Files
libredesk/internal/conversation/message.go
Abhinav Raut 5361bcb24f feat: dockerize libredesk
- feat: new flag for idempotent installation `--idempotent-install`
- feat: new flag to skip yes/no prompt `--yes`
- feat: new flag for upgrades `--upgrade`
- refactor: update doc strings and sample toml file.
- chore: update .gitignore.
2025-02-22 23:33:18 +05:30

777 lines
26 KiB
Go

package conversation
import (
"bytes"
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"path/filepath"
"slices"
"strings"
"time"
"github.com/abhinavxd/libredesk/internal/attachment"
amodels "github.com/abhinavxd/libredesk/internal/automation/models"
"github.com/abhinavxd/libredesk/internal/conversation/models"
"github.com/abhinavxd/libredesk/internal/envelope"
"github.com/abhinavxd/libredesk/internal/image"
"github.com/abhinavxd/libredesk/internal/inbox"
mmodels "github.com/abhinavxd/libredesk/internal/media/models"
"github.com/abhinavxd/libredesk/internal/stringutil"
umodels "github.com/abhinavxd/libredesk/internal/user/models"
"github.com/lib/pq"
"github.com/volatiletech/null/v9"
)
const (
MessageIncoming = "incoming"
MessageOutgoing = "outgoing"
MessageActivity = "activity"
SenderTypeAgent = "agent"
SenderTypeContact = "contact"
MessageStatusPending = "pending"
MessageStatusSent = "sent"
MessageStatusFailed = "failed"
MessageStatusReceived = "received"
ActivityStatusChange = "status_change"
ActivityPriorityChange = "priority_change"
ActivityAssignedUserChange = "assigned_user_change"
ActivityAssignedTeamChange = "assigned_team_change"
ActivitySelfAssign = "self_assign"
ActivityTagChange = "tag_change"
ActivitySLASet = "sla_set"
ContentTypeText = "text"
ContentTypeHTML = "html"
maxLastMessageLen = 45
maxMessagesPerPage = 30
)
// Run starts a pool of worker goroutines to handle message dispatching via inbox's channel and processes incoming messages. It scans for
// pending outgoing messages at the specified read interval and pushes them to the outgoing queue to be sent.
func (m *Manager) Run(ctx context.Context, incomingQWorkers, outgoingQWorkers, scanInterval time.Duration) {
dbScanner := time.NewTicker(scanInterval)
defer dbScanner.Stop()
for range outgoingQWorkers {
m.wg.Add(1)
go func() {
defer m.wg.Done()
m.MessageSenderWorker(ctx)
}()
}
for range incomingQWorkers {
m.wg.Add(1)
go func() {
defer m.wg.Done()
m.IncomingMessageWorker(ctx)
}()
}
// Scan pending outgoing messages and send them.
for {
select {
case <-ctx.Done():
return
case <-dbScanner.C:
var (
pendingMessages = []models.Message{}
messageIDs = m.getOutgoingProcessingMessageIDs()
)
// Get pending outgoing messages and skip the currently processing message ids.
if err := m.q.GetPendingMessages.Select(&pendingMessages, pq.Array(messageIDs)); err != nil {
m.lo.Error("error fetching pending messages from db", "error", err)
continue
}
// Prepare and push the message to the outgoing queue.
for _, message := range pendingMessages {
// Put the message ID in the processing map.
m.outgoingProcessingMessages.Store(message.ID, message.ID)
// Push the message to the outgoing message queue.
m.outgoingMessageQueue <- message
}
}
}
}
// Close signals the Manager to stop processing messages, closes channels,
// and waits for all worker goroutines to finish processing.
func (m *Manager) Close() {
m.closedMu.Lock()
defer m.closedMu.Unlock()
m.closed = true
close(m.outgoingMessageQueue)
close(m.incomingMessageQueue)
m.wg.Wait()
}
// IncomingMessageWorker processes incoming messages from the incoming message queue.
func (m *Manager) IncomingMessageWorker(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case msg, ok := <-m.incomingMessageQueue:
if !ok {
return
}
if err := m.processIncomingMessage(msg); err != nil {
m.lo.Error("error processing incoming msg", "error", err)
}
}
}
}
// MessageSenderWorker sends outgoing pending messages.
func (m *Manager) MessageSenderWorker(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case message, ok := <-m.outgoingMessageQueue:
if !ok {
return
}
m.sendOutgoingMessage(message)
}
}
}
// sendOutgoingMessage sends an outgoing message.
func (m *Manager) sendOutgoingMessage(message models.Message) {
defer m.outgoingProcessingMessages.Delete(message.ID)
// Helper function to handle errors
handleError := func(err error, errorMsg string) bool {
if err != nil {
m.lo.Error(errorMsg, "error", err, "message_id", message.ID)
m.UpdateMessageStatus(message.UUID, MessageStatusFailed)
return true
}
return false
}
// Get inbox
inbox, err := m.inboxStore.Get(message.InboxID)
if handleError(err, "error fetching inbox") {
return
}
// Render content in template
if err := m.RenderContentInTemplate(inbox.Channel(), &message); err != nil {
handleError(err, "error rendering content in template")
return
}
// Attach attachments to the message
if err := m.attachAttachmentsToMessage(&message); err != nil {
handleError(err, "error attaching attachments to message")
return
}
// Set message sender and receiver
message.From = inbox.FromAddress()
message.To, _ = m.GetToAddress(message.ConversationID)
message.InReplyTo, _ = m.GetLatestReceivedMessageSourceID(message.ConversationID)
// Send message
err = inbox.Send(message)
if handleError(err, "error sending message") {
return
}
// Update status and first reply time
m.UpdateMessageStatus(message.UUID, MessageStatusSent)
m.UpdateConversationFirstReplyAt(message.ConversationUUID, message.ConversationID, message.CreatedAt)
}
// RenderContentInTemplate renders message content in template.
func (m *Manager) RenderContentInTemplate(channel string, message *models.Message) error {
switch channel {
case inbox.ChannelEmail:
conversation, err := m.GetConversation(0, message.ConversationUUID)
if err != nil {
m.lo.Error("error fetching conversation", "uuid", message.ConversationUUID, "error", err)
return fmt.Errorf("fetching conversation: %w", err)
}
message.Content, err = m.template.RenderWithBaseTemplate(conversation, message.Content)
if err != nil {
m.lo.Error("could not render email content using template", "id", message.ID, "error", err)
return fmt.Errorf("could not render email content using template: %w", err)
}
default:
m.lo.Warn("unknown message channel", "channel", channel)
return fmt.Errorf("unknown message channel: %s", channel)
}
return nil
}
// GetConversationMessages retrieves messages for a specific conversation.
func (m *Manager) GetConversationMessages(conversationUUID string, page, pageSize int) ([]models.Message, int, error) {
var (
messages = make([]models.Message, 0)
qArgs []interface{}
)
qArgs = append(qArgs, conversationUUID)
query, pageSize, qArgs, err := m.generateMessagesQuery(m.q.GetMessages, qArgs, page, pageSize)
if err != nil {
m.lo.Error("error generating messages query", "error", err)
return messages, pageSize, envelope.NewError(envelope.GeneralError, "Error fetching messages", nil)
}
tx, err := m.db.BeginTxx(context.Background(), nil)
defer tx.Rollback()
if err != nil {
m.lo.Error("error preparing get messages query", "error", err)
return messages, pageSize, envelope.NewError(envelope.GeneralError, "Error fetching messages", nil)
}
if err := tx.Select(&messages, query, qArgs...); err != nil {
m.lo.Error("error fetching conversations", "error", err)
return messages, pageSize, envelope.NewError(envelope.GeneralError, "Error fetching messages", nil)
}
return messages, pageSize, nil
}
// GetMessage retrieves a message by UUID.
func (m *Manager) GetMessage(uuid string) (models.Message, error) {
var message models.Message
if err := m.q.GetMessage.Get(&message, uuid); err != nil {
m.lo.Error("error fetching message", "uuid", uuid, "error", err)
return message, envelope.NewError(envelope.GeneralError, "Error fetching message", nil)
}
return message, nil
}
// UpdateMessageStatus updates the status of a message.
func (m *Manager) UpdateMessageStatus(uuid string, status string) error {
if _, err := m.q.UpdateMessageStatus.Exec(status, uuid); err != nil {
m.lo.Error("error updating message status", "error", err, "uuid", uuid)
return err
}
// Broadcast messge status update to all conversation subscribers.
conversationUUID, _ := m.getConversationUUIDFromMessageUUID(uuid)
m.BroadcastMessageUpdate(conversationUUID, uuid, "status" /*property*/, status)
return nil
}
// MarkMessageAsPending updates message status to `Pending`, so if it's a outgoing message it can be picked up again by a worker.
func (m *Manager) MarkMessageAsPending(uuid string) error {
if err := m.UpdateMessageStatus(uuid, MessageStatusPending); err != nil {
return envelope.NewError(envelope.GeneralError, "Error retrying message", nil)
}
return nil
}
// SendPrivateNote inserts a private message in a conversation.
func (m *Manager) SendPrivateNote(media []mmodels.Media, senderID int, conversationUUID, content string) error {
// Insert Message.
message := models.Message{
ConversationUUID: conversationUUID,
SenderID: senderID,
Type: MessageOutgoing,
SenderType: SenderTypeAgent,
Status: MessageStatusSent,
Content: content,
ContentType: ContentTypeHTML,
Private: true,
Media: media,
}
return m.InsertMessage(&message)
}
// SendReply inserts a reply message in a conversation.
func (m *Manager) SendReply(media []mmodels.Media, senderID int, conversationUUID, content string, cc, bcc []string, meta map[string]interface{}) error {
cc = stringutil.RemoveEmpty(cc)
bcc = stringutil.RemoveEmpty(bcc)
// Save cc and bcc as JSON in meta.
if len(cc) > 0 {
meta["cc"] = cc
}
if len(bcc) > 0 {
meta["bcc"] = bcc
}
metaJSON, err := json.Marshal(meta)
if err != nil {
return envelope.NewError(envelope.GeneralError, "Error marshalling message meta", nil)
}
message := models.Message{
ConversationUUID: conversationUUID,
SenderID: senderID,
Type: MessageOutgoing,
SenderType: SenderTypeAgent,
Status: MessageStatusPending,
Content: content,
ContentType: ContentTypeHTML,
Private: false,
Media: media,
Meta: string(metaJSON),
}
return m.InsertMessage(&message)
}
// InsertMessage inserts a message and attaches the media to the message.
func (m *Manager) InsertMessage(message *models.Message) error {
// Private message is always sent.
if message.Private {
message.Status = MessageStatusSent
}
// Handle empty meta.
if message.Meta == "" || message.Meta == "null" {
message.Meta = "{}"
}
// Convert HTML content to text for search.
message.TextContent = stringutil.HTML2Text(message.Content)
// Insert Message.
if err := m.q.InsertMessage.QueryRow(message.Type, message.Status, message.ConversationID, message.ConversationUUID, message.Content, message.TextContent, message.SenderID, message.SenderType,
message.Private, message.ContentType, message.SourceID, message.Meta).Scan(&message.ID, &message.UUID, &message.CreatedAt); err != nil {
m.lo.Error("error inserting message in db", "error", err)
return envelope.NewError(envelope.GeneralError, "Error sending message", nil)
}
// Attach message to the media.
for _, media := range message.Media {
m.mediaStore.Attach(media.ID, mmodels.ModelMessages, message.ID)
}
// Add this user as a participant.
if err := m.addConversationParticipant(message.SenderID, message.ConversationUUID); err != nil {
return err
}
// Update conversation last message details in conversation metadata.
m.UpdateConversationLastMessage(message.ConversationID, message.ConversationUUID, message.TextContent, message.SenderType, message.CreatedAt)
// Broadcast new message.
m.BroadcastNewMessage(message)
return nil
}
// RecordAssigneeUserChange records an activity for a user assignee change.
func (m *Manager) RecordAssigneeUserChange(conversationUUID string, assigneeID int, actor umodels.User) error {
// Self assignment.
if assigneeID == actor.ID {
return m.InsertConversationActivity(ActivitySelfAssign, conversationUUID, actor.FullName(), actor)
}
// Assignment to another user.
assignee, err := m.userStore.Get(assigneeID)
if err != nil {
return err
}
return m.InsertConversationActivity(ActivityAssignedUserChange, conversationUUID, assignee.FullName(), actor)
}
// RecordAssigneeTeamChange records an activity for a team assignee change.
func (m *Manager) RecordAssigneeTeamChange(conversationUUID string, teamID int, actor umodels.User) error {
team, err := m.teamStore.Get(teamID)
if err != nil {
return err
}
return m.InsertConversationActivity(ActivityAssignedTeamChange, conversationUUID, team.Name, actor)
}
// RecordPriorityChange records an activity for a priority change.
func (m *Manager) RecordPriorityChange(priority, conversationUUID string, actor umodels.User) error {
return m.InsertConversationActivity(ActivityPriorityChange, conversationUUID, priority, actor)
}
// RecordStatusChange records an activity for a status change.
func (m *Manager) RecordStatusChange(status, conversationUUID string, actor umodels.User) error {
return m.InsertConversationActivity(ActivityStatusChange, conversationUUID, status, actor)
}
// RecordSLASet records an activity for an SLA set.
func (m *Manager) RecordSLASet(conversationUUID string, slaName string, actor umodels.User) error {
return m.InsertConversationActivity(ActivitySLASet, conversationUUID, slaName, actor)
}
// RecordTagChange records an activity for a tag change.
func (m *Manager) RecordTagChange(conversationUUID string, tag string, actor umodels.User) error {
return m.InsertConversationActivity(ActivityTagChange, conversationUUID, tag, actor)
}
// InsertConversationActivity inserts an activity message.
func (m *Manager) InsertConversationActivity(activityType, conversationUUID, newValue string, actor umodels.User) error {
content, err := m.getMessageActivityContent(activityType, newValue, actor.FullName())
if err != nil {
m.lo.Error("error could not generate activity content", "error", err)
return err
}
message := models.Message{
Type: MessageActivity,
Status: MessageStatusSent,
Content: content,
ContentType: ContentTypeText,
ConversationUUID: conversationUUID,
Private: true,
SenderID: actor.ID,
SenderType: SenderTypeAgent,
}
if err := m.InsertMessage(&message); err != nil {
m.lo.Error("error inserting activity message", "error", err)
return envelope.NewError(envelope.GeneralError, "Error inserting activity message", nil)
}
return nil
}
// getConversationUUIDFromMessageUUID returns conversation UUID from message UUID.
func (m *Manager) getConversationUUIDFromMessageUUID(uuid string) (string, error) {
var conversationUUID string
if err := m.q.GetConversationUUIDFromMessageUUID.Get(&conversationUUID, uuid); err != nil {
m.lo.Error("error fetching conversation uuid from message uuid", "uuid", uuid, "error", err)
return conversationUUID, err
}
return conversationUUID, nil
}
// getMessageActivityContent generates activity content based on the activity type.
func (m *Manager) getMessageActivityContent(activityType, newValue, actorName string) (string, error) {
var content = ""
switch activityType {
case ActivityAssignedUserChange:
content = fmt.Sprintf("Assigned to %s by %s", newValue, actorName)
case ActivityAssignedTeamChange:
content = fmt.Sprintf("Assigned to %s team by %s", newValue, actorName)
case ActivitySelfAssign:
content = fmt.Sprintf("%s self-assigned this conversation", actorName)
case ActivityPriorityChange:
content = fmt.Sprintf("%s set priority to %s", actorName, newValue)
case ActivityStatusChange:
content = fmt.Sprintf("%s marked the conversation as %s", actorName, newValue)
case ActivityTagChange:
content = fmt.Sprintf("%s added tag %s", actorName, newValue)
case ActivitySLASet:
content = fmt.Sprintf("%s set %s SLA", actorName, newValue)
default:
return "", fmt.Errorf("invalid activity type %s", activityType)
}
return content, nil
}
// processIncomingMessage handles the insertion of an incoming message and
// associated contact. It finds or creates the contact, checks for existing
// conversations, and creates a new conversation if necessary. It also
// inserts the message, uploads any attachments, and queues the conversation evaluation of automation rules.
func (m *Manager) processIncomingMessage(in models.IncomingMessage) error {
// Find or create contact and set sender ID in message.
if err := m.userStore.CreateContact(&in.Contact); err != nil {
m.lo.Error("error upserting contact", "error", err)
return err
}
in.Message.SenderID = in.Contact.ID
// Conversations exists for this message?
conversationID, err := m.findConversationID([]string{in.Message.SourceID.String})
if err != nil && err != errConversationNotFound {
return err
}
if conversationID > 0 {
return nil
}
// Find or create new conversation.
isNewConversation, err := m.findOrCreateConversation(&in.Message, in.InboxID, in.Contact.ContactChannelID, in.Contact.ID)
if err != nil {
return err
}
// Upload message attachments.
if err := m.uploadMessageAttachments(&in.Message); err != nil {
// Log error but continue processing.
m.lo.Error("error uploading message attachments", "message_source_id", in.Message.SourceID, "error", err)
}
// Insert message.
if err = m.InsertMessage(&in.Message); err != nil {
return err
}
// Evaluate automation rules for this conversation.
if isNewConversation {
m.automation.EvaluateNewConversationRules(in.Message.ConversationUUID)
} else {
m.automation.EvaluateConversationUpdateRules(in.Message.ConversationUUID, amodels.EventConversationMessageIncoming)
// Reopen conversation if it's closed, snoozed, or resolved.
systemUser, err := m.userStore.GetSystemUser()
if err != nil {
m.lo.Error("error fetching system user", "error", err)
return fmt.Errorf("error fetching system user for reopening conversation: %w", err)
}
if err := m.ReOpenConversation(in.Message.ConversationUUID, systemUser); err != nil {
m.lo.Error("error reopening conversation", "error", err)
return fmt.Errorf("error reopening conversation: %w", err)
}
}
return nil
}
// MessageExists checks if a message with the given messageID exists.
func (m *Manager) MessageExists(messageID string) (bool, error) {
_, err := m.findConversationID([]string{messageID})
if err != nil {
if errors.Is(err, errConversationNotFound) {
return false, nil
}
m.lo.Error("error fetching message from db", "error", err)
return false, err
}
return true, nil
}
// EnqueueIncoming enqueues an incoming message for inserting in db.
func (m *Manager) EnqueueIncoming(message models.IncomingMessage) error {
m.closedMu.Lock()
defer m.closedMu.Unlock()
if m.closed {
return errors.New("incoming message queue is closed")
}
select {
case m.incomingMessageQueue <- message:
return nil
default:
m.lo.Warn("WARNING: incoming message queue is full")
return errors.New("incoming message queue is full")
}
}
// GetConversationByMessageID returns conversation by message id.
func (m *Manager) GetConversationByMessageID(id int) (models.Conversation, error) {
var conversation = models.Conversation{}
if err := m.q.GetConversationByMessageID.Get(&conversation, id); err != nil {
if err == sql.ErrNoRows {
return conversation, errConversationNotFound
}
m.lo.Error("error fetching message from DB", "error", err)
return conversation, envelope.NewError(envelope.GeneralError, "Error fetching message", nil)
}
return conversation, nil
}
// generateMessagesQuery generates the SQL query for fetching messages in a conversation.
func (c *Manager) generateMessagesQuery(baseQuery string, qArgs []interface{}, page, pageSize int) (string, int, []interface{}, error) {
if page <= 0 {
return "", 0, nil, errors.New("page must be greater than 0")
}
if pageSize > maxMessagesPerPage {
pageSize = maxMessagesPerPage
}
if pageSize <= 0 {
return "", 0, nil, errors.New("page size must be greater than 0")
}
// Calculate the offset
offset := (page - 1) * pageSize
// Append LIMIT and OFFSET to query arguments
qArgs = append(qArgs, pageSize, offset)
// Include LIMIT and OFFSET in the SQL query
sqlQuery := fmt.Sprintf(baseQuery, fmt.Sprintf("LIMIT $%d OFFSET $%d", len(qArgs)-1, len(qArgs)))
return sqlQuery, pageSize, qArgs, nil
}
// uploadMessageAttachments uploads attachments for a message.
func (m *Manager) uploadMessageAttachments(message *models.Message) []error {
if len(message.Attachments) == 0 {
return nil
}
var uploadErr []error
for _, attachment := range message.Attachments {
// Check if this attachment already exists by the content ID.
if attachment.ContentID != "" {
exists, err := m.mediaStore.ContentIDExists(attachment.ContentID)
if err != nil {
m.lo.Error("error checking media existence by content ID", "content_id", attachment.ContentID, "error", err)
continue
}
if exists {
m.lo.Debug("attachment with content ID already exists", "content_id", attachment.ContentID)
continue
}
}
m.lo.Debug("uploading message attachment", "name", attachment.Name, "content_id", attachment.ContentID, "size", attachment.Size)
// Sanitize filename and upload.
attachment.Name = stringutil.SanitizeFilename(attachment.Name)
attachReader := bytes.NewReader(attachment.Content)
media, err := m.mediaStore.UploadAndInsert(
attachment.Name,
attachment.ContentType,
attachment.ContentID,
/** Linking media to message happens later **/
null.String{}, /** modelType */
null.Int{}, /** modelID **/
attachReader,
attachment.Size,
null.StringFrom(attachment.Disposition),
[]byte("{}"), /** meta **/
)
if err != nil {
uploadErr = append(uploadErr, err)
m.lo.Error("failed to upload attachment", "name", attachment.Name, "error", err)
}
// If the attachment is an image, generate and upload thumbnail.
attachmentExt := strings.TrimPrefix(strings.ToLower(filepath.Ext(attachment.Name)), ".")
if slices.Contains(image.Exts, attachmentExt) {
if err := m.uploadThumbnailForMedia(media, attachment.Content); err != nil {
uploadErr = append(uploadErr, err)
m.lo.Error("error uploading thumbnail", "error", err)
}
}
message.Media = append(message.Media, media)
}
return uploadErr
}
// findOrCreateConversation finds or creates a conversation for the given message.
func (m *Manager) findOrCreateConversation(in *models.Message, inboxID, contactChannelID, contactID int) (bool, error) {
var (
new bool
err error
conversationID int
conversationUUID string
)
// Search for existing conversation.
sourceIDs := in.References
if in.InReplyTo != "" {
sourceIDs = append(sourceIDs, in.InReplyTo)
}
conversationID, err = m.findConversationID(sourceIDs)
if err != nil && err != errConversationNotFound {
return new, err
}
// Conversation not found, create one.
if conversationID == 0 {
new = true
lastMessage := stringutil.HTML2Text(in.Content)
lastMessageAt := time.Now()
conversationID, conversationUUID, err = m.CreateConversation(contactID, contactChannelID, inboxID, lastMessage, lastMessageAt, in.Subject)
if err != nil || conversationID == 0 {
return new, err
}
in.ConversationID = conversationID
in.ConversationUUID = conversationUUID
return new, nil
}
// Get UUID.
if conversationUUID == "" {
conversationUUID, err = m.GetConversationUUID(conversationID)
if err != nil {
return new, err
}
}
in.ConversationID = conversationID
in.ConversationUUID = conversationUUID
return new, nil
}
// findConversationID finds the conversation ID from the message source ID.
func (m *Manager) findConversationID(messageSourceIDs []string) (int, error) {
if len(messageSourceIDs) == 0 {
return 0, errConversationNotFound
}
var conversationID int
if err := m.q.MessageExistsBySourceID.QueryRow(pq.Array(messageSourceIDs)).Scan(&conversationID); err != nil {
if err == sql.ErrNoRows {
return conversationID, errConversationNotFound
}
m.lo.Error("error fetching msg from DB", "error", err)
return conversationID, err
}
return conversationID, nil
}
// attachAttachmentsToMessage attaches attachment blobs to message.
func (m *Manager) attachAttachmentsToMessage(message *models.Message) error {
var attachments attachment.Attachments
// Get all media for this message.
medias, err := m.mediaStore.GetByModel(message.ID, mmodels.ModelMessages)
if err != nil {
m.lo.Error("error fetching message attachments", "error", err)
return err
}
// Fetch blobs.
for _, media := range medias {
blob, err := m.mediaStore.GetBlob(media.UUID)
if err != nil {
m.lo.Error("error fetching media blob", "error", err)
return err
}
attachment := attachment.Attachment{
Name: media.Filename,
Content: blob,
Header: attachment.MakeHeader(media.ContentType, media.UUID, media.Filename, "base64", media.Disposition.String),
}
attachments = append(attachments, attachment)
}
// Attach attachments.
message.Attachments = attachments
return nil
}
// getOutgoingProcessingMessageIDs returns the IDs of outgoing messages currently being processed.
func (m *Manager) getOutgoingProcessingMessageIDs() []int {
var out = make([]int, 0)
m.outgoingProcessingMessages.Range(func(key, _ any) bool {
if k, ok := key.(int); ok {
out = append(out, k)
}
return true
})
return out
}
// uploadThumbnailForMedia prepares and uploads a thumbnail for an image attachment.
func (m *Manager) uploadThumbnailForMedia(media mmodels.Media, content []byte) error {
// Create a reader from the content
file := bytes.NewReader(content)
// Seek to the beginning of the file
file.Seek(0, 0)
// Create the thumbnail
thumbFile, err := image.CreateThumb(image.DefThumbSize, file)
if err != nil {
return fmt.Errorf("error creating thumbnail: %w", err)
}
// Generate thumbnail name
thumbName := fmt.Sprintf("thumb_%s", media.UUID)
// Upload the thumbnail
if _, err := m.mediaStore.Upload(thumbName, media.ContentType, thumbFile); err != nil {
m.lo.Error("error uploading thumbnail", "error", err)
return fmt.Errorf("error uploading thumbnail: %w", err)
}
return nil
}