Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding handling of Nulled lists to beam_row_from_dict #33830

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions sdks/python/apache_beam/io/gcp/bigquery_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -1606,11 +1606,13 @@ def beam_row_from_dict(row: dict, schema):
# This requires that each row has all the fields in the schema.
# However, it's possible that some nullable fields don't appear in the row.
# For this case, we create the field with a `None` value
if name not in row and mode == "NULLABLE":
# None is also set when a repeated field is missing as BigQuery
# converts Null Repeated fields to empty lists
if name not in row and (mode == "NULLABLE" or mode == "REPEATED"):
row[name] = None

value = row[name]
if type in ["RECORD", "STRUCT"]:
if type in ["RECORD", "STRUCT"] and value:
# if this is a list of records, we create a list of Beam Rows
if mode == "REPEATED":
list_of_beam_rows = []
Expand Down
11 changes: 10 additions & 1 deletion sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -867,7 +867,15 @@ def test_dict_to_beam_row_all_types_repeated(self):
self.assertEqual(expected_beam_row, beam_row_from_dict(dict_row, schema))

def test_dict_to_beam_row_all_types_nullable(self):
schema = {"fields": self.get_schema_fields_with_mode("nullable")}
schema_fields_with_nested = [{
"name": "nested_record",
"type": "record",
"mode": "repeated",
"fields": self.get_schema_fields_with_mode("nullable")
}]
schema_fields_with_nested.extend(
self.get_schema_fields_with_mode("nullable"))
schema = {"fields": schema_fields_with_nested}
dict_row = {k: None for k in self.DICT_ROW}

# input dict row with missing nullable fields should still yield a full
Expand All @@ -876,6 +884,7 @@ def test_dict_to_beam_row_all_types_nullable(self):
del dict_row['bool']

expected_beam_row = beam.Row(
nested_record=None,
str=None,
bool=None,
bytes=None,
Expand Down
Loading