Skip to content

Commit

Permalink
add metadata to messages
Browse files Browse the repository at this point in the history
  • Loading branch information
rekby committed Dec 15, 2023
1 parent f1d4f44 commit 4685172
Show file tree
Hide file tree
Showing 27 changed files with 384 additions and 50 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
* Added per message metadata support for topic api
* Context for call options now have same lifetime as driver (previous - same lifetime as context for call Open function).
* Extended metrics (fill database.sql callbacks, recognize TLI error)
* Refactored config prefix in metrics
Expand Down
2 changes: 1 addition & 1 deletion examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ require (
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
github.com/syndtr/goleveldb v1.0.0 // indirect
github.com/yandex-cloud/go-genproto v0.0.0-20220815090733-4c139c0154e2 // indirect
github.com/ydb-platform/ydb-go-genproto v0.0.0-20231012155159-f85a672542fd // indirect
github.com/ydb-platform/ydb-go-genproto v0.0.0-20231215113745-46f6d30f974a // indirect
github.com/ydb-platform/ydb-go-sdk-metrics v0.16.3 // indirect
github.com/ydb-platform/ydb-go-yc-metadata v0.5.4 // indirect
golang.org/x/crypto v0.13.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions examples/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1197,6 +1197,7 @@ github.com/ydb-platform/xorm v0.0.3 h1:MXk42lANB6r/MMLg/XdJfyXJycGUDlCeLiMlLGDKV
github.com/ydb-platform/xorm v0.0.3/go.mod h1:hFsU7EUF0o3S+l5c0eyP2yPVjJ0d4gsFdqCsyazzwBc=
github.com/ydb-platform/ydb-go-genproto v0.0.0-20231012155159-f85a672542fd h1:dzWP1Lu+A40W883dK/Mr3xyDSM/2MggS8GtHT0qgAnE=
github.com/ydb-platform/ydb-go-genproto v0.0.0-20231012155159-f85a672542fd/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I=
github.com/ydb-platform/ydb-go-genproto v0.0.0-20231215113745-46f6d30f974a/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I=
github.com/ydb-platform/ydb-go-sdk-auth-environ v0.1.2 h1:EYSI1kulnHb0H0zt3yOw4cRj4ABMSMGwNe43D+fX7e4=
github.com/ydb-platform/ydb-go-sdk-auth-environ v0.1.2/go.mod h1:Xfjce+VMU9yJVr1lj60yK2fFPWjB4jr/4cp3K7cjzi4=
github.com/ydb-platform/ydb-go-sdk-metrics v0.16.3 h1:30D5jErLAiGjchVG2D9JiCLbST5LpAiyS7DoUtHkWsU=
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/golang-jwt/jwt/v4 v4.4.1
github.com/google/uuid v1.3.0
github.com/jonboulle/clockwork v0.3.0
github.com/ydb-platform/ydb-go-genproto v0.0.0-20231012155159-f85a672542fd
github.com/ydb-platform/ydb-go-genproto v0.0.0-20231215113745-46f6d30f974a
golang.org/x/sync v0.3.0
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
google.golang.org/grpc v1.57.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/ydb-platform/ydb-go-genproto v0.0.0-20231012155159-f85a672542fd h1:dzWP1Lu+A40W883dK/Mr3xyDSM/2MggS8GtHT0qgAnE=
github.com/ydb-platform/ydb-go-genproto v0.0.0-20231012155159-f85a672542fd/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I=
github.com/ydb-platform/ydb-go-genproto v0.0.0-20231215113745-46f6d30f974a h1:9wx+kCrCQCdwmDe1AFW5yAHdzlo+RV7lcy6y7Zq661s=
github.com/ydb-platform/ydb-go-genproto v0.0.0-20231215113745-46f6d30f974a/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package rawtopiccommon

type MetadataItem struct {
Key string
Value []byte
}
10 changes: 10 additions & 0 deletions internal/grpcwrapper/rawtopic/rawtopicreader/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,15 @@ func (r *ReadResponse) fromProto(p *Ydb_Topic.StreamReadMessage_ReadResponse) er
dstMessage.Data = srcMessage.Data
dstMessage.UncompressedSize = srcMessage.UncompressedSize
dstMessage.MessageGroupID = srcMessage.MessageGroupId
if len(srcMessage.MetadataItems) > 0 {
dstMessage.MetadataItems = make([]rawtopiccommon.MetadataItem, 0, len(srcMessage.MetadataItems))
for _, protoItem := range srcMessage.MetadataItems {
dstMessage.MetadataItems = append(dstMessage.MetadataItems, rawtopiccommon.MetadataItem{
Key: protoItem.Key,
Value: protoItem.Value[:len(protoItem.Value):len(protoItem.Value)],
})
}
}
}
}
}
Expand Down Expand Up @@ -273,6 +282,7 @@ type MessageData struct {
Data []byte
UncompressedSize int64
MessageGroupID string
MetadataItems []rawtopiccommon.MetadataItem
}

