diff --git a/journalpump/senders/base.py b/journalpump/senders/base.py index 5a3b00c..5c8a89c 100644 --- a/journalpump/senders/base.py +++ b/journalpump/senders/base.py @@ -14,6 +14,8 @@ import re # type: ignore[no-redef] KAFKA_COMPRESSED_MESSAGE_OVERHEAD = 30 + +# GCP logging also relies on this MAX message size MAX_KAFKA_MESSAGE_SIZE = 1024**2 # 1 MiB MAX_ERROR_MESSAGES = 8 diff --git a/journalpump/senders/google_cloud_logging.py b/journalpump/senders/google_cloud_logging.py index 264cabf..2930dfa 100644 --- a/journalpump/senders/google_cloud_logging.py +++ b/journalpump/senders/google_cloud_logging.py @@ -23,6 +23,15 @@ class GoogleCloudLoggingSender(LogSender): 0: "EMERGENCY", } + # A bit on the safe side, not exactly 256KB but this + # is an approximation anyway + # according to https://cloud.google.com/logging/quotas + _LOG_ENTRY_QUOTA = 250 * 1024 + + # Somewhat arbitrary maximum message size choosen, this gives a 56K + # headroom for the other fields in the LogEntry + _MAX_MESSAGE_SIZE = 200 * 1024 + def __init__(self, *, config, googleapiclient_request_builder=None, **kwargs): super().__init__(config=config, max_send_interval=config.get("max_send_interval", 0.3), **kwargs) credentials = None @@ -62,6 +71,18 @@ def send_messages(self, *, messages, cursor): for message in messages: msg_str = message.decode("utf8") msg = json.loads(msg_str) + + # This might not measure exactly 256K but should be a good enough approximation to handle this error. + # We try truncating the message if it isn't possible then it is skip. + if len(message) > self._LOG_ENTRY_QUOTA: + DEFAULT_MESSAGE = "Log entry can't be logged because its size is greater than GCP logging quota of 256K" + if "MESSAGE" in msg: + msg["MESSAGE"] = f'{msg["MESSAGE"][:self._MAX_MESSAGE_SIZE]}[MESSAGE TRUNCATED]' + messsage_size = len(json.dumps(msg, ensure_ascii=False).encode("utf-8")) + if messsage_size > self._LOG_ENTRY_QUOTA: + msg = {"MESSAGE": DEFAULT_MESSAGE} + else: + msg = {"MESSAGE": DEFAULT_MESSAGE} timestamp = msg.pop("timestamp", None) journald_priority = msg.pop("PRIORITY", None) @@ -75,7 +96,7 @@ def send_messages(self, *, messages, cursor): if timestamp is not None: entry["timestamp"] = timestamp[:26] + "Z" # assume timestamp to be UTC if journald_priority is not None: - severity = GoogleCloudLoggingSender._SEVERITY_MAPPING.get(journald_priority, "DEFAULT") + severity = self._SEVERITY_MAPPING.get(journald_priority, "DEFAULT") entry["severity"] = severity body["entries"].append(entry) diff --git a/test/test_google_cloud_logging.py b/test/test_google_cloud_logging.py index 6b4e517..23651e1 100644 --- a/test/test_google_cloud_logging.py +++ b/test/test_google_cloud_logging.py @@ -144,3 +144,50 @@ def test_correct_timestamp(self): cursor=None, ) assert sender._sent_count == 1 # pylint: disable=protected-access + + def test_big_logentry_is_truncated(self): + """Check that message was not marked as sent if GoogleApi returns error""" + message_content = "A" * 257_00 + request_builder = self._generate_request_builder( + [{"jsonPayload": {"MESSAGE": message_content[: GoogleCloudLoggingSender._MAX_MESSAGE_SIZE]}}], + ) + + sender = GoogleCloudLoggingSender( + name="googlecloudlogging", + reader=mock.Mock(), + stats=mock.Mock(), + field_filter=None, + config=self.CONFIG, + googleapiclient_request_builder=request_builder, + ) + message = {"MESSAGE": message_content} + sender.send_messages(messages=[json.dumps(message).encode()], cursor=None) + assert sender._sent_count == 1 + + def test_big_logentry_sends_default(self): + """Check that message was not marked as sent if GoogleApi returns error""" + request_builder = self._generate_request_builder( + [ + { + "jsonPayload": { + "MESSAGE": "Log entry can't be logged because its size is greater than GCP logging quota of 256K" + } + } + ] + ) + + sender = GoogleCloudLoggingSender( + name="googlecloudlogging", + reader=mock.Mock(), + stats=mock.Mock(), + field_filter=None, + config=self.CONFIG, + googleapiclient_request_builder=request_builder, + ) + message = {"MESSAGE": "A" * 200_000, "OTHER_FIELD": "B" * 200_000} + sender.send_messages(messages=[json.dumps(message).encode()], cursor=None) + assert sender._sent_count == 1 + + message = {"OTHER_FIELD": "B" * 257_000} + sender.send_messages(messages=[json.dumps(message).encode()], cursor=None) + assert sender._sent_count == 2