-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpublisher_example.py
32 lines (27 loc) · 1.15 KB
/
publisher_example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import config
import datetime
from lab_share_lib.rabbit.schema_registry import SchemaRegistry
from lab_share_lib.rabbit.basic_publisher import BasicPublisher
from lab_share_lib.rabbit.avro_encoder import AvroEncoderBinaryMessage
from lab_share_lib.constants import RABBITMQ_HEADER_VALUE_ENCODER_TYPE_BINARY
TIMESTAMP = datetime.datetime.utcnow()
MESSAGE = f"This is the message sent from the publisher at {TIMESTAMP}"
SUBJECT = "example_1_message"
RABBITMQ_ROUTING_KEY = "testing_routing_key.34"
registry = SchemaRegistry(config.REDPANDA_BASE_URI, verify=False)
publisher = BasicPublisher(
config.RABBIT_SERVER_DETAILS, publish_retry_delay=5, publish_max_retries=36, verify_cert=False
)
encoder = AvroEncoderBinaryMessage(registry, SUBJECT)
# When using a BinaryFileEncoder, you can set the compression codec to "snappy" or "deflate"
# encoder.set_compression_codec("snappy")
encoded_message = encoder.encode([MESSAGE], version="latest")
print(f"Sending message: {MESSAGE}")
publisher.publish_message(
"test_exchange",
RABBITMQ_ROUTING_KEY,
encoded_message.body,
SUBJECT,
encoded_message.version,
RABBITMQ_HEADER_VALUE_ENCODER_TYPE_BINARY,
)