Skip to content

Commit

Permalink
Merge pull request #171 from Aiven-Open/italo.garcia/mitigate-logentr…
Browse files Browse the repository at this point in the history
…y-quota
  • Loading branch information
dogukancagatay authored Jun 26, 2024
2 parents 9803f9c + 95d94d7 commit c62e2ea
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 1 deletion.
2 changes: 2 additions & 0 deletions journalpump/senders/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import re # type: ignore[no-redef]

KAFKA_COMPRESSED_MESSAGE_OVERHEAD = 30

# GCP logging also relies on this MAX message size
MAX_KAFKA_MESSAGE_SIZE = 1024**2 # 1 MiB

MAX_ERROR_MESSAGES = 8
Expand Down
23 changes: 22 additions & 1 deletion journalpump/senders/google_cloud_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@ class GoogleCloudLoggingSender(LogSender):
0: "EMERGENCY",
}

# A bit on the safe side, not exactly 256KB but this
# is an approximation anyway
# according to https://cloud.google.com/logging/quotas
_LOG_ENTRY_QUOTA = 250 * 1024

# Somewhat arbitrary maximum message size choosen, this gives a 56K
# headroom for the other fields in the LogEntry
_MAX_MESSAGE_SIZE = 200 * 1024

def __init__(self, *, config, googleapiclient_request_builder=None, **kwargs):
super().__init__(config=config, max_send_interval=config.get("max_send_interval", 0.3), **kwargs)
credentials = None
Expand Down Expand Up @@ -62,6 +71,18 @@ def send_messages(self, *, messages, cursor):
for message in messages:
msg_str = message.decode("utf8")
msg = json.loads(msg_str)

# This might not measure exactly 256K but should be a good enough approximation to handle this error.
# We try truncating the message if it isn't possible then it is skip.
if len(message) > self._LOG_ENTRY_QUOTA:
DEFAULT_MESSAGE = "Log entry can't be logged because its size is greater than GCP logging quota of 256K"
if "MESSAGE" in msg:
msg["MESSAGE"] = f'{msg["MESSAGE"][:self._MAX_MESSAGE_SIZE]}[MESSAGE TRUNCATED]'
messsage_size = len(json.dumps(msg, ensure_ascii=False).encode("utf-8"))
if messsage_size > self._LOG_ENTRY_QUOTA:
msg = {"MESSAGE": DEFAULT_MESSAGE}
else:
msg = {"MESSAGE": DEFAULT_MESSAGE}
timestamp = msg.pop("timestamp", None)
journald_priority = msg.pop("PRIORITY", None)

Expand All @@ -75,7 +96,7 @@ def send_messages(self, *, messages, cursor):
if timestamp is not None:
entry["timestamp"] = timestamp[:26] + "Z" # assume timestamp to be UTC
if journald_priority is not None:
severity = GoogleCloudLoggingSender._SEVERITY_MAPPING.get(journald_priority, "DEFAULT")
severity = self._SEVERITY_MAPPING.get(journald_priority, "DEFAULT")
entry["severity"] = severity
body["entries"].append(entry)

Expand Down
47 changes: 47 additions & 0 deletions test/test_google_cloud_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,50 @@ def test_correct_timestamp(self):
cursor=None,
)
assert sender._sent_count == 1 # pylint: disable=protected-access

def test_big_logentry_is_truncated(self):
"""Check that message was not marked as sent if GoogleApi returns error"""
message_content = "A" * 257_00
request_builder = self._generate_request_builder(
[{"jsonPayload": {"MESSAGE": message_content[: GoogleCloudLoggingSender._MAX_MESSAGE_SIZE]}}],
)

sender = GoogleCloudLoggingSender(
name="googlecloudlogging",
reader=mock.Mock(),
stats=mock.Mock(),
field_filter=None,
config=self.CONFIG,
googleapiclient_request_builder=request_builder,
)
message = {"MESSAGE": message_content}
sender.send_messages(messages=[json.dumps(message).encode()], cursor=None)
assert sender._sent_count == 1

def test_big_logentry_sends_default(self):
"""Check that message was not marked as sent if GoogleApi returns error"""
request_builder = self._generate_request_builder(
[
{
"jsonPayload": {
"MESSAGE": "Log entry can't be logged because its size is greater than GCP logging quota of 256K"
}
}
]
)

sender = GoogleCloudLoggingSender(
name="googlecloudlogging",
reader=mock.Mock(),
stats=mock.Mock(),
field_filter=None,
config=self.CONFIG,
googleapiclient_request_builder=request_builder,
)
message = {"MESSAGE": "A" * 200_000, "OTHER_FIELD": "B" * 200_000}
sender.send_messages(messages=[json.dumps(message).encode()], cursor=None)
assert sender._sent_count == 1

message = {"OTHER_FIELD": "B" * 257_000}
sender.send_messages(messages=[json.dumps(message).encode()], cursor=None)
assert sender._sent_count == 2

0 comments on commit c62e2ea

Please sign in to comment.