fix[SLA]: Use conversation created_at time as start time for SLA calculation. This makes sure when SLA is changed for a conversation the elapsed time carries over.

refactor: improve error handling and update comments for clarity
This commit is contained in:
Abhinav Raut
2025-02-02 23:51:50 +05:30
parent d3c2cc2527
commit 6993d972ab
8 changed files with 63 additions and 58 deletions

View File

@@ -393,21 +393,21 @@ func handleUpdateTeamAssignee(r *fastglue.Request) error {
return sendErrorEnvelope(r, err)
}
// Apply SLA policy if team has changed and has an SLA policy.
// Evaluate automation rules on team assignment.
app.automation.EvaluateConversationUpdateRules(uuid, models.EventConversationTeamAssigned)
// Apply SLA policy if team has changed and the new team has an SLA policy.
if conversation.AssignedTeamID.Int != assigneeID && assigneeID != 0 {
team, err := app.team.Get(assigneeID)
if err != nil {
return sendErrorEnvelope(r, err)
}
if team.SLAPolicyID.Int != 0 {
if err := app.conversation.ApplySLA(conversation.UUID, conversation.ID, conversation.AssignedTeamID.Int, team.SLAPolicyID.Int, user); err != nil {
if err := app.conversation.ApplySLA(*conversation, team.SLAPolicyID.Int, user); err != nil {
return sendErrorEnvelope(r, err)
}
}
}
// Evaluate automation rules on team assignment.
app.automation.EvaluateConversationUpdateRules(uuid, models.EventConversationTeamAssigned)
return r.SendEnvelope("Team assigned successfully")
}

View File

