package conversation import ( "bytes" "context" "database/sql" "encoding/json" "errors" "fmt" "path/filepath" "slices" "strings" "time" "github.com/abhinavxd/libredesk/internal/attachment" amodels "github.com/abhinavxd/libredesk/internal/automation/models" "github.com/abhinavxd/libredesk/internal/conversation/models" "github.com/abhinavxd/libredesk/internal/envelope" "github.com/abhinavxd/libredesk/internal/image" "github.com/abhinavxd/libredesk/internal/inbox" mmodels "github.com/abhinavxd/libredesk/internal/media/models" "github.com/abhinavxd/libredesk/internal/sla" "github.com/abhinavxd/libredesk/internal/stringutil" umodels "github.com/abhinavxd/libredesk/internal/user/models" wmodels "github.com/abhinavxd/libredesk/internal/webhook/models" "github.com/lib/pq" "github.com/volatiletech/null/v9" ) const ( maxMessagesPerPage = 100 ) // Run starts a pool of worker goroutines to handle message dispatching via inbox's channel and processes incoming messages. It scans for // pending outgoing messages at the specified read interval and pushes them to the outgoing queue to be sent. func (m *Manager) Run(ctx context.Context, incomingQWorkers, outgoingQWorkers, scanInterval time.Duration) { dbScanner := time.NewTicker(scanInterval) defer dbScanner.Stop() for range outgoingQWorkers { m.wg.Add(1) go func() { defer m.wg.Done() m.MessageSenderWorker(ctx) }() } for range incomingQWorkers { m.wg.Add(1) go func() { defer m.wg.Done() m.IncomingMessageWorker(ctx) }() } // Scan pending outgoing messages and send them. for { select { case <-ctx.Done(): return case <-dbScanner.C: var ( pendingMessages = []models.Message{} messageIDs = m.getOutgoingProcessingMessageIDs() ) // Get pending outgoing messages and skip the currently processing message ids. if err := m.q.GetOutgoingPendingMessages.Select(&pendingMessages, pq.Array(messageIDs)); err != nil { m.lo.Error("error fetching pending messages from db", "error", err) continue } // Prepare and push the message to the outgoing queue. for _, message := range pendingMessages { // Put the message ID in the processing map. m.outgoingProcessingMessages.Store(message.ID, message.ID) // Push the message to the outgoing message queue. m.outgoingMessageQueue <- message } } } } // Close signals the Manager to stop processing messages, closes channels, // and waits for all worker goroutines to finish processing. func (m *Manager) Close() { m.closedMu.Lock() defer m.closedMu.Unlock() m.closed = true close(m.outgoingMessageQueue) close(m.incomingMessageQueue) m.wg.Wait() } // IncomingMessageWorker processes incoming messages from the incoming message queue. func (m *Manager) IncomingMessageWorker(ctx context.Context) { for { select { case <-ctx.Done(): return case msg, ok := <-m.incomingMessageQueue: if !ok { return } if err := m.processIncomingMessage(msg); err != nil { m.lo.Error("error processing incoming msg", "error", err) } } } } // MessageSenderWorker sends outgoing pending messages. func (m *Manager) MessageSenderWorker(ctx context.Context) { for { select { case <-ctx.Done(): return case message, ok := <-m.outgoingMessageQueue: if !ok { return } m.sendOutgoingMessage(message) } } } // sendOutgoingMessage sends an outgoing message. func (m *Manager) sendOutgoingMessage(message models.Message) { defer m.outgoingProcessingMessages.Delete(message.ID) // Helper function to handle errors handleError := func(err error, errorMsg string) bool { if err != nil { m.lo.Error(errorMsg, "error", err, "message_id", message.ID) m.UpdateMessageStatus(message.UUID, models.MessageStatusFailed) return true } return false } // Get inbox inbox, err := m.inboxStore.Get(message.InboxID) if handleError(err, "error fetching inbox") { return } // Render content in template if err := m.RenderMessageInTemplate(inbox.Channel(), &message); err != nil { handleError(err, "error rendering content in template") return } // Attach attachments to the message if err := m.attachAttachmentsToMessage(&message); err != nil { handleError(err, "error attaching attachments to message") return } // Set from address of the inbox message.From = inbox.FromAddress() // Set "In-Reply-To" and "References" headers, logging any errors but continuing to send the message. // Include only the last 20 messages as references to avoid exceeding header size limits. message.References, err = m.GetMessageSourceIDs(message.ConversationID, 20) if err != nil { m.lo.Error("Error fetching conversation source IDs", "error", err) } // References is sorted in DESC i.e newest message first, so reverse it to keep the references in order. slices.Reverse(message.References) // Remove the current message ID from the references. message.References = stringutil.RemoveItemByValue(message.References, message.SourceID.String) if len(message.References) > 0 { message.InReplyTo = message.References[len(message.References)-1] } // Send message err = inbox.Send(message) if handleError(err, "error sending message") { return } // Update status. m.UpdateMessageStatus(message.UUID, models.MessageStatusSent) // Skip system user replies since we only update timestamps and SLA for human replies. systemUser, err := m.userStore.GetSystemUser() if err != nil { m.lo.Error("error fetching system user", "error", err) return } if message.SenderID != systemUser.ID { conversation, err := m.GetConversation(message.ConversationID, "") if err != nil { m.lo.Error("error fetching conversation", "conversation_id", message.ConversationID, "error", err) return } now := time.Now() if conversation.FirstReplyAt.IsZero() { m.UpdateConversationFirstReplyAt(message.ConversationUUID, message.ConversationID, now) } m.UpdateConversationLastReplyAt(message.ConversationUUID, message.ConversationID, now) // Clear waiting since timestamp as agent has replied to the conversation. m.UpdateConversationWaitingSince(message.ConversationUUID, nil) // Mark latest SLA event for next response as met. metAt, err := m.slaStore.SetLatestSLAEventMetAt(conversation.AppliedSLAID.Int, sla.MetricNextResponse) if err != nil && !errors.Is(err, sla.ErrLatestSLAEventNotFound) { m.lo.Error("error setting next response SLA event `met_at`", "conversation_id", conversation.ID, "metric", sla.MetricNextResponse, "applied_sla_id", conversation.AppliedSLAID.Int, "error", err) } else if !metAt.IsZero() { m.BroadcastConversationUpdate(message.ConversationUUID, "next_response_met_at", metAt.Format(time.RFC3339)) } // Evaluate automation rules for outgoing message. m.automation.EvaluateConversationUpdateRulesByID(message.ConversationID, "", amodels.EventConversationMessageOutgoing) } } // RenderMessageInTemplate renders message content in template. func (m *Manager) RenderMessageInTemplate(channel string, message *models.Message) error { switch channel { case inbox.ChannelEmail: conversation, err := m.GetConversation(0, message.ConversationUUID) if err != nil { m.lo.Error("error fetching conversation", "uuid", message.ConversationUUID, "error", err) return fmt.Errorf("fetching conversation: %w", err) } sender, err := m.userStore.GetAgent(message.SenderID, "") if err != nil { m.lo.Error("error fetching message sender user", "sender_id", message.SenderID, "error", err) return fmt.Errorf("fetching message sender user: %w", err) } data := map[string]any{ "Conversation": map[string]any{ "ReferenceNumber": conversation.ReferenceNumber, "Subject": conversation.Subject.String, "Priority": conversation.Priority.String, "UUID": conversation.UUID, }, "Contact": map[string]any{ "FirstName": conversation.Contact.FirstName, "LastName": conversation.Contact.LastName, "FullName": conversation.Contact.FullName(), "Email": conversation.Contact.Email.String, }, "Recipient": map[string]any{ "FirstName": conversation.Contact.FirstName, "LastName": conversation.Contact.LastName, "FullName": conversation.Contact.FullName(), "Email": conversation.Contact.Email.String, }, "Author": map[string]any{ "FirstName": sender.FirstName, "LastName": sender.LastName, "FullName": sender.FullName(), "Email": sender.Email.String, }, } // For automated replies set author fields to empty strings as the recipients will see name as System. if sender.IsSystemUser() { data["Author"] = map[string]any{ "FirstName": "", "LastName": "", "FullName": "", "Email": "", } } message.Content, err = m.template.RenderEmailWithTemplate(data, message.Content) if err != nil { m.lo.Error("could not render email content using template", "id", message.ID, "error", err) return fmt.Errorf("could not render email content using template: %w", err) } default: m.lo.Warn("unknown message channel", "channel", channel) return fmt.Errorf("unknown message channel: %s", channel) } return nil } // GetConversationMessages retrieves messages for a specific conversation. func (m *Manager) GetConversationMessages(conversationUUID string, page, pageSize int) ([]models.Message, int, error) { var ( messages = make([]models.Message, 0) qArgs []interface{} ) qArgs = append(qArgs, conversationUUID) query, pageSize, qArgs, err := m.generateMessagesQuery(m.q.GetMessages, qArgs, page, pageSize) if err != nil { m.lo.Error("error generating messages query", "error", err) return messages, pageSize, envelope.NewError(envelope.GeneralError, m.i18n.Ts("globals.messages.errorFetching", "name", "{globals.terms.message}"), nil) } tx, err := m.db.BeginTxx(context.Background(), &sql.TxOptions{ ReadOnly: true, }) defer tx.Rollback() if err != nil { m.lo.Error("error preparing get messages query", "error", err) return messages, pageSize, envelope.NewError(envelope.GeneralError, m.i18n.Ts("globals.messages.errorFetching", "name", "{globals.terms.message}"), nil) } if err := tx.Select(&messages, query, qArgs...); err != nil { m.lo.Error("error fetching conversations", "error", err) return messages, pageSize, envelope.NewError(envelope.GeneralError, m.i18n.Ts("globals.messages.errorFetching", "name", "{globals.terms.message}"), nil) } return messages, pageSize, nil } // GetMessage retrieves a message by UUID. func (m *Manager) GetMessage(uuid string) (models.Message, error) { var message models.Message if err := m.q.GetMessage.Get(&message, uuid); err != nil { m.lo.Error("error fetching message", "uuid", uuid, "error", err) return message, envelope.NewError(envelope.GeneralError, m.i18n.Ts("globals.messages.errorFetching", "name", "{globals.terms.message}"), nil) } return message, nil } // UpdateMessageStatus updates the status of a message. func (m *Manager) UpdateMessageStatus(messageUUID string, status string) error { if _, err := m.q.UpdateMessageStatus.Exec(status, messageUUID); err != nil { m.lo.Error("error updating message status", "message_uuid", messageUUID, "error", err) return err } // Broadcast message status update to all conversation subscribers. conversationUUID, _ := m.getConversationUUIDFromMessageUUID(messageUUID) m.BroadcastMessageUpdate(conversationUUID, messageUUID, "status" /*property*/, status) // Trigger webhook for message update. if message, err := m.GetMessage(messageUUID); err != nil { m.lo.Error("error fetching message for webhook event", "uuid", messageUUID, "error", err) } else { m.webhookStore.TriggerEvent(wmodels.EventMessageUpdated, message) } return nil } // MarkMessageAsPending updates message status to `Pending`, enqueuing it for sending. func (m *Manager) MarkMessageAsPending(uuid string) error { if err := m.UpdateMessageStatus(uuid, models.MessageStatusPending); err != nil { m.lo.Error("error marking message as pending", "uuid", uuid, "error", err) return envelope.NewError(envelope.GeneralError, m.i18n.Ts("globals.messages.errorSending", "name", "{globals.terms.message}"), nil) } return nil } // SendPrivateNote inserts a private message in a conversation. func (m *Manager) SendPrivateNote(media []mmodels.Media, senderID int, conversationUUID, content string) (models.Message, error) { message := models.Message{ ConversationUUID: conversationUUID, SenderID: senderID, Type: models.MessageOutgoing, SenderType: models.SenderTypeAgent, Status: models.MessageStatusSent, Content: content, ContentType: models.ContentTypeHTML, Private: true, Media: media, } if err := m.InsertMessage(&message); err != nil { return models.Message{}, err } return message, nil } // CreateContactMessage creates a contact message in a conversation. func (m *Manager) CreateContactMessage(media []mmodels.Media, contactID int, conversationUUID, content, contentType string) (models.Message, error) { message := models.Message{ ConversationUUID: conversationUUID, SenderID: contactID, Type: models.MessageIncoming, SenderType: models.SenderTypeContact, Status: models.MessageStatusReceived, Content: content, ContentType: contentType, Private: false, Media: media, } if err := m.InsertMessage(&message); err != nil { return models.Message{}, err } return message, nil } // QueueReply queues a reply message in a conversation. func (m *Manager) QueueReply(media []mmodels.Media, inboxID, senderID int, conversationUUID, content string, to, cc, bcc []string, meta map[string]interface{}) (models.Message, error) { var ( message = models.Message{} ) // Clear empty fields in to, cc, bcc. to = stringutil.RemoveEmpty(to) cc = stringutil.RemoveEmpty(cc) bcc = stringutil.RemoveEmpty(bcc) if len(to) == 0 { return message, envelope.NewError(envelope.GeneralError, m.i18n.Ts("globals.messages.empty", "name", "`to`"), nil) } meta["to"] = to if len(cc) > 0 { meta["cc"] = cc } if len(bcc) > 0 { meta["bcc"] = bcc } metaJSON, err := json.Marshal(meta) if err != nil { return message, envelope.NewError(envelope.GeneralError, m.i18n.Ts("globals.messages.errorMarshalling", "name", "{globals.terms.meta}"), nil) } // Generate unique source ID i.e. message-id for email. inbox, err := m.inboxStore.GetDBRecord(inboxID) if err != nil { return message, err } sourceID, err := stringutil.GenerateEmailMessageID(conversationUUID, inbox.From) if err != nil { m.lo.Error("error generating source message id", "error", err) return message, envelope.NewError(envelope.GeneralError, m.i18n.T("conversation.errorGeneratingMessageID"), nil) } // Insert Message. message = models.Message{ ConversationUUID: conversationUUID, SenderID: senderID, Type: models.MessageOutgoing, SenderType: models.SenderTypeAgent, Status: models.MessageStatusPending, Content: content, ContentType: models.ContentTypeHTML, Private: false, Media: media, Meta: metaJSON, SourceID: null.StringFrom(sourceID), } if err := m.InsertMessage(&message); err != nil { return models.Message{}, err } return message, nil } // InsertMessage inserts a message and attaches the media to the message. func (m *Manager) InsertMessage(message *models.Message) error { if message.Private { message.Status = models.MessageStatusSent } if len(message.Meta) == 0 || string(message.Meta) == "null" { message.Meta = json.RawMessage(`{}`) } // Handle empty content type enum, default to text. if message.ContentType == "" { message.ContentType = models.ContentTypeText } // Convert HTML content to text for search. message.TextContent = stringutil.HTML2Text(message.Content) // Insert and scan the message into the struct. if err := m.q.InsertMessage.Get(message, message.Type, message.Status, message.ConversationID, message.ConversationUUID, message.Content, message.TextContent, message.SenderID, message.SenderType, message.Private, message.ContentType, message.SourceID, message.Meta); err != nil { m.lo.Error("error inserting message in db", "error", err) return envelope.NewError(envelope.GeneralError, m.i18n.Ts("globals.messages.errorInserting", "name", "{globals.terms.message}"), nil) } // Attach just inserted message to the media. for _, media := range message.Media { m.mediaStore.Attach(media.ID, mmodels.ModelMessages, message.ID) } // Add this user as a participant. m.addConversationParticipant(message.SenderID, message.ConversationUUID) // Hide CSAT message content as it contains a public link to the survey. lastMessage := message.TextContent if message.HasCSAT() { lastMessage = "Please rate your experience with us" } // Update conversation last message details in conversation. m.UpdateConversationLastMessage(message.ConversationID, message.ConversationUUID, lastMessage, message.SenderType, message.CreatedAt) // Broadcast new message. m.BroadcastNewMessage(message) // Refetch message if this message has media attachments, as media gets linked after inserting the message. if len(message.Media) > 0 { refetchedMessage, err := m.GetMessage(message.UUID) if err != nil { m.lo.Error("error fetching message after insert", "error", err) } else { // Replace the message in the struct with the refetched message. *message = refetchedMessage } } // Trigger webhook for new message created. m.webhookStore.TriggerEvent(wmodels.EventMessageCreated, message) return nil } // RecordAssigneeUserChange records an activity for a user assignee change. func (m *Manager) RecordAssigneeUserChange(conversationUUID string, assigneeID int, actor umodels.User) error { // Self assignment. if assigneeID == actor.ID { return m.InsertConversationActivity(models.ActivitySelfAssign, conversationUUID, actor.FullName(), actor) } // Assignment to another user. assignee, err := m.userStore.GetAgent(assigneeID, "") if err != nil { return err } return m.InsertConversationActivity(models.ActivityAssignedUserChange, conversationUUID, assignee.FullName(), actor) } // RecordAssigneeTeamChange records an activity for a team assignee change. func (m *Manager) RecordAssigneeTeamChange(conversationUUID string, teamID int, actor umodels.User) error { team, err := m.teamStore.Get(teamID) if err != nil { return err } return m.InsertConversationActivity(models.ActivityAssignedTeamChange, conversationUUID, team.Name, actor) } // RecordPriorityChange records an activity for a priority change. func (m *Manager) RecordPriorityChange(priority, conversationUUID string, actor umodels.User) error { return m.InsertConversationActivity(models.ActivityPriorityChange, conversationUUID, priority, actor) } // RecordStatusChange records an activity for a status change. func (m *Manager) RecordStatusChange(status, conversationUUID string, actor umodels.User) error { return m.InsertConversationActivity(models.ActivityStatusChange, conversationUUID, status, actor) } // RecordSLASet records an activity for an SLA set. func (m *Manager) RecordSLASet(conversationUUID string, slaName string, actor umodels.User) error { return m.InsertConversationActivity(models.ActivitySLASet, conversationUUID, slaName, actor) } // RecordTagAddition records an activity for a tag addition. func (m *Manager) RecordTagAddition(conversationUUID string, tag string, actor umodels.User) error { return m.InsertConversationActivity(models.ActivityTagAdded, conversationUUID, tag, actor) } // RecordTagRemoval records an activity for a tag removal. func (m *Manager) RecordTagRemoval(conversationUUID string, tag string, actor umodels.User) error { return m.InsertConversationActivity(models.ActivityTagRemoved, conversationUUID, tag, actor) } // InsertConversationActivity inserts an activity message. func (m *Manager) InsertConversationActivity(activityType, conversationUUID, newValue string, actor umodels.User) error { content, err := m.getMessageActivityContent(activityType, newValue, actor.FullName()) if err != nil { m.lo.Error("error could not generate activity content", "error", err) return envelope.NewError(envelope.GeneralError, m.i18n.Ts("globals.messages.errorGenerating", "name", "{globals.terms.activityMessage}"), nil) } message := models.Message{ Type: models.MessageActivity, Status: models.MessageStatusSent, Content: content, ContentType: models.ContentTypeText, ConversationUUID: conversationUUID, Private: true, SenderID: actor.ID, SenderType: models.SenderTypeAgent, } if err := m.InsertMessage(&message); err != nil { m.lo.Error("error inserting activity message", "error", err) return envelope.NewError(envelope.GeneralError, m.i18n.Ts("globals.messages.errorInserting", "name", "{globals.terms.activityMessage}"), nil) } return nil } // getConversationUUIDFromMessageUUID returns conversation UUID from message UUID. func (m *Manager) getConversationUUIDFromMessageUUID(uuid string) (string, error) { var conversationUUID string if err := m.q.GetConversationUUIDFromMessageUUID.Get(&conversationUUID, uuid); err != nil { m.lo.Error("error fetching conversation uuid from message uuid", "uuid", uuid, "error", err) return conversationUUID, err } return conversationUUID, nil } // getMessageActivityContent generates activity content based on the activity type. func (m *Manager) getMessageActivityContent(activityType, newValue, actorName string) (string, error) { var content = "" switch activityType { case models.ActivityAssignedUserChange: content = fmt.Sprintf("Assigned to %s by %s", newValue, actorName) case models.ActivityAssignedTeamChange: content = fmt.Sprintf("Assigned to %s team by %s", newValue, actorName) case models.ActivitySelfAssign: content = fmt.Sprintf("%s self-assigned this conversation", actorName) case models.ActivityPriorityChange: content = fmt.Sprintf("%s set priority to %s", actorName, newValue) case models.ActivityStatusChange: content = fmt.Sprintf("%s marked the conversation as %s", actorName, newValue) case models.ActivityTagAdded: content = fmt.Sprintf("%s added tag %s", actorName, newValue) case models.ActivityTagRemoved: content = fmt.Sprintf("%s removed tag %s", actorName, newValue) case models.ActivitySLASet: content = fmt.Sprintf("%s set %s SLA policy", actorName, newValue) default: return "", fmt.Errorf("invalid activity type %s", activityType) } return content, nil } // processIncomingMessage handles the insertion of an incoming message and // associated contact. It finds or creates the contact, checks for existing // conversations, and creates a new conversation if necessary. It also // inserts the message, uploads any attachments, and queues the conversation evaluation of automation rules. func (m *Manager) processIncomingMessage(in models.IncomingMessage) error { // Find or create contact and set sender ID in message. if err := m.userStore.CreateContact(&in.Contact); err != nil { m.lo.Error("error upserting contact", "error", err) return err } in.Message.SenderID = in.Contact.ID // Conversation already exists for this message? Skip if it does. conversationID, err := m.findConversationID([]string{in.Message.SourceID.String}) if err != nil && err != errConversationNotFound { return err } if conversationID > 0 { return nil } // Find or create new conversation. isNewConversation, err := m.findOrCreateConversation(&in.Message, in.InboxID, in.Contact.ContactChannelID, in.Contact.ID) if err != nil { return err } // Upload message attachments, on failure delete the conversation if it was just created for this message. if upErr := m.uploadMessageAttachments(&in.Message); upErr != nil { m.lo.Error("error uploading message attachments", "message_source_id", in.Message.SourceID, "error", upErr) if isNewConversation && in.Message.ConversationUUID != "" { m.lo.Info("deleting conversation as message attachment upload failed", "conversation_uuid", in.Message.ConversationUUID, "message_source_id", in.Message.SourceID) if err := m.DeleteConversation(in.Message.ConversationUUID); err != nil { return fmt.Errorf("error deleting conversation after message attachment upload failure: %w", err) } } return fmt.Errorf("error uploading message attachments: %w", upErr) } // Insert message. if err = m.InsertMessage(&in.Message); err != nil { return err } // Evaluate automation rules & send webhook events. if isNewConversation { conversation, err := m.GetConversation(in.Message.ConversationID, "") if err == nil { m.webhookStore.TriggerEvent(wmodels.EventConversationCreated, conversation) m.automation.EvaluateNewConversationRules(conversation) } return nil } // Reopen conversation if it's not Open. systemUser, err := m.userStore.GetSystemUser() if err != nil { m.lo.Error("error fetching system user", "error", err) } else { if err := m.ReOpenConversation(in.Message.ConversationUUID, systemUser); err != nil { m.lo.Error("error reopening conversation", "error", err) } } // Set waiting since timestamp, this gets cleared when agent replies to the conversation. now := time.Now() m.UpdateConversationWaitingSince(in.Message.ConversationUUID, &now) // Create SLA event for next response if a SLA is applied and has next response time set, subsequent agent replies will mark this event as met. // This cycle continues for next response time SLA metric. conversation, err := m.GetConversation(in.Message.ConversationID, "") if err != nil { m.lo.Error("error fetching conversation", "conversation_id", in.Message.ConversationID, "error", err) } else { // Trigger automations on incoming message event. m.automation.EvaluateConversationUpdateRules(conversation, amodels.EventConversationMessageIncoming) if conversation.SLAPolicyID.Int == 0 { m.lo.Info("no SLA policy applied to conversation, skipping next response SLA event creation") return nil } if deadline, err := m.slaStore.CreateNextResponseSLAEvent(conversation.ID, conversation.AppliedSLAID.Int, conversation.SLAPolicyID.Int, conversation.AssignedTeamID.Int); err != nil && !errors.Is(err, sla.ErrUnmetSLAEventAlreadyExists) { m.lo.Error("error creating next response SLA event", "conversation_id", conversation.ID, "error", err) } else if !deadline.IsZero() { m.lo.Info("next response SLA event created for conversation", "conversation_id", conversation.ID, "deadline", deadline, "sla_policy_id", conversation.SLAPolicyID.Int) m.BroadcastConversationUpdate(in.Message.ConversationUUID, "next_response_deadline_at", deadline.Format(time.RFC3339)) // Clear next response met at timestamp as this event was just created. m.BroadcastConversationUpdate(in.Message.ConversationUUID, "next_response_met_at", nil) } } return nil } // MessageExists checks if a message with the given messageID exists. func (m *Manager) MessageExists(messageID string) (bool, error) { _, err := m.findConversationID([]string{messageID}) if err != nil { if errors.Is(err, errConversationNotFound) { return false, nil } m.lo.Error("error fetching message from db", "error", err) return false, err } return true, nil } // EnqueueIncoming enqueues an incoming message for inserting in db. func (m *Manager) EnqueueIncoming(message models.IncomingMessage) error { m.closedMu.Lock() defer m.closedMu.Unlock() if m.closed { return errors.New("incoming message queue is closed") } select { case m.incomingMessageQueue <- message: return nil default: m.lo.Warn("WARNING: incoming message queue is full") return errors.New("incoming message queue is full") } } // GetConversationByMessageID returns conversation by message id. func (m *Manager) GetConversationByMessageID(id int) (models.Conversation, error) { var conversation = models.Conversation{} if err := m.q.GetConversationByMessageID.Get(&conversation, id); err != nil { if err == sql.ErrNoRows { return conversation, envelope.NewError(envelope.NotFoundError, m.i18n.Ts("globals.messages.notFound", "name", "{globals.terms.conversation}"), nil) } m.lo.Error("error fetching message from DB", "error", err) return conversation, envelope.NewError(envelope.GeneralError, m.i18n.Ts("globals.messages.errorFetching", "name", "{globals.terms.conversation}"), nil) } return conversation, nil } // generateMessagesQuery generates the SQL query for fetching messages in a conversation. func (c *Manager) generateMessagesQuery(baseQuery string, qArgs []interface{}, page, pageSize int) (string, int, []interface{}, error) { if page <= 0 { return "", 0, nil, errors.New("page must be greater than 0") } if pageSize > maxMessagesPerPage { pageSize = maxMessagesPerPage } if pageSize <= 0 { return "", 0, nil, errors.New("page size must be greater than 0") } // Calculate the offset offset := (page - 1) * pageSize // Append LIMIT and OFFSET to query arguments qArgs = append(qArgs, pageSize, offset) // Include LIMIT and OFFSET in the SQL query sqlQuery := fmt.Sprintf(baseQuery, fmt.Sprintf("LIMIT $%d OFFSET $%d", len(qArgs)-1, len(qArgs))) return sqlQuery, pageSize, qArgs, nil } // uploadMessageAttachments uploads all attachments for a message. func (m *Manager) uploadMessageAttachments(message *models.Message) error { if len(message.Attachments) == 0 { return nil } for _, attachment := range message.Attachments { // Check if this attachment already exists by the content ID, as inline images can be repeated across conversations. contentID := attachment.ContentID if contentID != "" { // Make content ID MORE unique by prefixing it with the conversation UUID, as content id is not globally unique practically, // different messages can have the same content ID, I do not have the message ID at this point, so I am using sticking with the conversation UUID // to make it more unique. contentID = message.ConversationUUID + "_" + contentID exists, uuid, err := m.mediaStore.ContentIDExists(contentID) if err != nil { m.lo.Error("error checking media existence by content ID", "content_id", contentID, "error", err) } // This attachment already exists, replace the cid:content_id with the media relative url, not using absolute path as the root path can change. if exists { m.lo.Debug("attachment with content ID already exists replacing content ID with media relative URL", "content_id", contentID, "media_uuid", uuid) message.Content = strings.ReplaceAll(message.Content, fmt.Sprintf("cid:%s", attachment.ContentID), "/uploads/"+uuid) continue } // Attachment does not exist, replace the content ID with the new more unique content ID. message.Content = strings.ReplaceAll(message.Content, fmt.Sprintf("cid:%s", attachment.ContentID), fmt.Sprintf("cid:%s", contentID)) } // Sanitize filename. attachment.Name = stringutil.SanitizeFilename(attachment.Name) m.lo.Debug("uploading message attachment", "name", attachment.Name, "content_id", contentID, "size", attachment.Size, "content_type", attachment.ContentType, "content_id", contentID, "disposition", attachment.Disposition) // Upload and insert entry in media table. attachReader := bytes.NewReader(attachment.Content) media, err := m.mediaStore.UploadAndInsert( attachment.Name, attachment.ContentType, contentID, /** Linking media to message happens later **/ null.String{}, /** modelType */ null.Int{}, /** modelID **/ attachReader, attachment.Size, null.StringFrom(attachment.Disposition), []byte("{}"), /** meta **/ ) if err != nil { m.lo.Error("failed to upload attachment", "name", attachment.Name, "error", err) return fmt.Errorf("failed to upload media %s: %w", attachment.Name, err) } // If the attachment is an image, generate and upload thumbnail. attachmentExt := strings.TrimPrefix(strings.ToLower(filepath.Ext(attachment.Name)), ".") if slices.Contains(image.Exts, attachmentExt) { if err := m.uploadThumbnailForMedia(media, attachment.Content); err != nil { m.lo.Error("error uploading thumbnail", "error", err) return fmt.Errorf("error uploading thumbnail for media %s: %w", attachment.Name, err) } } message.Media = append(message.Media, media) } return nil } // findOrCreateConversation finds or creates a conversation for the given message. func (m *Manager) findOrCreateConversation(in *models.Message, inboxID, contactChannelID, contactID int) (bool, error) { var ( new bool err error conversationID int conversationUUID string ) // Search for existing conversation using the in-reply-to and references. sourceIDs := append([]string{in.InReplyTo}, in.References...) conversationID, err = m.findConversationID(sourceIDs) if err != nil && err != errConversationNotFound { return new, err } // Conversation not found, create one. if conversationID == 0 { new = true lastMessage := stringutil.HTML2Text(in.Content) lastMessageAt := time.Now() conversationID, conversationUUID, err = m.CreateConversation(contactID, contactChannelID, inboxID, lastMessage, lastMessageAt, in.Subject, false /**append reference number to subject**/) if err != nil || conversationID == 0 { return new, err } in.ConversationID = conversationID in.ConversationUUID = conversationUUID return new, nil } // Get UUID. if conversationUUID == "" { conversationUUID, err = m.GetConversationUUID(conversationID) if err != nil { return new, err } } in.ConversationID = conversationID in.ConversationUUID = conversationUUID return new, nil } // findConversationID finds the conversation ID from the message source ID. func (m *Manager) findConversationID(messageSourceIDs []string) (int, error) { if len(messageSourceIDs) == 0 { return 0, errConversationNotFound } var conversationID int if err := m.q.MessageExistsBySourceID.QueryRow(pq.Array(messageSourceIDs)).Scan(&conversationID); err != nil { if err == sql.ErrNoRows { return conversationID, errConversationNotFound } m.lo.Error("error fetching msg from DB", "error", err) return conversationID, err } return conversationID, nil } // attachAttachmentsToMessage attaches attachment blobs to message. func (m *Manager) attachAttachmentsToMessage(message *models.Message) error { var attachments attachment.Attachments // Get all media for this message. medias, err := m.mediaStore.GetByModel(message.ID, mmodels.ModelMessages) if err != nil { m.lo.Error("error fetching message attachments", "error", err) return err } // Fetch blobs. for _, media := range medias { blob, err := m.mediaStore.GetBlob(media.UUID) if err != nil { m.lo.Error("error fetching media blob", "error", err) return err } attachment := attachment.Attachment{ Name: media.Filename, Content: blob, Header: attachment.MakeHeader(media.ContentType, media.UUID, media.Filename, "base64", media.Disposition.String), } attachments = append(attachments, attachment) } // Attach attachments. message.Attachments = attachments return nil } // getOutgoingProcessingMessageIDs returns the IDs of outgoing messages currently being processed. func (m *Manager) getOutgoingProcessingMessageIDs() []int { var out = make([]int, 0) m.outgoingProcessingMessages.Range(func(key, _ any) bool { if k, ok := key.(int); ok { out = append(out, k) } return true }) return out } // uploadThumbnailForMedia prepares and uploads a thumbnail for an image attachment. func (m *Manager) uploadThumbnailForMedia(media mmodels.Media, content []byte) error { // Create a reader from the content file := bytes.NewReader(content) // Seek to the beginning of the file file.Seek(0, 0) // Create the thumbnail thumbFile, err := image.CreateThumb(image.DefThumbSize, file) if err != nil { return fmt.Errorf("error creating thumbnail: %w", err) } // Generate thumbnail name thumbName := fmt.Sprintf("thumb_%s", media.UUID) // Upload the thumbnail if _, err := m.mediaStore.Upload(thumbName, media.ContentType, thumbFile); err != nil { m.lo.Error("error uploading thumbnail", "error", err) return fmt.Errorf("error uploading thumbnail: %w", err) } return nil } // getLatestMessage returns the latest message in a conversation. func (m *Manager) getLatestMessage(conversationID int, typ []string, status []string, excludePrivate bool) (models.Message, error) { var message models.Message if err := m.q.GetLatestMessage.Get(&message, conversationID, pq.Array(typ), pq.Array(status), excludePrivate); err != nil { if err == sql.ErrNoRows { return message, sql.ErrNoRows } m.lo.Error("error fetching latest message from DB", "error", err) return message, fmt.Errorf("fetching latest message: %w", err) } return message, nil }