wip: oidc

This commit is contained in:
Abhinav Raut
2024-07-31 04:02:34 +05:30
parent 3d6d513ba9
commit ead54665fb
39 changed files with 798 additions and 549 deletions

View File

@@ -49,7 +49,7 @@ const (
maxMessagesPerPage = 30
)
// ListenAndDispatch starts worker pool to process incoming and outgoing messages.
// ListenAndDispatch starts worker pool to process incoming and send pending outgoing messages.
func (m *Manager) ListenAndDispatch(ctx context.Context, dispatchConcurrency, readerConcurrency int, readInterval time.Duration) {
// Spawn a worker goroutine pool to dispatch messages.
for range dispatchConcurrency {
@@ -155,7 +155,7 @@ func (m *Manager) MessageDispatchWorker(ctx context.Context) {
err = inbox.Send(message)
// Update status.
var newStatus = MessageStatusFailed
var newStatus = MessageStatusSent
if err != nil {
newStatus = MessageStatusFailed
m.lo.Error("error sending message", "error", err, "inbox_id", message.InboxID)
@@ -167,9 +167,6 @@ func (m *Manager) MessageDispatchWorker(ctx context.Context) {
m.UpdateConversationFirstReplyAt(message.ConversationUUID, message.ConversationID, message.CreatedAt)
}
// Broadcast update to all subscribers.
m.wsHub.BroadcastMessagePropUpdate(message.ConversationUUID, message.UUID, "status" /*message field*/, newStatus)
// Delete from processing map.
m.outgoingProcessingMessages.Delete(message.ID)
}
@@ -221,6 +218,10 @@ func (m *Manager) UpdateMessageStatus(uuid string, status string) error {
m.lo.Error("error updating message status in DB", "error", err, "uuid", uuid)
return err
}
// Broadcast messge status update to all conversation subscribers.
conversationUUID, _ := m.getConversationUUIDFromMessageUUID(uuid)
m.wsHub.BroadcastMessagePropUpdate(conversationUUID, uuid, "status" /*message field*/, status)
return nil
}
@@ -234,6 +235,7 @@ func (m *Manager) MarkMessageAsPending(uuid string) error {
// InsertMessage inserts a message and attaches the attachments to the message.
func (m *Manager) InsertMessage(message *models.Message) error {
// Private message is always sent.
if message.Private {
message.Status = MessageStatusSent
}
@@ -242,7 +244,7 @@ func (m *Manager) InsertMessage(message *models.Message) error {
message.Meta = "{}"
}
// InsertMessage.
// Insert Message.
if err := m.q.InsertMessage.QueryRow(message.Type, message.Status, message.ConversationID, message.ConversationUUID, message.Content, message.SenderID, message.SenderType,
message.Private, message.ContentType, message.SourceID, message.InboxID, message.Meta).Scan(&message.ID, &message.UUID, &message.CreatedAt); err != nil {
m.lo.Error("error inserting message in db", "error", err)
@@ -251,8 +253,18 @@ func (m *Manager) InsertMessage(message *models.Message) error {
// Attach message to the media.
for _, media := range message.Media {
m.mediaStore.AttachToModel(media.ID, mmodels.ModelMessages, message.ID)
m.mediaStore.Attach(media.ID, mmodels.ModelMessages, message.ID)
}
// Add this user as a participant.
m.AddConversationParticipant(message.SenderID, message.ConversationUUID)
// Update conversation meta with the last message details.
message.TrimmedContent = stringutil.Trim(message.Content, 45)
m.UpdateConversationLastMessage(0, message.ConversationUUID, message.TrimmedContent, message.CreatedAt)
// Broadcast new message to all conversation subscribers.
m.BroadcastNewConversationMessage(message)
return nil
}
@@ -325,15 +337,19 @@ func (m *Manager) InsertConversationActivity(activityType, conversationUUID, new
// InsertMessage message in DB.
m.InsertMessage(&message)
// Broadcast the new message to all subscribers.
m.BroadcastNewConversationMessage(message, content)
// Update the last message in conversation meta.
m.UpdateConversationLastMessage(0, conversationUUID, content, message.CreatedAt)
return nil
}
// getConversationUUIDFromMessageUUID returns conversation UUID from message UUID.
func (m *Manager) getConversationUUIDFromMessageUUID(uuid string) (string, error) {
var conversationUUID string
if err := m.q.GetConversationUUIDFromMessageUUID.Get(&conversationUUID, uuid); err != nil {
m.lo.Error("error fetching conversation uuid from message uuid", "uuid", uuid, "error", err)
return conversationUUID, err
}
return conversationUUID, nil
}
// getMessageActivityContent generates activity content based on the activity type.
func (m *Manager) getMessageActivityContent(activityType, newValue, actorName string) (string, error) {
var content = ""
@@ -384,24 +400,12 @@ func (m *Manager) processIncomingMessage(in models.IncomingMessage) error {
// Insert message.
if err = m.InsertMessage(&in.Message); err != nil {
return fmt.Errorf("inserting conversation message: %w", err)
return err
}
// Upload attachments.
if err := m.uploadMessageAttachments(&in.Message); err != nil {
return fmt.Errorf("uploading message attachments: %w", err)
}
// Send WS update to all subscribers.
if in.Message.ConversationUUID != "" {
var content string
if isNewConversation {
content = stringutil.Trim(in.Message.Subject, maxLastMessageLen)
} else {
content = stringutil.Trim(in.Message.Content, maxLastMessageLen)
}
m.BroadcastNewConversationMessage(in.Message, content)
m.UpdateConversationLastMessage(in.Message.ConversationID, in.Message.ConversationUUID, content, in.Message.CreatedAt)
return err
}
// Evaluate automation rules for this conversation.
@@ -480,7 +484,7 @@ func (m *Manager) uploadMessageAttachments(message *models.Message) error {
}
// Attach message to media.
if err := m.mediaStore.AttachToModel(media.ID, mmodels.ModelMessages, message.ID); err != nil {
if err := m.mediaStore.Attach(media.ID, mmodels.ModelMessages, message.ID); err != nil {
m.lo.Error("error attaching message to media", "error", err)
return err
}
@@ -610,6 +614,6 @@ func (m *Manager) getOutgoingProcessingMessageIDs() []int {
}
// BroadcastNewConversationMessage broadcasts a new conversation message to subscribers.
func (m *Manager) BroadcastNewConversationMessage(message models.Message, content string) {
m.wsHub.BroadcastNewConversationMessage(message.ConversationUUID, content, message.UUID, time.Now().Format(time.RFC3339), message.Private)
func (m *Manager) BroadcastNewConversationMessage(message *models.Message) {
m.wsHub.BroadcastNewConversationMessage(message.ConversationUUID, message.TrimmedContent, message.UUID, message.CreatedAt.Format(time.RFC3339), message.Type, message.Private)
}