Skip to content

Commit

Permalink
Issue 29: Unit test from streamcut API (#28)
Browse files Browse the repository at this point in the history
Add unit test for get_streamcut() API which gives all reader positions in stream segment for given readergroup

Signed-off-by: Shwetha N <[email protected]>
  • Loading branch information
ShwethaSNayak authored Feb 29, 2024
1 parent 3ab7eb6 commit cee08b4
Showing 1 changed file with 49 additions and 0 deletions.
49 changes: 49 additions & 0 deletions tests/pravega_reader_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,3 +321,52 @@ async def test_largeEvents(self):
count+=1
self.assertEqual(b'a'*100000, event.data(), "Invalid event data")
r1.release_segment(segment_slice)

# Test to read the events and get streamCut
# Enable RG to read from given streamcut.
async def test_getStreamcut(self):
suffix = str(random.randint(0, 100))
scope = "testReaderSC"
stream = "testStream" + suffix
print("Creating a Stream Manager, ensure Pravega is running")
stream_manager = pravega_client.StreamManager("tcp://127.0.0.1:9090")
pravega_client.StreamReaderGroupConfig(True, scope, stream)
print("Creating a scope")
scope_result = stream_manager.create_scope(scope)
print(scope_result)
print("Creating a stream ", stream)
stream_result = stream_manager.create_stream(scope, stream, 1)
print(stream_result)
w1 = stream_manager.create_writer(scope, stream)

print("Write 2 events")
w1.write_event("event1")
w1.write_event("event2")
w1.flush()

# Create a reader Group points to tail of the stream.
reader_group = stream_manager.create_reader_group("rgnew" + suffix, scope, stream, True)

# get Streamcut which points to 3rd event
rsm = reader_group.get_streamcut()
print(rsm)
# write 3 more events
w1.write_event("event3")
w1.write_event("event4")
w1.write_event("event5")
w1.flush()
# Create ReaderGroup by passing streamcut which points to 3rd event.
reader_group2 = stream_manager.create_reader_group("rgnew" + suffix, scope, stream, False, rsm)
# consume the segment slice for events. Read all 3 events from given slice.
r1 = reader_group2.create_reader("reader-1")
segment_slice = await r1.get_segment_slice_async()
count = 0
event_num = 2
for event in segment_slice:
count += 1
event_num += 1
self.assertEqual(b'event' + str(event_num).encode(), event.data(), "Invalid event data")

self.assertEqual(count, 3, "Three events are expected")
r1.release_segment(segment_slice)
r1.reader_offline()

0 comments on commit cee08b4

Please sign in to comment.