diff --git a/internal/converter/converter.go b/internal/converter/converter.go index f0c3bf3ccf..7e1b304d1f 100644 --- a/internal/converter/converter.go +++ b/internal/converter/converter.go @@ -43,7 +43,7 @@ func init() { modules.RegisterConverter(message.FormatUrlEncoded, func(_ api.StreamContext, _ string, _ map[string]*ast.JsonStreamField, props map[string]any) (message.Converter, error) { return urlencoded.NewConverter(props) }) - modules.RegisterWriterConverter(message.FormatDelimited, func(ctx api.StreamContext, avscPath string, _ map[string]*ast.JsonStreamField, props map[string]any) (message.ConvertWriter, error) { + modules.RegisterWriterConverter(message.FormatDelimited, func(ctx api.StreamContext, avscPath string, _ bool, _ map[string]*ast.JsonStreamField, props map[string]any) (message.ConvertWriter, error) { return delimited.NewCsvWriter(ctx, props) }) } @@ -65,9 +65,9 @@ func GetOrCreateConverter(ctx api.StreamContext, format string, schemaId string, return nil, fmt.Errorf("format type %s not supported", t) } -func GetConvertWriter(ctx api.StreamContext, format string, schemaId string, schema map[string]*ast.JsonStreamField, props map[string]any) (message.ConvertWriter, error) { +func GetConvertWriter(ctx api.StreamContext, format string, schemaId string, isColumnar bool, schema map[string]*ast.JsonStreamField, props map[string]any) (message.ConvertWriter, error) { if cw, ok := modules.ConvertWriters[format]; ok { - return cw(ctx, schemaId, schema, props) + return cw(ctx, schemaId, isColumnar, schema, props) } c, err := GetOrCreateConverter(ctx, format, schemaId, schema, props) if err != nil { diff --git a/internal/topo/node/batch_writer_op.go b/internal/topo/node/batch_writer_op.go index 2366ed3466..00cf75975f 100644 --- a/internal/topo/node/batch_writer_op.go +++ b/internal/topo/node/batch_writer_op.go @@ -35,17 +35,25 @@ import ( // Output: RawTuple type BatchWriterOp struct { *defaultSinkNode - writer message.ConvertWriter + writer message.ConvertWriter + columnar bool // save lastRow to get the props lastRow any } func NewBatchWriterOp(ctx api.StreamContext, name string, rOpt *def.RuleOption, schema map[string]*ast.JsonStreamField, sc *SinkConf) (*BatchWriterOp, error) { nctx := ctx.(*context.DefaultContext).WithOpId(name) - c, err := converter.GetConvertWriter(nctx, sc.Format, sc.SchemaId, schema, nil) + c, err := converter.GetConvertWriter(nctx, sc.Format, sc.SchemaId, sc.BatchColumnar, schema, nil) if err != nil { return nil, err } + if sc.BatchColumnar { + cw, ok := c.(message.ColWriter) + if !ok { + return nil, fmt.Errorf("format %s does not suppor columnar writer", sc.Format) + } + c = cw + } err = c.New(nctx) if err != nil { return nil, fmt.Errorf("writer fail to initialize new file: %s", err) @@ -53,6 +61,7 @@ func NewBatchWriterOp(ctx api.StreamContext, name string, rOpt *def.RuleOption, return &BatchWriterOp{ defaultSinkNode: newDefaultSinkNode(name, rOpt), writer: c, + columnar: sc.BatchColumnar, }, nil } @@ -103,7 +112,12 @@ func (o *BatchWriterOp) Exec(ctx api.StreamContext, errCh chan<- error) { } case xsql.Row: o.onProcessStart(ctx, data) - e := o.writer.Write(ctx, dt.ToMap()) + var e error + if o.columnar { + e = o.writer.(message.ColWriter).ColWrite(ctx, dt.ToMap()) + } else { + e = o.writer.Write(ctx, dt.ToMap()) + } if e != nil { o.onError(ctx, e) } @@ -112,7 +126,12 @@ func (o *BatchWriterOp) Exec(ctx api.StreamContext, errCh chan<- error) { count++ case xsql.Collection: o.onProcessStart(ctx, data) - e := o.writer.Write(ctx, dt.ToMaps()) + var e error + if o.columnar { + e = o.writer.(message.ColWriter).ColWrite(ctx, dt.ToMaps()) + } else { + e = o.writer.Write(ctx, dt.ToMaps()) + } if e != nil { o.onError(ctx, e) } diff --git a/internal/topo/node/props.go b/internal/topo/node/props.go index 26078ceff1..b758802cb2 100644 --- a/internal/topo/node/props.go +++ b/internal/topo/node/props.go @@ -1,4 +1,4 @@ -// Copyright 2024 EMQ Technologies Co., Ltd. +// Copyright 2024-2025 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -36,6 +36,7 @@ type SinkConf struct { DataField string `json:"dataField"` BatchSize int `json:"batchSize"` LingerInterval cast.DurationConf `json:"lingerInterval"` + BatchColumnar bool `json:"batchColumnar"` Compression string `json:"compression"` CompressionProps map[string]any `json:"compressionProps"` Encryption string `json:"encryption"` diff --git a/pkg/message/artifacts.go b/pkg/message/artifacts.go index 16edab0e52..41f4026e26 100644 --- a/pkg/message/artifacts.go +++ b/pkg/message/artifacts.go @@ -1,4 +1,4 @@ -// Copyright 2021-2024 EMQ Technologies Co., Ltd. +// Copyright 2021-2025 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -46,6 +46,11 @@ type ConvertWriter interface { Flush(ctx api.StreamContext) ([]byte, error) } +type ColWriter interface { + ConvertWriter + ColWrite(ctx api.StreamContext, d any) error +} + // PartialDecoder decodes a field partially type PartialDecoder interface { DecodeField(ctx api.StreamContext, b []byte, f string) (any, error) diff --git a/pkg/modules/converter.go b/pkg/modules/converter.go index cf479197df..441c4fb66e 100644 --- a/pkg/modules/converter.go +++ b/pkg/modules/converter.go @@ -42,7 +42,7 @@ func IsFormatSupported(format string) bool { // ConvertWriters are sink converter to use together with batch var ConvertWriters = map[string]ConvertWriterProvider{} -type ConvertWriterProvider func(ctx api.StreamContext, schemaId string, logicalSchema map[string]*ast.JsonStreamField, props map[string]any) (message.ConvertWriter, error) +type ConvertWriterProvider func(ctx api.StreamContext, schemaId string, isColumnar bool, logicalSchema map[string]*ast.JsonStreamField, props map[string]any) (message.ConvertWriter, error) func RegisterWriterConverter(name string, provider ConvertWriterProvider) { ConvertWriters[name] = provider