Send batched unread messages in conversation continuity email after contact has not seen a conversation for configured amount of time, instead of sending a single message immediately when contact is disconnected on websocket.

Make the batching interval and time threshold for unread messages configurable
This commit is contained in:
Abhinav Raut
2025-10-03 01:54:17 +05:30
parent 1de54fe110
commit 16ca6b6df7
20 changed files with 566 additions and 187 deletions

View File

@@ -226,11 +226,30 @@ func initConversations(
template *tmpl.Manager,
webhook *webhook.Manager,
) *conversation.Manager {
continuityConfig := &conversation.ContinuityConfig{}
if ko.Exists("conversation.continuity.batch_check_interval") {
continuityConfig.BatchCheckInterval = ko.MustDuration("conversation.continuity.batch_check_interval")
}
if ko.Exists("conversation.continuity.offline_threshold") {
continuityConfig.OfflineThreshold = ko.MustDuration("conversation.continuity.offline_threshold")
}
if ko.Exists("conversation.continuity.min_email_interval") {
continuityConfig.MinEmailInterval = ko.MustDuration("conversation.continuity.min_email_interval")
}
if ko.Exists("conversation.continuity.max_messages_per_email") {
continuityConfig.MaxMessagesPerEmail = ko.MustInt("conversation.continuity.max_messages_per_email")
}
c, err := conversation.New(hub, i18n, notif, sla, status, priority, inboxStore, userStore, teamStore, mediaStore, settings, csat, automationEngine, template, webhook, conversation.Opts{
DB: db,
Lo: initLogger("conversation_manager"),
OutgoingMessageQueueSize: ko.MustInt("message.outgoing_queue_size"),
IncomingMessageQueueSize: ko.MustInt("message.incoming_queue_size"),
ContinuityConfig: continuityConfig,
})
if err != nil {
log.Fatalf("error initializing conversation manager: %v", err)

View File

@@ -217,6 +217,7 @@ func main() {
go autoassigner.Run(ctx, autoAssignInterval)
go conversation.Run(ctx, messageIncomingQWorkers, messageOutgoingQWorkers, messageOutgoingScanInterval)
go conversation.RunUnsnoozer(ctx, unsnoozeInterval)
go conversation.RunContinuity(ctx)
go webhook.Run(ctx)
go notifier.Run(ctx)
go sla.Run(ctx, slaEvaluationInterval)

View File

@@ -119,6 +119,12 @@ timeout = "15s"
# How often to check for conversations to unsnooze
unsnooze_interval = "5m"
[conversation.continuity]
offline_threshold = "10m"
batch_check_interval = "5m"
max_messages_per_email = 10
min_email_interval = "15m"
[sla]
# How often to evaluate SLA compliance for conversations
evaluation_interval = "5m"

View File

@@ -1,5 +1,8 @@
<template>
<div class="libredesk-widget-app text-foreground bg-background" :class="{ 'dark': widgetStore.config.dark_mode }">
<div
class="libredesk-widget-app text-foreground bg-background"
:class="{ dark: widgetStore.config.dark_mode }"
>
<div class="widget-container">
<MainLayout />
</div>
@@ -28,25 +31,32 @@ onMounted(async () => {
if (widgetConfig) {
widgetStore.updateConfig(widgetConfig)
}
initializeWebSocket()
widgetStore.openWidget()
widgetStore.setOpen(true)
setupParentMessageListeners()
await chatStore.fetchConversations()
// Notify parent window that Vue app is ready
window.parent.postMessage({
type: 'VUE_APP_READY'
}, '*')
window.parent.postMessage(
{
type: 'VUE_APP_READY'
},
'*'
)
})
// Listen for messages from parent window (widget.js)
const setupParentMessageListeners = () => {
window.addEventListener('message', (event) => {
if (event.data.type === 'SET_MOBILE_STATE') {
if (event.data.type == 'WIDGET_CLOSED') {
widgetStore.setOpen(false)
} else if (event.data.type === 'WIDGET_OPENED') {
widgetStore.setOpen(true)
} else if (event.data.type === 'SET_MOBILE_STATE') {
widgetStore.setMobileFullScreen(event.data.isMobile)
} else if (event.data.type === 'WIDGET_EXPANDED') {
widgetStore.setExpanded(event.data.isExpanded)

View File

@@ -87,7 +87,7 @@
</div>
<!-- Message metadata -->
<div class="text-xs text-muted-foreground mt-1 flex items-center gap-2">
<div class="text-[10px] text-muted-foreground mt-1 flex items-center gap-2">
<!-- Agent name and time for agent messages -->
<span v-if="message.author.type === 'agent'">
{{ message.author.first_name }} {{ message.author.last_name }}
@@ -245,6 +245,9 @@ const scrollToBottom = () => {
}
onMounted(() => {
// Update last seen timestamp when conversation is opened
chatStore.updateCurrentConversationLastSeen()
// Check initial scroll position
checkIfAtBottom()
@@ -254,7 +257,8 @@ onMounted(() => {
}, 200)
})
// Only auto-scroll for user's own messages or when at bottom
// Auto-scroll for user's own messages or when already at bottom
// On any new message addition, update last seen timestamp for this conversation
watch(
() => chatStore.getCurrentConversationMessages,
(newMessages, oldMessages) => {
@@ -269,8 +273,13 @@ watch(
return
}
// Check if new messages were added
if (oldMessages && newMessages.length > oldMessages.length) {
// If widget is open, do:
// - Check if new messages were added and handle scrolling behavior
// - Also update the last seen timestamp if the widget is open
if (oldMessages && newMessages.length > oldMessages.length && widgetStore.isOpen) {
// Always on new message, update last seen timestamp if conversation is open
chatStore.updateCurrentConversationLastSeen()
const newMessage = newMessages[newMessages.length - 1]
// Auto-scroll if:

View File

@@ -182,7 +182,6 @@ const sendMessage = async () => {
emit('error', handleHTTPError(error).message)
} finally {
isSending.value = false
await chatStore.updateCurrentConversationLastSeen()
}
}

View File

@@ -22,7 +22,9 @@ export const useChatStore = defineStore('chat', () => {
messageCacheVersion.value // Force reactivity tracking
const convId = currentConversation.value?.uuid
if (!convId) return []
return messageCache.getAllPagesMessages(convId)
const messages = messageCache.getAllPagesMessages(convId)
// Filter out continuity email messages
return messages.filter(msg => !msg.meta?.continuity_email)
})
const hasConversations = computed(() => conversations.value?.length > 0)
const getConversations = computed(() => {
@@ -139,7 +141,6 @@ export const useChatStore = defineStore('chat', () => {
return false
} finally {
isLoadingConversation.value = false
await updateCurrentConversationLastSeen()
}
return true
}

View File

@@ -23,8 +23,8 @@ export const useWidgetStore = defineStore('widget', () => {
isInChatView.value = false
}
const openWidget = () => {
isOpen.value = true
const setOpen = (open) => {
isOpen.value = open
}
const closeWidget = () => {
@@ -126,7 +126,7 @@ export const useWidgetStore = defineStore('widget', () => {
// Actions
toggleWidget,
openWidget,
setOpen,
closeWidget,
navigateToChat,
navigateToMessages,

View File

@@ -159,6 +159,8 @@ export class WidgetWebSocketClient {
try {
this.socket.send(JSON.stringify({
type: 'ping',
jwt: this.jwt,
inbox_id: this.inboxId ? parseInt(this.inboxId, 10) : null
}))
if (Date.now() - this.lastPong > 60000) {
console.warn('No pong received in 60 seconds, closing widget connection')

View File

@@ -586,6 +586,8 @@
"admin.inbox.livechat.prechatForm.availableFields": "Available Custom Attributes",
"admin.inbox.livechat.conversationContinuity": "Conversation continuity email inbox",
"admin.inbox.livechat.conversationContinuity.description": "When contacts go offline, replies will be sent from this email inbox. The contacts can continue the same conversation by replying to the email or in the chat widget when they return to your site.",
"admin.inbox.livechat.continuityEmailFooter": "Reply directly to this email to continue the conversation.",
"admin.inbox.livechat.continuityEmailSubject": "New messages from {site_name}",
"admin.agent.deleteConfirmation": "This will permanently delete the agent. Consider disabling the account instead.",
"admin.agent.apiKey.description": "Generate API keys for this agent to access libredesk programmatically.",
"admin.agent.apiKey.noKey": "No API key has been generated for this agent.",

View File

@@ -0,0 +1,294 @@
package conversation
import (
"context"
"encoding/json"
"fmt"
"html"
"slices"
"strings"
"time"
"github.com/abhinavxd/libredesk/internal/attachment"
"github.com/abhinavxd/libredesk/internal/conversation/models"
"github.com/abhinavxd/libredesk/internal/stringutil"
"github.com/volatiletech/null/v9"
)
// RunContinuity starts a goroutine that sends continuity emails containing unread outgoing messages to contacts who have been offline for a configured duration.
func (m *Manager) RunContinuity(ctx context.Context) {
m.lo.Info("starting conversation continuity processor", "check_interval", m.continuityConfig.BatchCheckInterval)
ticker := time.NewTicker(m.continuityConfig.BatchCheckInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := m.processContinuityEmails(); err != nil {
m.lo.Error("error processing continuity emails", "error", err)
}
}
}
}
// processContinuityEmails finds offline livechat conversations and sends batched unread messages emails to contacts
func (m *Manager) processContinuityEmails() error {
var (
offlineThresholdMinutes = int(m.continuityConfig.OfflineThreshold.Minutes())
minEmailIntervalMinutes = int(m.continuityConfig.MinEmailInterval.Minutes())
maxMessagesPerEmail = m.continuityConfig.MaxMessagesPerEmail
conversations []models.ContinuityConversation
)
m.lo.Debug("fetching offline conversations for continuity emails", "offline_threshold_minutes", offlineThresholdMinutes, "min_email_interval_minutes", minEmailIntervalMinutes)
if err := m.q.GetOfflineLiveChatConversations.Select(&conversations, offlineThresholdMinutes, minEmailIntervalMinutes); err != nil {
return fmt.Errorf("error fetching offline conversations: %w", err)
}
m.lo.Debug("fetched offline conversations for continuity emails", "count", len(conversations))
for _, conv := range conversations {
m.lo.Info("sending continuity email for conversation", "conversation_uuid", conv.UUID, "contact_email", conv.ContactEmail)
if err := m.sendContinuityEmail(conv, maxMessagesPerEmail); err != nil {
m.lo.Error("error sending continuity email", "conversation_uuid", conv.UUID, "error", err)
continue
}
}
return nil
}
// sendContinuityEmail sends a batched continuity email for a conversation
func (m *Manager) sendContinuityEmail(conv models.ContinuityConversation, maxMessages int) error {
var (
message models.Message
cleanUp = false
)
if conv.ContactEmail.String == "" {
m.lo.Debug("no contact email for conversation, skipping continuity email", "conversation_uuid", conv.UUID)
return fmt.Errorf("no contact email for conversation")
}
// Cleanup inserted message on failure
defer func() {
if cleanUp {
if _, delErr := m.q.DeleteMessage.Exec(message.ID, message.UUID); delErr != nil {
m.lo.Error("error cleaning up failed continuity message",
"error", delErr,
"message_id", message.ID,
"message_uuid", message.UUID,
"conversation_uuid", conv.UUID)
}
}
}()
m.lo.Debug("fetching unread messages for continuity email", "conversation_uuid", conv.UUID, "contact_last_seen_at", conv.ContactLastSeenAt, "max_messages", maxMessages)
var unreadMessages []models.ContinuityUnreadMessage
if err := m.q.GetUnreadMessages.Select(&unreadMessages, conv.ID, conv.ContactLastSeenAt, maxMessages); err != nil {
return fmt.Errorf("error fetching unread messages: %w", err)
}
m.lo.Debug("fetched unread messages for continuity email", "conversation_uuid", conv.UUID, "unread_count", len(unreadMessages))
if len(unreadMessages) == 0 {
m.lo.Debug("no unread messages found for conversation, skipping continuity email", "conversation_uuid", conv.UUID)
return nil
}
// Get linked email inbox
if !conv.LinkedEmailInboxID.Valid {
return fmt.Errorf("no linked email inbox configured for livechat inbox")
}
linkedEmailInbox, err := m.inboxStore.Get(conv.LinkedEmailInboxID.Int)
if err != nil {
return fmt.Errorf("error fetching linked email inbox: %w", err)
}
// Build email content with all unread messages
emailContent := m.buildContinuityEmailContent(unreadMessages)
// Collect attachments from all unread messages
attachments, err := m.collectAttachmentsFromMessages(unreadMessages)
if err != nil {
m.lo.Error("error collecting attachments from messages", "conversation_uuid", conv.UUID, "error", err)
return fmt.Errorf("error collecting attachments for continuity email: %w", err)
}
// Generate email subject with site name, this subject is translated
siteName := "Support"
if siteNameJSON, err := m.settingsStore.Get("app.site_name"); err == nil {
siteName = strings.Trim(strings.TrimSpace(string(siteNameJSON)), "\"")
}
emailSubject := m.i18n.Ts("admin.inbox.livechat.continuityEmailSubject", "site_name", siteName)
// Generate unique Message-ID for threading
sourceID, err := stringutil.GenerateEmailMessageID(conv.UUID, linkedEmailInbox.FromAddress())
if err != nil {
return fmt.Errorf("error generating message ID: %w", err)
}
// Get system user for sending the email
systemUser, err := m.userStore.GetSystemUser()
if err != nil {
return fmt.Errorf("error fetching system user: %w", err)
}
metaJSON, err := json.Marshal(map[string]any{
"continuity_email": true,
})
if err != nil {
m.lo.Error("error marshalling continuity email meta", "error", err, "conversation_uuid", conv.UUID)
return fmt.Errorf("error marshalling continuity email meta: %w", err)
}
// Create message for sending
message = models.Message{
InboxID: conv.LinkedEmailInboxID.Int,
ConversationID: conv.ID,
ConversationUUID: conv.UUID,
SenderID: systemUser.ID,
Type: models.MessageOutgoing,
SenderType: models.SenderTypeAgent,
Status: models.MessageStatusSent,
Content: emailContent,
ContentType: models.ContentTypeHTML,
Private: false,
SourceID: null.StringFrom(sourceID),
MessageReceiverID: conv.ContactID,
From: linkedEmailInbox.FromAddress(),
To: []string{conv.ContactEmail.String},
Subject: emailSubject,
Meta: metaJSON,
Attachments: attachments,
}
// Set Reply-To header for conversation continuity
emailAddress, err := stringutil.ExtractEmail(linkedEmailInbox.FromAddress())
if err == nil {
emailUserPart := strings.Split(emailAddress, "@")
if len(emailUserPart) == 2 {
message.Headers = map[string][]string{
"Reply-To": {fmt.Sprintf("%s+%s@%s", emailUserPart[0], conv.UUID, emailUserPart[1])},
}
}
}
// Insert message into database
if err := m.InsertMessage(&message); err != nil {
return fmt.Errorf("error inserting continuity message: %w", err)
}
// Get all message source IDs for References header and threading
references, err := m.GetMessageSourceIDs(conv.ID, 100)
if err != nil {
m.lo.Error("error fetching conversation source IDs for continuity email", "error", err)
references = []string{}
}
// References is sorted in DESC i.e newest message first, so reverse it to keep the references in order.
slices.Reverse(references)
// Filter out livechat references (ones without @) and keep only the last 20
var filteredReferences []string
for _, ref := range references {
if strings.Contains(ref, "@") {
filteredReferences = append(filteredReferences, ref)
// Keep only the last 20 references, remove the first one if exceeding
if len(filteredReferences) > 20 {
filteredReferences = filteredReferences[1:]
}
}
}
message.References = filteredReferences
// Set In-Reply-To if we have references
if len(filteredReferences) > 0 {
message.InReplyTo = filteredReferences[len(filteredReferences)-1]
}
// Render message template
if err := m.RenderMessageInTemplate(linkedEmailInbox.Channel(), &message); err != nil {
// Clean up the inserted message on failure
cleanUp = true
m.lo.Error("error rendering email template for continuity email", "error", err, "message_id", message.ID, "message_uuid", message.UUID, "conversation_uuid", conv.UUID)
return fmt.Errorf("error rendering email template: %w", err)
}
// Send the email
if err := linkedEmailInbox.Send(message); err != nil {
// Clean up the inserted message on failure
cleanUp = true
m.lo.Error("error sending continuity email", "error", err, "message_id", message.ID, "message_uuid", message.UUID, "conversation_uuid", conv.UUID)
return fmt.Errorf("error sending continuity email: %w", err)
}
// Mark in DB that continuity email was sent now
if _, err := m.q.UpdateContinuityEmailTracking.Exec(conv.ID); err != nil {
m.lo.Error("error updating continuity email tracking", "conversation_uuid", conv.UUID, "error", err)
return fmt.Errorf("error updating continuity email tracking: %w", err)
}
m.lo.Info("sent conversation continuity email",
"conversation_uuid", conv.UUID,
"contact_email", conv.ContactEmail,
"message_count", len(unreadMessages),
"linked_email_inbox_id", conv.LinkedEmailInboxID.Int)
return nil
}
// buildContinuityEmailContent creates email content with conversation summary and unread messages
func (m *Manager) buildContinuityEmailContent(unreadMessages []models.ContinuityUnreadMessage) string {
var content strings.Builder
for _, msg := range unreadMessages {
// Get sender display name
senderName := "Agent"
if msg.SenderFirstName.Valid || msg.SenderLastName.Valid {
firstName := strings.TrimSpace(msg.SenderFirstName.String)
lastName := strings.TrimSpace(msg.SenderLastName.String)
fullName := strings.TrimSpace(firstName + " " + lastName)
if fullName != "" {
senderName = fullName
}
}
// Format timestamp
timestamp := msg.CreatedAt.Format("Mon, Jan 2, 2006 at 3:04 PM")
// Add message header with agent name and timestamp
content.WriteString(fmt.Sprintf("<p><strong>%s</strong> <em>%s</em></p>\n",
html.EscapeString(senderName),
html.EscapeString(timestamp)))
// Add message content
content.WriteString(msg.Content)
content.WriteString("\n<br/>\n")
}
// Add footer with reply instructions, footer is translated
content.WriteString("<hr/>\n")
content.WriteString(fmt.Sprintf("<p><em>%s</em></p>\n", html.EscapeString(m.i18n.T("admin.inbox.livechat.continuityEmailFooter"))))
return content.String()
}
// collectAttachmentsFromMessages collects all attachments from unread messages for the continuity email
func (m *Manager) collectAttachmentsFromMessages(unreadMessages []models.ContinuityUnreadMessage) (attachment.Attachments, error) {
var allAttachments attachment.Attachments
for _, msg := range unreadMessages {
msgAttachments, err := m.fetchMessageAttachments(msg.ID)
if err != nil {
m.lo.Error("error fetching attachments for message", "error", err, "message_id", msg.ID)
continue
}
allAttachments = append(allAttachments, msgAttachments...)
}
return allAttachments, nil
}

View File

@@ -81,6 +81,7 @@ type Manager struct {
closed bool
closedMu sync.RWMutex
wg sync.WaitGroup
continuityConfig ContinuityConfig
}
// WidgetConversationView represents the conversation data for widget clients
@@ -148,6 +149,7 @@ type inboxStore interface {
type settingsStore interface {
GetAppRootURL() (string, error)
GetByPrefix(prefix string) (types.JSONText, error)
Get(key string) (types.JSONText, error)
}
type csatStore interface {
@@ -160,12 +162,21 @@ type webhookStore interface {
TriggerEvent(event wmodels.WebhookEvent, data any)
}
// ContinuityConfig holds configuration for conversation continuity emails
type ContinuityConfig struct {
BatchCheckInterval time.Duration
OfflineThreshold time.Duration
MinEmailInterval time.Duration
MaxMessagesPerEmail int
}
// Opts holds the options for creating a new Manager.
type Opts struct {
DB *sqlx.DB
Lo *logf.Logger
OutgoingMessageQueueSize int
IncomingMessageQueueSize int
ContinuityConfig *ContinuityConfig
}
// New initializes a new conversation Manager.
@@ -192,6 +203,30 @@ func New(
return nil, err
}
// Default continuity config
continuityConfig := ContinuityConfig{
BatchCheckInterval: 2 * time.Minute,
OfflineThreshold: 3 * time.Minute,
MinEmailInterval: 15 * time.Minute,
MaxMessagesPerEmail: 10,
}
// Override with provided config if available
if opts.ContinuityConfig != nil {
if opts.ContinuityConfig.BatchCheckInterval > 0 {
continuityConfig.BatchCheckInterval = opts.ContinuityConfig.BatchCheckInterval
}
if opts.ContinuityConfig.OfflineThreshold > 0 {
continuityConfig.OfflineThreshold = opts.ContinuityConfig.OfflineThreshold
}
if opts.ContinuityConfig.MinEmailInterval > 0 {
continuityConfig.MinEmailInterval = opts.ContinuityConfig.MinEmailInterval
}
if opts.ContinuityConfig.MaxMessagesPerEmail > 0 {
continuityConfig.MaxMessagesPerEmail = opts.ContinuityConfig.MaxMessagesPerEmail
}
}
c := &Manager{
q: q,
wsHub: wsHub,
@@ -214,6 +249,7 @@ func New(
incomingMessageQueue: make(chan models.IncomingMessage, opts.IncomingMessageQueueSize),
outgoingMessageQueue: make(chan models.Message, opts.OutgoingMessageQueueSize),
outgoingProcessingMessages: sync.Map{},
continuityConfig: continuityConfig,
}
return c, nil
@@ -262,11 +298,17 @@ type queries struct {
GetOutgoingPendingMessages *sqlx.Stmt `query:"get-outgoing-pending-messages"`
GetMessageSourceIDs *sqlx.Stmt `query:"get-message-source-ids"`
GetConversationUUIDFromMessageUUID *sqlx.Stmt `query:"get-conversation-uuid-from-message-uuid"`
MessageExistsBySourceID *sqlx.Stmt `query:"message-exists-by-source-id"`
GetConversationByMessageID *sqlx.Stmt `query:"get-conversation-by-message-id"`
InsertMessage *sqlx.Stmt `query:"insert-message"`
UpdateMessageStatus *sqlx.Stmt `query:"update-message-status"`
UpdateMessageSourceID *sqlx.Stmt `query:"update-message-source-id"`
MessageExistsBySourceID *sqlx.Stmt `query:"message-exists-by-source-id"`
GetConversationByMessageID *sqlx.Stmt `query:"get-conversation-by-message-id"`
DeleteMessage *sqlx.Stmt `query:"delete-message"`
// Conversation continuity queries.
GetOfflineLiveChatConversations *sqlx.Stmt `query:"get-offline-livechat-conversations"`
GetUnreadMessages *sqlx.Stmt `query:"get-unread-messages"`
UpdateContinuityEmailTracking *sqlx.Stmt `query:"update-continuity-email-tracking"`
}
// CreateConversation creates a new conversation and returns its ID and UUID.

View File

@@ -181,16 +181,9 @@ func (m *Manager) sendOutgoingMessage(message models.Message) {
// Send message
err = inb.Send(message)
if err != nil {
if err == livechat.ErrClientNotConnected {
// Continue conversation over email.
if err := m.sendConversationContinuityEmail(message); err != nil {
m.lo.Error("error sending conversation continuity email", "message_id", message.ID, "error", err)
}
} else {
handleError(err, "error sending message")
return
}
if err != nil && err != livechat.ErrClientNotConnected {
handleError(err, "error sending message")
return
}
// Update status as sent.
@@ -430,12 +423,13 @@ func (m *Manager) SendReply(media []mmodels.Media, inboxID, senderID, contactID
return models.Message{}, envelope.NewError(envelope.GeneralError, m.i18n.T("conversation.errorGeneratingMessageID"), nil)
}
case inbox.ChannelLiveChat:
sourceID, err = stringutil.RandomAlphanumeric(35)
// TODO: Is source id needed for live chat messages?
sourceID, err = stringutil.RandomAlphanumeric(16)
if err != nil {
m.lo.Error("error generating random source id", "error", err)
return models.Message{}, envelope.NewError(envelope.GeneralError, m.i18n.T("conversation.errorGeneratingMessageID"), nil)
}
sourceID = "livechat-" + sourceID
sourceID = "livechat-" + conversationUUID + "-" + sourceID
}
// Marshal meta.
@@ -980,24 +974,23 @@ func (m *Manager) findConversationID(messageSourceIDs []string) (int, error) {
return conversationID, nil
}
// attachAttachmentsToMessage attaches attachment blobs to message.
func (m *Manager) attachAttachmentsToMessage(message *models.Message) error {
// fetchMessageAttachments fetches attachments for a single message ID - extracted for reuse
func (m *Manager) fetchMessageAttachments(messageID int) (attachment.Attachments, error) {
var attachments attachment.Attachments
// Get all media for this message.
medias, err := m.mediaStore.GetByModel(message.ID, mmodels.ModelMessages)
// Get all media for this message
medias, err := m.mediaStore.GetByModel(messageID, mmodels.ModelMessages)
if err != nil {
m.lo.Error("error fetching message attachments", "error", err)
return err
return attachments, fmt.Errorf("error fetching message attachments: %w", err)
}
// Fetch blobs.
// Fetch blobs for each media item
for _, media := range medias {
blob, err := m.mediaStore.GetBlob(media.UUID)
if err != nil {
m.lo.Error("error fetching media blob", "error", err)
return err
return attachments, fmt.Errorf("error fetching media blob: %w", err)
}
attachment := attachment.Attachment{
Name: media.Filename,
UUID: media.UUID,
@@ -1010,6 +1003,17 @@ func (m *Manager) attachAttachmentsToMessage(message *models.Message) error {
attachments = append(attachments, attachment)
}
return attachments, nil
}
// attachAttachmentsToMessage attaches attachment blobs to message.
func (m *Manager) attachAttachmentsToMessage(message *models.Message) error {
attachments, err := m.fetchMessageAttachments(message.ID)
if err != nil {
m.lo.Error("error fetching message attachments", "error", err)
return err
}
// Attach attachments.
message.Attachments = attachments
@@ -1118,120 +1122,3 @@ func (m *Manager) ProcessIncomingMessageHooks(conversationUUID string, isNewConv
}
return nil
}
// updateMessageSourceID updates the source ID of a message.
func (m *Manager) updateMessageSourceID(id int, sourceID string) error {
_, err := m.q.UpdateMessageSourceID.Exec(sourceID, id)
return err
}
// sendConversationContinuityEmail sends an email to the contact for conversation continuity, supposed to be called after contacts go offline in live chat.
func (m *Manager) sendConversationContinuityEmail(message models.Message) error {
// Get contact and make sure it has a valid email
contact, err := m.userStore.Get(message.MessageReceiverID, "", "")
if err != nil {
return fmt.Errorf("error fetching contact for email: %w", err)
}
if contact.Email.String == "" {
m.lo.Info("contact has no email for conversation continuity", "contact_id", contact.ID, "conversation_uuid", message.ConversationUUID)
return fmt.Errorf("contact has no email for conversation continuity")
}
// Get the original livechat inbox to check for linked email inbox
originalInbox, err := m.inboxStore.GetDBRecord(message.InboxID)
if err != nil {
return fmt.Errorf("error fetching original inbox: %w", err)
}
// Check if livechat inbox has a linked email inbox
if !originalInbox.LinkedEmailInboxID.Valid {
m.lo.Info("no linked email inbox configured for livechat inbox", "inbox_id", message.InboxID)
return fmt.Errorf("no linked email inbox configured for livechat inbox")
}
// Get the linked email inbox
linkedEmailInbox, err := m.inboxStore.Get(originalInbox.LinkedEmailInboxID.Int)
if err != nil {
return fmt.Errorf("error fetching linked email inbox: %w", err)
}
linkedEmailInboxRecord, err := m.inboxStore.GetDBRecord(originalInbox.LinkedEmailInboxID.Int)
if err != nil {
return fmt.Errorf("error fetching linked email inbox record: %w", err)
}
if !linkedEmailInboxRecord.Enabled {
return fmt.Errorf("linked email inbox is disabled")
}
// Create a new message for the email inbox with proper threading headers
emailMessage := message
emailMessage.InboxID = originalInbox.LinkedEmailInboxID.Int
emailMessage.From = linkedEmailInbox.FromAddress()
// Generate Message-ID for threading using conversation UUID and random suffix
// Extract clean email address and domain from From address
emailAddress, err := stringutil.ExtractEmail(linkedEmailInbox.FromAddress())
if err != nil {
return fmt.Errorf("error extracting email from inbox address: %w", err)
}
// Embed UUID in Message-ID for threading (format: {uuid}.{timestamp}@domain)
emailUserPart := strings.Split(emailAddress, "@")
if len(emailUserPart) == 2 {
emailMessage.SourceID = null.StringFrom(fmt.Sprintf("%s.%d@%s", message.ConversationUUID, time.Now().UnixNano(), emailUserPart[1]))
if err := m.updateMessageSourceID(message.ID, emailMessage.SourceID.String); err != nil {
m.lo.Error("error updating message source ID", "error", err)
return fmt.Errorf("error updating message source ID: %w", err)
}
}
// Get all message source IDs for References header
references, err := m.GetMessageSourceIDs(message.ConversationID, 50)
if err != nil {
m.lo.Error("Error fetching conversation source IDs for email fallback", "error", err)
// Continue without references rather than fail
references = []string{}
}
// Filter out references that do not have `@` meaning they are live chat messages
var filteredReferences []string
for _, ref := range references {
if strings.Contains(ref, "@") {
filteredReferences = append(filteredReferences, ref)
}
}
// Build References and In-Reply-To headers
emailMessage.References = filteredReferences
if len(references) > 0 {
emailMessage.InReplyTo = references[len(references)-1]
}
// Render content of message in template
if err := m.RenderMessageInTemplate(linkedEmailInbox.Channel(), &emailMessage); err != nil {
return fmt.Errorf("error rendering email template: %w", err)
}
// Set email recipients
emailMessage.To = []string{contact.Email.String}
// Set Reply-To address with conversation UUID for routing replies
emailMessage.Headers = map[string][]string{
"Reply-To": {fmt.Sprintf("%s+%s@%s", strings.Split(emailAddress, "@")[0], message.ConversationUUID, strings.Split(emailAddress, "@")[1])},
}
// Send the email via the linked email inbox
if err := linkedEmailInbox.Send(emailMessage); err != nil {
return fmt.Errorf("error sending conversation continuity email: %w", err)
}
m.lo.Info("sent conversation continuity email",
"original_inbox_id", message.InboxID,
"linked_email_inbox_id", originalInbox.LinkedEmailInboxID.Int,
"contact_email", contact.Email.String,
"conversation_uuid", message.ConversationUUID)
return nil
}

View File

@@ -53,6 +53,26 @@ var (
ContentTypeHTML = "html"
)
type ContinuityConversation struct {
ID int `db:"id"`
UUID string `db:"uuid"`
ContactID int `db:"contact_id"`
InboxID int `db:"inbox_id"`
ContactLastSeenAt time.Time `db:"contact_last_seen_at"`
LastContinuityEmailSentAt null.Time `db:"last_continuity_email_sent_at"`
ContactEmail null.String `db:"contact_email"`
ContactFirstName null.String `db:"contact_first_name"`
ContactLastName null.String `db:"contact_last_name"`
LinkedEmailInboxID null.Int `db:"linked_email_inbox_id"`
}
type ContinuityUnreadMessage struct {
Message
SenderFirstName null.String `db:"sender.first_name"`
SenderLastName null.String `db:"sender.last_name"`
SenderType string `db:"sender.type"`
}
type LastChatMessage struct {
Content string `db:"content" json:"content"`
CreatedAt time.Time `db:"created_at" json:"created_at"`

View File

@@ -503,6 +503,12 @@ WHERE m.id = $1;
DELETE FROM conversations WHERE uuid = $1;
-- MESSAGE queries.
-- name: delete-message
DELETE FROM conversation_messages WHERE CASE
WHEN $1 > 0 THEN id = $1
ELSE uuid = $2
END;
-- name: get-message-source-ids
SELECT
source_id
@@ -582,6 +588,7 @@ ORDER BY m.created_at;
-- name: get-messages
SELECT
COUNT(*) OVER() AS total,
m.id,
m.created_at,
m.updated_at,
m.status,
@@ -668,4 +675,68 @@ ORDER BY m.created_at DESC
LIMIT 1;
-- name: update-message-source-id
UPDATE conversation_messages SET source_id = $1 WHERE id = $2;
UPDATE conversation_messages SET source_id = $1 WHERE id = $2;
-- name: get-offline-livechat-conversations
SELECT
c.id,
c.uuid,
c.contact_id,
c.inbox_id,
c.contact_last_seen_at,
c.last_continuity_email_sent_at,
i.linked_email_inbox_id,
u.email as contact_email,
u.first_name as contact_first_name,
u.last_name as contact_last_name
FROM conversations c
JOIN users u ON u.id = c.contact_id
JOIN inboxes i ON i.id = c.inbox_id
WHERE i.channel = 'livechat'
AND i.enabled = TRUE
AND i.linked_email_inbox_id IS NOT NULL
AND c.contact_last_seen_at IS NOT NULL
AND c.contact_last_seen_at > NOW() - INTERVAL '1 hour'
AND c.contact_last_seen_at < NOW() - MAKE_INTERVAL(mins => $1)
AND EXISTS (
SELECT 1 FROM conversation_messages cm
WHERE cm.conversation_id = c.id
AND cm.created_at > c.contact_last_seen_at
AND cm.type = 'outgoing'
AND cm.private = false
)
AND u.email > ''
AND (c.last_continuity_email_sent_at IS NULL
OR c.last_continuity_email_sent_at < NOW() - MAKE_INTERVAL(mins => $2));
-- name: get-unread-messages
SELECT
m.id,
m.created_at,
m.updated_at,
m.status,
m.type,
m.content,
m.text_content,
m.uuid,
m.private,
m.sender_id,
m.sender_type,
m.meta,
u.first_name as "sender.first_name",
u.last_name as "sender.last_name",
u.type as "sender.type"
FROM conversation_messages m
LEFT JOIN users u ON u.id = m.sender_id
WHERE m.conversation_id = $1
AND m.created_at > $2
AND m.type = 'outgoing'
AND m.private = false
ORDER BY m.created_at ASC
LIMIT $3;
-- name: update-continuity-email-tracking
UPDATE conversations
SET contact_last_seen_at = NOW(),
last_continuity_email_sent_at = NOW()
WHERE id = $1;

View File

@@ -20,7 +20,7 @@ where id = $1 and deleted_at is NULL
RETURNING *;
-- name: soft-delete
UPDATE inboxes set deleted_at = now(), config = '{}' where id = $1 and deleted_at is NULL;
UPDATE inboxes set deleted_at = now(), config = '{}', enabled = false where id = $1 and deleted_at is NULL;
-- name: toggle
UPDATE inboxes

View File

@@ -157,5 +157,22 @@ func V0_8_0(db *sqlx.DB, fs stuffbin.FileSystem, ko *koanf.Koanf) error {
return err
}
// Add column to track last continuity email sent
_, err = db.Exec(`
ALTER TABLE conversations ADD COLUMN IF NOT EXISTS last_continuity_email_sent_at TIMESTAMPTZ NULL;
`)
if err != nil {
return err
}
// Add index for continuity email tracking
_, err = db.Exec(`
CREATE INDEX IF NOT EXISTS index_conversations_on_last_continuity_email_sent_at
ON conversations (last_continuity_email_sent_at);
`)
if err != nil {
return err
}
return nil
}

View File

@@ -3,7 +3,6 @@ package stringutil
import (
"crypto/rand"
"encoding/base64"
"fmt"
"net/mail"
"net/url"
@@ -101,11 +100,11 @@ func RemoveEmpty(s []string) []string {
return r
}
// GenerateEmailMessageID generates a RFC-compliant Message-ID for an email, does not include the angle brackets.
// The client is expected to wrap the returned string in angle brackets.
func GenerateEmailMessageID(messageID string, fromAddress string) (string, error) {
if messageID == "" {
return "", fmt.Errorf("messageID cannot be empty")
// GenerateEmailMessageID generates an RFC-compliant Message-ID for an email without angle brackets.
// The uuid parameter is a unique identifier, typically a conversation UUID v4.
func GenerateEmailMessageID(uuid string, fromAddress string) (string, error) {
if uuid == "" {
return "", fmt.Errorf("uuid cannot be empty")
}
// Parse from address
@@ -121,26 +120,16 @@ func GenerateEmailMessageID(messageID string, fromAddress string) (string, error
}
domain := parts[1]
// Generate cryptographic random component
random := make([]byte, 8)
if _, err := rand.Read(random); err != nil {
return "", fmt.Errorf("failed to generate random bytes: %w", err)
// Random component
randomStr, err := RandomAlphanumeric(11)
if err != nil {
return "", fmt.Errorf("failed to generate random string: %w", err)
}
// Sanitize messageID for email Message-ID
cleaner := regexp.MustCompile(`[^\w.-]`) // Allow only alphanum, ., -, _
cleanmessageID := cleaner.ReplaceAllString(messageID, "_")
// Ensure cleaned messageID isn't empty
if cleanmessageID == "" {
return "", fmt.Errorf("messageID became empty after sanitization")
}
// Build RFC-compliant Message-ID
return fmt.Sprintf("%s-%d-%s@%s",
cleanmessageID,
time.Now().UnixNano(), // Nanosecond precision
strings.TrimRight(base64.URLEncoding.EncodeToString(random), "="), // URL-safe base64 without padding
uuid,
time.Now().UnixNano(),
randomStr,
domain,
), nil
}

View File

@@ -239,7 +239,8 @@ CREATE TABLE conversations (
last_interaction_at TIMESTAMPTZ NULL,
next_sla_deadline_at TIMESTAMPTZ NULL,
snoozed_until TIMESTAMPTZ NULL
snoozed_until TIMESTAMPTZ NULL,
last_continuity_email_sent_at TIMESTAMPTZ NULL
);
CREATE INDEX index_conversations_on_assigned_user_id ON conversations (assigned_user_id);
CREATE INDEX index_conversations_on_assigned_team_id ON conversations (assigned_team_id);
@@ -253,6 +254,7 @@ CREATE INDEX index_conversations_on_last_message_at ON conversations (last_messa
CREATE INDEX index_conversations_on_last_interaction_at ON conversations (last_interaction_at);
CREATE INDEX index_conversations_on_next_sla_deadline_at ON conversations (next_sla_deadline_at);
CREATE INDEX index_conversations_on_waiting_since ON conversations (waiting_since);
CREATE INDEX index_conversations_on_last_continuity_email_sent_at ON conversations (last_continuity_email_sent_at);
DROP TABLE IF EXISTS conversation_messages CASCADE;
CREATE TABLE conversation_messages (

View File

@@ -267,8 +267,16 @@
toggle () {
if (this.isChatVisible) {
this.hideChat();
// Send WIDGET_CLOSED event to iframe
if (this.iframe && this.iframe.contentWindow) {
this.iframe.contentWindow.postMessage({ type: 'WIDGET_CLOSED' }, '*');
}
} else {
this.showChat();
// Send WIDGET_OPENED event to iframe
if (this.iframe && this.iframe.contentWindow) {
this.iframe.contentWindow.postMessage({ type: 'WIDGET_OPENED' }, '*');
}
}
}