mirror of
				https://github.com/openobserve/goflow2.git
				synced 2025-11-03 21:43:13 +00:00 
			
		
		
		
	add async
This commit is contained in:
		@@ -4,12 +4,13 @@ import (
 | 
				
			|||||||
	"bytes"
 | 
						"bytes"
 | 
				
			||||||
	"encoding/json"
 | 
						"encoding/json"
 | 
				
			||||||
	"flag"
 | 
						"flag"
 | 
				
			||||||
	"fmt"
 | 
					 | 
				
			||||||
	"math"
 | 
						"math"
 | 
				
			||||||
	"net/http"
 | 
						"net/http"
 | 
				
			||||||
	"sync"
 | 
						"sync"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						log "github.com/sirupsen/logrus"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/netsampler/goflow2/v2/transport"
 | 
						"github.com/netsampler/goflow2/v2/transport"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -59,10 +60,17 @@ func (d *HTTPDriver) Send(key, data []byte) error {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
		maxRetries := 3
 | 
							maxRetries := 3
 | 
				
			||||||
		delay := time.Millisecond * 500 // initial delay
 | 
							delay := time.Millisecond * 500 // initial delay
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							var wg sync.WaitGroup
 | 
				
			||||||
 | 
							wg.Add(1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							go func() {
 | 
				
			||||||
 | 
								defer wg.Done()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			for i := 0; i < maxRetries; i++ {
 | 
								for i := 0; i < maxRetries; i++ {
 | 
				
			||||||
				req, err := http.NewRequest("POST", d.httpDestination, bytes.NewBuffer(jsonData))
 | 
									req, err := http.NewRequest("POST", d.httpDestination, bytes.NewBuffer(jsonData))
 | 
				
			||||||
				if err != nil {
 | 
									if err != nil {
 | 
				
			||||||
				return err
 | 
										return
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
				req.Header.Set("Content-Type", "application/json")
 | 
									req.Header.Set("Content-Type", "application/json")
 | 
				
			||||||
@@ -70,27 +78,23 @@ func (d *HTTPDriver) Send(key, data []byte) error {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
				client := &http.Client{}
 | 
									client := &http.Client{}
 | 
				
			||||||
				resp, err := client.Do(req)
 | 
									resp, err := client.Do(req)
 | 
				
			||||||
			if err != nil {
 | 
									if err != nil || (resp.StatusCode < 200 || resp.StatusCode >= 300) {
 | 
				
			||||||
					if i == maxRetries-1 {
 | 
										if i == maxRetries-1 {
 | 
				
			||||||
					return err
 | 
											log.Error(err)
 | 
				
			||||||
				}
 | 
											return
 | 
				
			||||||
				continue
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
			defer resp.Body.Close()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			// If the status code is not in the 2xx range, retry
 | 
					 | 
				
			||||||
			if resp.StatusCode < 200 || resp.StatusCode >= 300 {
 | 
					 | 
				
			||||||
				if i == maxRetries-1 {
 | 
					 | 
				
			||||||
					return fmt.Errorf("failed to send data, status code: %d", resp.StatusCode)
 | 
					 | 
				
			||||||
					}
 | 
										}
 | 
				
			||||||
					time.Sleep(delay * time.Duration(math.Pow(2, float64(i)))) // exponential backoff
 | 
										time.Sleep(delay * time.Duration(math.Pow(2, float64(i)))) // exponential backoff
 | 
				
			||||||
					continue
 | 
										continue
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
 | 
									defer resp.Body.Close()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
				// reset batchData
 | 
									// reset batchData
 | 
				
			||||||
				d.batchData = d.batchData[:0]
 | 
									d.batchData = d.batchData[:0]
 | 
				
			||||||
				break
 | 
									break
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 | 
							}()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							wg.Wait()
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user