mirror of
https://github.com/abhinavxd/libredesk.git
synced 2025-11-02 04:53:41 +00:00
use http client with set timeout for webhook requests
remove db calls from `TriggerEvent`
This commit is contained in:
@@ -12,12 +12,14 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/abhinavxd/libredesk/internal/dbutil"
|
||||
"github.com/abhinavxd/libredesk/internal/envelope"
|
||||
"github.com/abhinavxd/libredesk/internal/version"
|
||||
"github.com/abhinavxd/libredesk/internal/webhook/models"
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/knadh/go-i18n"
|
||||
@@ -56,9 +58,8 @@ type Opts struct {
|
||||
|
||||
// DeliveryTask represents a webhook delivery task
|
||||
type DeliveryTask struct {
|
||||
WebhookID int
|
||||
Event models.WebhookEvent
|
||||
Payload any
|
||||
Event models.WebhookEvent
|
||||
Payload any
|
||||
}
|
||||
|
||||
// queries contains prepared SQL queries.
|
||||
@@ -88,7 +89,15 @@ func New(opts Opts) (*Manager, error) {
|
||||
db: opts.DB,
|
||||
deliveryQueue: make(chan DeliveryTask, opts.QueueSize),
|
||||
httpClient: &http.Client{
|
||||
Timeout: opts.Timeout,
|
||||
Timeout: 10 * time.Second,
|
||||
Transport: &http.Transport{
|
||||
DialContext: (&net.Dialer{
|
||||
Timeout: 3 * time.Second,
|
||||
KeepAlive: 30 * time.Second,
|
||||
}).DialContext,
|
||||
TLSHandshakeTimeout: 3 * time.Second,
|
||||
ResponseHeaderTimeout: 3 * time.Second,
|
||||
},
|
||||
},
|
||||
workers: opts.Workers,
|
||||
}, nil
|
||||
@@ -164,14 +173,11 @@ func (m *Manager) SendTestWebhook(id int) error {
|
||||
return envelope.NewError(envelope.NotFoundError, m.i18n.Ts("globals.messages.notFound", "name", "webhook"), nil)
|
||||
}
|
||||
|
||||
m.deliverWebhook(DeliveryTask{
|
||||
WebhookID: webhook.ID,
|
||||
Event: models.EventWebhookTest,
|
||||
m.deliverSingleWebhook(webhook, DeliveryTask{
|
||||
Event: models.EventWebhookTest,
|
||||
Payload: map[string]any{
|
||||
"id": webhook.ID,
|
||||
"name": webhook.Name,
|
||||
"url": webhook.URL,
|
||||
"events": webhook.Events,
|
||||
"id": webhook.ID,
|
||||
"name": webhook.Name,
|
||||
},
|
||||
})
|
||||
|
||||
@@ -186,22 +192,13 @@ func (m *Manager) TriggerEvent(event models.WebhookEvent, data any) {
|
||||
return
|
||||
}
|
||||
|
||||
webhooks, err := m.getWebhooksByEvent(string(event))
|
||||
if err != nil {
|
||||
m.lo.Error("error fetching webhooks for event", "event", event, "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, webhook := range webhooks {
|
||||
select {
|
||||
case m.deliveryQueue <- DeliveryTask{
|
||||
WebhookID: webhook.ID,
|
||||
Event: event,
|
||||
Payload: data,
|
||||
}:
|
||||
default:
|
||||
m.lo.Warn("webhook delivery queue is full, dropping webhook delivery", "webhook_id", webhook.ID, "event", event, "queue_size", len(m.deliveryQueue))
|
||||
}
|
||||
select {
|
||||
case m.deliveryQueue <- DeliveryTask{
|
||||
Event: event,
|
||||
Payload: data,
|
||||
}:
|
||||
default:
|
||||
m.lo.Warn("webhook delivery queue is full, dropping webhook delivery", "event", event, "queue_size", len(m.deliveryQueue))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -243,36 +240,43 @@ func (m *Manager) worker(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
// deliverWebhook delivers a webhook by making an HTTP request.
|
||||
// deliverWebhook delivers webhooks for an event by making HTTP requests.
|
||||
func (m *Manager) deliverWebhook(task DeliveryTask) {
|
||||
webhook, err := m.Get(task.WebhookID)
|
||||
webhooks, err := m.getWebhooksByEvent(string(task.Event))
|
||||
if err != nil {
|
||||
m.lo.Error("error fetching webhook for delivery", "webhook_id", task.WebhookID, "error", err)
|
||||
m.lo.Error("error fetching webhooks for event", "event", task.Event, "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, webhook := range webhooks {
|
||||
m.deliverSingleWebhook(webhook, task)
|
||||
}
|
||||
}
|
||||
|
||||
// deliverSingleWebhook delivers a webhook to a single endpoint.
|
||||
func (m *Manager) deliverSingleWebhook(webhook models.Webhook, task DeliveryTask) {
|
||||
basePayload := map[string]any{
|
||||
"event": task.Event,
|
||||
"timestamp": time.Now().Format(time.RFC3339),
|
||||
"timestamp": time.Now().UTC().Format(time.RFC3339),
|
||||
"payload": task.Payload,
|
||||
}
|
||||
|
||||
payloadBytes, err := json.Marshal(basePayload)
|
||||
if err != nil {
|
||||
m.lo.Error("error marshaling webhook payload", "webhook_id", task.WebhookID, "event", task.Event, "error", err)
|
||||
m.lo.Error("error marshaling webhook payload", "webhook_id", webhook.ID, "event", task.Event, "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Create HTTP request
|
||||
req, err := http.NewRequest("POST", webhook.URL, bytes.NewReader(payloadBytes))
|
||||
if err != nil {
|
||||
m.lo.Error("error creating webhook request", "webhook_id", task.WebhookID, "url", webhook.URL, "event", task.Event, "error", err)
|
||||
m.lo.Error("error creating webhook request", "webhook_id", webhook.ID, "url", webhook.URL, "event", task.Event, "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Set headers
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("User-Agent", "Libredesk-Webhook/1.0")
|
||||
req.Header.Set("User-Agent", "Libredesk-Webhook/"+version.Version)
|
||||
|
||||
// Add signature if secret is provided
|
||||
if webhook.Secret != "" {
|
||||
@@ -281,7 +285,7 @@ func (m *Manager) deliverWebhook(task DeliveryTask) {
|
||||
}
|
||||
|
||||
m.lo.Debug("delivering webhook",
|
||||
"webhook_id", task.WebhookID,
|
||||
"webhook_id", webhook.ID,
|
||||
"url", webhook.URL,
|
||||
"event", task.Event,
|
||||
"payload", string(payloadBytes),
|
||||
@@ -292,7 +296,7 @@ func (m *Manager) deliverWebhook(task DeliveryTask) {
|
||||
resp, err := m.httpClient.Do(req)
|
||||
if err != nil {
|
||||
m.lo.Error("webhook delivery failed - HTTP request error",
|
||||
"webhook_id", task.WebhookID,
|
||||
"webhook_id", webhook.ID,
|
||||
"url", webhook.URL,
|
||||
"event", task.Event,
|
||||
"error", err)
|
||||
@@ -303,7 +307,7 @@ func (m *Manager) deliverWebhook(task DeliveryTask) {
|
||||
// Read response body
|
||||
responseBody, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
m.lo.Error("error reading webhook response", "webhook_id", task.WebhookID, "error", err)
|
||||
m.lo.Error("error reading webhook response", "webhook_id", webhook.ID, "error", err)
|
||||
responseBody = []byte(fmt.Sprintf("Error reading response: %v", err))
|
||||
}
|
||||
|
||||
@@ -312,13 +316,13 @@ func (m *Manager) deliverWebhook(task DeliveryTask) {
|
||||
|
||||
if success {
|
||||
m.lo.Info("webhook delivered successfully",
|
||||
"webhook_id", task.WebhookID,
|
||||
"webhook_id", webhook.ID,
|
||||
"event", task.Event,
|
||||
"url", webhook.URL,
|
||||
"status_code", resp.StatusCode)
|
||||
} else {
|
||||
m.lo.Error("webhook delivery failed",
|
||||
"webhook_id", task.WebhookID,
|
||||
"webhook_id", webhook.ID,
|
||||
"event", task.Event,
|
||||
"url", webhook.URL,
|
||||
"status_code", resp.StatusCode,
|
||||
|
||||
Reference in New Issue
Block a user