diff --git a/pkg/transfer/elasticsearch/backend.go b/pkg/transfer/elasticsearch/backend.go index 0047b7153..12dddae83 100644 --- a/pkg/transfer/elasticsearch/backend.go +++ b/pkg/transfer/elasticsearch/backend.go @@ -195,7 +195,7 @@ func (b *BulkHandler) Flush(ctx context.Context, results []interface{}) (count i // 处理跨时间间隔 if index != lastIndex && lastIndex != "" { - cnt, err := b.flush(ctx, index, records) + cnt, err := b.flush(ctx, lastIndex, records) records = records[:0] count += cnt errs.Add(err) diff --git a/pkg/transfer/template/etl/flat_batch.go b/pkg/transfer/template/etl/flat_batch.go index 115240a9a..1a57da761 100644 --- a/pkg/transfer/template/etl/flat_batch.go +++ b/pkg/transfer/template/etl/flat_batch.go @@ -86,7 +86,7 @@ func (p *FlatBatchHandler) Process(d define.Payload, outputChan chan<- define.Pa err := d.To(&originMap) if err != nil { p.CounterFails.Inc() - logging.Errorf("%v convert payload %#v error %v", p, d, err) + logging.MinuteErrorfSampling(p.String(), "%v convert payload %#v error %v", p, d, err) return }