//
Expand Down
9 changes: 9 additions & 0 deletions internal/grpcwrapper/rawtopic/rawtopicwriter/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ type MessageData struct {
CreatedAt time.Time
UncompressedSize int64
Partitioning Partitioning
MetadataItems []rawtopiccommon.MetadataItem
Data []byte
}

Expand All @@ -185,6 +186,14 @@ func (d *MessageData) ToProto() (*Ydb_Topic.StreamWriteMessage_WriteRequest_Mess
if err != nil {
return nil, err
}

for _, item := range d.MetadataItems {

Check failure on line 190 in internal/grpcwrapper/rawtopic/rawtopicwriter/messages.go

View workflow job for this annotation

GitHub Actions / golangci-lint

rangeValCopy: each iteration copies 40 bytes (consider pointers or indexing) (gocritic)
res.MetadataItems = append(res.MetadataItems, &Ydb_Topic.MetadataItem{
Key: item.Key,
Value: item.Value,
})
}

return res, nil
}

Expand Down
7 changes: 7 additions & 0 deletions internal/topic/topicreaderinternal/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,13 @@ func newBatchFromStream(
dstMess.commitRange.commitOffsetStart = prevOffset + 1
dstMess.commitRange.commitOffsetEnd = sMess.Offset + 1

if len(sMess.MetadataItems) > 0 {
dstMess.Metadata = make(map[string][]byte, len(sMess.MetadataItems))
for _, item := range sMess.MetadataItems {

Check failure on line 91 in internal/topic/topicreaderinternal/batch.go

View workflow job for this annotation

GitHub Actions / golangci-lint

rangeValCopy: each iteration copies 40 bytes (consider pointers or indexing) (gocritic)
dstMess.Metadata[item.Key] = item.Value
}
}

prevOffset = sMess.Offset
}

Expand Down
10 changes: 10 additions & 0 deletions internal/topic/topicreaderinternal/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/ydb-platform/ydb-go-sdk/v3/internal/empty"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xbytes"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
)

Expand All @@ -28,6 +29,7 @@ type PublicMessage struct {
Offset int64
WrittenAt time.Time
ProducerID string
Metadata map[string][]byte // Metadata, nil if no metadata

commitRange commitRange
data oneTimeReader
Expand Down Expand Up @@ -135,6 +137,14 @@ func (pmb *PublicMessageBuilder) CreatedAt(createdAt time.Time) *PublicMessageBu
return pmb
}

func (pmb *PublicMessageBuilder) Metadata(metadata map[string][]byte) *PublicMessageBuilder {
pmb.mess.Metadata = make(map[string][]byte, len(metadata))
for key, val := range metadata {
pmb.mess.Metadata[key] = xbytes.Clone(val)
}
return pmb
}

// MessageGroupID set message MessageGroupID
func (pmb *PublicMessageBuilder) MessageGroupID(messageGroupID string) *PublicMessageBuilder {
pmb.mess.MessageGroupID = messageGroupID
Expand Down
88 changes: 84 additions & 4 deletions internal/topic/topicreaderinternal/stream_reader_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ func TestTopicStreamReaderImpl_ReadMessages(t *testing.T) {
prevOffset := e.partitionSession.lastReceivedMessageOffset()

sendDataRequestCompleted := make(empty.Chan)
dataSize := 4
dataSize := 6
e.stream.EXPECT().Send(&rawtopicreader.ReadRequest{BytesSize: dataSize}).Do(func(_ interface{}) {
close(sendDataRequestCompleted)
})
Expand Down Expand Up @@ -639,17 +639,60 @@ func TestTopicStreamReaderImpl_ReadMessages(t *testing.T) {
},
},
},
{
Codec: rawtopiccommon.CodecRaw,
WriteSessionMeta: map[string]string{"a": "b", "c": "d"},
WrittenAt: testTime(7),
MessageData: []rawtopicreader.MessageData{
{
Offset: prevOffset + 30,
SeqNo: 5,
CreatedAt: testTime(5),
Data: []byte("test"),
UncompressedSize: 4,
MessageGroupID: "1",
MetadataItems: []rawtopiccommon.MetadataItem{
{
Key: "first",
Value: []byte("first-value"),
},
{
Key: "second",
Value: []byte("second-value"),
},
},
},
{
Offset: prevOffset + 31,
SeqNo: 6,
CreatedAt: testTime(5),
Data: []byte("4567"),
UncompressedSize: 4,
MessageGroupID: "1",
MetadataItems: []rawtopiccommon.MetadataItem{
{
Key: "doubled-key",
Value: []byte("bad"),
},
{
Key: "doubled-key",
Value: []byte("good"),
},
},
},
},
},
},
},
},
},
)

