mirror of
https://github.com/abhinavxd/libredesk.git
synced 2025-11-08 16:01:53 +00:00
feat: command menu on press ctrl + k
feat: CSAT email when a conversation is marked resolved.
feat(AI) ✨ agent message rewriting with OpenAI prompts
373 lines
11 KiB
Go
373 lines
11 KiB
Go
// Package automation automatically evaluates and applies rules to conversations based on events like new conversations, updates, and time triggers,
|
|
// and performs some actions if they are true.
|
|
package automation
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"embed"
|
|
"encoding/json"
|
|
"slices"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/abhinavxd/artemis/internal/automation/models"
|
|
cmodels "github.com/abhinavxd/artemis/internal/conversation/models"
|
|
"github.com/abhinavxd/artemis/internal/dbutil"
|
|
"github.com/abhinavxd/artemis/internal/envelope"
|
|
mmodels "github.com/abhinavxd/artemis/internal/media/models"
|
|
umodels "github.com/abhinavxd/artemis/internal/user/models"
|
|
"github.com/jmoiron/sqlx"
|
|
"github.com/lib/pq"
|
|
"github.com/zerodha/logf"
|
|
)
|
|
|
|
var (
|
|
//go:embed queries.sql
|
|
efs embed.FS
|
|
|
|
// MaxQueueSize defines the maximum size of the task queues.
|
|
MaxQueueSize = 5000
|
|
)
|
|
|
|
// TaskType represents the type of conversation task.
|
|
type TaskType string
|
|
|
|
const (
|
|
NewConversation TaskType = "new"
|
|
UpdateConversation TaskType = "update"
|
|
TimeTrigger TaskType = "time-trigger"
|
|
)
|
|
|
|
// ConversationTask represents a unit of work for processing conversations.
|
|
type ConversationTask struct {
|
|
taskType TaskType
|
|
eventType string
|
|
conversationUUID string
|
|
}
|
|
|
|
type Engine struct {
|
|
rules []models.Rule
|
|
rulesMu sync.RWMutex
|
|
q queries
|
|
lo *logf.Logger
|
|
conversationStore ConversationStore
|
|
slaStore SLAStore
|
|
systemUser umodels.User
|
|
taskQueue chan ConversationTask
|
|
closed bool
|
|
closedMu sync.RWMutex
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
type Opts struct {
|
|
DB *sqlx.DB
|
|
Lo *logf.Logger
|
|
}
|
|
|
|
type ConversationStore interface {
|
|
GetConversation(id int, uuid string) (cmodels.Conversation, error)
|
|
GetConversationsCreatedAfter(t time.Time) ([]cmodels.Conversation, error)
|
|
UpdateConversationTeamAssignee(uuid string, teamID int, actor umodels.User) error
|
|
UpdateConversationUserAssignee(uuid string, assigneeID int, actor umodels.User) error
|
|
UpdateConversationStatus(uuid string, status, snoozeDur []byte, actor umodels.User) error
|
|
UpdateConversationPriority(uuid string, priority []byte, actor umodels.User) error
|
|
SendPrivateNote(media []mmodels.Media, senderID int, conversationUUID, content string) error
|
|
SendReply(media []mmodels.Media, senderID int, conversationUUID, content, meta string) error
|
|
RecordSLASet(conversationUUID string, actor umodels.User) error
|
|
}
|
|
|
|
type SLAStore interface {
|
|
ApplySLA(conversationID, slaID int) error
|
|
}
|
|
|
|
type queries struct {
|
|
GetAll *sqlx.Stmt `query:"get-all"`
|
|
GetRule *sqlx.Stmt `query:"get-rule"`
|
|
InsertRule *sqlx.Stmt `query:"insert-rule"`
|
|
UpdateRule *sqlx.Stmt `query:"update-rule"`
|
|
DeleteRule *sqlx.Stmt `query:"delete-rule"`
|
|
ToggleRule *sqlx.Stmt `query:"toggle-rule"`
|
|
GetEnabledRules *sqlx.Stmt `query:"get-enabled-rules"`
|
|
}
|
|
|
|
// New initializes a new Engine.
|
|
func New(systemUser umodels.User, opt Opts) (*Engine, error) {
|
|
var (
|
|
q queries
|
|
e = &Engine{
|
|
systemUser: systemUser,
|
|
lo: opt.Lo,
|
|
taskQueue: make(chan ConversationTask, MaxQueueSize),
|
|
}
|
|
)
|
|
if err := dbutil.ScanSQLFile("queries.sql", &q, opt.DB, efs); err != nil {
|
|
return nil, err
|
|
}
|
|
e.q = q
|
|
e.rules = e.queryRules()
|
|
return e, nil
|
|
}
|
|
|
|
// SetConversationStore sets conversations store.
|
|
func (e *Engine) SetConversationStore(store ConversationStore, slaStore SLAStore) {
|
|
e.conversationStore = store
|
|
e.slaStore = slaStore
|
|
}
|
|
|
|
// ReloadRules reloads automation rules from DB.
|
|
func (e *Engine) ReloadRules() {
|
|
e.rulesMu.Lock()
|
|
defer e.rulesMu.Unlock()
|
|
e.lo.Debug("reloading automation engine rules")
|
|
e.rules = e.queryRules()
|
|
}
|
|
|
|
// Run starts the Engine with a worker pool to evaluate rules based on events.
|
|
func (e *Engine) Run(ctx context.Context, workerCount int) {
|
|
// Start the worker pool
|
|
for i := 0; i < workerCount; i++ {
|
|
e.wg.Add(1)
|
|
go e.worker(ctx)
|
|
}
|
|
|
|
// Ticker for timed triggers.
|
|
ticker := time.NewTicker(1 * time.Hour)
|
|
defer func() {
|
|
ticker.Stop()
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
e.lo.Info("queuing time triggers")
|
|
e.taskQueue <- ConversationTask{taskType: TimeTrigger}
|
|
}
|
|
}
|
|
}
|
|
|
|
// worker processes tasks from the taskQueue until it's closed or context is done.
|
|
func (e *Engine) worker(ctx context.Context) {
|
|
defer e.wg.Done()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case task, ok := <-e.taskQueue:
|
|
if !ok {
|
|
return
|
|
}
|
|
switch task.taskType {
|
|
case NewConversation:
|
|
e.handleNewConversation(task.conversationUUID)
|
|
case UpdateConversation:
|
|
e.handleUpdateConversation(task.conversationUUID, task.eventType)
|
|
case TimeTrigger:
|
|
e.handleTimeTrigger()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Close signals the Engine to stop accepting any more messages and waits for all workers to finish.
|
|
func (e *Engine) Close() {
|
|
e.closedMu.Lock()
|
|
defer e.closedMu.Unlock()
|
|
if e.closed {
|
|
return
|
|
}
|
|
e.closed = true
|
|
close(e.taskQueue)
|
|
// Wait for all workers.
|
|
e.wg.Wait()
|
|
}
|
|
|
|
// GetAllRules retrieves all rules of a specific type.
|
|
func (e *Engine) GetAllRules(typ []byte) ([]models.RuleRecord, error) {
|
|
var rules = make([]models.RuleRecord, 0)
|
|
if err := e.q.GetAll.Select(&rules, typ); err != nil {
|
|
e.lo.Error("error fetching rules", "error", err)
|
|
return rules, envelope.NewError(envelope.GeneralError, "Error fetching automation rules.", nil)
|
|
}
|
|
return rules, nil
|
|
}
|
|
|
|
// GetRule retrieves a rule by ID.
|
|
func (e *Engine) GetRule(id int) (models.RuleRecord, error) {
|
|
var rule models.RuleRecord
|
|
if err := e.q.GetRule.Get(&rule, id); err != nil {
|
|
if err == sql.ErrNoRows {
|
|
return rule, envelope.NewError(envelope.InputError, "Rule not found.", nil)
|
|
}
|
|
e.lo.Error("error fetching rule", "error", err)
|
|
return rule, envelope.NewError(envelope.GeneralError, "Error fetching automation rule.", nil)
|
|
}
|
|
return rule, nil
|
|
}
|
|
|
|
// ToggleRule toggles the active status of a rule by ID.
|
|
func (e *Engine) ToggleRule(id int) error {
|
|
if _, err := e.q.ToggleRule.Exec(id); err != nil {
|
|
e.lo.Error("error toggling rule", "error", err)
|
|
return envelope.NewError(envelope.GeneralError, "Error toggling automation rule.", nil)
|
|
}
|
|
// Reload rules.
|
|
e.ReloadRules()
|
|
return nil
|
|
}
|
|
|
|
// UpdateRule updates an existing rule.
|
|
func (e *Engine) UpdateRule(id int, rule models.RuleRecord) error {
|
|
if _, err := e.q.UpdateRule.Exec(id, rule.Name, rule.Description, rule.Type, pq.Array(rule.Events), rule.Rules); err != nil {
|
|
e.lo.Error("error updating rule", "error", err)
|
|
return envelope.NewError(envelope.GeneralError, "Error updating automation rule.", nil)
|
|
}
|
|
// Reload rules.
|
|
e.ReloadRules()
|
|
return nil
|
|
}
|
|
|
|
// CreateRule creates a new rule.
|
|
func (e *Engine) CreateRule(rule models.RuleRecord) error {
|
|
if _, err := e.q.InsertRule.Exec(rule.Name, rule.Description, rule.Type, pq.Array(rule.Events), rule.Rules); err != nil {
|
|
e.lo.Error("error creating rule", "error", err)
|
|
return envelope.NewError(envelope.GeneralError, "Error creating automation rule.", nil)
|
|
}
|
|
// Reload rules.
|
|
e.ReloadRules()
|
|
return nil
|
|
}
|
|
|
|
// DeleteRule deletes a rule by ID.
|
|
func (e *Engine) DeleteRule(id int) error {
|
|
if _, err := e.q.DeleteRule.Exec(id); err != nil {
|
|
e.lo.Error("error deleting rule", "error", err)
|
|
return envelope.NewError(envelope.GeneralError, "Error deleting automation rule.", nil)
|
|
}
|
|
// Reload rules.
|
|
e.ReloadRules()
|
|
return nil
|
|
}
|
|
|
|
// handleNewConversation handles new conversation events.
|
|
func (e *Engine) handleNewConversation(conversationUUID string) {
|
|
conversation, err := e.conversationStore.GetConversation(0, conversationUUID)
|
|
if err != nil {
|
|
e.lo.Error("error fetching conversation for new event", "uuid", conversationUUID, "error", err)
|
|
return
|
|
}
|
|
rules := e.filterRulesByType(models.RuleTypeNewConversation, "")
|
|
e.evalConversationRules(rules, conversation)
|
|
}
|
|
|
|
// handleUpdateConversation handles update conversation events with specific eventType.
|
|
func (e *Engine) handleUpdateConversation(conversationUUID, eventType string) {
|
|
conversation, err := e.conversationStore.GetConversation(0, conversationUUID)
|
|
if err != nil {
|
|
e.lo.Error("error fetching conversation for update event", "uuid", conversationUUID, "error", err)
|
|
return
|
|
}
|
|
rules := e.filterRulesByType(models.RuleTypeConversationUpdate, eventType)
|
|
e.evalConversationRules(rules, conversation)
|
|
}
|
|
|
|
// handleTimeTrigger handles time trigger events.
|
|
func (e *Engine) handleTimeTrigger() {
|
|
thirtyDaysAgo := time.Now().Add(-30 * 24 * time.Hour)
|
|
conversations, err := e.conversationStore.GetConversationsCreatedAfter(thirtyDaysAgo)
|
|
if err != nil {
|
|
e.lo.Error("error fetching conversations for time trigger", "error", err)
|
|
return
|
|
}
|
|
rules := e.filterRulesByType(models.RuleTypeTimeTrigger, "")
|
|
e.lo.Debug("fetched conversations for evaluating time triggers", "conversations_count", len(conversations), "rules_count", len(rules))
|
|
for _, conversation := range conversations {
|
|
e.evalConversationRules(rules, conversation)
|
|
}
|
|
}
|
|
|
|
// EvaluateNewConversationRules enqueues a new conversation for rule evaluation.
|
|
func (e *Engine) EvaluateNewConversationRules(conversationUUID string) {
|
|
e.closedMu.RLock()
|
|
defer e.closedMu.RUnlock()
|
|
if e.closed {
|
|
return
|
|
}
|
|
select {
|
|
case e.taskQueue <- ConversationTask{
|
|
taskType: NewConversation,
|
|
conversationUUID: conversationUUID,
|
|
}:
|
|
default:
|
|
// Queue is full.
|
|
e.lo.Warn("EvaluateNewConversationRules: newConversationQ is full, unable to enqueue conversation")
|
|
}
|
|
}
|
|
|
|
// EvaluateConversationUpdateRules enqueues an updated conversation for rule evaluation.
|
|
func (e *Engine) EvaluateConversationUpdateRules(conversationUUID string, eventType string) {
|
|
if eventType == "" {
|
|
e.lo.Error("error evaluating conversation update rules: eventType is empty")
|
|
return
|
|
}
|
|
e.closedMu.RLock()
|
|
defer e.closedMu.RUnlock()
|
|
if e.closed {
|
|
return
|
|
}
|
|
select {
|
|
case e.taskQueue <- ConversationTask{
|
|
taskType: UpdateConversation,
|
|
conversationUUID: conversationUUID,
|
|
eventType: eventType,
|
|
}:
|
|
default:
|
|
// Queue is full.
|
|
e.lo.Warn("EvaluateConversationUpdateRules: updateConversationQ is full, unable to enqueue conversation")
|
|
}
|
|
}
|
|
|
|
// queryRules fetches automation rules from the database.
|
|
func (e *Engine) queryRules() []models.Rule {
|
|
var (
|
|
rules []models.RuleRecord
|
|
filteredRules []models.Rule
|
|
)
|
|
err := e.q.GetEnabledRules.Select(&rules)
|
|
if err != nil {
|
|
e.lo.Error("error fetching automation rules", "error", err)
|
|
return filteredRules
|
|
}
|
|
|
|
for _, rule := range rules {
|
|
var rulesBatch []models.Rule
|
|
if err := json.Unmarshal([]byte(rule.Rules), &rulesBatch); err != nil {
|
|
e.lo.Error("error unmarshalling rule JSON", "error", err)
|
|
continue
|
|
}
|
|
// Set the Type and event for each rule in the batch.
|
|
for i := range rulesBatch {
|
|
rulesBatch[i].Type = rule.Type
|
|
rulesBatch[i].Events = rule.Events
|
|
}
|
|
filteredRules = append(filteredRules, rulesBatch...)
|
|
}
|
|
return filteredRules
|
|
}
|
|
|
|
// filterRulesByType filters rules by type and event.
|
|
func (e *Engine) filterRulesByType(ruleType, eventType string) []models.Rule {
|
|
e.rulesMu.RLock()
|
|
defer e.rulesMu.RUnlock()
|
|
|
|
var filteredRules []models.Rule
|
|
for _, rule := range e.rules {
|
|
if rule.Type == ruleType && (eventType == "" || slices.Contains(rule.Events, eventType)) {
|
|
filteredRules = append(filteredRules, rule)
|
|
}
|
|
}
|
|
return filteredRules
|
|
}
|