fix[IMAP]: Pass context to allow early exit from IMAP processing loops on cancellation

This commit is contained in:
Abhinav Raut
2025-02-02 14:17:23 +05:30
parent 4eeabae134
commit a699619d7a

View File

@@ -38,7 +38,7 @@ func (e *Email) ReadIncomingMessages(ctx context.Context, cfg IMAPConfig) error
case <-ctx.Done():
return nil
case <-readTicker.C:
if err := e.processMailbox(cfg); err != nil {
if err := e.processMailbox(ctx, cfg); err != nil && err != context.Canceled {
e.lo.Error("error processing mailbox", "error", err)
}
}
@@ -46,7 +46,7 @@ func (e *Email) ReadIncomingMessages(ctx context.Context, cfg IMAPConfig) error
}
// processMailbox processes emails in the specified mailbox.
func (e *Email) processMailbox(cfg IMAPConfig) error {
func (e *Email) processMailbox(ctx context.Context, cfg IMAPConfig) error {
e.lo.Debug("processing emails from mailbox", "mailbox", cfg.Mailbox, "inbox_id", e.Identifier())
client, err := imapclient.DialTLS(cfg.Host+":"+fmt.Sprint(cfg.Port), &imapclient.Options{})
if err != nil {
@@ -70,7 +70,7 @@ func (e *Email) processMailbox(cfg IMAPConfig) error {
return fmt.Errorf("error searching messages: %w", err)
}
return e.fetchAndProcessMessages(client, searchData, e.Identifier())
return e.fetchAndProcessMessages(ctx, client, searchData, e.Identifier())
}
// searchMessages searches for messages in the specified time range.
@@ -89,7 +89,7 @@ func (e *Email) searchMessages(client *imapclient.Client, since time.Time) (*ima
}
// fetchAndProcessMessages fetches and processes messages based on the search results.
func (e *Email) fetchAndProcessMessages(client *imapclient.Client, searchData *imap.SearchData, inboxID int) error {
func (e *Email) fetchAndProcessMessages(ctx context.Context, client *imapclient.Client, searchData *imap.SearchData, inboxID int) error {
seqSet := imap.SeqSet{}
seqSet.AddRange(searchData.Min, searchData.Max)
@@ -101,25 +101,29 @@ func (e *Email) fetchAndProcessMessages(client *imapclient.Client, searchData *i
fetchCmd := client.Fetch(seqSet, fetchOptions)
for {
msg := fetchCmd.Next()
if msg == nil {
break
}
select {
case <-ctx.Done():
return ctx.Err()
default:
msg := fetchCmd.Next()
if msg == nil {
break
}
for fetchItem := msg.Next(); fetchItem != nil; fetchItem = msg.Next() {
if item, ok := fetchItem.(imapclient.FetchItemDataEnvelope); ok {
if err := e.processEnvelope(client, item.Envelope, msg.SeqNum, inboxID); err != nil {
e.lo.Error("error processing envelope", "error", err)
// Process message envelope.
for fetchItem := msg.Next(); fetchItem != nil; fetchItem = msg.Next() {
if item, ok := fetchItem.(imapclient.FetchItemDataEnvelope); ok {
if err := e.processEnvelope(ctx, client, item.Envelope, msg.SeqNum, inboxID); err != nil && err != context.Canceled {
e.lo.Error("error processing envelope", "error", err)
}
}
}
}
}
return nil
}
// processEnvelope processes an email envelope.
func (e *Email) processEnvelope(client *imapclient.Client, env *imap.Envelope, seqNum uint32, inboxID int) error {
func (e *Email) processEnvelope(ctx context.Context, client *imapclient.Client, env *imap.Envelope, seqNum uint32, inboxID int) error {
if len(env.From) == 0 {
e.lo.Warn("no sender received for email", "message_id", env.MessageID)
return nil
@@ -191,14 +195,21 @@ func (e *Email) processEnvelope(client *imapclient.Client, env *imap.Envelope, s
return nil
}
for fullFetchItem := fullMsg.Next(); fullFetchItem != nil; fullFetchItem = fullMsg.Next() {
if fullItem, ok := fullFetchItem.(imapclient.FetchItemDataBodySection); ok {
e.lo.Debug("fetching full message body", "message_id", env.MessageID)
return e.processFullMessage(fullItem, incomingMsg)
// Fetch full message.
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
for fullFetchItem := fullMsg.Next(); fullFetchItem != nil; fullFetchItem = fullMsg.Next() {
if fullItem, ok := fullFetchItem.(imapclient.FetchItemDataBodySection); ok {
e.lo.Debug("fetching full message body", "message_id", env.MessageID)
return e.processFullMessage(fullItem, incomingMsg)
}
}
return nil
}
}
return nil
}
// processFullMessage processes the full email message.