-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Guarantee your own commits via a blocking payer #441
Conversation
WalkthroughThis pull request introduces several new components related to cursor tracking and metadata API testing. A new test file Changes
Sequence DiagramsequenceDiagram
participant Client
participant NodeCursorTracker
participant Node
Client->>NodeCursorTracker: BlockUntilDesiredCursorReached
NodeCursorTracker->>Node: Subscribe to Cursor Stream
loop Wait for Desired Cursor
Node-->>NodeCursorTracker: Cursor Update
alt Cursor Matches Criteria
NodeCursorTracker-->>Client: Return Success
else Cursor Not Reached
NodeCursorTracker->>Node: Continue Listening
end
end
Suggested reviewers
Tip 🌐 Web search-backed reviews and chat
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (5)
pkg/api/payer/nodeCursorTracker.go (1)
28-29
: Address the TODO comment regarding stream lifecycle.The current implementation creates and tears down the stream for every request, which could be inefficient for high-frequency operations.
Consider implementing a connection pool or stream reuse mechanism to improve performance. Would you like me to propose an implementation?
pkg/api/metadata/cursor_test.go (2)
87-87
: Extract sleep durations as constants.Hardcoded sleep durations should be defined as constants for better maintainability.
+const ( + // Add buffer to worker poll time for test stability + testSleepBuffer = 100 * time.Millisecond + testSleepDuration = message.SubscribeWorkerPollTime + testSleepBuffer +) + func insertInitialRows(t *testing.T, store *sql.DB) { testutils.InsertGatewayEnvelopes(t, store, []queries.InsertGatewayEnvelopeParams{ allRows[0], allRows[1], }) - time.Sleep(message.SubscribeWorkerPollTime + 100*time.Millisecond) + time.Sleep(testSleepDuration) }Also applies to: 93-93
96-135
: Add more test coverage.The current tests only cover basic functionality. Consider adding tests for:
- Error cases (invalid requests, network failures)
- Edge cases (empty cursor, maximum sequence IDs)
- Concurrent cursor updates
Would you like me to propose additional test cases?
pkg/api/payer/service.go (2)
223-223
: Consolidate sequence ID assignment.The
desiredSequenceId
is assigned in multiple switch cases. Consider consolidating this assignment after the switch statement.hash = logMessage.Raw.TxHash unsignedOriginatorEnvelope = buildUnsignedOriginatorEnvelopeFromChain( clientEnvelope.Aad().TargetOriginator, logMessage.SequenceId, logMessage.Message, ) - desiredSequenceId = logMessage.SequenceId case topic.TOPIC_KIND_IDENTITY_UPDATES_V1: // ... existing code ... - desiredSequenceId = logMessage.SequenceId default: return nil, status.Errorf( codes.InvalidArgument, "Unknown blockchain message for topic %s", targetTopic.String(), ) } + switch kind { + case topic.TOPIC_KIND_GROUP_MESSAGES_V1: + desiredSequenceId = logMessage.(*groupmessages.GroupMessagesMessageSent).SequenceId + case topic.TOPIC_KIND_IDENTITY_UPDATES_V1: + desiredSequenceId = logMessage.(*identityupdates.IdentityUpdatesIdentityUpdateCreated).SequenceId + }Also applies to: 240-240
258-266
: Improve error handling for cursor tracking.The error from
BlockUntilDesiredCursorReached
should include more context about the failure.err = s.nodeCursorTracker.BlockUntilDesiredCursorReached( ctx, expectedNode, desiredOriginatorId, desiredSequenceId, ) if err != nil { - return nil, err + return nil, status.Errorf( + codes.Internal, + "failed to wait for cursor (node: %d, originator: %d, sequence: %d): %v", + expectedNode, + desiredOriginatorId, + desiredSequenceId, + err, + ) }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
pkg/api/metadata/cursor_test.go
(1 hunks)pkg/api/payer/nodeCursorTracker.go
(1 hunks)pkg/api/payer/service.go
(6 hunks)pkg/testutils/api/api.go
(4 hunks)
🧰 Additional context used
🪛 golangci-lint (1.62.2)
pkg/api/metadata/cursor_test.go
23-23: var topicC
is unused
(unused)
🪛 GitHub Check: Lint-Go
pkg/api/metadata/cursor_test.go
[failure] 23-23:
var topicC
is unused (unused)
⏰ Context from checks skipped due to timeout of 90000ms (2)
- GitHub Check: Build pre-baked anvil-xmtpd
- GitHub Check: Test (Node)
🔇 Additional comments (3)
pkg/api/payer/nodeCursorTracker.go (1)
56-58
: 🛠️ Refactor suggestionRemove redundant error check.
The error condition is already checked at line 49. The additional check at line 56 is redundant and can be simplified.
- if err != nil || resp == nil || resp.LatestSync == nil { + if resp == nil || resp.LatestSync == nil { return status.Errorf(codes.Internal, "error getting node cursor: %v", err) }Likely invalid or redundant comment.
pkg/testutils/api/api.go (1)
70-87
: LGTM!The implementation follows the established pattern of other API clients and is well-structured.
Also applies to: 182-191
pkg/api/payer/service.go (1)
180-182
: Address the TODO comment for originator ID determination.The hardcoded originator ID should be determined from the chain as noted in the TODO comment.
Let's verify if we can determine this from the chain:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (3)
pkg/api/payer/publish_test.go (2)
41-61
: Consider making the sleep duration configurable.The hard-coded sleep duration in
Recv()
might make tests flaky or slow. Consider making it configurable or using a more deterministic approach.type MockSubscribeSyncCursorClient struct { metadata_api.MetadataApi_SubscribeSyncCursorClient updates []*metadata_api.GetSyncCursorResponse err error index int + sleepDuration time.Duration } func (m *MockSubscribeSyncCursorClient) Recv() (*metadata_api.GetSyncCursorResponse, error) { if m.err != nil { return nil, m.err } if m.index < len(m.updates) { resp := m.updates[m.index] m.index++ return resp, nil } // Simulate an open stream without new messages - time.Sleep(50 * time.Millisecond) + if m.sleepDuration > 0 { + time.Sleep(m.sleepDuration) + } return nil, nil }
111-124
: Consider adding error scenarios in the mock stream.The test only covers the happy path. Consider adding test cases for error scenarios and timeout conditions.
mockStream := &MockSubscribeSyncCursorClient{ updates: []*metadata_api.GetSyncCursorResponse{ { LatestSync: &envelopesProto.Cursor{ NodeIdToSequenceId: map[uint32]uint64{1: sequenceId}, }, }, }, + // Add error scenarios + // err: errors.New("simulated error"), }pkg/api/payer/nodeCursorTracker.go (1)
72-74
: Simplify error handling logic.The error check is redundant as it's already handled in the previous if statement.
- if err != nil || resp == nil || resp.LatestSync == nil { + if resp == nil || resp.LatestSync == nil { return status.Errorf(codes.Internal, "error getting node cursor: %v", err) }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
.mockery.yaml
(1 hunks)pkg/api/payer/nodeCursorTracker.go
(1 hunks)pkg/api/payer/publish_test.go
(6 hunks)pkg/api/payer/service.go
(6 hunks)pkg/mocks/metadata_api/mock_MetadataApiClient.go
(1 hunks)pkg/server/server.go
(1 hunks)pkg/testutils/api/api.go
(4 hunks)
🔇 Additional comments (11)
pkg/api/payer/publish_test.go (1)
31-39
: LGTM! Well-structured mock implementation.The
FixedMetadataApiClientConstructor
provides a clean way to inject mock clients for testing.pkg/mocks/metadata_api/mock_MetadataApiClient.go (1)
1-188
: LGTM! Auto-generated mock implementation.The mock implementation is auto-generated by mockery and follows standard patterns. No issues found.
.mockery.yaml (1)
7-9
: LGTM! Correct mockery configuration.The MetadataApiClient interface is correctly added to the mockery configuration, following the established pattern.
pkg/api/payer/nodeCursorTracker.go (3)
11-37
: Well-structured interface and implementation!The code follows Go best practices with clean interface definition, proper dependency injection, and a clear constructor pattern.
45-45
: Address the TODO comment about stream lifecycle.Consider implementing a connection pool or stream reuse mechanism to avoid creating and tearing down streams for every request.
Would you like me to help design a connection pooling solution for the streams?
51-54
:⚠️ Potential issueAdd stream cleanup to prevent resource leaks.
The stream should be properly closed using
CloseSend()
to prevent resource leaks.stream, err := client.SubscribeSyncCursor(ctx, &metadata_api.GetSyncCursorRequest{}) if err != nil { return err } + defer stream.CloseSend()
Likely invalid or redundant comment.
pkg/testutils/api/api.go (3)
70-87
: LGTM! Well-implemented client creation.The implementation follows the established patterns and properly handles resource cleanup.
142-144
: LGTM! Clean service registration.The metadata service is properly registered with appropriate error handling.
183-191
: LGTM! Test client implementation.The test client follows the same pattern as other test clients, ensuring consistency.
pkg/api/payer/service.go (2)
188-188
: Remove hardcoded originator ID.The TODO comment indicates this should be determined from the chain. This should be addressed before production deployment.
Would you like me to help implement the chain-based originator ID determination?
265-273
: LGTM! Proper cursor tracking integration.The cursor tracking is well-integrated into the publishing flow, ensuring consistency before proceeding.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (4)
pkg/api/payer/publish_test.go (3)
39-49
: Add documentation for the mock client.Consider adding a comment explaining the purpose and behavior of this mock client, particularly how it simulates cursor updates.
51-67
: Simplify blocking mechanism and protect shared state.A few suggestions to improve the implementation:
- The select statement with a single case can be simplified.
- The index field should be protected for concurrent access.
Consider this implementation:
// Recv simulates receiving cursor updates over time. func (m *MockSubscribeSyncCursorClient) Recv() (*metadata_api.GetSyncCursorResponse, error) { if m.err != nil { return nil, m.err } if m.index < len(m.updates) { resp := m.updates[m.index] m.index++ return resp, nil } // block forever - select { - case <-m.ctx.Done(): - return nil, m.ctx.Err() - } + <-m.ctx.Done() + return nil, m.ctx.Err() }Also consider adding a mutex to protect the index field if concurrent access is possible:
type MockSubscribeSyncCursorClient struct { metadata_api.MetadataApi_SubscribeSyncCursorClient ctx context.Context updates []*metadata_api.GetSyncCursorResponse err error index int + mu sync.Mutex }
🧰 Tools
🪛 golangci-lint (1.62.2)
63-63: S1000: should use a simple channel send/receive instead of
select
with a single case(gosimple)
🪛 GitHub Check: Lint-Go
[failure] 63-63:
S1000: should use a simple channel send/receive instead ofselect
with a single case (gosimple)
117-129
: Add error case testing and set mock stream context.The test could be more robust:
- The mock stream's context is not set, which could lead to resource leaks.
- No error cases are tested.
Consider adding error case tests and setting the context:
mockStream := &MockSubscribeSyncCursorClient{ + ctx: ctx, // Set the context updates: []*metadata_api.GetSyncCursorResponse{ { LatestSync: &envelopesProto.Cursor{ NodeIdToSequenceId: map[uint32]uint64{1: sequenceId}, }, }, }, }
Also consider adding a test case for error handling:
t.Run("handles cursor subscription error", func(t *testing.T) { mockStream := &MockSubscribeSyncCursorClient{ ctx: ctx, err: fmt.Errorf("subscription error"), } metaMocks.On("SubscribeSyncCursor", mock.Anything, mock.Anything). Return(mockStream, nil). Once() // Test error handling })pkg/api/payer/nodeCursorTracker_test.go (1)
16-140
: Consider refactoring tests for better maintainability.The test cases share similar setup code and could benefit from table-driven tests to reduce duplication and make it easier to add new test cases.
Consider refactoring to use table-driven tests:
func TestCursorTracker(t *testing.T) { tests := []struct { name string updates []*metadata_api.GetSyncCursorResponse originator uint32 sequence uint64 useTimeout bool expectedError bool }{ { name: "basic success", updates: []*metadata_api.GetSyncCursorResponse{ { LatestSync: &envelopesProto.Cursor{ NodeIdToSequenceId: map[uint32]uint64{1: 1}, }, }, }, originator: 1, sequence: 1, expectedError: false, }, // Add more test cases here } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { ctx := context.Background() if tt.useTimeout { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, 10*time.Millisecond) defer cancel() } tracker := constructTracker(t, ctx, tt.updates) err := tracker.BlockUntilDesiredCursorReached( ctx, 1, tt.originator, tt.sequence, ) if tt.expectedError { require.Error(t, err) } else { require.NoError(t, err) } }) } }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
pkg/api/payer/nodeCursorTracker.go
(1 hunks)pkg/api/payer/nodeCursorTracker_test.go
(1 hunks)pkg/api/payer/publish_test.go
(4 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- pkg/api/payer/nodeCursorTracker.go
🧰 Additional context used
🪛 golangci-lint (1.62.2)
pkg/api/payer/publish_test.go
63-63: S1000: should use a simple channel send/receive instead of select
with a single case
(gosimple)
🪛 GitHub Check: Lint-Go
pkg/api/payer/publish_test.go
[failure] 63-63:
S1000: should use a simple channel send/receive instead of select
with a single case (gosimple)
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: Test (Node)
🔇 Additional comments (6)
pkg/api/payer/publish_test.go (4)
14-14
: LGTM!The new imports are correctly added and align with the new metadata API functionality being tested.
Also applies to: 18-18
29-37
: LGTM!Clean implementation of a test double that follows testing best practices by providing a fixed mock client.
81-92
: LGTM!The metadata client mock setup is well-structured and correctly integrated into the service builder.
174-174
: LGTM!The change correctly handles the new metadata mock return value from buildPayerService.
pkg/api/payer/nodeCursorTracker_test.go (2)
1-14
: LGTM! Package structure and imports are well organized.The package follows Go conventions for test files and includes all necessary imports.
147-149
: Add missing mock type definition.The
MockSubscribeSyncCursorClient
type is used but not defined in this file.Please ensure the mock type is defined with all necessary methods. Consider adding:
type MockSubscribeSyncCursorClient struct { updates []*metadata_api.GetSyncCursorResponse ctx context.Context index int } func (m *MockSubscribeSyncCursorClient) Recv() (*metadata_api.GetSyncCursorResponse, error) { if m.index >= len(m.updates) { return nil, io.EOF } update := m.updates[m.index] m.index++ return update, nil } // Add other required interface methods
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
pkg/api/payer/nodeCursorTracker.go (1)
46-46
: Consider implementing connection pooling.The TODO comment suggests improving stream lifecycle management. Consider implementing a connection pool to reuse streams instead of creating and tearing them down for every request.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
pkg/api/payer/nodeCursorTracker.go
(1 hunks)pkg/api/payer/nodeCursorTracker_test.go
(1 hunks)pkg/server/server.go
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- pkg/server/server.go
🔇 Additional comments (10)
pkg/api/payer/nodeCursorTracker.go (4)
12-27
: LGTM! Clean interface and implementation.The interface is well-defined, and the implementation correctly handles client creation and error propagation.
29-38
: LGTM! Well-structured tracker implementation.The struct and constructor are well-designed with appropriate field types and initialization.
52-55
: Consider adding stream cleanup.The stream created by
SubscribeSyncCursor
should be properly closed to prevent resource leaks.
75-104
: LGTM! Robust implementation of cursor tracking.The core logic effectively handles:
- Concurrent operations with proper channel usage
- Context cancellation and error propagation
- Edge cases like missing originator ID
pkg/api/payer/nodeCursorTracker_test.go (6)
16-36
: LGTM! Clear and effective basic test.The test effectively verifies the happy path with clear setup and assertions.
87-147
: LGTM! Comprehensive server shutdown tests.The tests effectively verify server shutdown scenarios with proper error assertions and message verification.
149-197
: LGTM! Well-structured edge case tests.The tests effectively verify behavior when originator or sequence doesn't exist, with proper timeout handling.
199-229
: Fix incorrect context usage in multi-stage test.Similar to the previous tests, this test uses a cancelled context (
testutils.CancelledContext()
) but expects no error.
231-257
: LGTM! Well-implemented test helper.The helper function effectively sets up the test environment with proper mocking and context handling.
38-86
:⚠️ Potential issueFix incorrect error expectations in shutdown tests.
The shutdown tests use cancelled contexts but expect no error (
require.NoError
). This is counterintuitive as cancelled contexts should result in errors.Apply these changes:
func TestCursorTrackerClientShutsDown(t *testing.T) { // ... - require.NoError(t, err) + require.Error(t, err) + require.Contains(t, err.Error(), "context canceled") } func TestCursorTrackerClientShutsDownAfterExecution(t *testing.T) { // ... - require.NoError(t, err) + require.Error(t, err) + require.Contains(t, err.Error(), "context canceled") }Likely invalid or redundant comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (1)
pkg/api/payer/publish_test.go (1)
77-88
: Add error handling for metadata client constructor.While the implementation is correct, it would be more robust to handle potential errors from the metadata client constructor.
Apply this diff to improve error handling:
metaMocks := metadataMocks.NewMockMetadataApiClient(t) metadataConstructor := &FixedMetadataApiClientConstructor{ mockClient: metaMocks, } var interf payer.MetadataApiClientConstructor = metadataConstructor + if metaMocks == nil { + t.Fatal("Failed to create metadata mock client") + } payerService, err := payer.NewPayerApiService( ctx, log, mockRegistry, privKey, mockMessagePublisher, &interf, )🧰 Tools
🪛 GitHub Actions: Test
[error] Test package failed with exit code 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
pkg/api/payer/publish_test.go
(4 hunks)
🧰 Additional context used
🪛 GitHub Actions: Test
pkg/api/payer/publish_test.go
[error] 61-61: Panic error: runtime error: invalid memory address or nil pointer dereference in MockSubscribeSyncCursorClient.Recv()
[error] Test package failed with exit code 1
🔇 Additional comments (1)
pkg/api/payer/publish_test.go (1)
14-14
: LGTM! Clean imports and type declarations.The new imports and type declarations are well-organized and follow Go conventions.
Also applies to: 18-18, 26-26, 29-31
🧰 Tools
🪛 GitHub Actions: Test
[error] Test package failed with exit code 1
FYI @mchenani |
|
The goal of this PR is to not return a commit (or any blockchain payload) as successfull to the client until at least one node acknowledged the write. This is important for clients as they like to re-read their writes.
This functionality uses the new metadata API to check when the correct sequenceID reached the desired state.
Summary by CodeRabbit
New Features
Tests
NodeCursorTracker
functionality, covering various scenarios.MetadataApiClient
.publish
functionality with new mock clients.Chores
Fixes #418