Skip to content

Commit

Permalink
pass extrametadata as record instead of keys and values
Browse files Browse the repository at this point in the history
  • Loading branch information
aykut-bozkurt committed Jul 24, 2024
1 parent 8c59fc1 commit 0b86a09
Showing 1 changed file with 15 additions and 17 deletions.
32 changes: 15 additions & 17 deletions lang/c/src/datafile.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,14 @@ static int write_header(avro_file_writer_t w, avro_value_t *extra_meta)
avro_writer_t schema_writer;
const avro_encoding_t *enc = &avro_binary_encoding;
int64_t schema_len;
size_t extra_meta_index;
size_t extra_meta_size = 0;

const char *json_schema = NULL;
size_t json_schema_len = 0;
if (extra_meta != NULL)
{
check(rval, avro_value_get_size(extra_meta, &extra_meta_size));
avro_value_t meta_schema = {0};
check(rval, avro_value_get_by_name(extra_meta, "avroschema", &meta_schema, NULL));
check(rval, avro_value_get_string(&meta_schema, &json_schema, &json_schema_len));
}

/* Generate random sync */
Expand All @@ -95,34 +97,30 @@ static int write_header(avro_file_writer_t w, avro_value_t *extra_meta)
check(rval, avro_write(w->writer, "Obj", 3));
check(rval, avro_write(w->writer, &version, 1));

check(rval, enc->write_long(w->writer, 2 + extra_meta_size));
check(rval, enc->write_long(w->writer, 2));
check(rval, enc->write_string(w->writer, "avro.codec"));
check(rval, enc->write_bytes(w->writer, w->codec->name, strlen(w->codec->name)));
check(rval, enc->write_string(w->writer, "avro.schema"));
schema_writer =
avro_writer_memory(&w->schema_buf[0], sizeof(w->schema_buf));
rval = avro_schema_to_json(w->writers_schema, schema_writer);

if (json_schema) {
rval = avro_write(schema_writer, (char *)json_schema, strlen(json_schema));
}
else {
rval = avro_schema_to_json(w->writers_schema, schema_writer);
}

if (rval) {
avro_writer_free(schema_writer);
return rval;
}

schema_len = avro_writer_tell(schema_writer);
avro_writer_free(schema_writer);
check(rval,
enc->write_bytes(w->writer, w->schema_buf, schema_len));

for (extra_meta_index = 0; extra_meta_index < extra_meta_size; extra_meta_index++) {
avro_value_t child;
const char *key;
const void *value;
size_t value_len;

check(rval, avro_value_get_by_index(extra_meta, extra_meta_index, &child, &key));
check(rval, avro_value_get_bytes(&child, &value, &value_len));
check(rval, enc->write_string(w->writer, key));
check(rval, enc->write_bytes(w->writer, value, value_len));
}

check(rval, enc->write_long(w->writer, 0));
return write_sync(w);
}
Expand Down

0 comments on commit 0b86a09

Please sign in to comment.