Files
libredesk/internal/automation/automation.go
Abhinav Raut 0c01b53b09 feat(conversations): add trigram index for searching ref numbers
feat(messages): add trigram index for text content search
- feat: UI animations for conversation and messages list.
- Simplify websocket updates.
2025-01-19 23:10:53 +05:30

399 lines
12 KiB
Go

// Package automation automatically evaluates and applies rules to conversations based on events like new conversations, updates, and time triggers,
// and performs some actions if they are true.
package automation
import (
"context"
"database/sql"
"embed"
"encoding/json"
"slices"
"sync"
"time"
"github.com/abhinavxd/libredesk/internal/automation/models"
cmodels "github.com/abhinavxd/libredesk/internal/conversation/models"
"github.com/abhinavxd/libredesk/internal/dbutil"
"github.com/abhinavxd/libredesk/internal/envelope"
umodels "github.com/abhinavxd/libredesk/internal/user/models"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
"github.com/zerodha/logf"
)
var (
//go:embed queries.sql
efs embed.FS
// MaxQueueSize defines the maximum size of the task queues.
MaxQueueSize = 5000
)
// TaskType represents the type of conversation task.
type TaskType string
const (
NewConversation TaskType = "new"
UpdateConversation TaskType = "update"
TimeTrigger TaskType = "time-trigger"
)
// ConversationTask represents a unit of work for processing conversations.
type ConversationTask struct {
taskType TaskType
eventType string
conversationUUID string
}
type Engine struct {
rules []models.Rule
rulesMu sync.RWMutex
q queries
lo *logf.Logger
conversationStore conversationStore
taskQueue chan ConversationTask
closed bool
closedMu sync.RWMutex
wg sync.WaitGroup
}
type Opts struct {
DB *sqlx.DB
Lo *logf.Logger
}
type conversationStore interface {
ApplyAction(action models.RuleAction, conversation cmodels.Conversation, user umodels.User) error
GetConversation(teamID int, uuid string) (cmodels.Conversation, error)
GetConversationsCreatedAfter(time.Time) ([]cmodels.Conversation, error)
}
type queries struct {
GetAll *sqlx.Stmt `query:"get-all"`
GetRule *sqlx.Stmt `query:"get-rule"`
InsertRule *sqlx.Stmt `query:"insert-rule"`
UpdateRule *sqlx.Stmt `query:"update-rule"`
DeleteRule *sqlx.Stmt `query:"delete-rule"`
ToggleRule *sqlx.Stmt `query:"toggle-rule"`
GetEnabledRules *sqlx.Stmt `query:"get-enabled-rules"`
UpdateRuleWeight *sqlx.Stmt `query:"update-rule-weight"`
UpdateRuleExecutionMode *sqlx.Stmt `query:"update-rule-execution-mode"`
}
// New initializes a new Engine.
func New(opt Opts) (*Engine, error) {
var (
q queries
e = &Engine{
lo: opt.Lo,
taskQueue: make(chan ConversationTask, MaxQueueSize),
}
)
if err := dbutil.ScanSQLFile("queries.sql", &q, opt.DB, efs); err != nil {
return nil, err
}
e.q = q
e.rules = e.queryRules()
return e, nil
}
// SetConversationStore sets conversations store.
func (e *Engine) SetConversationStore(store conversationStore) {
e.conversationStore = store
}
// ReloadRules reloads automation rules from DB.
func (e *Engine) ReloadRules() {
e.rulesMu.Lock()
defer e.rulesMu.Unlock()
e.lo.Debug("reloading automation engine rules")
e.rules = e.queryRules()
}
// Run starts the Engine with a worker pool to evaluate rules based on events.
func (e *Engine) Run(ctx context.Context, workerCount int) {
// Spawn worker pool.
for i := 0; i < workerCount; i++ {
e.wg.Add(1)
go e.worker(ctx)
}
// Ticker for timed triggers.
ticker := time.NewTicker(1 * time.Hour)
defer func() {
ticker.Stop()
}()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
e.lo.Info("queuing time triggers")
e.taskQueue <- ConversationTask{taskType: TimeTrigger}
}
}
}
// worker processes tasks from the taskQueue until it's closed or context is done.
func (e *Engine) worker(ctx context.Context) {
defer e.wg.Done()
for {
select {
case <-ctx.Done():
return
case task, ok := <-e.taskQueue:
if !ok {
return
}
switch task.taskType {
case NewConversation:
e.handleNewConversation(task.conversationUUID)
case UpdateConversation:
e.handleUpdateConversation(task.conversationUUID, task.eventType)
case TimeTrigger:
e.handleTimeTrigger()
}
}
}
}
// Close signals the Engine to stop accepting any more messages and waits for all workers to finish.
func (e *Engine) Close() {
e.closedMu.Lock()
defer e.closedMu.Unlock()
if e.closed {
return
}
e.closed = true
close(e.taskQueue)
// Wait for all workers.
e.wg.Wait()
}
// GetAllRules retrieves all rules of a specific type.
func (e *Engine) GetAllRules(typ []byte) ([]models.RuleRecord, error) {
var rules = make([]models.RuleRecord, 0)
if err := e.q.GetAll.Select(&rules, typ); err != nil {
e.lo.Error("error fetching rules", "error", err)
return rules, envelope.NewError(envelope.GeneralError, "Error fetching automation rules.", nil)
}
return rules, nil
}
// GetRule retrieves a rule by ID.
func (e *Engine) GetRule(id int) (models.RuleRecord, error) {
var rule models.RuleRecord
if err := e.q.GetRule.Get(&rule, id); err != nil {
if err == sql.ErrNoRows {
return rule, envelope.NewError(envelope.InputError, "Rule not found.", nil)
}
e.lo.Error("error fetching rule", "error", err)
return rule, envelope.NewError(envelope.GeneralError, "Error fetching automation rule.", nil)
}
return rule, nil
}
// ToggleRule toggles the active status of a rule by ID.
func (e *Engine) ToggleRule(id int) error {
if _, err := e.q.ToggleRule.Exec(id); err != nil {
e.lo.Error("error toggling rule", "error", err)
return envelope.NewError(envelope.GeneralError, "Error toggling automation rule.", nil)
}
// Reload rules.
e.ReloadRules()
return nil
}
// UpdateRule updates an existing rule.
func (e *Engine) UpdateRule(id int, rule models.RuleRecord) error {
if _, err := e.q.UpdateRule.Exec(id, rule.Name, rule.Description, rule.Type, pq.Array(rule.Events), rule.Rules); err != nil {
e.lo.Error("error updating rule", "error", err)
return envelope.NewError(envelope.GeneralError, "Error updating automation rule.", nil)
}
// Reload rules.
e.ReloadRules()
return nil
}
// CreateRule creates a new rule.
func (e *Engine) CreateRule(rule models.RuleRecord) error {
if _, err := e.q.InsertRule.Exec(rule.Name, rule.Description, rule.Type, pq.Array(rule.Events), rule.Rules); err != nil {
e.lo.Error("error creating rule", "error", err)
return envelope.NewError(envelope.GeneralError, "Error creating automation rule.", nil)
}
// Reload rules.
e.ReloadRules()
return nil
}
// DeleteRule deletes a rule by ID.
func (e *Engine) DeleteRule(id int) error {
if _, err := e.q.DeleteRule.Exec(id); err != nil {
e.lo.Error("error deleting rule", "error", err)
return envelope.NewError(envelope.GeneralError, "Error deleting automation rule.", nil)
}
// Reload rules.
e.ReloadRules()
return nil
}
// UpdateRuleWeights updates the weights of the automation rules.
func (e *Engine) UpdateRuleWeights(weights map[int]int) error {
for id, weight := range weights {
if _, err := e.q.UpdateRuleWeight.Exec(id, weight); err != nil {
e.lo.Error("error updating rule weight", "error", err)
return envelope.NewError(envelope.GeneralError, "Error updating weight", nil)
}
}
// Reload rules.
e.ReloadRules()
return nil
}
// UpdateRuleExecutionMode updates the execution mode for a type of rule.
func (e *Engine) UpdateRuleExecutionMode(ruleType, mode string) error {
if _, err := e.q.UpdateRuleExecutionMode.Exec(ruleType, mode); err != nil {
e.lo.Error("error updating rule execution mode", "error", err)
return envelope.NewError(envelope.GeneralError, "Error updating rule execution mode", nil)
}
// Reload rules.
e.ReloadRules()
return nil
}
// EvaluateNewConversationRules enqueues a new conversation for rule evaluation.
func (e *Engine) EvaluateNewConversationRules(conversationUUID string) {
e.closedMu.RLock()
defer e.closedMu.RUnlock()
if e.closed {
return
}
select {
case e.taskQueue <- ConversationTask{
taskType: NewConversation,
conversationUUID: conversationUUID,
}:
default:
// Queue is full.
e.lo.Warn("EvaluateNewConversationRules: newConversationQ is full, unable to enqueue conversation")
}
}
// EvaluateConversationUpdateRules enqueues an updated conversation for rule evaluation.
func (e *Engine) EvaluateConversationUpdateRules(conversationUUID string, eventType string) {
if eventType == "" {
e.lo.Error("error evaluating conversation update rules: eventType is empty")
return
}
e.closedMu.RLock()
defer e.closedMu.RUnlock()
if e.closed {
return
}
select {
case e.taskQueue <- ConversationTask{
taskType: UpdateConversation,
conversationUUID: conversationUUID,
eventType: eventType,
}:
default:
// Queue is full.
e.lo.Warn("EvaluateConversationUpdateRules: updateConversationQ is full, unable to enqueue conversation")
}
}
// handleNewConversation handles new conversation events.
func (e *Engine) handleNewConversation(conversationUUID string) {
e.lo.Debug("handling new conversation", "uuid", conversationUUID)
conversation, err := e.conversationStore.GetConversation(0, conversationUUID)
if err != nil {
e.lo.Error("error fetching conversation for new event", "uuid", conversationUUID, "error", err)
return
}
rules := e.filterRulesByType(models.RuleTypeNewConversation, "")
if len(rules) == 0 {
e.lo.Warn("no rules to evaluate for new conversation", "uuid", conversationUUID)
return
}
e.evalConversationRules(rules, conversation)
}
// handleUpdateConversation handles update conversation events with specific eventType.
func (e *Engine) handleUpdateConversation(conversationUUID, eventType string) {
e.lo.Debug("handling update conversation", "uuid", conversationUUID, "event_type", eventType)
conversation, err := e.conversationStore.GetConversation(0, conversationUUID)
if err != nil {
e.lo.Error("error fetching conversation for update event", "uuid", conversationUUID, "error", err)
return
}
rules := e.filterRulesByType(models.RuleTypeConversationUpdate, eventType)
if len(rules) == 0 {
e.lo.Warn("no rules to evaluate for conversation update", "uuid", conversationUUID, "event_type", eventType)
return
}
e.evalConversationRules(rules, conversation)
}
// handleTimeTrigger handles time trigger events.
func (e *Engine) handleTimeTrigger() {
e.lo.Debug("handling time trigger")
thirtyDaysAgo := time.Now().Add(-30 * 24 * time.Hour)
conversations, err := e.conversationStore.GetConversationsCreatedAfter(thirtyDaysAgo)
if err != nil {
e.lo.Error("error fetching conversations for time trigger", "error", err)
return
}
rules := e.filterRulesByType(models.RuleTypeTimeTrigger, "")
if len(rules) == 0 {
e.lo.Warn("no rules to evaluate for time trigger")
return
}
e.lo.Debug("fetched conversations for evaluating time triggers", "conversations_count", len(conversations), "rules_count", len(rules))
for _, conversation := range conversations {
e.evalConversationRules(rules, conversation)
}
}
// queryRules fetches automation rules from the database.
func (e *Engine) queryRules() []models.Rule {
var (
rules []models.RuleRecord
filteredRules []models.Rule
)
err := e.q.GetEnabledRules.Select(&rules)
if err != nil {
e.lo.Error("error fetching automation rules", "error", err)
return filteredRules
}
for _, rule := range rules {
var rulesBatch []models.Rule
if err := json.Unmarshal([]byte(rule.Rules), &rulesBatch); err != nil {
e.lo.Error("error unmarshalling rule JSON", "error", err)
continue
}
// Set values from DB.
for i := range rulesBatch {
rulesBatch[i].Type = rule.Type
rulesBatch[i].Events = rule.Events
rulesBatch[i].ExecutionMode = rule.ExecutionMode
}
filteredRules = append(filteredRules, rulesBatch...)
}
return filteredRules
}
// filterRulesByType filters rules by type and event.
func (e *Engine) filterRulesByType(ruleType, eventType string) []models.Rule {
e.rulesMu.RLock()
defer e.rulesMu.RUnlock()
var filteredRules []models.Rule
for _, rule := range e.rules {
if rule.Type == ruleType && (eventType == "" || slices.Contains(rule.Events, eventType)) {
filteredRules = append(filteredRules, rule)
}
}
return filteredRules
}