mirror of
https://github.com/abhinavxd/libredesk.git
synced 2025-11-06 23:13:28 +00:00
419 lines
14 KiB
Go
419 lines
14 KiB
Go
// Package automation automatically evaluates and applies rules to conversations based on events like new conversations, updates, and time triggers,
|
|
// and performs some actions if they are true.
|
|
package automation
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"embed"
|
|
"encoding/json"
|
|
"slices"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/abhinavxd/libredesk/internal/automation/models"
|
|
cmodels "github.com/abhinavxd/libredesk/internal/conversation/models"
|
|
"github.com/abhinavxd/libredesk/internal/dbutil"
|
|
"github.com/abhinavxd/libredesk/internal/envelope"
|
|
umodels "github.com/abhinavxd/libredesk/internal/user/models"
|
|
"github.com/jmoiron/sqlx"
|
|
"github.com/knadh/go-i18n"
|
|
"github.com/lib/pq"
|
|
"github.com/zerodha/logf"
|
|
)
|
|
|
|
var (
|
|
//go:embed queries.sql
|
|
efs embed.FS
|
|
// MaxQueueSize is the maximum size of the task queue.
|
|
MaxQueueSize = 5000
|
|
)
|
|
|
|
// TaskType represents the type of conversation task.
|
|
type TaskType string
|
|
|
|
const (
|
|
NewConversation TaskType = "new"
|
|
UpdateConversation TaskType = "update"
|
|
TimeTrigger TaskType = "time-trigger"
|
|
)
|
|
|
|
// ConversationTask represents a unit of work for processing conversations.
|
|
type ConversationTask struct {
|
|
taskType TaskType
|
|
eventType string
|
|
conversation cmodels.Conversation
|
|
}
|
|
|
|
type Engine struct {
|
|
rules []models.Rule
|
|
rulesMu sync.RWMutex
|
|
q queries
|
|
lo *logf.Logger
|
|
i18n *i18n.I18n
|
|
conversationStore conversationStore
|
|
taskQueue chan ConversationTask
|
|
closed bool
|
|
closedMu sync.RWMutex
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
type Opts struct {
|
|
DB *sqlx.DB
|
|
Lo *logf.Logger
|
|
I18n *i18n.I18n
|
|
}
|
|
|
|
type conversationStore interface {
|
|
ApplyAction(action models.RuleAction, conversation cmodels.Conversation, user umodels.User) error
|
|
GetConversation(teamID int, uuid string) (cmodels.Conversation, error)
|
|
GetConversationsCreatedAfter(time.Time) ([]cmodels.Conversation, error)
|
|
}
|
|
|
|
type queries struct {
|
|
GetAll *sqlx.Stmt `query:"get-all"`
|
|
GetRule *sqlx.Stmt `query:"get-rule"`
|
|
InsertRule *sqlx.Stmt `query:"insert-rule"`
|
|
UpdateRule *sqlx.Stmt `query:"update-rule"`
|
|
DeleteRule *sqlx.Stmt `query:"delete-rule"`
|
|
ToggleRule *sqlx.Stmt `query:"toggle-rule"`
|
|
GetEnabledRules *sqlx.Stmt `query:"get-enabled-rules"`
|
|
UpdateRuleWeight *sqlx.Stmt `query:"update-rule-weight"`
|
|
UpdateRuleExecutionMode *sqlx.Stmt `query:"update-rule-execution-mode"`
|
|
}
|
|
|
|
// New initializes a new Engine.
|
|
func New(opt Opts) (*Engine, error) {
|
|
var (
|
|
q queries
|
|
e = &Engine{
|
|
lo: opt.Lo,
|
|
i18n: opt.I18n,
|
|
taskQueue: make(chan ConversationTask, MaxQueueSize),
|
|
}
|
|
)
|
|
if err := dbutil.ScanSQLFile("queries.sql", &q, opt.DB, efs); err != nil {
|
|
return nil, err
|
|
}
|
|
e.q = q
|
|
e.rules = e.queryRules()
|
|
return e, nil
|
|
}
|
|
|
|
// SetConversationStore sets conversations store.
|
|
func (e *Engine) SetConversationStore(store conversationStore) {
|
|
e.conversationStore = store
|
|
}
|
|
|
|
// ReloadRules reloads automation rules from DB.
|
|
func (e *Engine) ReloadRules() {
|
|
e.rulesMu.Lock()
|
|
defer e.rulesMu.Unlock()
|
|
e.lo.Debug("reloading automation engine rules")
|
|
e.rules = e.queryRules()
|
|
}
|
|
|
|
// Run starts the Engine with a worker pool to evaluate rules based on events.
|
|
func (e *Engine) Run(ctx context.Context, workerCount int) {
|
|
// Spawn worker pool.
|
|
for i := 0; i < workerCount; i++ {
|
|
e.wg.Add(1)
|
|
go e.worker(ctx)
|
|
}
|
|
|
|
// Hourly ticker for timed triggers.
|
|
ticker := time.NewTicker(1 * time.Hour)
|
|
defer func() {
|
|
ticker.Stop()
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
e.lo.Info("queuing time triggers")
|
|
e.taskQueue <- ConversationTask{taskType: TimeTrigger}
|
|
}
|
|
}
|
|
}
|
|
|
|
// worker processes tasks from the taskQueue until it's closed or context is done.
|
|
func (e *Engine) worker(ctx context.Context) {
|
|
defer e.wg.Done()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case task, ok := <-e.taskQueue:
|
|
if !ok {
|
|
return
|
|
}
|
|
switch task.taskType {
|
|
case NewConversation:
|
|
e.handleNewConversation(task.conversation)
|
|
case UpdateConversation:
|
|
e.handleUpdateConversation(task.conversation, task.eventType)
|
|
case TimeTrigger:
|
|
e.handleTimeTrigger()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Close signals the Engine to stop accepting any more messages and waits for all workers to finish.
|
|
func (e *Engine) Close() {
|
|
e.closedMu.Lock()
|
|
defer e.closedMu.Unlock()
|
|
if e.closed {
|
|
return
|
|
}
|
|
e.closed = true
|
|
close(e.taskQueue)
|
|
// Wait for all workers.
|
|
e.wg.Wait()
|
|
}
|
|
|
|
// GetAllRules retrieves all rules of a specific type.
|
|
func (e *Engine) GetAllRules(typ []byte) ([]models.RuleRecord, error) {
|
|
var rules = make([]models.RuleRecord, 0)
|
|
if err := e.q.GetAll.Select(&rules, typ); err != nil {
|
|
e.lo.Error("error fetching rules", "error", err)
|
|
return rules, envelope.NewError(envelope.GeneralError, e.i18n.Ts("globals.messages.errorFetching", "name", e.i18n.Ts("globals.terms.rule")), nil)
|
|
}
|
|
return rules, nil
|
|
}
|
|
|
|
// GetRule retrieves a rule by ID.
|
|
func (e *Engine) GetRule(id int) (models.RuleRecord, error) {
|
|
var rule models.RuleRecord
|
|
if err := e.q.GetRule.Get(&rule, id); err != nil {
|
|
if err == sql.ErrNoRows {
|
|
return rule, envelope.NewError(envelope.InputError, e.i18n.Ts("globals.messages.notFound", "name", e.i18n.Ts("globals.terms.rule")), nil)
|
|
}
|
|
e.lo.Error("error fetching rule", "error", err)
|
|
return rule, envelope.NewError(envelope.GeneralError, e.i18n.Ts("globals.messages.errorFetching", "name", e.i18n.Ts("globals.terms.rule")), nil)
|
|
}
|
|
return rule, nil
|
|
}
|
|
|
|
// ToggleRule toggles the active status of a rule by ID.
|
|
func (e *Engine) ToggleRule(id int) (models.RuleRecord, error) {
|
|
var result models.RuleRecord
|
|
if err := e.q.ToggleRule.Get(&result, id); err != nil {
|
|
e.lo.Error("error toggling rule", "error", err)
|
|
return models.RuleRecord{}, envelope.NewError(envelope.GeneralError, e.i18n.Ts("globals.messages.errorUpdating", "name", e.i18n.Ts("globals.terms.rule")), nil)
|
|
}
|
|
// Reload rules.
|
|
e.ReloadRules()
|
|
return result, nil
|
|
}
|
|
|
|
// UpdateRule updates an existing rule.
|
|
func (e *Engine) UpdateRule(id int, rule models.RuleRecord) (models.RuleRecord, error) {
|
|
if rule.Events == nil {
|
|
rule.Events = pq.StringArray{}
|
|
}
|
|
var result models.RuleRecord
|
|
if err := e.q.UpdateRule.Get(&result, id, rule.Name, rule.Description, rule.Type, rule.Events, rule.Rules, rule.Enabled); err != nil {
|
|
e.lo.Error("error updating rule", "error", err)
|
|
return models.RuleRecord{}, envelope.NewError(envelope.GeneralError, e.i18n.Ts("globals.messages.errorUpdating", "name", e.i18n.Ts("globals.terms.rule")), nil)
|
|
}
|
|
// Reload rules.
|
|
e.ReloadRules()
|
|
return result, nil
|
|
}
|
|
|
|
// CreateRule creates a new rule.
|
|
func (e *Engine) CreateRule(rule models.RuleRecord) (models.RuleRecord, error) {
|
|
if rule.Events == nil {
|
|
rule.Events = pq.StringArray{}
|
|
}
|
|
var result models.RuleRecord
|
|
if err := e.q.InsertRule.Get(&result, rule.Name, rule.Description, rule.Type, rule.Events, rule.Rules); err != nil {
|
|
e.lo.Error("error creating rule", "error", err)
|
|
return models.RuleRecord{}, envelope.NewError(envelope.GeneralError, e.i18n.Ts("globals.messages.errorCreating", "name", e.i18n.Ts("globals.terms.rule")), nil)
|
|
}
|
|
// Reload rules.
|
|
e.ReloadRules()
|
|
return result, nil
|
|
}
|
|
|
|
// DeleteRule deletes a rule by ID.
|
|
func (e *Engine) DeleteRule(id int) error {
|
|
if _, err := e.q.DeleteRule.Exec(id); err != nil {
|
|
e.lo.Error("error deleting rule", "error", err)
|
|
return envelope.NewError(envelope.GeneralError, e.i18n.Ts("globals.messages.errorDeleting", "name", e.i18n.Ts("globals.terms.rule")), nil)
|
|
}
|
|
// Reload rules.
|
|
e.ReloadRules()
|
|
return nil
|
|
}
|
|
|
|
// UpdateRuleWeights updates the weights of the automation rules.
|
|
func (e *Engine) UpdateRuleWeights(weights map[int]int) error {
|
|
for id, weight := range weights {
|
|
if _, err := e.q.UpdateRuleWeight.Exec(id, weight); err != nil {
|
|
e.lo.Error("error updating rule weight", "error", err)
|
|
return envelope.NewError(envelope.GeneralError, e.i18n.Ts("globals.messages.errorUpdating", "name", e.i18n.Ts("globals.terms.rule")), nil)
|
|
}
|
|
}
|
|
// Reload rules.
|
|
e.ReloadRules()
|
|
return nil
|
|
}
|
|
|
|
// UpdateRuleExecutionMode updates the execution mode for a type of rule.
|
|
func (e *Engine) UpdateRuleExecutionMode(ruleType, mode string) error {
|
|
if _, err := e.q.UpdateRuleExecutionMode.Exec(ruleType, mode); err != nil {
|
|
e.lo.Error("error updating rule execution mode", "error", err)
|
|
return envelope.NewError(envelope.GeneralError, e.i18n.Ts("globals.messages.errorUpdating", "name", e.i18n.Ts("globals.terms.rule")), nil)
|
|
}
|
|
// Reload rules.
|
|
e.ReloadRules()
|
|
return nil
|
|
}
|
|
|
|
// EvaluateNewConversationRules enqueues a new conversation for rule evaluation.
|
|
func (e *Engine) EvaluateNewConversationRules(conversation cmodels.Conversation) {
|
|
e.closedMu.RLock()
|
|
defer e.closedMu.RUnlock()
|
|
if e.closed {
|
|
return
|
|
}
|
|
select {
|
|
case e.taskQueue <- ConversationTask{
|
|
taskType: NewConversation,
|
|
conversation: conversation,
|
|
}:
|
|
default:
|
|
// Queue is full.
|
|
e.lo.Warn("EvaluateNewConversationRules: newConversationQ is full, unable to enqueue conversation")
|
|
}
|
|
}
|
|
|
|
// EvaluateConversationUpdateRules enqueues a conversation for rule evaluation, this function exists along with EvaluateConversationUpdateRulesByID to reduce DB queries for fetching conversations.
|
|
func (e *Engine) EvaluateConversationUpdateRules(conversation cmodels.Conversation, eventType string) {
|
|
if eventType == "" {
|
|
e.lo.Error("error evaluating conversation update rules: eventType is empty")
|
|
return
|
|
}
|
|
e.closedMu.RLock()
|
|
defer e.closedMu.RUnlock()
|
|
if e.closed {
|
|
return
|
|
}
|
|
select {
|
|
case e.taskQueue <- ConversationTask{
|
|
taskType: UpdateConversation,
|
|
eventType: eventType,
|
|
conversation: conversation,
|
|
}:
|
|
default:
|
|
// Queue is full.
|
|
e.lo.Warn("EvaluateConversationUpdateRules: updateConversationQ is full, unable to enqueue conversation")
|
|
}
|
|
}
|
|
|
|
// EvaluateConversationUpdateRulesByID fetches conversation by ID and enqueues for rule evaluation,
|
|
// This function is useful when callers want to fresh fetch the conversation from the database instead of passing it directly as they might have a stale copy.
|
|
func (e *Engine) EvaluateConversationUpdateRulesByID(conversationID int, conversationUUID, eventType string) {
|
|
conversation, err := e.conversationStore.GetConversation(conversationID, conversationUUID)
|
|
if err != nil {
|
|
e.lo.Error("error fetching conversation", "conversation_id", conversationID, "error", err)
|
|
return
|
|
}
|
|
e.EvaluateConversationUpdateRules(conversation, eventType)
|
|
}
|
|
|
|
// handleNewConversation handles new conversation events.
|
|
func (e *Engine) handleNewConversation(conversation cmodels.Conversation) {
|
|
e.lo.Debug("handling new conversation for automation rule evaluation", "uuid", conversation.UUID)
|
|
rules := e.filterRulesByType(models.RuleTypeNewConversation, "")
|
|
if len(rules) == 0 {
|
|
e.lo.Info("no rules to evaluate for new conversation rule evaluation", "uuid", conversation.UUID)
|
|
return
|
|
}
|
|
e.evalConversationRules(rules, conversation)
|
|
}
|
|
|
|
// handleUpdateConversation handles update conversation events with specific eventType.
|
|
func (e *Engine) handleUpdateConversation(conversation cmodels.Conversation, eventType string) {
|
|
e.lo.Debug("handling update conversation for automation rule evaluation", "uuid", conversation.UUID, "event_type", eventType)
|
|
rules := e.filterRulesByType(models.RuleTypeConversationUpdate, eventType)
|
|
if len(rules) == 0 {
|
|
e.lo.Info("no rules to evaluate for conversation update", "uuid", conversation.UUID, "event_type", eventType)
|
|
return
|
|
}
|
|
e.evalConversationRules(rules, conversation)
|
|
}
|
|
|
|
// handleTimeTrigger handles time trigger events.
|
|
func (e *Engine) handleTimeTrigger() {
|
|
e.lo.Info("running time trigger evaluation for automation rules")
|
|
thirtyDaysAgo := time.Now().Add(-30 * 24 * time.Hour)
|
|
conversations, err := e.conversationStore.GetConversationsCreatedAfter(thirtyDaysAgo)
|
|
if err != nil {
|
|
e.lo.Error("error fetching conversations for time trigger", "error", err)
|
|
return
|
|
}
|
|
rules := e.filterRulesByType(models.RuleTypeTimeTrigger, "")
|
|
if len(rules) == 0 {
|
|
e.lo.Info("no rules to evaluate for time trigger")
|
|
return
|
|
}
|
|
e.lo.Info("fetched conversations for evaluating time triggers", "conversations_count", len(conversations), "rules_count", len(rules))
|
|
for _, c := range conversations {
|
|
// Fetch entire conversation.
|
|
conversation, err := e.conversationStore.GetConversation(0, c.UUID)
|
|
if err != nil {
|
|
e.lo.Error("error fetching conversation for time trigger", "uuid", c.UUID, "error", err)
|
|
continue
|
|
}
|
|
e.evalConversationRules(rules, conversation)
|
|
}
|
|
}
|
|
|
|
// queryRules fetches automation rules from the database.
|
|
func (e *Engine) queryRules() []models.Rule {
|
|
var (
|
|
rules []models.RuleRecord
|
|
filteredRules []models.Rule
|
|
)
|
|
err := e.q.GetEnabledRules.Select(&rules)
|
|
if err != nil {
|
|
e.lo.Error("error fetching automation rules", "error", err)
|
|
return filteredRules
|
|
}
|
|
|
|
for _, rule := range rules {
|
|
var rulesBatch []models.Rule
|
|
if err := json.Unmarshal([]byte(rule.Rules), &rulesBatch); err != nil {
|
|
e.lo.Error("error unmarshalling rule JSON", "error", err)
|
|
continue
|
|
}
|
|
// Set values from DB.
|
|
for i := range rulesBatch {
|
|
rulesBatch[i].Type = rule.Type
|
|
rulesBatch[i].Events = rule.Events
|
|
rulesBatch[i].ExecutionMode = rule.ExecutionMode
|
|
}
|
|
filteredRules = append(filteredRules, rulesBatch...)
|
|
}
|
|
return filteredRules
|
|
}
|
|
|
|
// filterRulesByType filters rules by type and event.
|
|
func (e *Engine) filterRulesByType(ruleType, eventType string) []models.Rule {
|
|
e.rulesMu.RLock()
|
|
defer e.rulesMu.RUnlock()
|
|
|
|
var filteredRules []models.Rule
|
|
for _, rule := range e.rules {
|
|
if rule.Type == ruleType && (eventType == "" || slices.Contains(rule.Events, eventType)) {
|
|
filteredRules = append(filteredRules, rule)
|
|
}
|
|
}
|
|
return filteredRules
|
|
}
|