expectedData := [][]byte{[]byte("123"), []byte("4567"), []byte("098"), []byte("0987")}
expectedData := [][]byte{[]byte("123"), []byte("4567"), []byte("098"), []byte("0987"), []byte("test"), []byte("4567")}
expectedBatch := &PublicBatch{
commitRange: commitRange{
commitOffsetStart: prevOffset + 1,
commitOffsetEnd: prevOffset + 21,
commitOffsetEnd: prevOffset + 32,
partitionSession: e.partitionSession,
},
Messages: []*PublicMessage{
Expand Down Expand Up @@ -713,11 +756,48 @@ func TestTopicStreamReaderImpl_ReadMessages(t *testing.T) {
partitionSession: e.partitionSession,
},
},
{
SeqNo: 5,
CreatedAt: testTime(5),
MessageGroupID: "1",
Metadata: map[string][]byte{
"first": []byte("first-value"),
"second": []byte("second-value"),
},
Offset: prevOffset.ToInt64() + 30,
WrittenAt: testTime(7),
WriteSessionMetadata: map[string]string{"a": "b", "c": "d"},
UncompressedSize: 4,
rawDataLen: 4,
commitRange: commitRange{
commitOffsetStart: prevOffset + 21,
commitOffsetEnd: prevOffset + 31,
partitionSession: e.partitionSession,
},
},
{
SeqNo: 6,
CreatedAt: testTime(5),
MessageGroupID: "1",
Metadata: map[string][]byte{
"doubled-key": []byte("good"),
},
Offset: prevOffset.ToInt64() + 31,
WrittenAt: testTime(7),
WriteSessionMetadata: map[string]string{"a": "b", "c": "d"},
UncompressedSize: 4,
rawDataLen: 4,
commitRange: commitRange{
commitOffsetStart: prevOffset + 31,
commitOffsetEnd: prevOffset + 32,
partitionSession: e.partitionSession,
},
},
},
}

opts := newReadMessageBatchOptions()
opts.MinCount = 4
opts.MinCount = 6
batch, err := e.reader.ReadMessageBatch(e.ctx, opts)
require.NoError(t, err)

