Skip to content

Commit

Permalink
feat(batch): support columnar writer
Browse files Browse the repository at this point in the history
Signed-off-by: Jiyong Huang <[email protected]>
  • Loading branch information
ngjaying committed Jan 22, 2025
1 parent 353c5e4 commit e60a793
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 10 deletions.
6 changes: 3 additions & 3 deletions internal/converter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func init() {
modules.RegisterConverter(message.FormatUrlEncoded, func(_ api.StreamContext, _ string, _ map[string]*ast.JsonStreamField, props map[string]any) (message.Converter, error) {
return urlencoded.NewConverter(props)
})
modules.RegisterWriterConverter(message.FormatDelimited, func(ctx api.StreamContext, avscPath string, _ map[string]*ast.JsonStreamField, props map[string]any) (message.ConvertWriter, error) {
modules.RegisterWriterConverter(message.FormatDelimited, func(ctx api.StreamContext, avscPath string, _ bool, _ map[string]*ast.JsonStreamField, props map[string]any) (message.ConvertWriter, error) {
return delimited.NewCsvWriter(ctx, props)
})
}
Expand All @@ -65,9 +65,9 @@ func GetOrCreateConverter(ctx api.StreamContext, format string, schemaId string,
return nil, fmt.Errorf("format type %s not supported", t)
}

func GetConvertWriter(ctx api.StreamContext, format string, schemaId string, schema map[string]*ast.JsonStreamField, props map[string]any) (message.ConvertWriter, error) {
func GetConvertWriter(ctx api.StreamContext, format string, schemaId string, isColumnar bool, schema map[string]*ast.JsonStreamField, props map[string]any) (message.ConvertWriter, error) {
if cw, ok := modules.ConvertWriters[format]; ok {
return cw(ctx, schemaId, schema, props)
return cw(ctx, schemaId, isColumnar, schema, props)
}
c, err := GetOrCreateConverter(ctx, format, schemaId, schema, props)
if err != nil {
Expand Down
27 changes: 23 additions & 4 deletions internal/topo/node/batch_writer_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,24 +35,33 @@ import (
// Output: RawTuple
type BatchWriterOp struct {
*defaultSinkNode
writer message.ConvertWriter
writer message.ConvertWriter
columnar bool
// save lastRow to get the props
lastRow any
}

func NewBatchWriterOp(ctx api.StreamContext, name string, rOpt *def.RuleOption, schema map[string]*ast.JsonStreamField, sc *SinkConf) (*BatchWriterOp, error) {
nctx := ctx.(*context.DefaultContext).WithOpId(name)
c, err := converter.GetConvertWriter(nctx, sc.Format, sc.SchemaId, schema, nil)
c, err := converter.GetConvertWriter(nctx, sc.Format, sc.SchemaId, sc.BatchColumnar, schema, nil)
if err != nil {
return nil, err
}
if sc.BatchColumnar {
cw, ok := c.(message.ColWriter)
if !ok {
return nil, fmt.Errorf("format %s does not suppor columnar writer", sc.Format)
}
c = cw
}
err = c.New(nctx)
if err != nil {
return nil, fmt.Errorf("writer fail to initialize new file: %s", err)
}
return &BatchWriterOp{
defaultSinkNode: newDefaultSinkNode(name, rOpt),
writer: c,
columnar: sc.BatchColumnar,
}, nil
}

Expand Down Expand Up @@ -103,7 +112,12 @@ func (o *BatchWriterOp) Exec(ctx api.StreamContext, errCh chan<- error) {
}
case xsql.Row:
o.onProcessStart(ctx, data)
e := o.writer.Write(ctx, dt.ToMap())
var e error
if o.columnar {
e = o.writer.(message.ColWriter).ColWrite(ctx, dt.ToMap())
} else {
e = o.writer.Write(ctx, dt.ToMap())
}
if e != nil {
o.onError(ctx, e)
}
Expand All @@ -112,7 +126,12 @@ func (o *BatchWriterOp) Exec(ctx api.StreamContext, errCh chan<- error) {
count++
case xsql.Collection:
o.onProcessStart(ctx, data)
e := o.writer.Write(ctx, dt.ToMaps())
var e error
if o.columnar {
e = o.writer.(message.ColWriter).ColWrite(ctx, dt.ToMaps())
} else {
e = o.writer.Write(ctx, dt.ToMaps())
}
if e != nil {
o.onError(ctx, e)
}
Expand Down
3 changes: 2 additions & 1 deletion internal/topo/node/props.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2024 EMQ Technologies Co., Ltd.
// Copyright 2024-2025 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -36,6 +36,7 @@ type SinkConf struct {
DataField string `json:"dataField"`
BatchSize int `json:"batchSize"`
LingerInterval cast.DurationConf `json:"lingerInterval"`
BatchColumnar bool `json:"batchColumnar"`
Compression string `json:"compression"`
CompressionProps map[string]any `json:"compressionProps"`
Encryption string `json:"encryption"`
Expand Down
7 changes: 6 additions & 1 deletion pkg/message/artifacts.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021-2024 EMQ Technologies Co., Ltd.
// Copyright 2021-2025 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -46,6 +46,11 @@ type ConvertWriter interface {
Flush(ctx api.StreamContext) ([]byte, error)
}

type ColWriter interface {
ConvertWriter
ColWrite(ctx api.StreamContext, d any) error
}

// PartialDecoder decodes a field partially
type PartialDecoder interface {
DecodeField(ctx api.StreamContext, b []byte, f string) (any, error)
Expand Down
2 changes: 1 addition & 1 deletion pkg/modules/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func IsFormatSupported(format string) bool {
// ConvertWriters are sink converter to use together with batch
var ConvertWriters = map[string]ConvertWriterProvider{}

type ConvertWriterProvider func(ctx api.StreamContext, schemaId string, logicalSchema map[string]*ast.JsonStreamField, props map[string]any) (message.ConvertWriter, error)
type ConvertWriterProvider func(ctx api.StreamContext, schemaId string, isColumnar bool, logicalSchema map[string]*ast.JsonStreamField, props map[string]any) (message.ConvertWriter, error)

func RegisterWriterConverter(name string, provider ConvertWriterProvider) {
ConvertWriters[name] = provider
Expand Down

0 comments on commit e60a793

Please sign in to comment.