refactor entire backend

This commit is contained in:
Abhinav Raut
2024-07-25 02:03:56 +05:30
parent 8b763fb167
commit 7ac9a2fe67
41 changed files with 800 additions and 752 deletions

View File

@@ -68,7 +68,7 @@ func handleGetConversation(r *fastglue.Request) error {
)
c, err := app.conversationManager.Get(uuid)
if err != nil {
return r.SendErrorEnvelope(http.StatusInternalServerError, err.Error(), nil, "")
return sendErrorEnvelope(r, err)
}
return r.SendEnvelope(c)
}
@@ -80,7 +80,7 @@ func handleUpdateAssigneeLastSeen(r *fastglue.Request) error {
)
err := app.conversationManager.UpdateAssigneeLastSeen(uuid)
if err != nil {
return r.SendErrorEnvelope(http.StatusInternalServerError, err.Error(), nil, "")
return sendErrorEnvelope(r, err)
}
return r.SendEnvelope(true)
}
@@ -92,7 +92,7 @@ func handleGetConversationParticipants(r *fastglue.Request) error {
)
p, err := app.conversationManager.GetParticipants(uuid)
if err != nil {
return r.SendErrorEnvelope(http.StatusInternalServerError, err.Error(), nil, "")
return sendErrorEnvelope(r, err)
}
return r.SendEnvelope(p)
}
@@ -170,7 +170,6 @@ func handleAddConversationTags(r *fastglue.Request) error {
app.lo.Error("unmarshalling tag ids", "error", err)
return r.SendErrorEnvelope(fasthttp.StatusInternalServerError, "error adding tags", nil, "")
}
if err := app.conversationManager.UpsertTags(uuid, tagIDs); err != nil {
return sendErrorEnvelope(r, err)
}

View File