@@ -136,12 +136,13 @@ func main() {
loadSettings(settings)
var (
autoAssignInterval = ko.MustDuration("autoassigner.interval")
autoAssignInterval = ko.MustDuration("autoassigner.autoassign_interval")
unsnoozeInterval = ko.MustDuration("conversation.unsnooze_interval")
automationWrk = ko.MustInt("automation.worker_count")
automationWorkers = ko.MustInt("automation.worker_count")
messageOutgoingQWorkers = ko.MustDuration("message.outgoing_queue_workers")
messageIncomingQWorkers = ko.MustDuration("message.incoming_queue_workers")
messageOutgoingScanInterval = ko.MustDuration("message.message_outoing_scan_interval")
slaEvaluationInterval = ko.MustDuration("sla.evaluation_interval")
lo = initLogger("libredesk")
wsHub = ws.NewHub()
rdb = initRedis()
@@ -167,14 +168,13 @@ func main() {
automation.SetConversationStore(conversation)
startInboxes(ctx, inbox, conversation)
go automation.Run(ctx, automationWrk)
go automation.Run(ctx, automationWorkers)
go autoassigner.Run(ctx, autoAssignInterval)
go conversation.Run(ctx, messageIncomingQWorkers, messageOutgoingQWorkers, messageOutgoingScanInterval)
go conversation.RunUnsnoozer(ctx, unsnoozeInterval)
go media.DeleteUnlinkedMedia(ctx)
go notifier.Run(ctx)
go sla.Run(ctx)
go sla.Run(ctx, slaEvaluationInterval)
go media.DeleteUnlinkedMedia(ctx)
var app = &App{
lo: lo,
@@ -230,25 +230,23 @@ func main() {
// Wait for shutdown signal.
<-ctx.Done()
colorlog.Red("Shutting down the server. Please wait....")
colorlog.Red("Shutting down HTTP server...")
s.Shutdown()
colorlog.Red("Server shutdown complete.")
colorlog.Red("Shutting down services. Please wait....")
colorlog.Red("Shutting down inboxes...")
inbox.Close()
colorlog.Red("Inbox shutdown complete.")
colorlog.Red("Shutting down automation...")
automation.Close()
colorlog.Red("Automation shutdown complete.")
colorlog.Red("Shutting down autoassigner...")
autoassigner.Close()
colorlog.Red("Autoassigner shutdown complete.")
colorlog.Red("Shutting down notifier...")
notifier.Close()
colorlog.Red("Notifier shutdown complete.")
colorlog.Red("Shutting down conversation...")
conversation.Close()
colorlog.Red("Conversation shutdown complete.")
colorlog.Red("Shutting down SLA...")
sla.Close()
colorlog.Red("SLA shutdown complete.")
colorlog.Red("Shutting down database...")
db.Close()
colorlog.Red("Database shutdown complete.")
colorlog.Red("Shutting down redis...")
rdb.Close()
colorlog.Red("Redis shutdown complete.")
colorlog.Green("Shutdown complete.")
}

View File

@@ -117,7 +117,7 @@ func (e *Engine) Run(ctx context.Context, workerCount int) {
go e.worker(ctx)
}
// Ticker for timed triggers.
// Hourly ticker for timed triggers.
ticker := time.NewTicker(1 * time.Hour)
defer func() {
ticker.Stop()

View File

@@ -40,9 +40,9 @@ import (
var (
//go:embed queries.sql
efs embed.FS
ErrConversationNotFound = errors.New("conversation not found")
ConversationsListAllowedFilterFields = []string{"status_id", "priority_id", "assigned_team_id", "assigned_user_id", "inbox_id"}
ConversationStatusesFilterFields = []string{"id", "name"}
errConversationNotFound = errors.New("conversation not found")
conversationsListAllowedFilterFields = []string{"status_id", "priority_id", "assigned_team_id", "assigned_user_id", "inbox_id"}
conversationStatusesFilterFields = []string{"id", "name"}
csatReplyMessage = "Please rate your experience with us: <a href=\"%s\">Rate now</a>"
)
@@ -78,7 +78,7 @@ type Manager struct {
}
type slaStore interface {
ApplySLA(conversationID, assignedTeamID, slaID int) (slaModels.SLAPolicy, error)
ApplySLA(startTime time.Time, conversationID, assignedTeamID, slaID int) (slaModels.SLAPolicy, error)
}
type statusStore interface {
@@ -380,7 +380,7 @@ func (c *Manager) ReOpenConversation(conversationUUID string, actor umodels.User
// Record the status change as an activity.
if err := c.RecordStatusChange(models.StatusOpen, conversationUUID, actor); err != nil {
return envelope.NewError(envelope.GeneralError, "Error recording status change", nil)
return err
}
}
return nil
@@ -687,8 +687,8 @@ func (c *Manager) makeConversationsListQuery(userID int, teamIDs []int, listType
Page: page,
PageSize: pageSize,
}, filtersJSON, dbutil.AllowedFields{
"conversations": ConversationsListAllowedFilterFields,
"conversation_statuses": ConversationStatusesFilterFields,
"conversations": conversationsListAllowedFilterFields,
"conversation_statuses": conversationStatusesFilterFields,
})
}
@@ -716,7 +716,7 @@ func (m *Manager) GetLatestReceivedMessageSourceID(conversationID int) (string,
func (m *Manager) SendAssignedConversationEmail(userIDs []int, conversation models.Conversation) error {
agent, err := m.userStore.Get(userIDs[0])
if err != nil {
m.lo.Error("error fetching agent", "error", err)
m.lo.Error("error fetching agent", "user_id", userIDs[0], "error", err)
return fmt.Errorf("fetching agent: %w", err)
}
@@ -760,16 +760,16 @@ func (m *Manager) UnassignOpen(userID int) error {
}
// ApplySLA applies the SLA policy to a conversation.
func (m *Manager) ApplySLA(conversationUUID string, conversationID, assignedTeamID, policyID int, actor umodels.User) error {
policy, err := m.slaStore.ApplySLA(conversationID, assignedTeamID, policyID)
func (m *Manager) ApplySLA(conversation models.Conversation, policyID int, actor umodels.User) error {
policy, err := m.slaStore.ApplySLA(conversation.CreatedAt, conversation.ID, conversation.AssignedTeamID.Int, policyID)
if err != nil {
m.lo.Error("error applying SLA", "error", err)
m.lo.Error("error applying SLA to conversation", "conversation_id", conversation.ID, "policy_id", policyID, "error", err)
return envelope.NewError(envelope.GeneralError, "Error applying SLA", nil)
}
// Record the SLA application as an activity.
if err := m.RecordSLASet(conversationUUID, policy.Name, actor); err != nil {
return envelope.NewError(envelope.GeneralError, "Error recording SLA application", nil)
if err := m.RecordSLASet(conversation.UUID, policy.Name, actor); err != nil {
return err
}
return nil
}
@@ -816,7 +816,7 @@ func (m *Manager) ApplyAction(action amodels.RuleAction, conv models.Conversatio
return m.SendReply([]mmodels.Media{}, user.ID, conv.UUID, action.Value[0], nil, nil, nil)
case amodels.ActionSetSLA:
slaID, _ := strconv.Atoi(action.Value[0])
return m.ApplySLA(conv.UUID, conv.ID, conv.AssignedTeamID.Int, slaID, user)
return m.ApplySLA(conv, slaID, user)
case amodels.ActionSetTags:
return m.UpsertConversationTags(conv.UUID, action.Value)
case amodels.ActionSendCSAT:

View File

@@ -421,8 +421,10 @@ func (m *Manager) InsertConversationActivity(activityType, conversationUUID, new
SenderType: SenderTypeUser,
}
// InsertMessage message in DB.
m.InsertMessage(&message)
if err := m.InsertMessage(&message); err != nil {
m.lo.Error("error inserting activity message", "error", err)
return envelope.NewError(envelope.GeneralError, "Error inserting activity message", nil)
}
return nil
}
@@ -474,7 +476,7 @@ func (m *Manager) processIncomingMessage(in models.IncomingMessage) error {
// Conversations exists for this message?
conversationID, err := m.findConversationID([]string{in.Message.SourceID.String})
if err != nil && err != ErrConversationNotFound {
if err != nil && err != errConversationNotFound {
return err
}
if conversationID > 0 {
@@ -521,7 +523,7 @@ func (m *Manager) processIncomingMessage(in models.IncomingMessage) error {
func (m *Manager) MessageExists(messageID string) (bool, error) {
_, err := m.findConversationID([]string{messageID})
if err != nil {
if errors.Is(err, ErrConversationNotFound) {
if errors.Is(err, errConversationNotFound) {
return false, nil
}
m.lo.Error("error fetching message from db", "error", err)
@@ -552,7 +554,7 @@ func (m *Manager) GetConversationByMessageID(id int) (models.Conversation, error
var conversation = models.Conversation{}
if err := m.q.GetConversationByMessageID.Get(&conversation, id); err != nil {
if err == sql.ErrNoRows {
return conversation, ErrConversationNotFound
return conversation, errConversationNotFound
}
m.lo.Error("error fetching message from DB", "error", err)
return conversation, envelope.NewError(envelope.GeneralError, "Error fetching message", nil)
@@ -652,7 +654,7 @@ func (m *Manager) findOrCreateConversation(in *models.Message, inboxID, contactC
sourceIDs = append(sourceIDs, in.InReplyTo)
}
conversationID, err = m.findConversationID(sourceIDs)
if err != nil && err != ErrConversationNotFound {
if err != nil && err != errConversationNotFound {
return new, err
}
@@ -684,12 +686,12 @@ func (m *Manager) findOrCreateConversation(in *models.Message, inboxID, contactC
// findConversationID finds the conversation ID from the message source ID.
func (m *Manager) findConversationID(messageSourceIDs []string) (int, error) {
if len(messageSourceIDs) == 0 {
return 0, ErrConversationNotFound
return 0, errConversationNotFound
}
var conversationID int
if err := m.q.MessageExistsBySourceID.QueryRow(pq.Array(messageSourceIDs)).Scan(&conversationID); err != nil {
if err == sql.ErrNoRows {
return conversationID, ErrConversationNotFound
return conversationID, errConversationNotFound
}
m.lo.Error("error fetching msg from DB", "error", err)
return conversationID, err

View File

@@ -136,7 +136,6 @@ func (e *Email) processEnvelope(ctx context.Context, client *imapclient.Client,
}
if exists {
e.lo.Debug("message already exists", "message_id", env.MessageID)
return nil
}

View File

@@ -43,6 +43,8 @@ next_sla_deadline_at = LEAST(
WHERE id IN (SELECT conversation_id FROM new_sla);
-- name: get-pending-slas
-- Get all the applied SLAs that are not yet breached or met and is also set on the conversation.
-- This make sure when SLA is changed, we don't update the breached or met status of the previous SLA.
SELECT a.id, a.first_response_deadline_at, c.first_reply_at as first_response_at,
a.resolution_deadline_at, c.resolved_at as resolved_at
FROM applied_slas a

View File

@@ -137,15 +137,15 @@ func (m *Manager) Update(id int, name, description string, firstResponseTime, re
return nil
}
// getBusinessHours returns the business hours ID and timezone for a team.
func (m *Manager) getBusinessHours(assignedTeamID int) (bmodels.BusinessHours, string, error) {
// getBusinessHoursAndTimezone returns the business hours ID and timezone for a team, falling back to app settings.
func (m *Manager) getBusinessHoursAndTimezone(assignedTeamID int) (bmodels.BusinessHours, string, error) {
var (
businessHrsID int
timezone string
bh bmodels.BusinessHours
)
// Fetch from team if assigned.
// Fetch from team if assignedTeamID is provided.
if assignedTeamID != 0 {
team, err := m.teamStore.Get(assignedTeamID)
if err != nil {
@@ -193,18 +193,19 @@ func (m *Manager) getBusinessHours(assignedTeamID int) (bmodels.BusinessHours, s
func (m *Manager) CalculateDeadlines(startTime time.Time, slaPolicyID, assignedTeamID int) (Deadlines, error) {
var deadlines Deadlines
businessHrs, timezone, err := m.getBusinessHours(assignedTeamID)
businessHrs, timezone, err := m.getBusinessHoursAndTimezone(assignedTeamID)
if err != nil {
return deadlines, err
}
m.lo.Info("calculating deadlines", "business_hours", businessHrs.Hours, "timezone", timezone, "always_open", businessHrs.IsAlwaysOpen)
m.lo.Info("calculating deadlines", "timezone", timezone, "business_hours_always_open", businessHrs.IsAlwaysOpen, "business_hours", businessHrs.Hours)
sla, err := m.Get(slaPolicyID)
if err != nil {
return deadlines, err
}
// Helper function to calculate deadlines by parsing the duration string.
calculateDeadline := func(durationStr string) (time.Time, error) {
if durationStr == "" {
return time.Time{}, nil
@@ -219,6 +220,7 @@ func (m *Manager) CalculateDeadlines(startTime time.Time, slaPolicyID, assignedT
}
return deadline, nil
}
if deadlines.FirstResponse, err = calculateDeadline(sla.FirstResponseTime); err != nil {
return deadlines, err
}
@@ -229,10 +231,10 @@ func (m *Manager) CalculateDeadlines(startTime time.Time, slaPolicyID, assignedT
}
// ApplySLA applies an SLA policy to a conversation.
func (m *Manager) ApplySLA(conversationID, assignedTeamID, slaPolicyID int) (models.SLAPolicy, error) {
func (m *Manager) ApplySLA(startTime time.Time, conversationID, assignedTeamID, slaPolicyID int) (models.SLAPolicy, error) {
var sla models.SLAPolicy
deadlines, err := m.CalculateDeadlines(time.Now(), slaPolicyID, assignedTeamID)
deadlines, err := m.CalculateDeadlines(startTime, slaPolicyID, assignedTeamID)
if err != nil {
return sla, err
}
@@ -263,11 +265,11 @@ func (m *Manager) GetLatestDeadlines(conversationID int) (time.Time, time.Time,
}
// Run starts the SLA evaluation loop and evaluates pending SLAs.
func (m *Manager) Run(ctx context.Context) {
func (m *Manager) Run(ctx context.Context, evalInterval time.Duration) {
m.wg.Add(1)
defer m.wg.Done()
ticker := time.NewTicker(2 * time.Minute)
ticker := time.NewTicker(evalInterval)
defer ticker.Stop()
for {
@@ -289,6 +291,7 @@ func (m *Manager) Close() error {
}
// evaluatePendingSLAs fetches unbreached SLAs and evaluates them.
// Here evaluation means checking if the SLA deadlines have been met or breached and updating timestamps accordingly.
func (m *Manager) evaluatePendingSLAs(ctx context.Context) error {
var pendingSLAs []models.AppliedSLA
if err := m.q.GetPendingSLAs.SelectContext(ctx, &pendingSLAs); err != nil {
@@ -306,6 +309,7 @@ func (m *Manager) evaluatePendingSLAs(ctx context.Context) error {
}
}
}
m.lo.Info("evaluated pending SLAs", "count", len(pendingSLAs))
return nil
}
@@ -318,15 +322,15 @@ func (m *Manager) evaluateSLA(sla models.AppliedSLA) error {
}
if !metAt.Valid && now.After(deadline) {
_, err := m.q.UpdateBreach.Exec(sla.ID, slaType)
return err
return fmt.Errorf("updating SLA breach: %w", err)
}
if metAt.Valid {
if metAt.Time.After(deadline) {
_, err := m.q.UpdateBreach.Exec(sla.ID, slaType)
return err
return fmt.Errorf("updating SLA breach: %w", err)
}
_, err := m.q.UpdateMet.Exec(sla.ID, slaType)
return err
return fmt.Errorf("updating SLA met: %w", err)
}
return nil
}