diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java index c721395eec79..856c29b0c535 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java @@ -86,7 +86,7 @@ public class AvroGenericRecordToStorageApiProto { .put(Schema.Type.STRING, Object::toString) .put(Schema.Type.BOOLEAN, Function.identity()) .put(Schema.Type.ENUM, o -> o.toString()) - .put(Schema.Type.BYTES, o -> ByteString.copyFrom(((ByteBuffer) o).duplicate())) + .put(Schema.Type.BYTES, AvroGenericRecordToStorageApiProto::convertBytes) .build(); // A map of supported logical types to their encoding functions. @@ -145,6 +145,16 @@ static ByteString convertDecimal(LogicalType logicalType, Object value) { return BeamRowToStorageApiProto.serializeBigDecimalToNumeric(bigDecimal); } + static ByteString convertBytes(Object value) { + if (value instanceof byte[]) { + // for backward compatibility + // this is not accepted by the avro spec, but users may have abused it + return ByteString.copyFrom((byte[]) value); + } else { + return ByteString.copyFrom(((ByteBuffer) value).duplicate()); + } + } + /** * Given an Avro Schema, returns a protocol-buffer TableSchema that can be used to write data * through BigQuery Storage API. diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java index 472173c67412..c073dec4752d 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java @@ -77,6 +77,7 @@ enum TestEnum { SchemaBuilder.record("TestRecord") .fields() .optionalBytes("bytesValue") + .optionalBytes("byteBufferValue") .requiredInt("intValue") .optionalLong("longValue") .optionalFloat("floatValue") @@ -138,64 +139,71 @@ enum TestEnum { .build()) .addField( FieldDescriptorProto.newBuilder() - .setName("intvalue") + .setName("bytebuffervalue") .setNumber(2) + .setType(Type.TYPE_BYTES) + .setLabel(Label.LABEL_OPTIONAL) + .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("intvalue") + .setNumber(3) .setType(Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("longvalue") - .setNumber(3) + .setNumber(4) .setType(Type.TYPE_INT64) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("floatvalue") - .setNumber(4) + .setNumber(5) .setType(Type.TYPE_DOUBLE) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("doublevalue") - .setNumber(5) + .setNumber(6) .setType(Type.TYPE_DOUBLE) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("stringvalue") - .setNumber(6) + .setNumber(7) .setType(Type.TYPE_STRING) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("booleanvalue") - .setNumber(7) + .setNumber(8) .setType(Type.TYPE_BOOL) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("arrayvalue") - .setNumber(8) + .setNumber(9) .setType(Type.TYPE_STRING) .setLabel(Label.LABEL_REPEATED) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("enumvalue") - .setNumber(9) + .setNumber(10) .setType(Type.TYPE_STRING) .setLabel(Label.LABEL_OPTIONAL) .build()) .addField( FieldDescriptorProto.newBuilder() .setName("fixedvalue") - .setNumber(10) + .setNumber(11) .setType(Type.TYPE_BYTES) .setLabel(Label.LABEL_REQUIRED) .build()) @@ -309,7 +317,8 @@ enum TestEnum { Instant now = Instant.now(); baseRecord = new GenericRecordBuilder(BASE_SCHEMA) - .set("bytesValue", ByteBuffer.wrap(BYTES)) + .set("bytesValue", BYTES) + .set("byteBufferValue", ByteBuffer.wrap(BYTES)) .set("intValue", (int) 3) .set("longValue", (long) 4) .set("floatValue", (float) 3.14) @@ -346,6 +355,7 @@ enum TestEnum { baseProtoExpectedFields = ImmutableMap.builder() .put("bytesvalue", ByteString.copyFrom(BYTES)) + .put("bytebuffervalue", ByteString.copyFrom(BYTES)) .put("intvalue", (long) 3) .put("longvalue", (long) 4) .put("floatvalue", (double) 3.14)