@@ -63,7 +63,7 @@ func initHandlers(g *fastglue.Fastglue, hub *ws.Hub) {
g.GET("/api/tags", auth(handleGetTags))
// i18n.
g.GET("/api/lang/{lang}", auth(handleGetI18nLang))
g.GET("/api/lang/{lang}", handleGetI18nLang)
// Websocket.
g.GET("/api/ws", auth(func(r *fastglue.Request) error {

View File

@@ -193,7 +193,6 @@ func initConversations(i18n *i18n.I18n, hub *ws.Hub, n notifier.Notifier, db *sq
c, err := conversation.New(hub, i18n, n, conversation.Opts{
DB: db,
Lo: initLogger("conversation_manager"),
ReferenceNumPattern: ko.String("app.constants.conversation_reference_number_pattern"),
})
if err != nil {
log.Fatalf("error initializing conversation manager: %v", err)
@@ -280,7 +279,7 @@ func initMessages(db *sqlx.DB,
}
func initTeamManager(db *sqlx.DB) *team.Manager {
var lo = initLogger("team_manager")
var lo = initLogger("team-manager")
mgr, err := team.New(team.Opts{
DB: db,
Lo: lo,
@@ -358,10 +357,14 @@ func initAutomationEngine(db *sqlx.DB, userManager *user.Manager) *automation.En
return engine
}
func initAutoAssigner(teamManager *team.Manager, conversationManager *conversation.Manager) *autoassigner.Engine {
e, err := autoassigner.New(teamManager, conversationManager, initLogger("autoassigner"))
func initAutoAssigner(teamManager *team.Manager, userManager *user.Manager, conversationManager *conversation.Manager) *autoassigner.Engine {
systemUser, err := userManager.GetSystemUser()
if err != nil {
log.Fatalf("error initializing auto assigner engine: %v", err)
log.Fatalf("error fetching system user: %v", err)
}
e, err := autoassigner.New(teamManager, conversationManager, systemUser, initLogger("autoassigner"))
if err != nil {
log.Fatalf("error initializing auto assigner: %v", err)
}
return e
}

View File

@@ -95,7 +95,7 @@ func main() {
conversationManager = initConversations(i18n, wsHub, notifier, db)
automationEngine = initAutomationEngine(db, userManager)
messageManager = initMessages(db, wsHub, userManager, teamManager, contactManager, attachmentManager, conversationManager, inboxManager, automationEngine, templateManager)
autoassigner = initAutoAssigner(teamManager, conversationManager)
autoassigner = initAutoAssigner(teamManager, userManager, conversationManager)
)
// Set message store for conversation manager.

View File

@@ -17,17 +17,17 @@ func handleGetMessages(r *fastglue.Request) error {
uuid = r.RequestCtx.UserValue("uuid").(string)
)
msgs, err := app.messageManager.GetConversationMessages(uuid)
messages, err := app.messageManager.GetConversationMessages(uuid)
if err != nil {
return sendErrorEnvelope(r, err)
}
for i := range msgs {
for j := range msgs[i].Attachments {
msgs[i].Attachments[j].URL = app.attachmentManager.Store.GetURL(msgs[i].Attachments[j].UUID)
for i := range messages {
for j := range messages[i].Attachments {
messages[i].Attachments[j].URL = app.attachmentManager.Store.GetURL(messages[i].Attachments[j].UUID)
}
}
return r.SendEnvelope(msgs)
return r.SendEnvelope(messages)
}
func handleGetMessage(r *fastglue.Request) error {
@@ -35,18 +35,16 @@ func handleGetMessage(r *fastglue.Request) error {
app = r.Context.(*App)
uuid = r.RequestCtx.UserValue("uuid").(string)
)
msgs, err := app.messageManager.GetMessage(uuid)
messages, err := app.messageManager.GetMessage(uuid)
if err != nil {
return sendErrorEnvelope(r, err)
}
for i := range msgs {
for j := range msgs[i].Attachments {
msgs[i].Attachments[j].URL = app.attachmentManager.Store.GetURL(msgs[i].Attachments[j].UUID)
for i := range messages {
for j := range messages[i].Attachments {
messages[i].Attachments[j].URL = app.attachmentManager.Store.GetURL(messages[i].Attachments[j].UUID)
}
}
return r.SendEnvelope(msgs)
return r.SendEnvelope(messages)
}
func handleRetryMessage(r *fastglue.Request) error {

2
go.mod
View File

@@ -12,6 +12,7 @@ require (
github.com/knadh/goyesql/v2 v2.2.0
github.com/knadh/koanf/parsers/json v0.1.0
github.com/knadh/koanf/parsers/toml v0.1.0
github.com/knadh/koanf/providers/confmap v0.1.0
github.com/knadh/koanf/providers/file v0.1.0
github.com/knadh/koanf/providers/posflag v0.1.0
github.com/knadh/koanf/providers/rawbytes v0.1.0
@@ -47,7 +48,6 @@ require (
github.com/jaytaylor/html2text v0.0.0-20230321000545-74c2419ad056 // indirect
github.com/klauspost/compress v1.17.8 // indirect
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect

View File

@@ -12,6 +12,7 @@ import (
"github.com/abhinavxd/artemis/internal/conversation"
"github.com/abhinavxd/artemis/internal/conversation/models"
"github.com/abhinavxd/artemis/internal/team"
umodels "github.com/abhinavxd/artemis/internal/user/models"
"github.com/mr-karan/balance"
"github.com/zerodha/logf"
)
@@ -27,6 +28,7 @@ type Engine struct {
// Mutex to protect the balancer map
mu sync.Mutex
systemUser umodels.User
conversationManager *conversation.Manager
teamManager *team.Manager
lo *logf.Logger
@@ -34,10 +36,11 @@ type Engine struct {
// New initializes a new Engine instance, set up with the provided team manager,
// conversation manager, and logger.
func New(teamManager *team.Manager, conversationManager *conversation.Manager, lo *logf.Logger) (*Engine, error) {
func New(teamManager *team.Manager, conversationManager *conversation.Manager, systemUser umodels.User, lo *logf.Logger) (*Engine, error) {
var e = Engine{
conversationManager: conversationManager,
teamManager: teamManager,
systemUser: systemUser,
lo: lo,
mu: sync.Mutex{},
}
@@ -121,6 +124,7 @@ func (e *Engine) assignConversations() error {
}
for _, conversation := range unassigned {
// Get user.
uid, err := e.getUserFromPool(conversation)
if err != nil {
e.lo.Error("error fetching user from balancer pool", "error", err)
@@ -133,8 +137,8 @@ func (e *Engine) assignConversations() error {
e.lo.Error("error converting user id from string to int", "error", err)
}
// Assign conversation to this user.
e.conversationManager.UpdateUserAssigneeBySystem(conversation.UUID, userID)
// Assign conversation.
e.conversationManager.UpdateUserAssignee(conversation.UUID, userID, e.systemUser)
}
return nil
}

View File

@@ -1,5 +1,5 @@
// Package automation provides a framework for automatically evaluating and applying
// rules to conversations based on events like new conversations, updates, and time triggers.
// Package automation automatically evaluates and applies rules to conversations based on events like new conversations, updates, and time triggers,
// and performs some actions if they are true.
package automation
import (
@@ -50,20 +50,17 @@ type ConversationStore interface {
UpdatePriority(uuid string, priority []byte, actor umodels.User) error
}
type UserStore interface {
GetSystemUser() (umodels.User, error)
}
type queries struct {
GetRules *sqlx.Stmt `query:"get-rules"`
GetAll *sqlx.Stmt `query:"get-all"`
GetRule *sqlx.Stmt `query:"get-rule"`
InsertRule *sqlx.Stmt `query:"insert-rule"`
UpdateRule *sqlx.Stmt `query:"update-rule"`
DeleteRule *sqlx.Stmt `query:"delete-rule"`
ToggleRule *sqlx.Stmt `query:"toggle-rule"`
GetAll *sqlx.Stmt `query:"get-all"`
GetRule *sqlx.Stmt `query:"get-rule"`
InsertRule *sqlx.Stmt `query:"insert-rule"`
UpdateRule *sqlx.Stmt `query:"update-rule"`
DeleteRule *sqlx.Stmt `query:"delete-rule"`
ToggleRule *sqlx.Stmt `query:"toggle-rule"`
GetEnabledRules *sqlx.Stmt `query:"get-enabled-rules"`
}
// New initializes a new Engine.
func New(systemUser umodels.User, opt Opts) (*Engine, error) {
var (
q queries
@@ -82,10 +79,12 @@ func New(systemUser umodels.User, opt Opts) (*Engine, error) {
return e, nil
}
// SetConversationStore sets the conversation store.
func (e *Engine) SetConversationStore(store ConversationStore) {
e.conversationStore = store
}
// ReloadRules reloads automation rules.
func (e *Engine) ReloadRules() {
e.rulesMu.Lock()
defer e.rulesMu.Unlock()
@@ -93,11 +92,12 @@ func (e *Engine) ReloadRules() {
e.rules = e.queryRules()
}
// Serve starts the Engine to evaluate rules based on events.
func (e *Engine) Serve(ctx context.Context) {
ticker := time.NewTicker(1 * time.Hour)
defer ticker.Stop()
// Create separate semaphores for each channel
// Create separate semaphores for each channel.
maxWorkers := 10
newConversationSemaphore := make(chan struct{}, maxWorkers)
updateConversationSemaphore := make(chan struct{}, maxWorkers)
@@ -121,6 +121,7 @@ func (e *Engine) Serve(ctx context.Context) {
}
}
// GetAllRules retrieves all rules of a specific type.
func (e *Engine) GetAllRules(typ []byte) ([]models.RuleRecord, error) {
var rules = make([]models.RuleRecord, 0)
if err := e.q.GetAll.Select(&rules, typ); err != nil {
@@ -130,6 +131,7 @@ func (e *Engine) GetAllRules(typ []byte) ([]models.RuleRecord, error) {
return rules, nil
}
// GetRule retrieves a rule by ID.
func (e *Engine) GetRule(id int) (models.RuleRecord, error) {
var rule = models.RuleRecord{}
if err := e.q.GetRule.Get(&rule, id); err != nil {
@@ -142,6 +144,7 @@ func (e *Engine) GetRule(id int) (models.RuleRecord, error) {
return rule, nil
}
// ToggleRule toggles the active status of a rule by ID.
func (e *Engine) ToggleRule(id int) error {
if _, err := e.q.ToggleRule.Exec(id); err != nil {
e.lo.Error("error toggling rule", "error", err)
@@ -152,6 +155,7 @@ func (e *Engine) ToggleRule(id int) error {
return nil
}
// UpdateRule updates an existing rule.
func (e *Engine) UpdateRule(id int, rule models.RuleRecord) error {
if _, err := e.q.UpdateRule.Exec(id, rule.Name, rule.Description, rule.Type, rule.Rules); err != nil {
e.lo.Error("error updating rule", "error", err)
@@ -162,6 +166,7 @@ func (e *Engine) UpdateRule(id int, rule models.RuleRecord) error {
return nil
}
// CreateRule creates a new rule.
func (e *Engine) CreateRule(rule models.RuleRecord) error {
if _, err := e.q.InsertRule.Exec(rule.Name, rule.Description, rule.Type, rule.Rules); err != nil {
e.lo.Error("error creating rule", "error", err)
@@ -172,6 +177,7 @@ func (e *Engine) CreateRule(rule models.RuleRecord) error {
return nil
}
// DeleteRule deletes a rule by ID.
func (e *Engine) DeleteRule(id int) error {
if _, err := e.q.DeleteRule.Exec(id); err != nil {
e.lo.Error("error deleting rule", "error", err)
@@ -182,6 +188,7 @@ func (e *Engine) DeleteRule(id int) error {
return nil
}
// handleNewConversation handles new conversation events.
func (e *Engine) handleNewConversation(conversationUUID string, semaphore chan struct{}) {
defer func() { <-semaphore }()
conversation, err := e.conversationStore.Get(conversationUUID)
@@ -192,6 +199,7 @@ func (e *Engine) handleNewConversation(conversationUUID string, semaphore chan s
e.evalConversationRules(rules, conversation)
}
// handleUpdateConversation handles update conversation events.
func (e *Engine) handleUpdateConversation(conversationUUID string, semaphore chan struct{}) {
defer func() { <-semaphore }()
conversation, err := e.conversationStore.Get(conversationUUID)
@@ -203,8 +211,10 @@ func (e *Engine) handleUpdateConversation(conversationUUID string, semaphore cha
e.evalConversationRules(rules, conversation)
}
// handleTimeTrigger handles time trigger events.
func (e *Engine) handleTimeTrigger(semaphore chan struct{}) {
defer func() { <-semaphore }()
thirtyDaysAgo := time.Now().Add(-30 * 24 * time.Hour)
conversations, err := e.conversationStore.GetRecentConversations(thirtyDaysAgo)
if err != nil {
@@ -216,6 +226,7 @@ func (e *Engine) handleTimeTrigger(semaphore chan struct{}) {
}
}
// EvaluateNewConversationRules enqueues a new conversation for rule evaluation.
func (e *Engine) EvaluateNewConversationRules(conversationUUID string) {
select {
case e.newConversationQ <- conversationUUID:
@@ -225,6 +236,7 @@ func (e *Engine) EvaluateNewConversationRules(conversationUUID string) {
}
}
// EvaluateConversationUpdateRules enqueues an updated conversation for rule evaluation.
func (e *Engine) EvaluateConversationUpdateRules(conversationUUID string) {
select {
case e.updateConversationQ <- conversationUUID:
@@ -234,17 +246,20 @@ func (e *Engine) EvaluateConversationUpdateRules(conversationUUID string) {
}
}
// queryRules fetches automation rules from the database.
func (e *Engine) queryRules() []models.Rule {
var (
rulesJSON []string
rules []models.Rule
)
err := e.q.GetRules.Select(&rulesJSON)
err := e.q.GetEnabledRules.Select(&rulesJSON)
if err != nil {
e.lo.Error("error fetching automation rules", "error", err)
return nil
return rules
}
e.lo.Debug("fetched rules from db", "count", len(rulesJSON))
for _, ruleJSON := range rulesJSON {
var rulesBatch []models.Rule
if err := json.Unmarshal([]byte(ruleJSON), &rulesBatch); err != nil {
@@ -256,6 +271,7 @@ func (e *Engine) queryRules() []models.Rule {
return rules
}
// filterRulesByType filters rules by type.
func (e *Engine) filterRulesByType(ruleType string) []models.Rule {
e.rulesMu.RLock()
defer e.rulesMu.RUnlock()

View File

@@ -9,27 +9,31 @@ import (
cmodels "github.com/abhinavxd/artemis/internal/conversation/models"
)
// evalConversationRules evaluates a list of rules against a given conversation.
// If all the groups of a rule pass their evaluations based on the defined logical operations,
// the corresponding actions are executed.
func (e *Engine) evalConversationRules(rules []models.Rule, conversation cmodels.Conversation) {
e.lo.Debug("num rules", "count", len(rules), "rules", rules)
for _, rule := range rules {
e.lo.Debug("eval rule", "groups", len(rule.Groups), "rule", rule)
// At max there can be only 2 groups.
if len(rule.Groups) > 2 {
e.lo.Warn("more than 2 groups found for rules")
continue
}
var results []bool
for _, group := range rule.Groups {
e.lo.Debug("evaluating group rule", "logical_op", group.LogicalOp)
result := e.evaluateGroup(group.Rules, group.LogicalOp, conversation)
e.lo.Debug("group evaluation status", "status", result)
results = append(results, result)
}
if evaluateFinalResult(results, rule.GroupOperator) {
e.lo.Debug("rule fully evalauted, executing actions")
e.lo.Debug("rule fully evaluated, executing actions")
// All group rule evaluations successful, execute the actions.
for _, action := range rule.Actions {
e.executeActions(conversation, action)
e.applyAction(action, conversation)
}
} else {
e.lo.Debug("rule evaluation failed, NOT executing actions")
@@ -37,7 +41,8 @@ func (e *Engine) evalConversationRules(rules []models.Rule, conversation cmodels
}
}
// evaluateFinalResult
// evaluateFinalResult computes the final result of multiple group evaluations
// based on the specified logical operator (AND/OR).
func evaluateFinalResult(results []bool, operator string) bool {
if operator == models.OperatorAnd {
for _, result := range results {
@@ -58,7 +63,8 @@ func evaluateFinalResult(results []bool, operator string) bool {
return false
}
// evaluateGroup
// evaluateGroup evaluates a set of rules within a group against a given conversation
// based on the specified logical operator (AND/OR).
func (e *Engine) evaluateGroup(rules []models.RuleDetail, operator string, conversation cmodels.Conversation) bool {
switch operator {
case models.OperatorAnd:
@@ -83,6 +89,7 @@ func (e *Engine) evaluateGroup(rules []models.RuleDetail, operator string, conve
return false
}
// evaluateRule evaluates a single rule against a given conversation.
func (e *Engine) evaluateRule(rule models.RuleDetail, conversation cmodels.Conversation) bool {
var (
valueToCompare string
@@ -129,9 +136,9 @@ func (e *Engine) evaluateRule(rule models.RuleDetail, conversation cmodels.Conve
case models.RuleNotContains:
conditionMet = !strings.Contains(valueToCompare, rule.Value)
case models.RuleSet:
conditionMet = bool(len(valueToCompare) > 0)
conditionMet = len(valueToCompare) > 0
case models.RuleNotSet:
conditionMet = !bool(len(valueToCompare) > 0)
conditionMet = len(valueToCompare) == 0
default:
e.lo.Error("rule logical operator not recognized", "operator", rule.Operator)
return false
@@ -139,13 +146,7 @@ func (e *Engine) evaluateRule(rule models.RuleDetail, conversation cmodels.Conve
return conditionMet
}
func (e *Engine) executeActions(conversation cmodels.Conversation, action models.RuleAction) {
err := e.applyAction(action, conversation)
if err != nil {
e.lo.Error("error executing rule action", "action", action.Action, "error", err)
}
}
// applyAction applies a specific action to the given conversation.
func (e *Engine) applyAction(action models.RuleAction, conversation cmodels.Conversation) error {
switch action.Type {
case models.ActionAssignTeam:

View File

@@ -1,4 +1,4 @@
-- name: get-rules
-- name: get-enabled-rules
select
rules
from automation_rules where disabled is not TRUE;

View File

@@ -1,8 +1,10 @@
// Package cannedresp provides functionality to manage canned responses in the system.
package cannedresp
import (
"embed"
"github.com/abhinavxd/artemis/internal/cannedresp/models"
"github.com/abhinavxd/artemis/internal/dbutil"
"github.com/abhinavxd/artemis/internal/envelope"
"github.com/jmoiron/sqlx"
@@ -14,17 +16,13 @@ var (
efs embed.FS
)
// Manager handles the operations related to canned responses.
type Manager struct {
q queries
lo *logf.Logger
}
type CannedResponse struct {
ID string `db:"id" json:"id"`
Title string `db:"title" json:"title"`
Content string `db:"content" json:"content"`
}
// Opts holds the options for creating a new Manager.
type Opts struct {
DB *sqlx.DB
Lo *logf.Logger
@@ -34,6 +32,7 @@ type queries struct {
GetAll *sqlx.Stmt `query:"get-all"`
}
// New initializes a new Manager.
func New(opts Opts) (*Manager, error) {
var q queries
@@ -47,8 +46,9 @@ func New(opts Opts) (*Manager, error) {
}, nil
}
func (t *Manager) GetAll() ([]CannedResponse, error) {
var c []CannedResponse
// GetAll retrieves all canned responses.
func (t *Manager) GetAll() ([]models.CannedResponse, error) {
var c = make([]models.CannedResponse, 0)
if err := t.q.GetAll.Select(&c); err != nil {
t.lo.Error("error fetching canned responses", "error", err)
return c, envelope.NewError(envelope.GeneralError, "Error fetching canned responses", nil)

View File

@@ -0,0 +1,8 @@
package models
// CannedResponse represents a canned response with an ID, title, and content.
type CannedResponse struct {
ID string `db:"id" json:"id"`
Title string `db:"title" json:"title"`
Content string `db:"content" json:"content"`
}

View File

@@ -1,3 +1,4 @@
// Package contact provides functionality to manage contacts in the system.
package contact
import (
@@ -14,20 +15,23 @@ var (
efs embed.FS
)
// Manager handles the operations related to contacts.
type Manager struct {
q queries
lo *logf.Logger
}
// Opts holds the options for creating a new Manager.
type Opts struct {
DB *sqlx.DB
Lo *logf.Logger
}
type queries struct {
InsertContact *sqlx.Stmt `query:"insert-contact"`
UpsertContact *sqlx.Stmt `query:"upsert-contact"`
}
// New initializes a new Manager.
func New(opts Opts) (*Manager, error) {
var q queries
@@ -41,11 +45,12 @@ func New(opts Opts) (*Manager, error) {
}, nil
}
// Upsert inserts or updates a contact and returns the contact ID.
func (m *Manager) Upsert(con models.Contact) (int, error) {
var contactID int
if err := m.q.InsertContact.QueryRow(con.Source, con.SourceID, con.InboxID,
if err := m.q.UpsertContact.QueryRow(con.Source, con.SourceID, con.InboxID,
con.FirstName, con.LastName, con.Email, con.PhoneNumber, con.AvatarURL).Scan(&contactID); err != nil {
m.lo.Error("inserting contact", "error", err)
m.lo.Error("error upserting contact", "error", err)
return contactID, err
}
return contactID, nil

View File

@@ -12,7 +12,7 @@ type Contact struct {
FirstName string `db:"first_name" json:"first_name"`
LastName string `db:"last_name" json:"last_name"`
Email string `db:"email" json:"email"`
PhoneNumber *string `db:"phone_number" json:"phone_number"`
PhoneNumber null.String `db:"phone_number" json:"phone_number"`
AvatarURL null.String `db:"avatar_url" json:"avatar_url"`
InboxID int `db:"inbox_id" json:"inbox_id"`
Source string `db:"source" json:"source"`

View File

@@ -1,12 +1,12 @@
-- name: insert-contact
-- Step 1: Check if contact exists
-- name: upsert-contact
-- Check if contact exists
WITH existing_contact AS (
SELECT contact_id
FROM public.contact_methods
WHERE source = $1 AND source_id = $2
)
-- Step 2: Insert contact if it does not exist
-- Insert contact if it does not exist
, ins_contact AS (
INSERT INTO public.contacts (first_name, last_name, email, phone_number, avatar_url)
SELECT $4, $5, $6, $7, $8
@@ -14,7 +14,7 @@ WITH existing_contact AS (
RETURNING id
)
-- Step 3: Determine which contact ID to use
-- Determine which contact ID to use
, final_contact AS (
SELECT contact_id AS id FROM existing_contact
UNION ALL
@@ -22,7 +22,7 @@ WITH existing_contact AS (
LIMIT 1
)
-- Step 4: Insert contact method if it does not exist
-- Insert contact method if it does not exist
, ins_contact_method AS (
INSERT INTO public.contact_methods (contact_id, source, source_id, inbox_id)
SELECT id, $1, $2, $3
@@ -30,6 +30,6 @@ WITH existing_contact AS (
ON CONFLICT DO NOTHING
)
-- Step 5: Return the final contact ID
-- Return the final contact ID
SELECT id AS contact_id FROM final_contact;

View File

@@ -1,3 +1,4 @@
// Package conversation provides functionality to manage conversations in the system.
package conversation
import (
@@ -25,62 +26,23 @@ import (
)
var (
// Embedded filesystem
//go:embed queries.sql
efs embed.FS
StatusOpen = "Open"
StatusResolved = "Resolved"
StatusProcessing = "Processing"
StatusSpam = "Spam"
PriorityLow = "Low"
PriortiyMedium = "Medium"
PriorityHigh = "High"
statuses = []string{
StatusOpen,
StatusResolved,
StatusProcessing,
StatusSpam,
}
priorities = []string{
PriorityLow,
PriortiyMedium,
PriorityHigh,
}
filters = map[string]string{
"status_open": " c.status = 'Open'",
"status_processing": " c.status = 'Processing'",
"status_spam": " c.status = 'Spam'",
"status_resolved": " c.status = 'Resolved'",
"status_all": " 1=1 ",
}
validOrderBy = []string{"created_at", "priority", "status", "last_message_at"}
validOrder = []string{"ASC", "DESC"}
assigneeTypeTeam = "team"
assigneeTypeUser = "user"
allConversations = "all"
assignedConversations = "assigned"
teamConversations = "unassigned"
)
const (
maxConversationsPageSize = 50
)
// MessageStore interface defines methods to record changes in conversation assignees, priority, and status.
type MessageStore interface {
RecordAssigneeUserChange(conversationUUID string, assigneeID int, actor umodels.User) error
RecordAssigneeUserChangeBySystem(conversationUUID string, assigneeID int) error
RecordAssigneeTeamChange(conversationUUID string, teamID int, actor umodels.User) error
RecordPriorityChange(priority, conversationUUID string, actor umodels.User) error
RecordStatusChange(status, conversationUUID string, actor umodels.User) error
}
// Manager handles the operations related to conversations.
type Manager struct {
lo *logf.Logger
db *sqlx.DB
@@ -92,10 +54,10 @@ type Manager struct {
ReferenceNumPattern string
}
// Opts holds the options for creating a new Manager.
type Opts struct {
DB *sqlx.DB
Lo *logf.Logger
ReferenceNumPattern string
DB *sqlx.DB
Lo *logf.Logger
}
type queries struct {
@@ -105,9 +67,9 @@ type queries struct {
GetConversation *sqlx.Stmt `query:"get-conversation"`
GetRecentConversations *sqlx.Stmt `query:"get-recent-conversations"`
GetUnassigned *sqlx.Stmt `query:"get-unassigned"`
GetConversationParticipants *sqlx.Stmt `query:"get-conversation-participants"`
GetConversations string `query:"get-conversations"`
GetConversationsUUIDs string `query:"get-conversations-uuids"`
GetConversationParticipants *sqlx.Stmt `query:"get-conversation-participants"`
GetAssignedConversations *sqlx.Stmt `query:"get-assigned-conversations"`
GetAssigneeStats *sqlx.Stmt `query:"get-assignee-stats"`
GetNewConversationsStats *sqlx.Stmt `query:"get-new-conversations-stats"`
@@ -124,53 +86,56 @@ type queries struct {
DeleteTags *sqlx.Stmt `query:"delete-tags"`
}
func New(hub *ws.Hub, i18n *i18n.I18n, notfier notifier.Notifier, opts Opts) (*Manager, error) {
// New initializes a new Manager.
func New(hub *ws.Hub, i18n *i18n.I18n, notifier notifier.Notifier, opts Opts) (*Manager, error) {
var q queries
if err := dbutil.ScanSQLFile("queries.sql", &q, opts.DB, efs); err != nil {
return nil, err
}
c := &Manager{
q: q,
hub: hub,
i18n: i18n,
notifier: notfier,
db: opts.DB,
lo: opts.Lo,
ReferenceNumPattern: opts.ReferenceNumPattern,
q: q,
hub: hub,
i18n: i18n,
notifier: notifier,
db: opts.DB,
lo: opts.Lo,
}
return c, nil
}
// SetMessageStore sets the message store for the Manager.
func (c *Manager) SetMessageStore(store MessageStore) {
c.messageStore = store
}
// Create creates a new conversation and returns its ID and UUID.
func (c *Manager) Create(contactID int, inboxID int, meta []byte) (int, string, error) {
var (
id int
uuid string
refNum, _ = c.generateRefNum(c.ReferenceNumPattern)
refNum, _ = stringutil.RandomNumericString(16)
)
if err := c.q.InsertConversation.QueryRow(refNum, contactID, StatusOpen, inboxID, meta).Scan(&id, &uuid); err != nil {
if err := c.q.InsertConversation.QueryRow(refNum, contactID, models.StatusOpen, inboxID, meta).Scan(&id, &uuid); err != nil {
c.lo.Error("error inserting new conversation into the DB", "error", err)
return id, uuid, err
}
return id, uuid, nil
}
// Get retrieves a conversation by its UUID.
func (c *Manager) Get(uuid string) (models.Conversation, error) {
var conversation models.Conversation
if err := c.q.GetConversation.Get(&conversation, uuid); err != nil {
if err == sql.ErrNoRows {
c.lo.Error("conversation not found", "uuid", uuid)
return conversation, err
return conversation, envelope.NewError(envelope.InputError, "Conversation not found.", nil)
}
c.lo.Error("error fetching conversation", "error", err)
return conversation, err
return conversation, envelope.NewError(envelope.InputError, "Error fetching conversation.", nil)
}
return conversation, nil
}
// GetRecentConversations retrieves conversations created after the specified time.
func (c *Manager) GetRecentConversations(time time.Time) ([]models.Conversation, error) {
var conversations []models.Conversation
if err := c.q.GetRecentConversations.Select(&conversations, time); err != nil {
@@ -184,25 +149,28 @@ func (c *Manager) GetRecentConversations(time time.Time) ([]models.Conversation,
return conversations, nil
}
// UpdateAssigneeLastSeen updates the last seen timestamp of an assignee.
func (c *Manager) UpdateAssigneeLastSeen(uuid string) error {
if _, err := c.q.UpdateAssigneeLastSeen.Exec(uuid); err != nil {
c.lo.Error("error updating conversation", "error", err)
return err
return envelope.NewError(envelope.GeneralError, "Error updating assignee last seen.", nil)
}
// Broadcast the property update.
c.hub.BroadcastConversationPropertyUpdate(uuid, "assignee_last_seen_at", time.Now().Format(time.DateTime))
c.hub.BroadcastConversationPropertyUpdate(uuid, "assignee_last_seen_at", time.Now().Format(time.RFC3339))
return nil
}
// GetParticipants retrieves the participants of a conversation.
func (c *Manager) GetParticipants(uuid string) ([]models.ConversationParticipant, error) {
conv := make([]models.ConversationParticipant, 0)
if err := c.q.GetConversationParticipants.Select(&conv, uuid); err != nil {
c.lo.Error("error fetching conversation", "error", err)
return conv, err
return conv, envelope.NewError(envelope.GeneralError, "Error fetching conversation participants", nil)
}
return conv, nil
}
// AddParticipant adds a participant to a conversation.
func (c *Manager) AddParticipant(userID int, convUUID string) error {
if _, err := c.q.InsertConverstionParticipant.Exec(userID, convUUID); err != nil {
if pgErr, ok := err.(*pq.Error); ok && pgErr.Code == "23505" {
@@ -213,6 +181,7 @@ func (c *Manager) AddParticipant(userID int, convUUID string) error {
return nil
}
// UpdateMeta updates the metadata of a conversation.
func (c *Manager) UpdateMeta(conversationID int, conversationUUID string, meta map[string]string) error {
metaJSON, err := json.Marshal(meta)
if err != nil {
@@ -226,6 +195,7 @@ func (c *Manager) UpdateMeta(conversationID int, conversationUUID string, meta m
return nil
}
// UpdateLastMessage updates the last message and its timestamp in a conversation.
func (c *Manager) UpdateLastMessage(conversationID int, conversationUUID, lastMessage string, lastMessageAt time.Time) error {
return c.UpdateMeta(conversationID, conversationUUID, map[string]string{
"last_message": lastMessage,
@@ -233,6 +203,7 @@ func (c *Manager) UpdateLastMessage(conversationID int, conversationUUID, lastMe
})
}
// UpdateFirstReplyAt updates the first reply timestamp in a conversation.
func (c *Manager) UpdateFirstReplyAt(conversationUUID string, conversationID int, at time.Time) error {
if _, err := c.q.UpdateFirstReplyAt.Exec(conversationID, at); err != nil {
c.lo.Error("error updating conversation first reply at", "error", err)
@@ -243,16 +214,18 @@ func (c *Manager) UpdateFirstReplyAt(conversationUUID string, conversationID int
return nil
}
// GetUnassigned retrieves unassigned conversations.
func (c *Manager) GetUnassigned() ([]models.Conversation, error) {
var conv []models.Conversation
if err := c.q.GetUnassigned.Select(&conv); err != nil {
if err != sql.ErrNoRows {
return conv, fmt.Errorf("fetching unassigned converastions: %w", err)
return conv, fmt.Errorf("fetching unassigned conversations: %w", err)
}
}
return conv, nil
}
// GetID retrieves the ID of a conversation by its UUID.
func (c *Manager) GetID(uuid string) (int, error) {
var id int
if err := c.q.GetID.QueryRow(uuid).Scan(&id); err != nil {
@@ -265,6 +238,7 @@ func (c *Manager) GetID(uuid string) (int, error) {
return id, nil
}
// GetUUID retrieves the UUID of a conversation by its ID.
func (c *Manager) GetUUID(id int) (string, error) {
var uuid string
if err := c.q.GetUUID.QueryRow(id).Scan(&uuid); err != nil {
@@ -277,6 +251,7 @@ func (c *Manager) GetUUID(id int) (string, error) {
return uuid, nil
}
// GetInboxID retrieves the inbox ID of a conversation by its UUID.
func (c *Manager) GetInboxID(uuid string) (int, error) {
var id int
if err := c.q.GetInboxID.QueryRow(uuid).Scan(&id); err != nil {
@@ -289,18 +264,22 @@ func (c *Manager) GetInboxID(uuid string) (int, error) {
return id, nil
}
// GetAll retrieves all conversations with optional filtering, ordering, and pagination.
func (c *Manager) GetAll(order, orderBy, filter string, page, pageSize int) ([]models.Conversation, error) {
return c.GetConversations(0, allConversations, order, orderBy, filter, page, pageSize)
return c.GetConversations(0, models.AllConversations, order, orderBy, filter, page, pageSize)
}
// GetAssigned retrieves conversations assigned to a specific user with optional filtering, ordering, and pagination.
func (c *Manager) GetAssigned(userID int, order, orderBy, filter string, page, pageSize int) ([]models.Conversation, error) {
return c.GetConversations(userID, assignedConversations, order, orderBy, filter, page, pageSize)
return c.GetConversations(userID, models.AssignedConversations, order, orderBy, filter, page, pageSize)
}
// GetTeamConversations retrieves conversations assigned to a team the user is part of with optional filtering, ordering, and pagination.
func (c *Manager) GetTeamConversations(userID int, order, orderBy, filter string, page, pageSize int) ([]models.Conversation, error) {
return c.GetConversations(userID, teamConversations, order, orderBy, filter, page, pageSize)
return c.GetConversations(userID, models.AssigneeTypeTeam, order, orderBy, filter, page, pageSize)
}
// GetConversations retrieves conversations based on user ID, type, and optional filtering, ordering, and pagination.
func (c *Manager) GetConversations(userID int, typ, order, orderBy, filter string, page, pageSize int) ([]models.Conversation, error) {
var conversations = make([]models.Conversation, 0)
@@ -311,7 +290,7 @@ func (c *Manager) GetConversations(userID int, typ, order, orderBy, filter strin
order = "DESC"
}
sqlStr, qArgs, err := c.generateConversationsQuery(userID, c.q.GetConversations, typ, order, orderBy, filter, page, pageSize)
query, qArgs, err := c.generateConversationsListQuery(userID, c.q.GetConversations, typ, order, orderBy, filter, page, pageSize)
if err != nil {
c.lo.Error("error generating conversations query", "error", err)
return conversations, envelope.NewError(envelope.GeneralError, c.i18n.Ts("globals.messages.errorFetching", "name", "{globals.entities.conversations}"), nil)
@@ -324,17 +303,18 @@ func (c *Manager) GetConversations(userID int, typ, order, orderBy, filter strin
return conversations, envelope.NewError(envelope.GeneralError, c.i18n.Ts("globals.messages.errorFetching", "name", "{globals.entities.conversations}"), nil)
}
if err := tx.Select(&conversations, sqlStr, qArgs...); err != nil {
if err := tx.Select(&conversations, query, qArgs...); err != nil {
c.lo.Error("error fetching conversations", "error", err)
return conversations, envelope.NewError(envelope.GeneralError, c.i18n.Ts("globals.messages.errorFetching", "name", "{globals.entities.conversations}"), nil)
}
return conversations, nil
}
// GetConversationUUIDs retrieves the UUIDs of conversations based on user ID, type, and optional filtering, ordering, and pagination.
func (c *Manager) GetConversationUUIDs(userID, page, pageSize int, typ, filter string) ([]string, error) {
var ids = make([]string, 0)
sqlStr, qArgs, err := c.generateConversationsQuery(userID, c.q.GetConversationsUUIDs, typ, "", "", filter, page, pageSize)
query, qArgs, err := c.generateConversationsListQuery(userID, c.q.GetConversationsUUIDs, typ, "", "", filter, page, pageSize)
if err != nil {
c.lo.Error("error generating conversations query", "error", err)
return ids, err
@@ -347,13 +327,14 @@ func (c *Manager) GetConversationUUIDs(userID, page, pageSize int, typ, filter s
return ids, err
}
if err := tx.Select(&ids, sqlStr, qArgs...); err != nil {
if err := tx.Select(&ids, query, qArgs...); err != nil {
c.lo.Error("error fetching conversation uuids", "error", err)
return ids, err
}
return ids, nil
}
// GetAssignedConversations retrieves conversations assigned to a specific user.
func (c *Manager) GetAssignedConversations(userID int) ([]models.Conversation, error) {
var conversations []models.Conversation
if err := c.q.GetAssignedConversations.Select(&conversations, userID); err != nil {
@@ -363,13 +344,14 @@ func (c *Manager) GetAssignedConversations(userID int) ([]models.Conversation, e
return conversations, nil
}
// UpdateUserAssignee updates the assignee of a conversation to a specific user.
func (c *Manager) UpdateUserAssignee(uuid string, assigneeID int, actor umodels.User) error {
if err := c.UpdateAssignee(uuid, assigneeID, assigneeTypeUser); err != nil {
if err := c.UpdateAssignee(uuid, assigneeID, models.AssigneeTypeUser); err != nil {
return envelope.NewError(envelope.GeneralError, "Error updating assignee", nil)
}
// Send notification to assignee.
c.notifier.SendAssignedConversationNotification([]int{assigneeID}, uuid)
go c.notifier.SendAssignedConversationNotification([]int{assigneeID}, uuid)
if err := c.messageStore.RecordAssigneeUserChange(uuid, assigneeID, actor); err != nil {
return envelope.NewError(envelope.GeneralError, "Error recording assignee change", nil)
@@ -377,22 +359,9 @@ func (c *Manager) UpdateUserAssignee(uuid string, assigneeID int, actor umodels.
return nil
}
func (c *Manager) UpdateUserAssigneeBySystem(uuid string, assigneeID int) error {
if err := c.UpdateAssignee(uuid, assigneeID, assigneeTypeUser); err != nil {
return envelope.NewError(envelope.GeneralError, "Error updating assignee", nil)
}
// Send notification to assignee.
c.notifier.SendAssignedConversationNotification([]int{assigneeID}, uuid)
if err := c.messageStore.RecordAssigneeUserChangeBySystem(uuid, assigneeID); err != nil {
return envelope.NewError(envelope.GeneralError, "Error recording assignee change", nil)
}
return nil
}
// UpdateTeamAssignee updates the assignee of a conversation to a specific team.
func (c *Manager) UpdateTeamAssignee(uuid string, teamID int, actor umodels.User) error {
if err := c.UpdateAssignee(uuid, teamID, assigneeTypeTeam); err != nil {
if err := c.UpdateAssignee(uuid, teamID, models.AssigneeTypeTeam); err != nil {
return envelope.NewError(envelope.GeneralError, "Error updating assignee", nil)
}
if err := c.messageStore.RecordAssigneeTeamChange(uuid, teamID, actor); err != nil {
@@ -401,15 +370,16 @@ func (c *Manager) UpdateTeamAssignee(uuid string, teamID int, actor umodels.User
return nil
}
// UpdateAssignee updates the assignee of a conversation.
func (c *Manager) UpdateAssignee(uuid string, assigneeID int, assigneeType string) error {
switch assigneeType {
case assigneeTypeUser:
case models.AssigneeTypeUser:
if _, err := c.q.UpdateAssignedUser.Exec(uuid, assigneeID); err != nil {
c.lo.Error("error updating conversation assignee", "error", err)
return fmt.Errorf("error updating assignee")
}
c.hub.BroadcastConversationPropertyUpdate(uuid, "assigned_user_id", strconv.Itoa(assigneeID))
case assigneeTypeTeam:
case models.AssigneeTypeTeam:
if _, err := c.q.UpdateAssignedTeam.Exec(uuid, assigneeID); err != nil {
c.lo.Error("error updating conversation assignee", "error", err)
return fmt.Errorf("error updating assignee")
@@ -421,40 +391,39 @@ func (c *Manager) UpdateAssignee(uuid string, assigneeID int, assigneeType strin
return nil
}
// UpdatePriority updates the priority of a conversation.
func (c *Manager) UpdatePriority(uuid string, priority []byte, actor umodels.User) error {
var priorityStr = string(priority)
if !slices.Contains(priorities, priorityStr) {
if !slices.Contains(models.ValidPriorities, priorityStr) {
return envelope.NewError(envelope.GeneralError, "Invalid `priority` value", nil)
}
if _, err := c.q.UpdatePriority.Exec(uuid, priority); err != nil {
c.lo.Error("error updating conversation priority", "error", err)
return envelope.NewError(envelope.GeneralError, "Error updating priority", nil)
}
if err := c.messageStore.RecordPriorityChange(priorityStr, uuid, actor); err != nil {
return envelope.NewError(envelope.GeneralError, "Error recording priority change", nil)
}
return nil
}
// UpdateStatus updates the status of a conversation.
func (c *Manager) UpdateStatus(uuid string, status []byte, actor umodels.User) error {
var statusStr = string(status)
if !slices.Contains(statuses, statusStr) {
if !slices.Contains(models.ValidStatuses, statusStr) {
return envelope.NewError(envelope.GeneralError, "Invalid `status` value", nil)
}
if _, err := c.q.UpdateStatus.Exec(uuid, status); err != nil {
c.lo.Error("error updating conversation status", "error", err)
return envelope.NewError(envelope.GeneralError, "Error updating status", nil)
}
if err := c.messageStore.RecordStatusChange(statusStr, uuid, actor); err != nil {
return envelope.NewError(envelope.GeneralError, "Error recording status change", nil)
}
return nil
}
// GetAssigneeStats retrieves the statistics of conversations assigned to a specific user.
func (c *Manager) GetAssigneeStats(userID int) (models.ConversationCounts, error) {
var counts = models.ConversationCounts{}
if err := c.q.GetAssigneeStats.Get(&counts, userID); err != nil {
@@ -467,6 +436,7 @@ func (c *Manager) GetAssigneeStats(userID int) (models.ConversationCounts, error
return counts, nil
}
// GetNewConversationsStats retrieves the statistics of new conversations.
func (c *Manager) GetNewConversationsStats() ([]models.NewConversationsStats, error) {
var stats []models.NewConversationsStats
if err := c.q.GetNewConversationsStats.Select(&stats); err != nil {
@@ -479,13 +449,12 @@ func (c *Manager) GetNewConversationsStats() ([]models.NewConversationsStats, er
return stats, nil
}
// UpsertTags updates the tags associated with a conversation.
func (t *Manager) UpsertTags(uuid string, tagIDs []int) error {
// Delete tags that have been removed & add new tags.
if _, err := t.q.DeleteTags.Exec(uuid, pq.Array(tagIDs)); err != nil {
t.lo.Error("error deleting conversation tags", "error", err)
return envelope.NewError(envelope.GeneralError, "Error adding tags", nil)
}
for _, tagID := range tagIDs {
if _, err := t.q.AddTag.Exec(uuid, tagID); err != nil {
t.lo.Error("error adding tags to conversation", "error", err)
@@ -495,7 +464,8 @@ func (t *Manager) UpsertTags(uuid string, tagIDs []int) error {
return nil
}
func (c *Manager) generateConversationsQuery(userID int, baseQuery, typ, order, orderBy, filter string, page, pageSize int) (string, []interface{}, error) {
// generateConversationsListQuery generates the SQL query to list conversations with optional filtering, ordering, and pagination.
func (c *Manager) generateConversationsListQuery(userID int, baseQuery, typ, order, orderBy, filter string, page, pageSize int) (string, []interface{}, error) {
var (
qArgs []interface{}
cond string
@@ -503,28 +473,28 @@ func (c *Manager) generateConversationsQuery(userID int, baseQuery, typ, order,
// Set condition based on the type.
switch typ {
case assignedConversations:
case models.AssignedConversations:
cond = "AND c.assigned_user_id = $1"
qArgs = append(qArgs, userID)
case teamConversations:
case models.TeamConversations:
cond = "AND c.assigned_user_id IS NULL AND c.assigned_team_id IN (SELECT team_id FROM team_members WHERE user_id = $1)"
qArgs = append(qArgs, userID)
case allConversations:
case models.AllConversations:
// No conditions.
default:
return "", nil, errors.New("invalid type of conversation")
}
if filterClause, ok := filters[filter]; ok {
if filterClause, ok := models.ValidFilters[filter]; ok {
cond += " AND " + filterClause
}
// Ensure orderBy & order is valid.
var orderByClause = ""
if slices.Contains(validOrderBy, orderBy) {
if slices.Contains(models.ValidOrderBy, orderBy) {
orderByClause = fmt.Sprintf(" ORDER BY %s ", orderBy)
}
if orderByClause > "" && slices.Contains(validOrder, order) {
if orderByClause > "" && slices.Contains(models.ValidOrder, order) {
orderByClause += order
}
@@ -543,22 +513,3 @@ func (c *Manager) generateConversationsQuery(userID int, baseQuery, typ, order,
return sqlQuery, qArgs, nil
}
func (c *Manager) generateRefNum(pattern string) (string, error) {
if len(pattern) <= 5 {
pattern = "01234567890"
}
randomNumbers, err := stringutil.RandomNumericString(len(pattern))
if err != nil {
return "", err
}
result := []byte(pattern)
randomIndex := 0
for i := range result {
if result[i] == '#' {
result[i] = randomNumbers[randomIndex]
randomIndex++
}
}
return string(result), nil
}

View File

@@ -7,6 +7,49 @@ import (
"github.com/volatiletech/null/v9"
)
var (
StatusOpen = "Open"
StatusResolved = "Resolved"
StatusProcessing = "Processing"
StatusSpam = "Spam"
PriorityLow = "Low"
PriortiyMedium = "Medium"
PriorityHigh = "High"
ValidStatuses = []string{
StatusOpen,
StatusResolved,
StatusProcessing,
StatusSpam,
}
ValidPriorities = []string{
PriorityLow,
PriortiyMedium,
PriorityHigh,
}
ValidFilters = map[string]string{
"status_open": " c.status = 'Open'",
"status_processing": " c.status = 'Processing'",
"status_spam": " c.status = 'Spam'",
"status_resolved": " c.status = 'Resolved'",
"status_all": " 1=1 ",
}
ValidOrderBy = []string{"created_at", "priority", "status", "last_message_at"}
ValidOrder = []string{"ASC", "DESC"}
AssigneeTypeTeam = "team"
AssigneeTypeUser = "user"
AllConversations = "all"
AssignedConversations = "assigned"
TeamConversations = "unassigned"
)
type Conversation struct {
ID int `db:"id" json:"id"`
CreatedAt time.Time `db:"created_at" json:"created_at"`
@@ -51,4 +94,4 @@ type ConversationCounts struct {
type NewConversationsStats struct {
Date time.Time `db:"date" json:"date"`
NewConversations int `db:"new_conversations" json:"new_conversations"`
}
}

View File

@@ -1,52 +1,39 @@
// Package dbutil provides utility functions for database operations.
package dbutil
import (
"errors"
"fmt"
"io/fs"
"github.com/jmoiron/sqlx"
"github.com/knadh/goyesql/v2"
goyesqlx "github.com/knadh/goyesql/v2/sqlx"
"github.com/lib/pq"
)
var (
// ErrUniqueViolation indicates a unique constraint violation.
ErrUniqueViolation = errors.New("unique constraint violation")
ErrEmailExists = errors.New("email already exists")
// ErrEmailExists indicates that an email already exists.
ErrEmailExists = errors.New("email already exists")
)
// ScanSQLFile scans a goyesql .sql file from the given fs to the given struct.
// ScanSQLFile scans a goyesql .sql file from the given fs and prepares the queries in the given struct.
func ScanSQLFile(path string, o interface{}, db *sqlx.DB, f fs.FS) error {
// Read the SQL file from the embedded filesystem.
b, err := fs.ReadFile(f, path)
if err != nil {
return err
}
// Parse the SQL file.
q, err := goyesql.ParseBytes(b)
if err != nil {
return err
}
// Prepare queries.
// Scan the parsed queries into the provided struct and prepare them.
if err := goyesqlx.ScanToStruct(o, q, db.Unsafe()); err != nil {
return err
}
return nil
}
// HandlePGError checks for common Postgres errors like unique constraint violations.
func HandlePGError(err error) error {
if pgErr, ok := err.(*pq.Error); ok {
switch pgErr.Code {
case "23505":
// Unique violation
switch pgErr.Constraint {
case "users_email_unique":
return fmt.Errorf("%w: %s", ErrEmailExists, pgErr.Detail)
default:
return fmt.Errorf("%w: %s", ErrUniqueViolation, pgErr.Detail)
}
}
}
return err
}

View File

@@ -1,3 +1,4 @@
// Package envelope provides custom error types and utility functions for API error handling.
package envelope
import "net/http"
@@ -13,19 +14,18 @@ const (
// Error is the error type used for all API errors.
type Error struct {
Code int
ErrorType string
Message string
Data interface{}
Code int // HTTP status code.
ErrorType string // Type of the error.
Message string // Error message.
Data interface{} // Additional data related to the error.
}
// Error returns the error message and satisfies Go error type.
// Error returns the error message and satisfies the Go error interface.
func (e Error) Error() string {
return e.Message
}
// NewError creates and returns a new instace of Error
// with custom error metadata.
// NewError creates and returns a new instance of Error with custom error metadata.
func NewError(etype string, message string, data interface{}) error {
err := Error{
Message: message,
@@ -51,7 +51,7 @@ func NewError(etype string, message string, data interface{}) error {
return err
}
// NewErrorWithCode creates and returns a new instace of Error, with custom error metadata and http status code.
// NewErrorWithCode creates and returns a new instance of Error with custom error metadata and an HTTP status code.
func NewErrorWithCode(etype string, code int, message string, data interface{}) error {
return Error{
Message: message,
@@ -60,4 +60,3 @@ func NewErrorWithCode(etype string, code int, message string, data interface{})
Code: code,
}
}

View File

@@ -1,4 +1,4 @@
// Package email is the email inbox with multiple SMTP servers and IMAP clients.
// Package email provides functionality for an email inbox with multiple SMTP servers and IMAP clients.
package email
import (
@@ -13,14 +13,14 @@ const (
ChannelEmail = "email"
)
// Config holds the email inbox config with multiple smtp servers & imap clients.
// Config holds the email inbox configuration with multiple SMTP servers and IMAP clients.
type Config struct {
SMTP []SMTPConfig `json:"smtp"`
IMAP []IMAPConfig `json:"imap"`
From string
From string `json:"from"`
}
// SMTP represents an SMTP server's credentials with the smtppool options.
// SMTPConfig represents an SMTP server's credentials with the smtppool options.
type SMTPConfig struct {
Username string `json:"username"`
Password string `json:"password"`
@@ -28,11 +28,10 @@ type SMTPConfig struct {
TLSType string `json:"tls_type"`
TLSSkipVerify bool `json:"tls_skip_verify"`
EmailHeaders map[string]string `json:"email_headers"`
// SMTP pool options.
smtppool.Opt `json:",squash"`
smtppool.Opt `json:",squash"` // SMTP pool options.
}
// IMAPConfig holds imap client credentials & config.
// IMAPConfig holds IMAP client credentials and configuration.
type IMAPConfig struct {
Host string `json:"host"`
Port int `json:"port"`
@@ -42,15 +41,15 @@ type IMAPConfig struct {
ReadInterval string `json:"read_interval"`
}
// Email is the email inbox with multiple SMTP servers and IMAP clients.
// Email represents the email inbox with multiple SMTP servers and IMAP clients.
type Email struct {
id int
smtpPools []*smtppool.Pool
imapCfg []IMAPConfig
headers map[string]string
lo *logf.Logger
from string
msgStore inbox.MessageStore
id int
smtpPools []*smtppool.Pool
imapCfg []IMAPConfig
headers map[string]string
lo *logf.Logger
from string
messageStore inbox.MessageStore
}
// Opts holds the options required for the email inbox.
@@ -67,20 +66,19 @@ func New(store inbox.MessageStore, opts Opts) (*Email, error) {
if err != nil {
return nil, err
}
e := &Email{
id: opts.ID,
headers: opts.Headers,
from: opts.Config.From,
imapCfg: opts.Config.IMAP,
lo: opts.Lo,
smtpPools: pools,
msgStore: store,
id: opts.ID,
headers: opts.Headers,
from: opts.Config.From,
imapCfg: opts.Config.IMAP,
lo: opts.Lo,
smtpPools: pools,
messageStore: store,
}
return e, nil
}
// Identifier returns the unique identifier of the inbox which is the database id.
// Identifier returns the unique identifier of the inbox which is the database ID.
func (e *Email) Identifier() int {
return e.id
}
@@ -93,10 +91,10 @@ func (e *Email) Close() error {
return nil
}
// Receive starts receiver for each IMAP client.
// Receive starts reading incoming messages for each IMAP client.
func (e *Email) Receive(ctx context.Context) error {
for _, cfg := range e.imapCfg {
e.ReadIncomingMessages(ctx, cfg)
go e.ReadIncomingMessages(ctx, cfg)
}
return nil
}
@@ -106,7 +104,7 @@ func (e *Email) FromAddress() string {
return e.from
}
// FromAddress returns the channel name for this inbox.
// Channel returns the channel name for this inbox.
func (e *Email) Channel() string {
return ChannelEmail
}

View File

@@ -22,14 +22,15 @@ const (
DefaultReadInterval = time.Duration(5 * time.Minute)
)
// ReadIncomingMessages reads and processes incoming messages from an IMAP server based on the provided configuration.
func (e *Email) ReadIncomingMessages(ctx context.Context, cfg IMAPConfig) error {
dur, err := time.ParseDuration(cfg.ReadInterval)
readInterval, err := time.ParseDuration(cfg.ReadInterval)
if err != nil {
e.lo.Warn("could not IMAP read interval, using the default read interval of 5 minutes.", "error", err)
dur = DefaultReadInterval
e.lo.Warn("could not parse IMAP read interval, using the default read interval of 5 minutes.", "error", err)
readInterval = DefaultReadInterval
}
readTicker := time.NewTicker(dur)
readTicker := time.NewTicker(readInterval)
defer readTicker.Stop()
for {
@@ -44,34 +45,36 @@ func (e *Email) ReadIncomingMessages(ctx context.Context, cfg IMAPConfig) error
}
}
// processMailbox processes emails in the specified mailbox.
func (e *Email) processMailbox(cfg IMAPConfig) error {
c, err := imapclient.DialTLS(cfg.Host+":"+fmt.Sprint(cfg.Port), &imapclient.Options{})
client, err := imapclient.DialTLS(cfg.Host+":"+fmt.Sprint(cfg.Port), &imapclient.Options{})
if err != nil {
return fmt.Errorf("error connecting to IMAP server: %w", err)
}
defer c.Logout()
defer client.Logout()
if err := c.Login(cfg.Username, cfg.Password).Wait(); err != nil {
if err := client.Login(cfg.Username, cfg.Password).Wait(); err != nil {
return fmt.Errorf("error logging in to the IMAP server: %w", err)
}
if _, err := c.Select(cfg.Mailbox, &imap.SelectOptions{ReadOnly: true}).Wait(); err != nil {
if _, err := client.Select(cfg.Mailbox, &imap.SelectOptions{ReadOnly: true}).Wait(); err != nil {
return fmt.Errorf("error selecting mailbox: %w", err)
}
since := time.Now().Add(time.Hour * 12 * -1)
before := time.Now().Add(time.Hour * 24)
since := time.Now().Add(-12 * time.Hour)
before := time.Now().Add(24 * time.Hour)
searchData, err := e.searchMessages(c, since, before)
searchData, err := e.searchMessages(client, since, before)
if err != nil {
return fmt.Errorf("error searching messages: %w", err)
}
return e.fetchAndProcessMessages(c, searchData, e.Identifier())
return e.fetchAndProcessMessages(client, searchData, e.Identifier())
}
func (e *Email) searchMessages(c *imapclient.Client, since, before time.Time) (*imap.SearchData, error) {
searchCMD := c.Search(&imap.SearchCriteria{
// searchMessages searches for messages in the specified time range.
func (e *Email) searchMessages(client *imapclient.Client, since, before time.Time) (*imap.SearchData, error) {
searchCMD := client.Search(&imap.SearchCriteria{
Since: since,
Before: before,
},
@@ -85,7 +88,8 @@ func (e *Email) searchMessages(c *imapclient.Client, since, before time.Time) (*
return searchCMD.Wait()
}
func (e *Email) fetchAndProcessMessages(c *imapclient.Client, searchData *imap.SearchData, inboxID int) error {
// fetchAndProcessMessages fetches and processes messages based on the search results.
func (e *Email) fetchAndProcessMessages(client *imapclient.Client, searchData *imap.SearchData, inboxID int) error {
seqSet := imap.SeqSet{}
seqSet.AddRange(searchData.Min, searchData.Max)
@@ -94,7 +98,7 @@ func (e *Email) fetchAndProcessMessages(c *imapclient.Client, searchData *imap.S
Envelope: true,
}
fetchCmd := c.Fetch(seqSet, fetchOptions)
fetchCmd := client.Fetch(seqSet, fetchOptions)
for {
msg := fetchCmd.Next()
@@ -104,7 +108,7 @@ func (e *Email) fetchAndProcessMessages(c *imapclient.Client, searchData *imap.S
for fetchItem := msg.Next(); fetchItem != nil; fetchItem = msg.Next() {
if item, ok := fetchItem.(imapclient.FetchItemDataEnvelope); ok {
if err := e.processEnvelope(c, item.Envelope, msg.SeqNum, inboxID); err != nil {
if err := e.processEnvelope(client, item.Envelope, msg.SeqNum, inboxID); err != nil {
e.lo.Error("error processing envelope", "error", err)
}
}
@@ -114,13 +118,14 @@ func (e *Email) fetchAndProcessMessages(c *imapclient.Client, searchData *imap.S
return nil
}
func (e *Email) processEnvelope(c *imapclient.Client, env *imap.Envelope, seqNum uint32, inboxID int) error {
// processEnvelope processes an email envelope.
func (e *Email) processEnvelope(client *imapclient.Client, env *imap.Envelope, seqNum uint32, inboxID int) error {
if len(env.From) == 0 {
e.lo.Debug("no sender found for email", "message_id", env.MessageID)
return nil
}
exists, err := e.msgStore.MessageExists(env.MessageID)
exists, err := e.messageStore.MessageExists(env.MessageID)
if exists || err != nil {
return nil
}
@@ -130,8 +135,7 @@ func (e *Email) processEnvelope(c *imapclient.Client, env *imap.Envelope, seqNum
Channel: e.Channel(),
SenderType: message.SenderTypeContact,
Type: message.TypeIncoming,
Meta: "{}",
InboxID: int(inboxID),
InboxID: inboxID,
Status: message.StatusReceived,
Subject: env.Subject,
SourceID: null.StringFrom(env.MessageID),
@@ -140,9 +144,9 @@ func (e *Email) processEnvelope(c *imapclient.Client, env *imap.Envelope, seqNum
Source: e.Channel(),
SourceID: env.From[0].Addr(),
Email: env.From[0].Addr(),
InboxID: int(inboxID),
InboxID: inboxID,
},
InboxID: int(inboxID),
InboxID: inboxID,
}
incomingMsg.Contact.FirstName, incomingMsg.Contact.LastName = getContactName(env.From[0])
@@ -152,7 +156,7 @@ func (e *Email) processEnvelope(c *imapclient.Client, env *imap.Envelope, seqNum
seqSet := imap.SeqSet{}
seqSet.AddNum(seqNum)
fullFetchCmd := c.Fetch(seqSet, fetchOptions)
fullFetchCmd := client.Fetch(seqSet, fetchOptions)
fullMsg := fullFetchCmd.Next()
if fullMsg == nil {
return nil
@@ -160,29 +164,30 @@ func (e *Email) processEnvelope(c *imapclient.Client, env *imap.Envelope, seqNum
for fullFetchItem := fullMsg.Next(); fullFetchItem != nil; fullFetchItem = fullMsg.Next() {
if fullItem, ok := fullFetchItem.(imapclient.FetchItemDataBodySection); ok {
return e.processFullMessage(fullItem, &incomingMsg)
return e.processFullMessage(fullItem, incomingMsg)
}
}
return nil
}
func (e *Email) processFullMessage(item imapclient.FetchItemDataBodySection, incomingMsg *models.IncomingMessage) error {
envel, err := enmime.ReadEnvelope(item.Literal)
// processFullMessage processes the full email message.
func (e *Email) processFullMessage(item imapclient.FetchItemDataBodySection, incomingMsg models.IncomingMessage) error {
envelope, err := enmime.ReadEnvelope(item.Literal)
if err != nil {
return fmt.Errorf("error parsing email envelope: %w", err)
}
if len(envel.HTML) > 0 {
incomingMsg.Message.Content = envel.HTML
if len(envelope.HTML) > 0 {
incomingMsg.Message.Content = envelope.HTML
incomingMsg.Message.ContentType = message.ContentTypeHTML
} else if len(envel.Text) > 0 {
incomingMsg.Message.Content = envel.Text
} else if len(envelope.Text) > 0 {
incomingMsg.Message.Content = envelope.Text
incomingMsg.Message.ContentType = message.ContentTypeText
}
inReplyTo := strings.ReplaceAll(strings.ReplaceAll(envel.GetHeader("In-Reply-To"), "<", ""), ">", "")
references := strings.Fields(envel.GetHeader("References"))
inReplyTo := strings.ReplaceAll(strings.ReplaceAll(envelope.GetHeader("In-Reply-To"), "<", ""), ">", "")
references := strings.Fields(envelope.GetHeader("References"))
for i, ref := range references {
references[i] = strings.Trim(strings.TrimSpace(ref), " <>")
}
@@ -190,37 +195,37 @@ func (e *Email) processFullMessage(item imapclient.FetchItemDataBodySection, inc
incomingMsg.Message.InReplyTo = inReplyTo
incomingMsg.Message.References = references
for _, j := range envel.Attachments {
for _, att := range envelope.Attachments {
incomingMsg.Message.Attachments = append(incomingMsg.Message.Attachments, amodels.Attachment{
Filename: j.FileName,
Header: j.Header,
Content: j.Content,
ContentType: j.ContentType,
ContentID: j.ContentID,
Filename: att.FileName,
Header: att.Header,
Content: att.Content,
ContentType: att.ContentType,
ContentID: att.ContentID,
ContentDisposition: attachment.DispositionAttachment,
Size: strconv.Itoa(len(j.Content)),
Size: strconv.Itoa(len(att.Content)),
})
}
for _, j := range envel.Inlines {
for _, att := range envelope.Inlines {
incomingMsg.Message.Attachments = append(incomingMsg.Message.Attachments, amodels.Attachment{
Filename: j.FileName,
Header: j.Header,
Content: j.Content,
ContentType: j.ContentType,
ContentID: j.ContentID,
Filename: att.FileName,
Header: att.Header,
Content: att.Content,
ContentType: att.ContentType,
ContentID: att.ContentID,
ContentDisposition: attachment.DispositionInline,
Size: strconv.Itoa(len(j.Content)),
Size: strconv.Itoa(len(att.Content)),
})
}
if err := e.msgStore.ProcessMessage(*incomingMsg); err != nil {
return fmt.Errorf("error processing message: %w", err)
if err := e.messageStore.ProcessIncomingMessage(incomingMsg); err != nil {
return err
}
return nil
}
// getContactName extracts the contact's first and last name from the IMAP address.
func getContactName(imapAddr imap.Address) (string, string) {
from := strings.TrimSpace(imapAddr.Name)
names := strings.Fields(from)

View File

@@ -2,7 +2,6 @@ package email
import (
"crypto/tls"
"encoding/json"
"fmt"
"math/rand"
"net/smtp"
@@ -20,7 +19,7 @@ const (
dispositionInline = "inline"
)
// New returns an SMTP e-mail channels from the given SMTP server configcfg.
// NewSmtpPool returns a smtppool
func NewSmtpPool(configs []SMTPConfig) ([]*smtppool.Pool, error) {
pools := make([]*smtppool.Pool, 0, len(configs))
@@ -34,12 +33,13 @@ func NewSmtpPool(configs []SMTPConfig) ([]*smtppool.Pool, error) {
case "login":
auth = &smtppool.LoginAuth{Username: cfg.Username, Password: cfg.Password}
case "", "none":
// No authentication
default:
return nil, fmt.Errorf("unknown SMTP auth type '%s'", cfg.AuthProtocol)
}
cfg.Opt.Auth = auth
// TLS config.
// TLS config
if cfg.TLSType != "none" {
cfg.TLSConfig = &tls.Config{}
if cfg.TLSSkipVerify {
@@ -48,11 +48,12 @@ func NewSmtpPool(configs []SMTPConfig) ([]*smtppool.Pool, error) {
cfg.TLSConfig.ServerName = cfg.Host
}
// SSL/TLS, not STARTTLcfg.
// SSL/TLS, not STARTTLS
if cfg.TLSType == "TLS" {
cfg.Opt.SSL = true
}
}
pool, err := smtppool.New(cfg.Opt)
if err != nil {
return nil, err
@@ -63,83 +64,73 @@ func NewSmtpPool(configs []SMTPConfig) ([]*smtppool.Pool, error) {
return pools, nil
}
// Send sends an email.
// Send sends an email using one of the configured SMTP servers.
func (e *Email) Send(m models.Message) error {
// If there are more than one SMTP servers, send to a random one from the list.
// Select a random SMTP server if there are multiple
var (
ln = len(e.smtpPools)
srv *smtppool.Pool
serverCount = len(e.smtpPools)
server *smtppool.Pool
)
if ln > 1 {
srv = e.smtpPools[rand.Intn(ln)]
if serverCount > 1 {
server = e.smtpPools[rand.Intn(serverCount)]
} else {
srv = e.smtpPools[0]
server = e.smtpPools[0]
}
// Are there attachments?
var files []smtppool.Attachment
// Prepare attachments if there are any
var attachments []smtppool.Attachment
if m.Attachments != nil {
files = make([]smtppool.Attachment, 0, len(m.Attachments))
for _, f := range m.Attachments {
a := smtppool.Attachment{
Filename: f.Filename,
Header: f.Header,
Content: make([]byte, len(f.Content)),
attachments = make([]smtppool.Attachment, 0, len(m.Attachments))
for _, file := range m.Attachments {
attachment := smtppool.Attachment{
Filename: file.Filename,
Header: file.Header,
Content: make([]byte, len(file.Content)),
}
copy(a.Content, f.Content)
files = append(files, a)
copy(attachment.Content, file.Content)
attachments = append(attachments, attachment)
}
}
em := smtppool.Email{
email := smtppool.Email{
From: m.From,
To: m.To,
Subject: m.Subject,
Attachments: files,
Attachments: attachments,
Headers: textproto.MIMEHeader{},
}
// Attach SMTP level headercfg.
for k, v := range e.headers {
em.Headers.Set(k, v)
// Attach SMTP level headers
for key, value := range e.headers {
email.Headers.Set(key, value)
}
// Attach e-mail level headercfg.
for k, v := range m.Headers {
em.Headers.Set(k, v[0])
// Attach email level headers
for key, value := range m.Headers {
email.Headers.Set(key, value[0])
}
// Others.
// Set In-Reply-To and References headers
if m.InReplyTo != "" {
em.Headers.Set(headerInReplyTo, "<"+m.InReplyTo+">")
email.Headers.Set(headerInReplyTo, "<"+m.InReplyTo+">")
}
references := ""
// Set references message ids
var references string
for _, ref := range m.References {
references += "<" + ref + "> "
}
em.Headers.Set(headerReferences, references)
fmt.Printf("%+v EMAIL HEADERS -> headers", em.Headers)
email.Headers.Set(headerReferences, references)
// Set email content
switch m.ContentType {
case "plain":
em.Text = []byte(m.Content)
email.Text = []byte(m.Content)
default:
em.HTML = []byte(m.Content)
email.HTML = []byte(m.Content)
if len(m.AltContent) > 0 {
em.Text = []byte(m.AltContent)
email.Text = []byte(m.AltContent)
}
}
jsonData, err := json.MarshalIndent(em, "", " ")
if err != nil {
fmt.Println("Error marshalling to JSON:", err)
}
fmt.Println()
fmt.Println()
fmt.Println(string(jsonData))
fmt.Println()
fmt.Println()
return srv.Send(em)
return server.Send(email)
}

View File

@@ -1,3 +1,4 @@
// Package inbox provides functionality to manage inboxes in the system.
package inbox
import (
@@ -18,10 +19,11 @@ var (
//go:embed queries.sql
efs embed.FS
// ErrInboxNotFound is returned when an inbox is not found.
ErrInboxNotFound = errors.New("inbox not found")
)
// Closer provides function for closing an inbox.
// Closer provides a function for closing an inbox.
type Closer interface {
Close() error
}
@@ -46,18 +48,19 @@ type Inbox interface {
Channel() string
}
// MessageStore defines methods for storing and processing messages.
type MessageStore interface {
MessageExists(string) (bool, error)
ProcessMessage(models.IncomingMessage) error
ProcessIncomingMessage(models.IncomingMessage) error
}
// Opts contains the options for the initializing the inbox manager.
// Opts contains the options for initializing the inbox manager.
type Opts struct {
QueueSize int
Concurrency int
}
// Manager manages the inbox.
// Manager manages the inboxes.
type Manager struct {
queries queries
inboxes map[int]Inbox
@@ -79,7 +82,7 @@ type queries struct {
func New(lo *logf.Logger, db *sqlx.DB) (*Manager, error) {
var q queries
// Scan the sql file into the queries struct.
// Scan the SQL file into the queries struct.
if err := dbutil.ScanSQLFile("queries.sql", &q, db, efs); err != nil {
return nil, err
}
@@ -106,17 +109,17 @@ func (m *Manager) Get(id int) (Inbox, error) {
return i, nil
}
// GetByID returns inbox by the given ID.
// GetByID returns an inbox from the DB by the given ID.
func (m *Manager) GetByID(id int) (imodels.Inbox, error) {
var inbox imodels.Inbox
if err := m.queries.GetByID.Get(&inbox, id); err != nil {
m.lo.Error("fetching active inboxes", "error", err)
m.lo.Error("fetching inbox by ID", "error", err)
return inbox, err
}
return inbox, nil
}
// GetActive returns all active inboxes.
// GetActive returns all active inboxes from the DB.
func (m *Manager) GetActive() ([]imodels.Inbox, error) {
var inboxes []imodels.Inbox
if err := m.queries.GetActive.Select(&inboxes); err != nil {
@@ -126,7 +129,7 @@ func (m *Manager) GetActive() ([]imodels.Inbox, error) {
return inboxes, nil
}
// GetAll returns all inboxes.
// GetAll returns all inboxes from the DB.
func (m *Manager) GetAll() ([]imodels.Inbox, error) {
var inboxes = make([]imodels.Inbox, 0)
if err := m.queries.GetAll.Select(&inboxes); err != nil {
@@ -136,7 +139,7 @@ func (m *Manager) GetAll() ([]imodels.Inbox, error) {
return inboxes, nil
}
// Create creates an inbox.
// Create creates an inbox in the DB.
func (m *Manager) Create(inbox imodels.Inbox) error {
if _, err := m.queries.InsertInbox.Exec(inbox.Channel, inbox.Config, inbox.Name, inbox.From, nil); err != nil {
m.lo.Error("error creating inbox", "error", err)
@@ -145,6 +148,7 @@ func (m *Manager) Create(inbox imodels.Inbox) error {
return nil
}
// Update updates an inbox in the DB.
func (m *Manager) Update(id int, inbox imodels.Inbox) error {
if _, err := m.queries.Update.Exec(id, inbox.Channel, inbox.Config, inbox.Name, inbox.From); err != nil {
m.lo.Error("error updating inbox", "error", err)
@@ -153,6 +157,7 @@ func (m *Manager) Update(id int, inbox imodels.Inbox) error {
return nil
}
// Toggle toggles the status of an inbox in the DB.
func (m *Manager) Toggle(id int) error {
if _, err := m.queries.Toggle.Exec(id); err != nil {
m.lo.Error("error toggling inbox", "error", err)
@@ -161,6 +166,7 @@ func (m *Manager) Toggle(id int) error {
return nil
}
// Delete soft deletes an inbox in the DB.
func (m *Manager) Delete(id int) error {
if _, err := m.queries.SoftDelete.Exec(id); err != nil {
m.lo.Error("error deleting inbox", "error", err)
@@ -169,7 +175,7 @@ func (m *Manager) Delete(id int) error {
return nil
}
// Receive starts receiver for each inbox.
// Receive starts the receiver for each inbox.
func (m *Manager) Receive(ctx context.Context) {
for _, inb := range m.inboxes {
go inb.Receive(ctx)

View File

@@ -14,16 +14,13 @@ import (
"github.com/abhinavxd/artemis/internal/attachment"
"github.com/abhinavxd/artemis/internal/automation"
"github.com/abhinavxd/artemis/internal/contact"
"github.com/abhinavxd/artemis/internal/conversation"
cmodels "github.com/abhinavxd/artemis/internal/contact/models"
"github.com/abhinavxd/artemis/internal/dbutil"
"github.com/abhinavxd/artemis/internal/envelope"
"github.com/abhinavxd/artemis/internal/inbox"
"github.com/abhinavxd/artemis/internal/message/models"
"github.com/abhinavxd/artemis/internal/systeminfo"
"github.com/abhinavxd/artemis/internal/team"
tmodels "github.com/abhinavxd/artemis/internal/team/models"
"github.com/abhinavxd/artemis/internal/template"
"github.com/abhinavxd/artemis/internal/user"
umodels "github.com/abhinavxd/artemis/internal/user/models"
"github.com/abhinavxd/artemis/internal/ws"
"github.com/jmoiron/sqlx"
@@ -67,15 +64,16 @@ const (
maxLastMessageLen = 45
)
// Manager handles message-related operations.
type Manager struct {
q queries
lo *logf.Logger
contactMgr *contact.Manager
attachmentMgr *attachment.Manager
conversationMgr *conversation.Manager
inboxMgr *inbox.Manager
userMgr *user.Manager
teamMgr *team.Manager
contactStore contactStore
inboxStore inboxStore
conversationStore conversationStore
userStore userStore
teamStore teamStore
attachmentManager *attachment.Manager
automationEngine *automation.Engine
wsHub *ws.Hub
templateManager *template.Manager
@@ -84,6 +82,7 @@ type Manager struct {
outgoingProcessingMessages sync.Map
}
// Opts contains options for initializing the Manager.
type Opts struct {
DB *sqlx.DB
Lo *logf.Logger
@@ -91,6 +90,30 @@ type Opts struct {
OutgoingMsgQueueSize int
}
type teamStore interface {
GetTeam(int) (tmodels.Team, error)
}
type userStore interface {
Get(int, string) (umodels.User, error)
}
type contactStore interface {
Upsert(cmodels.Contact) (int, error)
}
type conversationStore interface {
UpdateFirstReplyAt(string, int, time.Time) error
UpdateLastMessage(int, string, string, time.Time) error
Create(int, int, []byte) (int, string, error)
GetUUID(int) (string, error)
}
type inboxStore interface {
Get(int) (inbox.Inbox, error)
}
// queries contains prepared SQL queries.
type queries struct {
GetMessage *sqlx.Stmt `query:"get-message"`
GetMessages *sqlx.Stmt `query:"get-messages"`
@@ -103,14 +126,15 @@ type queries struct {
MessageExists *sqlx.Stmt `query:"message-exists"`
}
// New creates and returns a new instance of the Manager.
func New(
wsHub *ws.Hub,
userMgr *user.Manager,
teamMgr *team.Manager,
contactMgr *contact.Manager,
attachmentMgr *attachment.Manager,
inboxMgr *inbox.Manager,
conversationMgr *conversation.Manager,
userStore userStore,
teamStore teamStore,
contactStore contactStore,
attachmentManager *attachment.Manager,
inboxStore inboxStore,
conversationStore conversationStore,
automationEngine *automation.Engine,
templateManager *template.Manager,
opts Opts,
@@ -124,12 +148,12 @@ func New(
q: q,
lo: opts.Lo,
wsHub: wsHub,
userMgr: userMgr,
teamMgr: teamMgr,
contactMgr: contactMgr,
attachmentMgr: attachmentMgr,
conversationMgr: conversationMgr,
inboxMgr: inboxMgr,
userStore: userStore,
teamStore: teamStore,
contactStore: contactStore,
conversationStore: conversationStore,
attachmentManager: attachmentManager,
inboxStore: inboxStore,
automationEngine: automationEngine,
templateManager: templateManager,
incomingMsgQ: make(chan models.IncomingMessage, opts.IncomingMsgQueueSize),
@@ -138,24 +162,27 @@ func New(
}, nil
}
// GetConversationMessages retrieves messages for a specific conversation.
func (m *Manager) GetConversationMessages(uuid string) ([]models.Message, error) {
var messages []models.Message
if err := m.q.GetMessages.Select(&messages, uuid); err != nil {
m.lo.Error("fetching messages from DB", "conversation_uuid", uuid, "error", err)
m.lo.Error("error fetching messages from DB", "conversation_uuid", uuid, "error", err)
return nil, envelope.NewError(envelope.GeneralError, "Error fetching messages", nil)
}
return messages, nil
}
// GetMessage retrieves a message by UUID.
func (m *Manager) GetMessage(uuid string) ([]models.Message, error) {
var messages []models.Message
if err := m.q.GetMessage.Select(&messages, uuid); err != nil {
m.lo.Error("fetching messages from DB", "conversation_uuid", uuid, "error", err)
m.lo.Error("error fetching message from DB", "message_uuid", uuid, "error", err)
return nil, envelope.NewError(envelope.GeneralError, "Error fetching message", nil)
}
return messages, nil
}
// UpdateMessageStatus updates the status of a message.
func (m *Manager) UpdateMessageStatus(uuid string, status string) error {
if _, err := m.q.UpdateMessageStatus.Exec(status, uuid); err != nil {
m.lo.Error("error updating message status in DB", "error", err, "uuid", uuid)
@@ -164,8 +191,12 @@ func (m *Manager) UpdateMessageStatus(uuid string, status string) error {
return nil
}
// RetryMessage retries sending a message by updating its status to pending.
func (m *Manager) RetryMessage(uuid string) error {
return m.UpdateMessageStatus(uuid, StatusPending)
if err := m.UpdateMessageStatus(uuid, StatusPending); err != nil {
return envelope.NewError(envelope.GeneralError, "Error retrying message", nil)
}
return nil
}
// RecordMessage inserts a message and attaches the attachments to the message.
@@ -177,24 +208,23 @@ func (m *Manager) RecordMessage(msg *models.Message) error {
if err := m.q.InsertMessage.QueryRow(msg.Type, msg.Status, msg.ConversationID, msg.ConversationUUID, msg.Content, msg.SenderID, msg.SenderType,
msg.Private, msg.ContentType, msg.SourceID, msg.InboxID, msg.Meta).Scan(&msg.ID, &msg.UUID, &msg.CreatedAt); err != nil {
m.lo.Error("error inserting message in db", "error", err)
return fmt.Errorf("inserting message: %w", err)
return envelope.NewError(envelope.GeneralError, "Error sending message", nil)
}
// Attach the attachments.
if err := m.attachmentMgr.AttachMessage(msg.Attachments, msg.ID); err != nil {
if err := m.attachmentManager.AttachMessage(msg.Attachments, msg.ID); err != nil {
m.lo.Error("error attaching attachments to the message", "error", err)
return errors.New("error attaching attachments to the message")
return envelope.NewError(envelope.GeneralError, "Error sending message", nil)
}
return nil
}
// StartDispatcher is a blocking function that must be invoked with a goroutine.
// It scans DB per second for pending outbound messages and sends them using the inbox channel.
// StartDispatcher starts a worker pool to dispatch pending outbound messages.
func (m *Manager) StartDispatcher(ctx context.Context, concurrency int, readInterval time.Duration) {
// Spawn goroutine worker pool.
for range concurrency {
go m.DispatchWorker()
for i := 0; i < concurrency; i++ {
go m.DispatchWorker(ctx)
}
// Scan pending messages from the DB.
@@ -234,54 +264,71 @@ func (m *Manager) StartDispatcher(ctx context.Context, concurrency int, readInte
}
}
func (m *Manager) DispatchWorker() {
for message := range m.outgoingMessageQueue {
inbox, err := m.inboxMgr.Get(message.InboxID)
if err != nil {
m.lo.Error("error fetching inbox", "error", err, "inbox_id", message.InboxID)
m.outgoingProcessingMessages.Delete(message.ID)
m.UpdateMessageStatus(message.UUID, StatusFailed)
continue
}
// DispatchWorker dispatches outgoing pending messages.
func (m *Manager) DispatchWorker(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case message, ok := <-m.outgoingMessageQueue:
if !ok {
return
}
message.From = inbox.FromAddress()
// Get inbox.
inbox, err := m.inboxStore.Get(message.InboxID)
if err != nil {
m.handleDispatchErrors(message, "error fetching inbox", err)
continue
}
if err := m.attachAttachments(&message); err != nil {
m.lo.Error("error attaching attachments to message", "error", err)
m.outgoingProcessingMessages.Delete(message.ID)
m.UpdateMessageStatus(message.UUID, StatusFailed)
continue
}
// Attach attachments.
if err := m.attachAttachments(&message); err != nil {
m.handleDispatchErrors(message, "error fetching inbox", err)
continue
}
message.To, _ = m.GetToAddress(message.ConversationID, inbox.Channel())
if inbox.Channel() == "email" {
// Get from, to addresses and inReplyTo.
message.From = inbox.FromAddress()
message.To, _ = m.GetToAddress(message.ConversationID, inbox.Channel())
message.InReplyTo, _ = m.GetInReplyTo(message.ConversationID)
// Send.
err = inbox.Send(message)
var newStatus = StatusFailed
if err != nil {
newStatus = StatusFailed
m.lo.Error("error sending message", "error", err, "inbox_id", message.InboxID)
}
// Update status.
m.UpdateMessageStatus(message.UUID, newStatus)
// Update first reply at.
switch newStatus {
case StatusSent:
m.conversationStore.UpdateFirstReplyAt(message.ConversationUUID, message.ConversationID, message.CreatedAt)
}
// Broadcast update.
m.wsHub.BroadcastMessagePropUpdate(message.ConversationUUID, message.UUID, "status" /*message field*/, newStatus)
// Delete from processing map.
m.outgoingProcessingMessages.Delete(message.ID)
}
err = inbox.Send(message)
var newStatus = StatusSent
if err != nil {
newStatus = StatusFailed
m.lo.Error("error sending message", "error", err, "inbox_id", message.InboxID)
}
m.UpdateMessageStatus(message.UUID, newStatus)
switch newStatus {
case StatusSent:
m.conversationMgr.UpdateFirstReplyAt(message.ConversationUUID, message.ConversationID, message.CreatedAt)
}
// Broadcast message status update to the subscribers.
m.wsHub.BroadcastMessagePropUpdate(message.ConversationUUID, message.UUID, "status" /*message field*/, newStatus)
// Remove message from the processing list.
m.outgoingProcessingMessages.Delete(message.ID)
}
}
// handleDispatchErrors logs an error, updates the message status to failed,
// and removes the message from the outgoing processing map.
func (m *Manager) handleDispatchErrors(message models.Message, logMessage string, err error) {
m.lo.Error(logMessage, "error", err, "inbox_id", message.InboxID)
m.outgoingProcessingMessages.Delete(message.ID)
m.UpdateMessageStatus(message.UUID, StatusFailed)
}
// GetToAddress retrieves the recipient addresses for a message.
func (m *Manager) GetToAddress(convID int, channel string) ([]string, error) {
var addr []string
if err := m.q.GetToAddress.Select(&addr, convID, channel); err != nil {
@@ -291,27 +338,24 @@ func (m *Manager) GetToAddress(convID int, channel string) ([]string, error) {
return addr, nil
}
// GetInReplyTo retrieves the In-Reply-To header value for a message.
func (m *Manager) GetInReplyTo(convID int) (string, error) {
var out string
if err := m.q.GetInReplyTo.Get(&out, convID); err != nil {
if err == sql.ErrNoRows {
m.lo.Error("in reply to not found", "error", err, "conversation_id", convID)
return out, nil
}
m.lo.Error("error fetching in reply to", "error", err, "conversation_id", convID)
return out, err
}
return out, nil
}
// StartDBInserts is a blocking function that must be invoked with a goroutine,
// it spawns worker pools for inserting incoming messages.
// StartDBInserts starts a worker pool to insert incoming messages into the database.
func (m *Manager) StartDBInserts(ctx context.Context, concurrency int) {
for range concurrency {
for i := 0; i < concurrency; i++ {
go m.InsertWorker(ctx)
}
}
// InsertWorker processes incoming messages and inserts them into the database.
func (m *Manager) InsertWorker(ctx context.Context) {
for msg := range m.incomingMsgQ {
select {
@@ -325,48 +369,41 @@ func (m *Manager) InsertWorker(ctx context.Context) {
}
}
// RecordAssigneeUserChange records an activity for a user assignee change.
func (m *Manager) RecordAssigneeUserChange(conversationUUID string, assigneeID int, actor umodels.User) error {
// Self assign.
if assigneeID == actor.ID {
return m.RecordActivity(ActivitySelfAssign, conversationUUID, actor.FullName(), actor)
}
// Assignment to other user.
assignee, err := m.userMgr.GetUser(assigneeID, "")
// Assignment to another user.
assignee, err := m.userStore.Get(assigneeID, "")
if err != nil {
return err
}
return m.RecordActivity(ActivityAssignedUserChange, conversationUUID, assignee.FullName(), actor)
}
func (m *Manager) RecordAssigneeUserChangeBySystem(conversationUUID string, assigneeID int) error {
assignee, err := m.userMgr.GetUser(assigneeID, "")
if err != nil {
return err
}
system, err := m.userMgr.GetUser(0, systeminfo.SystemUserUUID)
if err != nil {
return err
}
return m.RecordActivity(ActivityAssignedUserChange, conversationUUID, assignee.FullName(), system)
}
// RecordAssigneeTeamChange records an activity for a team assignee change.
func (m *Manager) RecordAssigneeTeamChange(conversationUUID string, teamID int, actor umodels.User) error {
team, err := m.teamMgr.GetTeam(teamID)
team, err := m.teamStore.GetTeam(teamID)
if err != nil {
return err
}
return m.RecordActivity(ActivityAssignedTeamChange, conversationUUID, team.Name, actor)
}
// RecordPriorityChange records an activity for a priority change.
func (m *Manager) RecordPriorityChange(priority, conversationUUID string, actor umodels.User) error {
return m.RecordActivity(ActivityPriorityChange, conversationUUID, priority, actor)
}
// RecordStatusChange records an activity for a status change.
func (m *Manager) RecordStatusChange(status, conversationUUID string, actor umodels.User) error {
return m.RecordActivity(ActivityStatusChange, conversationUUID, status, actor)
}
// RecordActivity records an activity message.
func (m *Manager) RecordActivity(activityType, conversationUUID, newValue string, actor umodels.User) error {
content, err := m.getActivityContent(activityType, newValue, actor.FullName())
if err != nil {
@@ -383,7 +420,6 @@ func (m *Manager) RecordActivity(activityType, conversationUUID, newValue string
Private: true,
SenderID: actor.ID,
SenderType: SenderTypeUser,
Meta: "{}",
}
// Record message in DB.
@@ -393,11 +429,12 @@ func (m *Manager) RecordActivity(activityType, conversationUUID, newValue string
m.BroadcastNewConversationMessage(message, content)
// Update the last message in conversation meta.
m.conversationMgr.UpdateLastMessage(0, conversationUUID, content, message.CreatedAt)
m.conversationStore.UpdateLastMessage(0, conversationUUID, content, message.CreatedAt)
return nil
}
// getActivityContent generates activity content based on the activity type.
func (m *Manager) getActivityContent(activityType, newValue, actorName string) (string, error) {
var content = ""
switch activityType {
@@ -419,30 +456,31 @@ func (m *Manager) getActivityContent(activityType, newValue, actorName string) (
return content, nil
}
// processIncomingMessage processes an incoming message.
func (m *Manager) processIncomingMessage(in models.IncomingMessage) error {
var (
trimmedMsg = m.TrimMsg(in.Message.Content)
convMeta = map[string]string{
trimmedMsg = m.TrimMsg(in.Message.Content)
conversationMeta = map[string]string{
"subject": in.Message.Subject,
"last_message": trimmedMsg,
"last_message_at": time.Now().Format(time.RFC3339),
}
)
convMetaJSON, err := json.Marshal(convMeta)
meta, err := json.Marshal(conversationMeta)
if err != nil {
m.lo.Error("error marshalling conversation meta", "error", err)
return err
}
senderID, err := m.contactMgr.Upsert(in.Contact)
senderID, err := m.contactStore.Upsert(in.Contact)
if err != nil {
m.lo.Error("error upserting contact", "error", err)
return err
}
in.Message.SenderID = senderID
// This message already exists? Return.
// Check if this message already exists.
conversationID, err := m.findConversationID([]string{in.Message.SourceID.String})
if err != nil && err != ErrConversationNotFound {
return err
@@ -451,7 +489,7 @@ func (m *Manager) processIncomingMessage(in models.IncomingMessage) error {
return nil
}
isNewConversation, err := m.findOrCreateConversation(&in.Message, in.InboxID, senderID, convMetaJSON)
isNewConversation, err := m.findOrCreateConversation(&in.Message, in.InboxID, senderID, meta)
if err != nil {
return err
}
@@ -465,15 +503,15 @@ func (m *Manager) processIncomingMessage(in models.IncomingMessage) error {
}
// Send WS update.
if in.Message.ConversationUUID > "" {
var content = ""
if in.Message.ConversationUUID != "" {
var content string
if isNewConversation {
content = m.TrimMsg(in.Message.Subject)
} else {
content = m.TrimMsg(in.Message.Content)
}
m.BroadcastNewConversationMessage(in.Message, content)
m.conversationMgr.UpdateLastMessage(in.Message.ConversationID, in.Message.ConversationUUID, content, in.Message.CreatedAt)
m.conversationStore.UpdateLastMessage(in.Message.ConversationID, in.Message.ConversationUUID, content, in.Message.CreatedAt)
}
// Evaluate automation rules for this conversation.
@@ -499,16 +537,18 @@ func (m *Manager) MessageExists(messageID string) (bool, error) {
return true, nil
}
// ProcessMessage enqueues an incoming message for processing.
func (m *Manager) ProcessMessage(message models.IncomingMessage) error {
// ProcessIncomingMessage enqueues an incoming message for processing.
func (m *Manager) ProcessIncomingMessage(message models.IncomingMessage) error {
select {
case m.incomingMsgQ <- message:
return nil
default:
return fmt.Errorf("failed to enqueue message: %v", message.Message.ID)
m.lo.Error("incoming message queue is full")
return errors.New("incoming message queue is full")
}
}
// TrimMsg trims and shortens a message content.
func (m *Manager) TrimMsg(msg string) string {
plain := strings.Trim(strings.TrimSpace(html2text.HTML2Text(msg)), " \t\n\r\v\f")
if len(plain) > maxLastMessageLen {
@@ -518,6 +558,7 @@ func (m *Manager) TrimMsg(msg string) string {
return plain
}
// uploadAttachments uploads attachments for a message.
func (m *Manager) uploadAttachments(in *models.Message) error {
var (
hasInline = false
@@ -526,7 +567,7 @@ func (m *Manager) uploadAttachments(in *models.Message) error {
)
for _, att := range in.Attachments {
reader := bytes.NewReader(att.Content)
url, _, _, err := m.attachmentMgr.Upload(msgUUID, att.Filename, att.ContentType, att.ContentDisposition, att.Size, reader)
url, _, _, err := m.attachmentManager.Upload(msgUUID, att.Filename, att.ContentType, att.ContentDisposition, att.Size, reader)
if err != nil {
m.lo.Error("error uploading attachment", "message_uuid", msgUUID, "error", err)
return errors.New("error uploading attachments for incoming message")
@@ -537,7 +578,7 @@ func (m *Manager) uploadAttachments(in *models.Message) error {
}
}
// Update the msg content the `cid:content_id` urls have been replaced.
// Update the msg content if the `cid:content_id` URLs have been replaced.
if hasInline {
if _, err := m.q.UpdateMessageContent.Exec(in.Content, msgID); err != nil {
m.lo.Error("error updating message content", "message_uuid", msgUUID)
@@ -547,6 +588,7 @@ func (m *Manager) uploadAttachments(in *models.Message) error {
return nil
}
// findOrCreateConversation finds or creates a conversation for the given message.
func (m *Manager) findOrCreateConversation(in *models.Message, inboxID int, contactID int, meta []byte) (bool, error) {
var (
new bool
@@ -557,7 +599,7 @@ func (m *Manager) findOrCreateConversation(in *models.Message, inboxID int, cont
// Search for existing conversation.
sourceIDs := in.References
if in.InReplyTo > "" {
if in.InReplyTo != "" {
sourceIDs = append(sourceIDs, in.InReplyTo)
}
conversationID, err = m.findConversationID(sourceIDs)
@@ -568,7 +610,7 @@ func (m *Manager) findOrCreateConversation(in *models.Message, inboxID int, cont
// Conversation not found, create one.
if conversationID == 0 {
new = true
conversationID, conversationUUID, err = m.conversationMgr.Create(contactID, inboxID, meta)
conversationID, conversationUUID, err = m.conversationStore.Create(contactID, inboxID, meta)
if err != nil || conversationID == 0 {
return new, err
}
@@ -579,7 +621,7 @@ func (m *Manager) findOrCreateConversation(in *models.Message, inboxID int, cont
// Set UUID if not available.
if conversationUUID == "" {
conversationUUID, err = m.conversationMgr.GetUUID(conversationID)
conversationUUID, err = m.conversationStore.GetUUID(conversationID)
if err != nil {
return new, err
}
@@ -598,29 +640,24 @@ func (m *Manager) findConversationID(sourceIDs []string) (int, error) {
if err := m.q.MessageExists.QueryRow(pq.Array(sourceIDs)).Scan(&conversationID); err != nil {
if err == sql.ErrNoRows {
return conversationID, ErrConversationNotFound
} else {
m.lo.Error("error fetching msg from DB", "error", err)
return conversationID, err
}
m.lo.Error("error fetching msg from DB", "error", err)
return conversationID, err
}
return conversationID, nil
}
// attachAttachments attaches files to a message.
func (m *Manager) attachAttachments(msg *models.Message) error {
var attachments, err = m.attachmentMgr.GetMessageAttachments(msg.ID)
attachments, err := m.attachmentManager.GetMessageAttachments(msg.ID)
if err != nil {
m.lo.Error("error fetching message attachments", "error", err)
return err
}
// TODO: set attachment headers and replace the inline image src url w
// src="cid:ii_lxxsfhtp0"
// a.Header.Set("Content-Disposition", "inline")
// a.Header.Set("Content-ID", "<"+f.CID+">")
// Fetch the blobs and attach the attachments to the message.
for i, att := range attachments {
attachments[i].Content, err = m.attachmentMgr.Store.GetBlob(att.UUID)
attachments[i].Content, err = m.attachmentManager.Store.GetBlob(att.UUID)
if err != nil {
m.lo.Error("error fetching blob for attachment", "attachment_uuid", att.UUID, "message_id", msg.ID)
return err
@@ -631,7 +668,7 @@ func (m *Manager) attachAttachments(msg *models.Message) error {
return nil
}
// getOutgoingProcessingMsgIDs returns outgoing msg ids currently being processed.
// getOutgoingProcessingMsgIDs returns the IDs of outgoing messages currently being processed.
func (m *Manager) getOutgoingProcessingMsgIDs() []int {
var out = make([]int, 0)
m.outgoingProcessingMessages.Range(func(key, _ any) bool {
@@ -643,6 +680,7 @@ func (m *Manager) getOutgoingProcessingMsgIDs() []int {
return out
}
// BroadcastNewConversationMessage broadcasts a new conversation message to subscribers.
func (m *Manager) BroadcastNewConversationMessage(message models.Message, trimmedContent string) {
m.wsHub.BroadcastNewConversationMessage(message.ConversationUUID, trimmedContent, message.UUID, time.Now().Format(time.RFC3339), message.Private)
}

View File

@@ -2,8 +2,8 @@ package notifier
// Notifier defines the interface for sending notifications.
type Notifier interface {
SendMessage(userID []int, subject, content string) error
SendAssignedConversationNotification(userID []int, convUUID string) error
SendMessage(userIDs []int, subject, content string) error
SendAssignedConversationNotification(userIDs []int, convUUID string) error
}
// TemplateRenderer defines the interface for rendering templates.
@@ -11,7 +11,7 @@ type TemplateRenderer interface {
RenderDefault(data interface{}) (subject, content string, err error)
}
// UserEmailFetcher defines the interfaces for fetchign user email address.
// UserEmailFetcher defines the interface for fetching user email addresses.
type UserEmailFetcher interface {
GetEmail(id int, uuid string) (string, error)
}

View File

@@ -5,6 +5,7 @@ import (
"math/rand"
"net/textproto"
amodels "github.com/abhinavxd/artemis/internal/attachment/models"
"github.com/abhinavxd/artemis/internal/inbox/channel/email"
"github.com/abhinavxd/artemis/internal/message/models"
notifier "github.com/abhinavxd/artemis/internal/notification"
@@ -12,22 +13,23 @@ import (
"github.com/zerodha/logf"
)
// Email
// Email implements the Notifier interface for sending emails.
type Email struct {
lo *logf.Logger
from string
smtpPools []*smtppool.Pool
userStore notifier.UserStore
TemplateRenderer notifier.TemplateRenderer
templateRenderer notifier.TemplateRenderer
}
// Opts contains options for creating a new Email notifier.
type Opts struct {
Lo *logf.Logger
FromEmail string
}
// New creates a new instance of email Notifier.
func New(smtpConfig []email.SMTPConfig, userStore notifier.UserStore, TemplateRenderer notifier.TemplateRenderer, opts Opts) (*Email, error) {
// New creates a new instance of the Email notifier.
func New(smtpConfig []email.SMTPConfig, userStore notifier.UserStore, templateRenderer notifier.TemplateRenderer, opts Opts) (*Email, error) {
pools, err := email.NewSmtpPool(smtpConfig)
if err != nil {
return nil, err
@@ -37,24 +39,18 @@ func New(smtpConfig []email.SMTPConfig, userStore notifier.UserStore, TemplateRe
smtpPools: pools,
from: opts.FromEmail,
userStore: userStore,
TemplateRenderer: TemplateRenderer,
templateRenderer: templateRenderer,
}, nil
}
// SendMessage sends an email using the default template to multiple users.
func (e *Email) SendMessage(userIDs []int, subject, content string) error {
var recipientEmails []string
for i := 0; i < len(userIDs); i++ {
userEmail, err := e.userStore.GetEmail(userIDs[i], "")
if err != nil {
e.lo.Error("error fetching user email", "error", err)
return err
}
recipientEmails = append(recipientEmails, userEmail)
recipientEmails, err := e.getUserEmails(userIDs)
if err != nil {
return err
}
// Render with default template.
templateBody, templateSubject, err := e.TemplateRenderer.RenderDefault(map[string]string{
templateBody, templateSubject, err := e.templateRenderer.RenderDefault(map[string]string{
"Content": content,
})
if err != nil {
@@ -72,14 +68,10 @@ func (e *Email) SendMessage(userIDs []int, subject, content string) error {
To: recipientEmails,
}
err = e.Send(m)
if err != nil {
e.lo.Error("error sending email notification", "error", err)
return err
}
return nil
return e.send(m)
}
// SendAssignedConversationNotification sends a email notification for an assigned conversation to the passed user.
func (e *Email) SendAssignedConversationNotification(userIDs []int, convUUID string) error {
subject := "New conversation assigned to you"
link := fmt.Sprintf("http://localhost:5173/conversations/%s", convUUID)
@@ -87,30 +79,40 @@ func (e *Email) SendAssignedConversationNotification(userIDs []int, convUUID str
return e.SendMessage(userIDs, subject, content)
}
// Send sends an email message.
func (e *Email) Send(m models.Message) error {
var (
ln = len(e.smtpPools)
srv *smtppool.Pool
)
if ln > 1 {
srv = e.smtpPools[rand.Intn(ln)]
} else {
srv = e.smtpPools[0]
// getUserEmails fetches the email addresses of the specified user IDs.
func (e *Email) getUserEmails(userIDs []int) ([]string, error) {
var recipientEmails []string
for _, userID := range userIDs {
userEmail, err := e.userStore.GetEmail(userID, "")
if err != nil {
e.lo.Error("error fetching user email", "error", err)
return nil, err
}
recipientEmails = append(recipientEmails, userEmail)
}
return recipientEmails, nil
}
// send sends an email message.
func (e *Email) send(m models.Message) error {
srv := e.selectSmtpPool()
em := e.prepareEmail(m)
return srv.Send(em)
}
// selectSmtpPool selects a random SMTP pool if multiple are available.
func (e *Email) selectSmtpPool() *smtppool.Pool {
if len(e.smtpPools) > 1 {
return e.smtpPools[rand.Intn(len(e.smtpPools))]
}
return e.smtpPools[0]
}
// prepareEmail prepares the email message with attachments and headers.
func (e *Email) prepareEmail(m models.Message) smtppool.Email {
var files []smtppool.Attachment
if m.Attachments != nil {
files = make([]smtppool.Attachment, 0, len(m.Attachments))
for _, f := range m.Attachments {
a := smtppool.Attachment{
Filename: f.Filename,
Header: f.Header,
Content: make([]byte, len(f.Content)),
}
copy(a.Content, f.Content)
files = append(files, a)
}
files = e.prepareAttachments(m.Attachments)
}
em := smtppool.Email{
@@ -134,5 +136,20 @@ func (e *Email) Send(m models.Message) error {
em.Text = []byte(m.AltContent)
}
}
return srv.Send(em)
return em
}
// prepareAttachments prepares email attachments.
func (e *Email) prepareAttachments(attachments []amodels.Attachment) []smtppool.Attachment {
files := make([]smtppool.Attachment, 0, len(attachments))
for _, f := range attachments {
a := smtppool.Attachment{
Filename: f.Filename,
Header: f.Header,
Content: make([]byte, len(f.Content)),
}
copy(a.Content, f.Content)
files = append(files, a)
}
return files
}

View File

@@ -1,3 +1,4 @@
// Package role handles role-related operations including creating, updating, fetching, and deleting roles.
package role
import (
@@ -16,16 +17,19 @@ var (
efs embed.FS
)
// Manager handles role-related operations.
type Manager struct {
q queries
lo *logf.Logger
}
// Opts contains options for initializing the Manager.
type Opts struct {
DB *sqlx.DB
Lo *logf.Logger
}
// queries contains prepared SQL queries.
type queries struct {
Get *sqlx.Stmt `query:"get-role"`
GetAll *sqlx.Stmt `query:"get-all"`
@@ -34,6 +38,7 @@ type queries struct {
Update *sqlx.Stmt `query:"update-role"`
}
// New creates and returns a new instance of the Manager.
func New(opts Opts) (*Manager, error) {
var q queries
@@ -47,6 +52,7 @@ func New(opts Opts) (*Manager, error) {
}, nil
}
// GetAll retrieves all roles.
func (t *Manager) GetAll() ([]models.Role, error) {
var roles = make([]models.Role, 0)
if err := t.q.GetAll.Select(&roles); err != nil {
@@ -56,6 +62,7 @@ func (t *Manager) GetAll() ([]models.Role, error) {
return roles, nil
}
// Get retrieves a role by ID.
func (t *Manager) Get(id int) (models.Role, error) {
var role = models.Role{}
if err := t.q.Get.Get(&role, id); err != nil {
@@ -65,6 +72,7 @@ func (t *Manager) Get(id int) (models.Role, error) {
return role, nil
}
// Delete deletes a role by ID.
func (t *Manager) Delete(id int) error {
if _, err := t.q.Delete.Exec(id); err != nil {
t.lo.Error("error deleting role", "error", err)
@@ -73,6 +81,7 @@ func (t *Manager) Delete(id int) error {
return nil
}
// Create creates a new role.
func (u *Manager) Create(r models.Role) error {
if _, err := u.q.Insert.Exec(r.Name, r.Description, pq.Array(r.Permissions)); err != nil {
u.lo.Error("error inserting role", "error", err)
@@ -81,6 +90,7 @@ func (u *Manager) Create(r models.Role) error {
return nil
}
// Update updates an existing role.
func (u *Manager) Update(id int, r models.Role) error {
if _, err := u.q.Update.Exec(id, r.Name, r.Description, pq.Array(r.Permissions)); err != nil {
u.lo.Error("error updating role", "error", err)

View File

@@ -1,3 +1,4 @@
// Package setting handles the management of application settings.
package setting
import (
@@ -15,18 +16,22 @@ var (
efs embed.FS
)
// Manager handles setting-related operations.
type Manager struct {
q queries
}
// Opts contains options for initializing the Manager.
type Opts struct {
DB *sqlx.DB
}
// queries contains prepared SQL queries.
type queries struct {
GetAll *sqlx.Stmt `query:"get-all"`
}
// New creates and returns a new instance of the Manager.
func New(opts Opts) (*Manager, error) {
var q queries
@@ -39,6 +44,7 @@ func New(opts Opts) (*Manager, error) {
}, nil
}
// GetAll retrieves all settings as a models.Settings struct.
func (m *Manager) GetAll() (models.Settings, error) {
var (
b types.JSONText
@@ -56,10 +62,9 @@ func (m *Manager) GetAll() (models.Settings, error) {
return out, nil
}
// GetAllJSON retrieves all settings as JSON.
func (m *Manager) GetAllJSON() (types.JSONText, error) {
var (
b types.JSONText
)
var b types.JSONText
if err := m.q.GetAll.Get(&b); err != nil {
return b, err

View File

@@ -1,3 +1,4 @@
// Package stringutil provides string utility functions.
package stringutil
import (
@@ -6,11 +7,9 @@ import (
// RandomAlNumString generates a random alphanumeric string of length n.
func RandomAlNumString(n int) (string, error) {
const (
dictionary = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
)
const dictionary = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
var bytes = make([]byte, n)
bytes := make([]byte, n)
if _, err := rand.Read(bytes); err != nil {
return "", err
@@ -23,13 +22,11 @@ func RandomAlNumString(n int) (string, error) {
return string(bytes), nil
}
// RandomNumericString generates a random digit numeric string of length n.
// RandomNumericString generates a random numeric string of length n.
func RandomNumericString(n int) (string, error) {
const (
dictionary = "0123456789"
)
const dictionary = "0123456789"
var bytes = make([]byte, n)
bytes := make([]byte, n)
if _, err := rand.Read(bytes); err != nil {
return "", err

View File

@@ -1,5 +0,0 @@
package systeminfo
const (
SystemUserUUID = "00000000-0000-0000-0000-000000000000"
)

View File

@@ -0,0 +1,9 @@
package models
import "time"
type Tag struct {
ID int `db:"id" json:"id"`
CreatedAt time.Time `db:"created_at" json:"created_at"`
Name string `db:"name" json:"name"`
}

View File

@@ -1,11 +1,12 @@
// Package tag handles the management of tags in the system.
package tag
import (
"embed"
"fmt"
"time"
"github.com/abhinavxd/artemis/internal/dbutil"
"github.com/abhinavxd/artemis/internal/tag/models"
"github.com/jmoiron/sqlx"
"github.com/zerodha/logf"
)
@@ -15,28 +16,26 @@ var (
efs embed.FS
)
type Tag struct {
ID int `db:"id" json:"id"`
CreatedAt time.Time `db:"created_at" json:"created_at"`
Name string `db:"name" json:"name"`
}
// Manager handles tag-related operations.
type Manager struct {
q queries
lo *logf.Logger
}
// Opts contains options for initializing the Manager.
type Opts struct {
DB *sqlx.DB
Lo *logf.Logger
}
// queries contains prepared SQL queries.
type queries struct {
GetAllTags *sqlx.Stmt `query:"get-all-tags"`
InsertTag *sqlx.Stmt `query:"insert-tag"`
DeleteTag *sqlx.Stmt `query:"delete-tag"`
}
// New creates and returns a new instance of the Manager.
func New(opts Opts) (*Manager, error) {
var q queries
@@ -50,15 +49,17 @@ func New(opts Opts) (*Manager, error) {
}, nil
}
func (t *Manager) GetAll() ([]Tag, error) {
var tt []Tag
if err := t.q.GetAllTags.Select(&tt); err != nil {
// GetAll retrieves all tags.
func (t *Manager) GetAll() ([]models.Tag, error) {
var tags []models.Tag
if err := t.q.GetAllTags.Select(&tags); err != nil {
t.lo.Error("fetching tags", "error", err)
return tt, fmt.Errorf("error fetching tags")
return tags, fmt.Errorf("error fetching tags")
}
return tt, nil
return tags, nil
}
// AddTag adds a new tag.
func (t *Manager) AddTag(name string) error {
if _, err := t.q.InsertTag.Exec(name); err != nil {
t.lo.Error("inserting tag", "error", err)
@@ -67,6 +68,7 @@ func (t *Manager) AddTag(name string) error {
return nil
}
// DeleteTag deletes a tag by ID.
func (t *Manager) DeleteTag(id int) error {
if _, err := t.q.DeleteTag.Exec(id); err != nil {
t.lo.Error("deleting tag", "error", err)

View File

@@ -1,3 +1,4 @@
// Package team handles the management of teams and their members.
package team
import (
@@ -19,16 +20,19 @@ var (
efs embed.FS
)
// Manager handles team-related operations.
type Manager struct {
lo *logf.Logger
q queries
}
// Opts contains options for initializing the Manager.
type Opts struct {
DB *sqlx.DB
Lo *logf.Logger
}
// queries contains prepared SQL queries.
type queries struct {
GetTeams *sqlx.Stmt `query:"get-teams"`
GetTeam *sqlx.Stmt `query:"get-team"`
@@ -37,6 +41,7 @@ type queries struct {
GetTeamMembers *sqlx.Stmt `query:"get-team-members"`
}
// New creates and returns a new instance of the Manager.
func New(opts Opts) (*Manager, error) {
var q queries
@@ -50,6 +55,7 @@ func New(opts Opts) (*Manager, error) {
}, nil
}
// GetAll retrieves all teams.
func (u *Manager) GetAll() ([]models.Team, error) {
var teams []models.Team
if err := u.q.GetTeams.Select(&teams); err != nil {
@@ -62,6 +68,7 @@ func (u *Manager) GetAll() ([]models.Team, error) {
return teams, nil
}
// GetTeam retrieves a team by ID.
func (u *Manager) GetTeam(id int) (models.Team, error) {
var team models.Team
if err := u.q.GetTeam.Get(&team, id); err != nil {
@@ -75,6 +82,7 @@ func (u *Manager) GetTeam(id int) (models.Team, error) {
return team, nil
}
// CreateTeam creates a new team.
func (u *Manager) CreateTeam(t models.Team) error {
if _, err := u.q.InsertTeam.Exec(t.Name); err != nil {
u.lo.Error("error inserting team", "error", err)
@@ -83,6 +91,7 @@ func (u *Manager) CreateTeam(t models.Team) error {
return nil
}
// UpdateTeam updates an existing team.
func (u *Manager) UpdateTeam(id int, t models.Team) error {
if _, err := u.q.UpdateTeam.Exec(id, t.Name); err != nil {
u.lo.Error("error updating team", "error", err)
@@ -91,6 +100,7 @@ func (u *Manager) UpdateTeam(id int, t models.Team) error {
return nil
}
// GetTeamMembers retrieves members of a team by team name.
func (u *Manager) GetTeamMembers(name string) ([]umodels.User, error) {
var users []umodels.User
if err := u.q.GetTeamMembers.Select(&users, name); err != nil {

View File

@@ -5,14 +5,14 @@ import (
"text/template"
)
// RenderDefault renders the system default template with the data.
// RenderDefault renders the system default template with the provided data.
func (m *Manager) RenderDefault(data interface{}) (string, string, error) {
templ, err := m.GetDefaultTemplate()
if err != nil {
return "", "", err
}
tmpl, err := template.New("").Parse(templ.Body)
tmpl, err := template.New("default").Parse(templ.Body)
if err != nil {
return "", "", err
}

View File

@@ -1,3 +1,4 @@
// Package template manages email templates including insertion and retrieval.
package template
import (
@@ -13,16 +14,19 @@ var (
efs embed.FS
)
// Manager handles template-related operations.
type Manager struct {
q queries
}
// queries contains prepared SQL queries.
type queries struct {
InsertTemplate *sqlx.Stmt `query:"insert-template"`
GetTemplate *sqlx.Stmt `query:"get-template"`
GetDefaultTemplate *sqlx.Stmt `query:"get-default-template"`
}
// New creates and returns a new instance of the Manager.
func New(db *sqlx.DB) (*Manager, error) {
var q queries
@@ -33,6 +37,7 @@ func New(db *sqlx.DB) (*Manager, error) {
return &Manager{q}, nil
}
// InsertTemplate inserts a new template with the given name, subject, and body.
func (m *Manager) InsertTemplate(name, subject, body string) error {
if _, err := m.q.InsertTemplate.Exec(name, subject, body); err != nil {
return err
@@ -40,6 +45,7 @@ func (m *Manager) InsertTemplate(name, subject, body string) error {
return nil
}
// GetTemplate retrieves a template by its name.
func (m *Manager) GetTemplate(name string) (models.Template, error) {
var template models.Template
if err := m.q.GetTemplate.Get(&template, name); err != nil {
@@ -48,6 +54,7 @@ func (m *Manager) GetTemplate(name string) (models.Template, error) {
return template, nil
}
// GetDefaultTemplate retrieves the default template.
func (m *Manager) GetDefaultTemplate() (models.Template, error) {
var template models.Template
if err := m.q.GetDefaultTemplate.Get(&template); err != nil {

View File

@@ -1,8 +0,0 @@
-- name: insert-upload
INSERT INTO uploads
(filename)
VALUES($1)
RETURNING id;
-- name: delete-upload
DELETE FROM uploads WHERE id = $1;

View File

@@ -1,80 +0,0 @@
package upload
import (
"embed"
"fmt"
"io"
"github.com/abhinavxd/artemis/internal/attachment"
"github.com/abhinavxd/artemis/internal/dbutil"
"github.com/jmoiron/sqlx"
"github.com/zerodha/logf"
)
var (
// Embedded filesystem
//go:embed queries.sql
efs embed.FS
uriUploads = "/upload/%s"
)
// Manager is the uploads manager.
type Manager struct {
Store attachment.Store
lo *logf.Logger
queries queries
appBaseURL string
}
type Opts struct {
Lo *logf.Logger
DB *sqlx.DB
AppBaseURL string
}
// New creates a new attachment manager instance.
func New(store attachment.Store, opt Opts) (*Manager, error) {
var q queries
// Scan SQL file
if err := dbutil.ScanSQLFile("queries.sql", &q, opt.DB, efs); err != nil {
return nil, err
}
return &Manager{
queries: q,
Store: store,
lo: opt.Lo,
appBaseURL: opt.AppBaseURL,
}, nil
}
type queries struct {
Insert *sqlx.Stmt `query:"insert-upload"`
Delete *sqlx.Stmt `query:"delete-upload"`
}
// Upload inserts the attachment details into the db and uploads the attachment.
func (m *Manager) Upload(fileName, contentType string, content io.ReadSeeker) (string, error) {
var uuid string
if err := m.queries.Insert.QueryRow(fileName).Scan(&uuid); err != nil {
m.lo.Error("error inserting upload", "error", err)
return uuid, err
}
if _, err := m.Store.Put(uuid, contentType, content); err != nil {
m.Delete(uuid)
return uuid, err
}
return m.appBaseURL + fmt.Sprintf(uriUploads, uuid), nil
}
// AttachMessage attaches given attachments to a message.
func (m *Manager) Delete(uuid string) error {
if err := m.Store.Delete(uuid); err != nil {
return err
}
return nil
}

View File

@@ -32,6 +32,7 @@ const (
SystemUserUUID = "00000000-0000-0000-0000-000000000000"
)
// Manager handles user-related operations.
type Manager struct {
lo *logf.Logger
i18n *i18n.I18n
@@ -39,13 +40,14 @@ type Manager struct {
bcryptCost int
}
// Opts contains options for initializing the Manager.
type Opts struct {
DB *sqlx.DB
Lo *logf.Logger
BcryptCost int
}
// Prepared queries.
// queries contains prepared SQL queries.
type queries struct {
CreateUser *sqlx.Stmt `query:"create-user"`
GetUsers *sqlx.Stmt `query:"get-users"`
@@ -57,6 +59,7 @@ type queries struct {
SetUserPassword *sqlx.Stmt `query:"set-user-password"`
}
// New creates and returns a new instance of the Manager.
func New(i18n *i18n.I18n, opts Opts) (*Manager, error) {
var q queries
@@ -72,6 +75,7 @@ func New(i18n *i18n.I18n, opts Opts) (*Manager, error) {
}, nil
}
// Login authenticates a user by email and password.
func (u *Manager) Login(email string, password []byte) (models.User, error) {
var user models.User
@@ -90,6 +94,7 @@ func (u *Manager) Login(email string, password []byte) (models.User, error) {
return user, nil
}
// GetUsers retrieves all users.
func (u *Manager) GetUsers() ([]models.User, error) {
var users []models.User
if err := u.q.GetUsers.Select(&users); err != nil {
@@ -103,10 +108,9 @@ func (u *Manager) GetUsers() ([]models.User, error) {
return users, nil
}
// Create creates a new user.
func (u *Manager) Create(user *models.User) error {
var (
password, _ = u.generatePassword()
)
password, _ := u.generatePassword()
user.Email = strings.ToLower(user.Email)
if _, err := u.q.CreateUser.Exec(user.Email, user.FirstName, user.LastName, password, user.TeamID, user.AvatarURL, pq.Array(user.Roles)); err != nil {
u.lo.Error("error creating user", "error", err)
@@ -115,7 +119,8 @@ func (u *Manager) Create(user *models.User) error {
return nil
}
func (u *Manager) GetUser(id int, uuid string) (models.User, error) {
// Get retrieves a user by ID or UUID.
func (u *Manager) Get(id int, uuid string) (models.User, error) {
var uu interface{}
if uuid != "" {
uu = uuid
@@ -132,10 +137,12 @@ func (u *Manager) GetUser(id int, uuid string) (models.User, error) {
return user, nil
}
// GetSystemUser retrieves the system user.
func (u *Manager) GetSystemUser() (models.User, error) {
return u.GetUser(0, SystemUserUUID)
return u.Get(0, SystemUserUUID)
}
// UpdateUser updates an existing user.
func (u *Manager) UpdateUser(id int, user models.User) error {
if _, err := u.q.UpdateUser.Exec(id, user.FirstName, user.LastName, user.Email, user.TeamID, pq.Array(user.Roles)); err != nil {
u.lo.Error("error updating user", "error", err)
@@ -144,6 +151,7 @@ func (u *Manager) UpdateUser(id int, user models.User) error {
return nil
}
// GetEmail retrieves the email of a user by ID or UUID.
func (u *Manager) GetEmail(id int, uuid string) (string, error) {
var uu interface{}
if uuid != "" {
@@ -161,6 +169,7 @@ func (u *Manager) GetEmail(id int, uuid string) (string, error) {
return email, nil
}
// GetPermissions retrieves the permissions of a user by ID.
func (u *Manager) GetPermissions(id int) ([]string, error) {
var permissions []string
if err := u.q.GetPermissions.Select(&permissions, id); err != nil {
@@ -170,24 +179,23 @@ func (u *Manager) GetPermissions(id int) ([]string, error) {
return permissions, nil
}
// verifyPassword compares the provided password with the stored password hash.
func (u *Manager) verifyPassword(pwd []byte, pwdHash string) error {
err := bcrypt.CompareHashAndPassword([]byte(pwdHash), pwd)
if err != nil {
if err := bcrypt.CompareHashAndPassword([]byte(pwdHash), pwd); err != nil {
return fmt.Errorf("invalid username or password")
}
return nil
}
// setPassword sets a new password for a user.
func (u *Manager) setPassword(uid int, pwd string) error {
// Bcrypt does not operate over 72 bytes.
if len(pwd) > 72 {
return ErrPasswordTooLong
}
bytes, err := u.generatePassword()
bytes, err := bcrypt.GenerateFromPassword([]byte(pwd), u.bcryptCost)
if err != nil {
return err
}
// Update password in db.
if _, err := u.q.SetUserPassword.Exec(bytes, uid); err != nil {
u.lo.Error("setting password", "error", err)
return fmt.Errorf("error setting password")
@@ -195,8 +203,9 @@ func (u *Manager) setPassword(uid int, pwd string) error {
return nil
}
// generatePassword generates a random password and returns its bcrypt hash.
func (u *Manager) generatePassword() ([]byte, error) {
var password, _ = stringutil.RandomAlNumString(16)
password, _ := stringutil.RandomAlNumString(16)
bytes, err := bcrypt.GenerateFromPassword([]byte(password), u.bcryptCost)
if err != nil {
u.lo.Error("error generating bcrypt password", "error", err)

View File

@@ -12,23 +12,26 @@ import (
)
const (
// SubscribeConversations to last 1000 conversations.
// SubscribeConversations to the last 1000 conversations.
// TODO: Move to config.
maxConversationsPagesToSub = 10
maxConversationsPageSize = 100
)
// SafeBool is a thread-safe boolean.
type SafeBool struct {
flag bool
mu sync.Mutex
}
// Set sets the value of the SafeBool.
func (b *SafeBool) Set(value bool) {
b.mu.Lock()
b.flag = value
b.mu.Unlock()
}
// Get returns the value of the SafeBool.
func (b *SafeBool) Get() bool {
b.mu.Lock()
defer b.mu.Unlock()
@@ -43,7 +46,7 @@ type Client struct {
// Hub.
Hub *Hub
// Ws Conn.
// WebSocket connection.
Conn *websocket.Conn
// To prevent pushes to the channel.
@@ -53,6 +56,7 @@ type Client struct {
Send chan Message
}
// Serve handles heartbeats and sending messages to the client.
func (c *Client) Serve(heartBeatDuration time.Duration) {
var heartBeatTicker = time.NewTicker(heartBeatDuration)
defer heartBeatTicker.Stop()
@@ -61,30 +65,31 @@ Loop:
select {
case <-heartBeatTicker.C:
if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil {
fmt.Println("error writing msg", err)
fmt.Println("error writing message", err)
return
}
case o, ok := <-c.Send:
case msg, ok := <-c.Send:
if !ok {
break Loop
}
c.Conn.WriteMessage(o.messageType, o.data)
c.Conn.WriteMessage(msg.messageType, msg.data)
}
}
c.Conn.Close()
}
// Listen listens for incoming messages from the client.
func (c *Client) Listen() {
for {
t, msg, err := c.Conn.ReadMessage()
msgType, msg, err := c.Conn.ReadMessage()
if err != nil {
break
}
if t == websocket.TextMessage {
if msgType == websocket.TextMessage {
c.processIncomingMessage(msg)
} else {
fmt.Printf("closing chan invalid msg type. %d \n", t)
fmt.Printf("closing channel due to invalid message type. %d \n", msgType)
c.Hub.RemoveClient(c)
c.close()
return
@@ -94,67 +99,68 @@ func (c *Client) Listen() {
c.close()
}
// processIncomingMessage processes incoming messages from client.
func (c *Client) processIncomingMessage(b []byte) {
var r models.IncomingReq
// processIncomingMessage processes incoming messages from the client.
func (c *Client) processIncomingMessage(data []byte) {
var req models.IncomingReq
if err := json.Unmarshal(b, &r); err != nil {
if err := json.Unmarshal(data, &req); err != nil {
return
}
switch r.Action {
switch req.Action {
case models.ActionConversationsSub:
var req = models.ConversationsSubscribe{}
if err := json.Unmarshal(b, &req); err != nil {
var subReq models.ConversationsSubscribe
if err := json.Unmarshal(data, &subReq); err != nil {
return
}
// First remove all user conversation subscriptions.
c.RemoveAllUserConversationSubscriptions(c.ID)
// Add the new subcriptions.
for page := range maxConversationsPagesToSub {
page++
conversationUUIDs, err := c.Hub.conversationStore.GetConversationUUIDs(c.ID, page, maxConversationsPageSize, req.Type, req.PreDefinedFilter)
// Add the new subscriptions.
for page := 1; page <= maxConversationsPagesToSub; page++ {
conversationUUIDs, err := c.Hub.conversationStore.GetConversationUUIDs(c.ID, page, maxConversationsPageSize, subReq.Type, subReq.PreDefinedFilter)
if err != nil {
log.Println("error fetching convesation ids", err)
log.Println("error fetching conversation ids", err)
continue
}
c.SubscribeConversations(c.ID, conversationUUIDs)
}
case models.ActionConversationSub:
var req = models.ConversationSubscribe{}
if err := json.Unmarshal(b, &req); err != nil {
var subReq models.ConversationSubscribe
if err := json.Unmarshal(data, &subReq); err != nil {
return
}
c.SubscribeConversations(c.ID, []string{req.UUID})
c.SubscribeConversations(c.ID, []string{subReq.UUID})
case models.ActionConversationUnSub:
var req = models.ConversationUnsubscribe{}
if err := json.Unmarshal(b, &req); err != nil {
var unsubReq models.ConversationUnsubscribe
if err := json.Unmarshal(data, &unsubReq); err != nil {
return
}
c.UnsubscribeConversation(c.ID, req.UUID)
c.UnsubscribeConversation(c.ID, unsubReq.UUID)
default:
fmt.Println("new incoming ws msg ", string(b))
fmt.Println("new incoming websocket message ", string(data))
}
}
// close closes the client connection and removes all subscriptions.
func (c *Client) close() {
c.RemoveAllUserConversationSubscriptions(c.ID)
c.Closed.Set(true)
close(c.Send)
}
// SubscribeConversations subscribes the client to the specified conversations.
func (c *Client) SubscribeConversations(userID int, conversationUUIDs []string) {
for _, conversationUUID := range conversationUUIDs {
// Initialize the slice if it doesn't exist
if c.Hub.ConversationSubs[conversationUUID] == nil {
c.Hub.ConversationSubs[conversationUUID] = []int{}
if c.Hub.conversationSubs[conversationUUID] == nil {
c.Hub.conversationSubs[conversationUUID] = []int{}
}
// Check if userID already exists
exists := false
for _, id := range c.Hub.ConversationSubs[conversationUUID] {
for _, id := range c.Hub.conversationSubs[conversationUUID] {
if id == userID {
exists = true
break
@@ -163,35 +169,37 @@ func (c *Client) SubscribeConversations(userID int, conversationUUIDs []string)
// Add userID if it doesn't exist
if !exists {
c.Hub.ConversationSubs[conversationUUID] = append(c.Hub.ConversationSubs[conversationUUID], userID)
c.Hub.conversationSubs[conversationUUID] = append(c.Hub.conversationSubs[conversationUUID], userID)
}
}
}
// UnsubscribeConversation unsubscribes the client from the specified conversation.
func (c *Client) UnsubscribeConversation(userID int, conversationUUID string) {
if userIDs, ok := c.Hub.ConversationSubs[conversationUUID]; ok {
if userIDs, ok := c.Hub.conversationSubs[conversationUUID]; ok {
for i, id := range userIDs {
if id == userID {
c.Hub.ConversationSubs[conversationUUID] = append(userIDs[:i], userIDs[i+1:]...)
c.Hub.conversationSubs[conversationUUID] = append(userIDs[:i], userIDs[i+1:]...)
break
}
}
if len(c.Hub.ConversationSubs[conversationUUID]) == 0 {
delete(c.Hub.ConversationSubs, conversationUUID)
if len(c.Hub.conversationSubs[conversationUUID]) == 0 {
delete(c.Hub.conversationSubs, conversationUUID)
}
}
}
// RemoveAllUserConversationSubscriptions removes all conversation subscriptions for the user.
func (c *Client) RemoveAllUserConversationSubscriptions(userID int) {
for conversationID, userIDs := range c.Hub.ConversationSubs {
for conversationUUID, userIDs := range c.Hub.conversationSubs {
for i, id := range userIDs {
if id == userID {
c.Hub.ConversationSubs[conversationID] = append(userIDs[:i], userIDs[i+1:]...)
c.Hub.conversationSubs[conversationUUID] = append(userIDs[:i], userIDs[i+1:]...)
break
}
}
if len(c.Hub.ConversationSubs[conversationID]) == 0 {
delete(c.Hub.ConversationSubs, conversationID)
if len(c.Hub.conversationSubs[conversationUUID]) == 0 {
delete(c.Hub.conversationSubs, conversationUUID)
}
}
}

View File

@@ -1,5 +1,6 @@
package models
// Action constants for WebSocket messages.
const (
ActionConversationsSub = "conversations_sub"
ActionConversationSub = "conversation_sub"
@@ -10,26 +11,33 @@ const (
MessageTypeConversationPropertyUpdate = "conv_prop_update"
)
// IncomingReq represents an incoming WebSocket request.
type IncomingReq struct {
Action string `json:"a"`
}
// ConversationsSubscribe represents a request to subscribe to conversations.
type ConversationsSubscribe struct {
Type string `json:"t"`
PreDefinedFilter string `json:"pf"`
}
// ConversationSubscribe represents a request to subscribe to a single conversation.
type ConversationSubscribe struct {
UUID string `json:"uuid"`
}
// ConversationUnsubscribe represents a request to unsubscribe from a single conversation.
type ConversationUnsubscribe struct {
UUID string `json:"uuid"`
}
// ConvSubUnsubReq represents a request to subscribe or unsubscribe from multiple conversations.
type ConvSubUnsubReq struct {
UUIDs []string `json:"v"`
}
// Message represents a WebSocket message.
type Message struct {
Type string `json:"typ"`
Data interface{} `json:"d"`

View File

@@ -1,3 +1,4 @@
// Package ws handles WebSocket connections and broadcasting messages to clients.
package ws
import (
@@ -12,57 +13,60 @@ import (
// Hub maintains the set of registered clients and their subscriptions.
type Hub struct {
clients map[int][]*Client
clientsMutex sync.Mutex
// Map of conversation uuid to a set of subscribed user IDs.
ConversationSubs map[string][]int
SubMut sync.Mutex
clients map[int][]*Client
clientsMutex sync.Mutex
conversationSubs map[string][]int
subMutex sync.Mutex
conversationStore ConversationStore
}
// ConversationStore defines the interface for retrieving conversation UUIDs.
type ConversationStore interface {
GetConversationUUIDs(userID, page, pageSize int, typ, predefinedFilter string) ([]string, error)
}
// NewHub creates a new Hub.
func NewHub() *Hub {
return &Hub{
clients: make(map[int][]*Client, 10000),
clientsMutex: sync.Mutex{},
ConversationSubs: make(map[string][]int),
SubMut: sync.Mutex{},
conversationSubs: make(map[string][]int),
subMutex: sync.Mutex{},
}
}
// Message represents a WebSocket message.
type Message struct {
messageType int
data []byte
}
// PushMessage represents a message to be pushed to clients.
type PushMessage struct {
Data []byte `json:"data"`
Users []int `json:"users"`
MaxUsers int `json:"max_users"`
}
// SetConversationStore sets the conversation store for the hub.
func (h *Hub) SetConversationStore(store ConversationStore) {
h.conversationStore = store
}
func (h *Hub) AddClient(c *Client) {
// AddClient adds a new client to the hub.
func (h *Hub) AddClient(client *Client) {
h.clientsMutex.Lock()
defer h.clientsMutex.Unlock()
h.clients[c.ID] = append(h.clients[c.ID], c)
h.clients[client.ID] = append(h.clients[client.ID], client)
}
// RemoveClient removes a client from the hub.
func (h *Hub) RemoveClient(client *Client) {
h.clientsMutex.Lock()
defer h.clientsMutex.Unlock()
if clients, ok := h.clients[client.ID]; ok {
for i, c := range clients {
if c == client {
// Remove the client from the slice
h.clients[client.ID] = append(clients[:i], clients[i+1:]...)
break
}
@@ -70,7 +74,7 @@ func (h *Hub) RemoveClient(client *Client) {
}
}
// ClientAlreadyConnected returns true if the client with this id is already connected else returns false.
// ClientAlreadyConnected returns true if the client with this ID is already connected.
func (h *Hub) ClientAlreadyConnected(id int) bool {
h.clientsMutex.Lock()
defer h.clientsMutex.Unlock()
@@ -78,26 +82,24 @@ func (h *Hub) ClientAlreadyConnected(id int) bool {
return ok
}
// PushMessage broadcasts a JSON data packet to all or some userIDs (with an optional max cap).
// PushMessage broadcasts a JSON data packet to specified userIDs (with an optional max cap).
// If userIDs is empty, the broadcast goes to all users.
func (h *Hub) PushMessage(m PushMessage) {
if len(m.Users) != 0 {
// The message has to go to specific userIDs.
func (h *Hub) PushMessage(msg PushMessage) {
if len(msg.Users) != 0 {
h.clientsMutex.Lock()
for _, userID := range m.Users {
for _, c := range h.clients[userID] {
c.Conn.WriteMessage(websocket.TextMessage, m.Data)
for _, userID := range msg.Users {
for _, client := range h.clients[userID] {
client.Conn.WriteMessage(websocket.TextMessage, msg.Data)
}
}
h.clientsMutex.Unlock()
} else {
// Message goes to all connected users.
n := 0
h.clientsMutex.Lock()
for _, cls := range h.clients {
for _, c := range cls {
c.Conn.WriteMessage(websocket.TextMessage, m.Data)
if m.MaxUsers > 0 && n >= m.MaxUsers {
for _, clients := range h.clients {
for _, client := range clients {
client.Conn.WriteMessage(websocket.TextMessage, msg.Data)
if msg.MaxUsers > 0 && n >= msg.MaxUsers {
break
}
}
@@ -107,8 +109,9 @@ func (h *Hub) PushMessage(m PushMessage) {
}
}
func (c *Hub) BroadcastNewConversationMessage(conversationUUID, trimmedMessage, messageUUID, lastMessageAt string, private bool) {
userIDs, ok := c.ConversationSubs[conversationUUID]
// BroadcastNewConversationMessage broadcasts a new conversation message.
func (h *Hub) BroadcastNewConversationMessage(conversationUUID, trimmedMessage, messageUUID, lastMessageAt string, private bool) {
userIDs, ok := h.conversationSubs[conversationUUID]
if !ok || len(userIDs) == 0 {
return
}
@@ -124,11 +127,12 @@ func (c *Hub) BroadcastNewConversationMessage(conversationUUID, trimmedMessage,
},
}
c.marshalAndPush(message, userIDs)
h.marshalAndPush(message, userIDs)
}
func (c *Hub) BroadcastMessagePropUpdate(conversationUUID, messageUUID, prop, value string) {
userIDs, ok := c.ConversationSubs[conversationUUID]
// BroadcastMessagePropUpdate broadcasts an update to a message property.
func (h *Hub) BroadcastMessagePropUpdate(conversationUUID, messageUUID, prop, value string) {
userIDs, ok := h.conversationSubs[conversationUUID]
if !ok || len(userIDs) == 0 {
return
}
@@ -142,28 +146,30 @@ func (c *Hub) BroadcastMessagePropUpdate(conversationUUID, messageUUID, prop, va
},
}
c.marshalAndPush(message, userIDs)
h.marshalAndPush(message, userIDs)
}
func (c *Hub) BroadcastConversationAssignment(userID int, conversationUUID string, avatarUrl string, firstName, lastName, lastMessage, inboxName string, lastMessageAt time.Time, unreadMessageCount int) {
// BroadcastConversationAssignment broadcasts the assignment of a conversation.
func (h *Hub) BroadcastConversationAssignment(userID int, conversationUUID, avatarURL, firstName, lastName, lastMessage, inboxName string, lastMessageAt time.Time, unreadMessageCount int) {
message := models.Message{
Type: models.MessageTypeNewConversation,
Data: map[string]interface{}{
"uuid": conversationUUID,
"avatar_url": avatarUrl,
"avatar_url": avatarURL,
"first_name": firstName,
"last_name": lastName,
"last_message": lastMessage,
"last_message_at": time.Now().Format(time.RFC3339),
"last_message_at": lastMessageAt.Format(time.RFC3339),
"inbox_name": inboxName,
"unread_message_count": unreadMessageCount,
},
}
c.marshalAndPush(message, []int{userID})
h.marshalAndPush(message, []int{userID})
}
func (c *Hub) BroadcastConversationPropertyUpdate(conversationUUID, prop string, value string) {
userIDs, ok := c.ConversationSubs[conversationUUID]
// BroadcastConversationPropertyUpdate broadcasts an update to a conversation property.
func (h *Hub) BroadcastConversationPropertyUpdate(conversationUUID, prop, value string) {
userIDs, ok := h.conversationSubs[conversationUUID]
if !ok || len(userIDs) == 0 {
return
}
@@ -177,19 +183,20 @@ func (c *Hub) BroadcastConversationPropertyUpdate(conversationUUID, prop string,
},
}
c.marshalAndPush(message, userIDs)
h.marshalAndPush(message, userIDs)
}
func (c *Hub) marshalAndPush(message models.Message, userIDs []int) {
messageB, err := json.Marshal(message)
// marshalAndPush marshals a message and pushes it to the specified user IDs.
func (h *Hub) marshalAndPush(message models.Message, userIDs []int) {
messageBytes, err := json.Marshal(message)
if err != nil {
return
}
fmt.Println("pushing ws msg", string(messageB), "type", message.Type, "to_user_ids", userIDs, "connected_userIds", len(c.clients))
fmt.Println("pushing ws msg", string(messageBytes), "type", message.Type, "to_user_ids", userIDs, "connected_userIds", len(h.clients))
c.PushMessage(PushMessage{
Data: messageB,
h.PushMessage(PushMessage{
Data: messageBytes,
Users: userIDs,
})
}