Expand Down
9 changes: 4 additions & 5 deletions internal/topic/topicwriterinternal/encoders.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (s *EncoderSelector) CompressMessages(messages []messageWithDataContent) (r
len(messages),
trace.TopicWriterCompressMessagesReasonCompressData,
)
err = readInParallelWithCodec(messages, codec, s.parallelCompressors)
err = cacheMessages(messages, codec, s.parallelCompressors)
onCompressDone(err)
}
return codec, err
Expand Down Expand Up @@ -188,7 +188,7 @@ func (s *EncoderSelector) measureCodecs(messages []messageWithDataContent) (rawt
len(messages),
trace.TopicWriterCompressMessagesReasonCodecsMeasure,
)
err := readInParallelWithCodec(messages, codec, s.parallelCompressors)
err := cacheMessages(messages, codec, s.parallelCompressors)
onCompressDone(err)
if err != nil {
return codecUnknown, err
Expand All @@ -215,8 +215,7 @@ func (s *EncoderSelector) measureCodecs(messages []messageWithDataContent) (rawt
return s.allowedCodecs[minSizeIndex], nil
}

func readInParallelWithCodec(messages []messageWithDataContent, codec rawtopiccommon.Codec, parallel int) error {
workerCount := parallel
func cacheMessages(messages []messageWithDataContent, codec rawtopiccommon.Codec, workerCount int) error {
if len(messages) < workerCount {
workerCount = len(messages)
}
Expand Down Expand Up @@ -253,7 +252,7 @@ func readInParallelWithCodec(messages []messageWithDataContent, codec rawtopicco
if localErr != nil {
return
}
_, localErr = task.GetEncodedBytes(codec)
localErr = task.CacheMessageData(codec)
if localErr != nil {
resErrMutex.WithLock(func() {
resErr = localErr
Expand Down
24 changes: 12 additions & 12 deletions internal/topic/topicwriterinternal/encoders_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@ func TestEncoderSelector_CodecMeasure(t *testing.T) {
var messages []messageWithDataContent
for i := 0; i < smallCount; i++ {
data := make([]byte, smallSize)
message := newMessageDataWithContent(Message{Data: bytes.NewReader(data)}, testCommonEncoders)
message := newMessageDataWithContent(PublicMessage{Data: bytes.NewReader(data)}, testCommonEncoders)
messages = append(messages, message)
}

for i := 0; i < largeCount; i++ {
data := make([]byte, largeSize)
message := newMessageDataWithContent(Message{Data: bytes.NewReader(data)}, testCommonEncoders)
message := newMessageDataWithContent(PublicMessage{Data: bytes.NewReader(data)}, testCommonEncoders)
messages = append(messages, message)
}

Expand Down Expand Up @@ -152,30 +152,30 @@ func TestEncoderSelector_CodecMeasure(t *testing.T) {

func TestCompressMessages(t *testing.T) {
t.Run("NoMessages", func(t *testing.T) {
require.NoError(t, readInParallelWithCodec(nil, rawtopiccommon.CodecRaw, 1))
require.NoError(t, cacheMessages(nil, rawtopiccommon.CodecRaw, 1))
})

t.Run("RawOk", func(t *testing.T) {
messages := newTestMessagesWithContent(1)
require.NoError(t, readInParallelWithCodec(messages, rawtopiccommon.CodecRaw, 1))
require.NoError(t, cacheMessages(messages, rawtopiccommon.CodecRaw, 1))
})
t.Run("RawError", func(t *testing.T) {
mess := newMessageDataWithContent(Message{}, testCommonEncoders)
mess := newMessageDataWithContent(PublicMessage{}, testCommonEncoders)
_, err := mess.GetEncodedBytes(rawtopiccommon.CodecGzip)
require.NoError(t, err)
messages := []messageWithDataContent{mess}
require.Error(t, readInParallelWithCodec(messages, rawtopiccommon.CodecRaw, 1))
require.Error(t, cacheMessages(messages, rawtopiccommon.CodecRaw, 1))
})

const messageCount = 10
t.Run("GzipOneThread", func(t *testing.T) {
var messages []messageWithDataContent
for i := 0; i < messageCount; i++ {
mess := newMessageDataWithContent(Message{Data: strings.NewReader("asdf")}, testCommonEncoders)
mess := newMessageDataWithContent(PublicMessage{Data: strings.NewReader("asdf")}, testCommonEncoders)
messages = append(messages, mess)
}

require.NoError(t, readInParallelWithCodec(messages, rawtopiccommon.CodecGzip, 1))
require.NoError(t, cacheMessages(messages, rawtopiccommon.CodecGzip, 1))
for i := 0; i < messageCount; i++ {
require.Equal(t, rawtopiccommon.CodecGzip, messages[i].bufCodec)
}
Expand All @@ -185,11 +185,11 @@ func TestCompressMessages(t *testing.T) {
t.Run("GzipOk", func(t *testing.T) {
var messages []messageWithDataContent
for i := 0; i < messageCount; i++ {
mess := newMessageDataWithContent(Message{Data: strings.NewReader("asdf")}, testCommonEncoders)
mess := newMessageDataWithContent(PublicMessage{Data: strings.NewReader("asdf")}, testCommonEncoders)
messages = append(messages, mess)
}

require.NoError(t, readInParallelWithCodec(messages, rawtopiccommon.CodecGzip, parallelCount))
require.NoError(t, cacheMessages(messages, rawtopiccommon.CodecGzip, parallelCount))
for i := 0; i < messageCount; i++ {
require.Equal(t, rawtopiccommon.CodecGzip, messages[i].bufCodec)
}
Expand All @@ -198,11 +198,11 @@ func TestCompressMessages(t *testing.T) {
t.Run("GzipErr", func(t *testing.T) {
var messages []messageWithDataContent
for i := 0; i < messageCount; i++ {
mess := newMessageDataWithContent(Message{Data: strings.NewReader("asdf")}, testCommonEncoders)
mess := newMessageDataWithContent(PublicMessage{Data: strings.NewReader("asdf")}, testCommonEncoders)
messages = append(messages, mess)
}
messages[0].dataWasRead = true

require.Error(t, readInParallelWithCodec(messages, rawtopiccommon.CodecGzip, parallelCount))
require.Error(t, cacheMessages(messages, rawtopiccommon.CodecGzip, parallelCount))
})
}
Loading

0 comments on commit 4685172

Please sign in to comment.