Skip to content

Latest commit

 

History

History
1723 lines (1440 loc) · 54.1 KB

README.adoc

File metadata and controls

1723 lines (1440 loc) · 54.1 KB

Datasino: Random Data Generator for arbitrary data types

1. TL;DR

1.1. Examples

# Generate 4 s-expressions for a schema containing a tagged union:
% docker run rixed/datasino --quiet --with-newlines --count 4 --schema '{thing:[foo|bar|baz string]?[]; thung:string[2]}' --encoding s-expression
(0 () ("hcmdj" "ffhbm"))
(2 (null null) ("xdqyt" "rwsnv"))
(3 ((0 ) (1 ) null) ("fwfvw" "sxcdo"))
(3 (null null null) ("tfirz" "svqhy"))

# Generate 4 CSV lines for a schema involving a list (which length is written first) and a nullable boolean:
% docker run rixed/datasino --quiet --with-newlines --count 4 --schema '{name:string; age:u8; roles:string[]; present:bool?}' --encoding csv
kbizh,44,1,rsbrx,\N
egjrr,62,0,,T
vehmj,84,2,zvqya,jefqu,F
ddrbq,41,3,fvohq,sxfqq,rvdth,\N

# Generate 4 JSON values for a recursive type:
% docker run rixed/datasino --quiet --with-newlines --count 4 --schema '{name:string; sex:[male|female]; parent:this?}' --encoding json
{"name":"rkpdggbam","sex":"male","parent":null}
{"name":"gcyz","sex":"female","parent":{"name":"ofxpyhgzo","sex":"male","parent":null}}
{"name":"l","sex":"male","parent":{"name":"twjfxfdbpow","sex":"male","parent":{"name":"nufds","sex":"male","parent":{"name":"mjgjije","sex":"male","parent":null}}}}
{"name":"vqy","sex":"female","parent":{"name":"drbqpkmrvdthals","sex":"female","parent":{"name":"fvohqn","sex":"female","parent":null}}}

# Demonstrate how to use a map function to transform the value, here to set the length field to that of the name:
% docker run rixed/datasino --quiet --with-newlines --count 4 --schema '{name:string; length:u8}' --encoding json \
    --map '(fun ("this")
             (copy-rec (param 0)
               (string "length") (string-length (get-field "name" (param 0)))))'
{"name":"izhsocrsbrxod","length":13}
{"name":"gjrrkpdggbamzi","length":14}
{"name":"tnonn","length":5}
{"name":"cgrumesmcuprtea","length":15}
```

1.2. Supported encodings

  • JSON

  • CSV (some types are not serializable)

  • S-Expressions

  • ClickHouse (row-binary)

1.3. Supported types

  • boolean (bool)

  • characters (char, no fancy UTF-8 just a byte)

  • float (float for 64bits floating point numbers)

  • string (string, just bytes)

  • unsigned integers (u8, u16, u24, u32, u40, u48, u56, u64 and u128)

  • signed integers (i8 to i128)

  • vectors (type[N] for N values of arbitrary type type. Example: string[2] for a pair of strings)

  • arrays (type[], like vectors but of varying dimension. Example: string[2][] for an array of pairs of strings)

  • tuples ((type1;type2;type3;...) where type1, type2, type3 etc are arbitrary types. Example: (string;(bool;bool);i32) for a triplet of a string, a pair of bools, and an integer)

  • records ({ name1: type1; name2: type2; ... } like a tuple with named fields. Example: { names:string[]; age:u8 })

  • sum types ([ name1 type1 | name2 type2 | ... ] where type1, type2 etc are optional arbitrary types. Example: [black|white|rgb { red:u8; green:u8; blue:u8 }]).

If followed by a question mark (?) a type is nullable. Example: u16?[3] is a vector of 3 possibly null unsigned integers.

The special keyword this corresponds to the current type and can be used to build recursive values (beware your stack!). Example: { species:string; subspecies:this[] }.

1.4. Supported outputs

  • Plain file

  • Kafka

1.5. Command line options

General purpose options
       --help[=FMT] (default=auto)
           Show this help in format FMT. The value FMT must be one of `auto',
           `pager', `groff' or `plain'. With `auto', the format is `pager` or
           `plain' whenever the TERM env var is `dumb' or undefined.

       -c VAL, --count=VAL (absent=-1 or COUNT env)
           If >= 0, exit after than many values has been written.

       -q, --quiet (absent QUIET env)
           Do not print actual output rate on stdout.

       -r VAL, --rate-limit=VAL (absent=0. or RATE_LIMIT env)
           Maximum number of generated values per seconds.

       --stutter=VAL (absent=0. or STUTTER env)
           Reuse each generated value that many time.

       --seed=VAL, --random-seed=VAL (absent RANDOM_SEED env)
           Initial value to initialize the random number generator with.

       --version
           Show version information.
Configuring the output values
       -s TYPE, --schema=TYPE (required)
           The type of the data to be generated (inline or @file).

       -m VAL, --map=VAL (absent MAP env)
           Optional function to convert/modify input values of the schema type
           before emission.

       -e null|row-binary|s-expression|csv|json,
       --encoding=null|ringbuf|row-binary|s-expression|csv|json
       (absent=s-expression or ENCODING env)
           Encoding format for output.

       --with-newlines (absent JSON_NEWLINES env)
           Append a newline after every JSON/S-expression value.
Configuring the sink
       --discard
           Discard generated values.


       -o VAL, --output-file=VAL
           File name where to append the generated values.

       --max-count=VAL (absent=0 or MAX_COUNT env)
           Rotate the current output file/kafka message after that number of
           values

       --max-size=VAL (absent=0 or MAX_SIZE env)
           Rotate the current output file/kafka message after that size (in
           bytes)

       --kafka-brokers=VAL (absent KAFKA_BROKERS env)
           Initial Kafka brokers.

       --kafka-compression-codec=VAL (absent=inherit or
       KAFKA_COMPRESSION_CODEC env)
           Compression codec to use.

       --kafka-compression-level=VAL (absent=-1 or KAFKA_COMPRESSION_LEVEL
       env)
           Compression level to use (-1..12, -1 being default level).

       --kafka-timeout=VAL (absent=0. or KAFKA_TIMEOUT env)
           Timeout when sending a Kafka message.

       --kafka-topic=VAL (absent KAFKA_TOPIC env)
           Kafka topic to publish to.

       --kafka-wait-confirmation (absent KAFKA_WAIT_CONFIRMATION env)
           Wait for delivery after sending each message.

       --partitions=VAL (absent KAFKA_PARTITIONS env)
           Kafka partitions where to send messages to (in a round-robbin
           manner).

       --csv-clickhouse-syntax (absent CSV_CLICKHOUSE_SYNTAX env)
           Should CSV encoder uses clickhouse syntax for compound types.

       --csv-null=VAL (absent= or CSV_NULL env)
           String to use as NULL.

       --csv-quote=VAL (absent CSV_QUOTE env)
           Character to use to quote strings.

       --csv-separator=VAL (absent=',' or CSV_SEPARATOR env)
           Character to use as a separator.

See datasino --help for more.

Here ends the TL;DR section. What follows is the source code of datasino, in literate programming style.

2. Overview

Datasino is a simple tool. Given a data_type, a flow_rate, an encoding and a target sink, it generates random values of that type, at that rate, and send them encoded as instructed to that sink.

All those required inputs will be read from the command line with cmdliner. For data type specifications and generator the dessser library will be used.

3. Notation and glossary

First let’s open a few modules and shorten some common ones:

external modules
open Batteries
open Cmdliner

module DE = DessserExpressions
module DH = DessserOCamlBackEndHelpers
module DL = DessserStdLib
module DM = DessserMasks
module DT = DessserTypes
module DU = DessserCompilationUnit

Also, many names will be shortened when they appear in variable names that are used frequently.

Table 1. Table Glossary
shorthand for

t

some type

thing_t

the type of thing

mn

a maybe_nullable type

4. Inputs from the command line

Let’s start by defining all the required command line arguments, the first of which being the data type, or schema.

4.1. Schema

For convenience let’s accept data type specification both in dessser internal format or ClickHouse somewhat informal NamesAndTypes format. The dessser library has parser for both formats, as well as a parser that accepts any of the two.

Also, let’s accept either inline schemas or schemas stored in a file, using the '@' character as an indication that the command line value is actually a file name, as curl does.

By connecting cmdliner parser and pretty-printer with dessser ones, we can obtain directly dessser representations of data types from cmdliner.

command line arguments
let schema =
  let env = Term.env_info "SCHEMA" in
  let doc = "The type of the data to be generated (inline or @file)." in
  let i = Arg.info ~doc ~env ~docv:"TYPE" [ "s" ; "schema" ] in
  Arg.(required (opt (some mn_t) None i))

With the mn_t type of command line parameter defined from dessser parser and pretty printer:

command line custom types
(* [string_or_file_content s] returns either [s] or, if [s] starts with
  '@', the content of that file name, à la curl: *)
let string_or_file_content s =
  if String.length s > 0 && s.[0] = '@' then
    DessserTools.read_whole_file (String.lchop s)
  else
    s

let mn_t =
  let parse s =
    let s = string_or_file_content s in
    match DessserParser.mn_of_string ~any_format:true (* <1> *) s with
    | exception e ->
        Stdlib.Error (`Msg (Printexc.to_string e))
    | mn ->
        Stdlib.Ok mn
  and print fmt mn =
    Format.fprintf fmt "%s" (DT.mn_to_string mn)
  in
  Arg.conv ~docv:"TYPE" (parse, print)
  1. This is what makes dessser parser accepts both ClickHouse and its own format to specify the schema.

4.2. Constraints on values

If all one want is to fill some data pipeline with data, random values of the proper type may be sufficient, but in many cases one will wish to have at least some data fields with realistic data.

That’s why it is possible to modify the generated random sample before it’s sent, so that some fields can be adapted/overwritten.

The most straightforward way to do so is to accept a user-defined function in the command line. The user would have to spell out this function in libdessser’s intermediate language though. Then, nothing prevents this function, that takes a value of type schema in input, to actually return a value of another type, and that’s the type that’s going to be emitted ultimately. Since this function is really just a map function, let’s call it that:

command line arguments
let map =
  let env = Term.env_info "MAP" in
  let doc = "Optional function to convert/modify input values of the schema \
             type before emission." in
  let i = Arg.info ~doc ~env [ "m" ; "map" ] in
  Arg.(value (opt (some expr_t) None i))

where expr_t is a command line parser for libdessser’s expressions that we can build in about the same way the mn_t parser was build earlier:

command line custom types
let expr_t =
  let parse s =
    let s = string_or_file_content s in
    match DessserParser.expr_of_string s with
    | exception e ->
        Stdlib.Error (`Msg (Printexc.to_string e))
    | e ->
        Stdlib.Ok e
  and print fmt e =
    Format.fprintf fmt "%s" (DE.to_string e)
  in
  Arg.conv ~docv:"EXPRESSION" (parse, print)

Of course when this map function is unspecified no such transformation of values will take place (equivalent to plugin the identity function).

See the last example given in the overview of this document to get a first idea of libdessser’s intermediary language syntax.

4.3. Rate limit

Optionally we might want to control the speed at which data is generated (by default datasino will just spin as fast as possible).

To limit the speed is trivial: a single rate_limit will do (using negative or zero values to mean "no limit").

command line arguments
let rate_limit =
  let env = Term.env_info "RATE_LIMIT" in
  let doc = "Maximum number of generated values per seconds." in
  let i = Arg.info ~doc ~env [ "r" ; "rate-limit" ] in
  Arg.(value (opt float 0. i))

It is then easy enough for datasino to pause in between messages to meet this restriction.

But making data generation faster is less trivial, as randomly generating data takes some time. The simplest way to increase throughput beyond that is to reuse previously generated values and send them several times in a row, to artificially inflate the flow rate a bit like bad cameras inflate image resolution by interpolating pixels, except datasino will not even bother interpolating.

command line arguments
let stutter =
  let env = Term.env_info "STUTTER" in
  let doc = "Reuse each generated value that many time." in
  let i = Arg.info ~doc ~env [ "stutter" ] in
  Arg.(value (opt float 0. i))

Using again a floating value here gives more control on the actual data rate (obviously each repetition will occur an integral number of times, but the average need not be an integer).

Finally, we might want to output only a given number of values before exiting, thus the count parameter:

command line arguments
let count =
  let env = Term.env_info "COUNT" in
  let doc = "If >= 0, exit after than many values has been written." in
  let i = Arg.info ~doc ~env [ "c" ; "count" ] in
  Arg.(value (opt int ~-1 i))

4.4. Encoding

Dessser generates values as normal heap allocated values and can then serialize those onto various possible encoding. Let’s now choose that encoding.

command line arguments
let encoding =
  let encodings =
    [ "null", Null ; (* <1> *)
      "ringbuf", RingBuff ;
      "row-binary", RowBinary ;
      "s-expression", SExpr ;
      "csv", CSV ;
      "json", Json ] in
  let env = Term.env_info "ENCODING" in
  let doc = "Encoding format for output." in
  let docv = docv_of_enum encodings in
  let i = Arg.info ~doc ~docv ~env [ "e" ; "encoding" ] in
  Arg.(value (opt (enum encodings) SExpr i))
  1. The Null encoding could be useful to measure the speed of generating values without serializing or sending.

given:

helper functions
type encodings = Null | RowBinary | SExpr | RingBuff | CSV | Json

and:

helper functions
let docv_of_enum l =
  IO.to_string (
    List.print ~first:"" ~last:"" ~sep:"|" (fun oc (n, _) ->
      String.print oc n)
  ) l

4.5. Target

Finally, the target, or sink, that can be either a file name (or name template), a Kafka broker or the special discard command that could also be useful for benchmarking.

command line arguments
let output_file =
  let doc = "File name where to append the generated values." in
  let i = Arg.info ~doc [ "o" ; "output-file" ] in
  Arg.(value (opt string "" i))

let discard =
  let doc = "Discard generated values." in
  let i = Arg.info ~doc [ "discard" ] in
  Arg.(value (flag i))

let kafka_brokers =
  let env = Term.env_info "KAFKA_BROKERS" in
  let doc = "Initial Kafka brokers." in
  let i = Arg.info ~doc ~env [ "kafka-brokers" ] in
  Arg.(value (opt string "" i))

let kafka_topic =
  let env = Term.env_info "KAFKA_TOPIC" in
  let i = Arg.info ~doc:"Kafka topic to publish to."
                   ~env [ "kafka-topic" ] in
  Arg.(value (opt string "" i))

let kafka_partitions =
  let env = Term.env_info "KAFKA_PARTITIONS" in
  let i = Arg.info ~doc:"Kafka partitions where to send messages to \
                         (in a round-robbin manner)."
                   ~env [ "partitions" ] in
  Arg.(value (opt (list int) [] i))

let kafka_timeout =
  let env = Term.env_info "KAFKA_TIMEOUT" in
  let i = Arg.info ~doc:"Timeout when sending a Kafka message."
                   ~env [ "kafka-timeout" ] in
  Arg.(value (opt float 0. i))

let kafka_wait_confirm =
  let env = Term.env_info "KAFKA_WAIT_CONFIRMATION" in
  let doc = "Wait for delivery after sending each message." in
  let i = Arg.info ~doc ~env [ "kafka-wait-confirmation" ] in
  Arg.(value (flag i))

let kafka_compression_codec =
  let env = Term.env_info "KAFKA_COMPRESSION_CODEC" in
  let doc = "Compression codec to use." in
  let i = Arg.info ~doc ~env [ "kafka-compression-codec" ] in
  Arg.(value (opt string default_kafka_compression_codec i))

let kafka_compression_level =
  let env = Term.env_info "KAFKA_COMPRESSION_LEVEL" in
  let doc = "Compression level to use (-1..12, -1 being default level)." in
  let i = Arg.info ~doc ~env [ "kafka-compression-level" ] in
  Arg.(value (opt int ~-1 i))

where:

default command line values
let default_kafka_compression_codec = "inherit"

Instead of appending every values into a single file it is sometime useful to have a new file created every now and then. The same options would control how many values to write per Kafka message.

command line arguments
let max_size =
  let env = Term.env_info "MAX_SIZE" in
  let doc = "Rotate the current output file/kafka message after that size \
             (in bytes)" in
  let i = Arg.info ~doc ~env [ "max-size" ] in
  Arg.(value (opt int 0 (* <1> *) i))

let max_count =
  let env = Term.env_info "MAX_COUNT" in
  let doc = "Rotate the current output file/kafka message after that number \
             of values" in
  let i = Arg.info ~doc ~env [ "max-count" ] in
  Arg.(value (opt int 0 (* <1> *) i))
  1. 0 can be used to mean "no limit".

When values are sent to kafka, no limit actually means to write only one value per message, whereas when writing to file it means to write all values into the file.

Since a single instance of datasino can have only one target, some of those options are mutually exclusive. A simple check function can verify that one and only one target is configured:

command line check
let check_command_line output_file discard kafka_brokers kafka_topic kafka_partitions
                       kafka_timeout kafka_wait_confirm kafka_compression_codec
                       kafka_compression_level =
  let use_file = output_file <> "" in
  let use_kafka = kafka_brokers <> "" in
  let mention_kafka =
    kafka_topic <> "" || kafka_partitions <> [] ||
    kafka_timeout <> 0. || kafka_wait_confirm ||
    kafka_compression_codec <> default_kafka_compression_codec ||
    kafka_compression_level <> ~-1 in
  if use_file && discard ||
     use_file && use_kafka ||
     use_kafka && discard then
    raise (Failure "More than one target is configured") ;
  if mention_kafka && not use_kafka then
    raise (Failure "kafka options given but kafka is no the target?") ;
  if kafka_compression_level < -1 || kafka_compression_level > 12 then
    raise (Failure "--kafka-compression-level must be between -1 and 12")

4.6. Miscellaneous options

4.6.1. Verbosity

Datasino will output regularly its actual output rate on stdout unless instructed to be quiet (recommanded if one intend to output data on stdout):

command line arguments
let quiet =
  let env = Term.env_info "QUIET" in
  let doc = "Do not print actual output rate on stdout." in
  let i = Arg.info ~doc ~env [ "q" ; "quiet" ] in
  Arg.(value (flag i))

4.6.2. Random generator seed

In sake of reproducibility, any random data generator must print (unless --quiet) the seed used to initialize the random number generator and must permit to set a specific one.

command line arguments
let random_seed =
  let env = Term.env_info "RANDOM_SEED" in
  let doc = "Initial value to initialize the random number generator with." in
  let i = Arg.info ~doc ~env [ "seed" ; "random-seed" ] in
  Arg.(value (opt (some int) None i))

5. Main function

These are all the command line arguments that are needed. We rely on cmdliner to parse them all and call the start function:

main function
let () =
  let start_cmd =
    let doc = "Datasino - random data generator" in
    Term.(
      (const start
        $ quiet
        $ random_seed
        $ schema
        $ map
        $ rate_limit
        $ stutter
        $ count
        $ encoding
        $ output_file
        $ discard
        $ kafka_brokers
        $ kafka_topic
        $ kafka_partitions
        $ kafka_timeout
        $ kafka_wait_confirm
        $ kafka_compression_codec
        $ kafka_compression_level
        $ max_size
        $ max_count
        (* ...extra command line arguments... *)),
      info "datasino" ~version ~doc)
  in
  Term.eval start_cmd |> Term.exit

After displaying the version of the program (always useful when all we have are the logs), the first thing this start function should do is to call the check_command_line function:

start function
let start
      quiet random_seed schema map rate_limit stutter count encoding
      output_file discard kafka_brokers kafka_topic kafka_partitions
      kafka_timeout kafka_wait_confirm kafka_compression_codec
      kafka_compression_level max_size max_count
      (* ...extra command line parameters... *) =
  if not quiet then Printf.printf "Datasino v%s\n%!" version ;
  let seed = random_seed |? Unix.(int_of_float (time ()) + getpid ()) in
  Random.init seed ;
  if not quiet && random_seed = None then
    Printf.printf "Random seed: %d\n%!" seed ;
  check_command_line
    output_file discard
    kafka_brokers kafka_topic kafka_partitions kafka_timeout kafka_wait_confirm
    kafka_compression_codec kafka_compression_level ;

For simplicity datasino is going to append values in a single buffer which, once large enough, will eventually be handed over to some output function. The main loop will therefore look like:

main loop, take 1
let main_loop random_value map serialize is_full output rate_limit count buffer =
  let rec loop buffer count =
    if count <> 0 then
      let v = random_value () in
      let v = map v in
      let buffer = serialize buffer v in
      let buffer =
        if is_full buffer then output buffer
        else buffer in
      rate_limit () ;
      let count = if count > 0 then count - 1 else count in
      loop buffer count in
  loop buffer count

With a functional style persistent buffer which will be a DH.Pointer.t, the type used by dessser derializers.

The start function must thus prepare five functions: 1. one that generate random values of the requested type (random_value); 2. one that, given a buffer and a generated value, encodes this value in the requested format (serialize); 3. one that tells if the buffer is ready to be sent (is_full); 4. one that sends the buffer to the desired target (output). 4. and finally, one that wait some time to comply with the rate limit (rate_limit).

Alas, the above code cannot be type checked! Indeed, the variable v has type schema, which is known only at runtime. That’s why the functions random_value (which returns a value of unknown type), map (which accepts it and returns another value of yet another unknown type), and serialize (which accept that later value) have to be generated at runtime.

A way around this is to combine those three functions into a single one that directly generates a random value, map it and serializes it, so that datasino program itself can be compiled without knowing the actual type of v. This changes the above main loop into:

main loop
let main_loop serialize_random_value is_full output rate_limit count buffer =
  let rec loop buffer count =
    if count <> 0 then
      let buffer = serialize_random_value buffer in
      let buffer =
        if is_full buffer then output buffer
        else buffer in
      rate_limit () ;
      let count = if count > 0 then count - 1 else count in
      loop buffer count in
  loop buffer count

5.1. The value generator

The dessser library offers a value generator already. More exactly, it has a function that returns the code of a function returning a random value of any type. That’s because dessser is a meta-programming tool: it generates code that’s tailored to specific data types. So despite the fact datasino works on any data type (ie. the schema is known only at runtime), the code that will manipulate data will be as efficient as if the data type was known at compile time. To achieve this, datasino will generate some code and then compile it and dynamically load it.

And since we will have several such functions we want to generate at run time, we will build a single compilation unit with all of them so there is only one external compilation and only one library to be dynamically loaded.

First, a compilation unit is created:

start function
  let compunit = DU.make "datasino" in

Before generating the random value generator, it’s best to declare that the type named "t" (the default name for a type in dessser, which the keyword this will refer to) is in fact out schama. With this, the random value generator could make sense of the this keyword and calls itself recursively to generate recursive values:

start function
  DT.add_type_as "t" schema.DT.typ ;

With this in place the random value generator can now be added into the compilation unit:

start function
  let compunit, _, _ (* <1> *) =
    DL.func_random schema |>
    DU.add_identifier_of_expression compunit ~name:"random_value" in
  1. add_identifier_of_expression returns not only the new compilation unit but also the identifier (as a dessser expression) for the added expression, and the name for this identifier. We will not use the identifier because we are not going to call this function from another piece of generated code, and the name we have chosen ourself as "random_value".

We will get back to this function and how datasino can actually call it when we compile and load that compilation unit.

5.2. The mapping function

Similarly to the above the user provided mapping function, if actually present on the command line, must also be added into the compilation unit.

While at it, we also retrieve the output type that is going to be serialized (enc_schema).

This also gives us the opportunity to type-check the expression passed on the command line, resulting in a better error message than the one the compiler would output in case something does not align properly.

Indeed, the command line parser just ensured it was a valid expression but never actually checked that it is a function accepting values of the specified type schema.

For this we make use of a few libdessser’s introspecting functions that are not the scope of this present document but which behavior are hopefully clear enough.

start function
  let compunit, enc_schema =
    match map with
    | None ->
        compunit, schema
    | Some f ->
        let enc_schema =
          match DE.(type_of no_env f) with
          | DT.{ typ = TFunction ([| in_t |], out_t) ; nullable = false ; _ } ->
              if not (DT.eq_mn in_t schema) then
                Printf.sprintf2 "Passed map function must accept values of the \
                                 specified schema, not %a"
                  DT.print_mn in_t |>
                failwith ;
              out_t
          | map_t ->
              Printf.sprintf2 "Passed map function must be a function accepting \
                               values of the specified schema, but this was \
                               passed: %a"
                DT.print_mn map_t |>
              failwith
        and compunit, _, _ =
          DU.add_identifier_of_expression compunit ~name:"map" f in
        compunit, enc_schema in

5.3. The Serializer

The next step is to build the serializer function. Again, the serializer will be tailored to the specific schema and encoding, so that’s another function to be added to the compilation unit compunit.

The way this function is build is to apply a functor that will then return a module specific for the chosen encoding, which exports a function named serialize which returns the code to serialize any value of a given type. Its signature is almost what is needed:

serialize signature
val serialize : ?config:Ser.config (* <1> *) ->
                ?with_fieldmask:bool (* <2> *) ->
                ?type_name:string (* <3> *) ->
                T.mn (* <4> *) ->
                U.t (* <5> *) ->
                U.t * E.t * string (* <6> *)
  1. Each encoding has different configuration options and we’d like to eventually control all of them from datasino command line.

  2. Masks, or field-masks, are a way to generate code that can dynamically skip some fields of data structures. This feature is not used in datasino.

  3. The type name (defaults to "t") so that it can be used recursively in the type definition. Here we will leave the default so that this, which defaults to this "t", will refer to the whole schema, if recursion is needed.

  4. This is the type of the values that need to be serialized, ie. enc_schema.

  5. The compilation unit into which the serializing function is to be added.

  6. The return value is the new compilation unit, the identifier for that function and its name. The function identifier is returned as an expression (E.t).

Given we are not going to use dynamic field masks, the function created by serialize will have this signature:

signature of the function generated by serialize
---
$schema -> DH.Pointer.t -> DH.Pointer.t
---

where $schema is a placeholder for the actual type we asked for (enc_schema at this stage, which is different from the schema passed on the command line if the map function changed the values into another type).

What dessser calls "pointer" is merely a byte buffer under the hood (for OCaml backend at least).

As the configuration of each encoder has its own type, we have to hide this configuration in a place where the actual module type is known, and return only the final, generic serialize function. This results in a code that’s more robust than elegant:

start function
  (* ...encoder configuration functions... *)
  let serialize =
    match encoding with
    | Null ->
        let module Ser = DessserDevNull.Ser in
        let module Serializer = DessserHeapValue.Serialize (Ser) in
        Serializer.serialize ?config:(null_config ())
    | RingBuff ->
        let module Ser = DessserRamenRingBuffer.Ser in
        let module Serializer = DessserHeapValue.Serialize (Ser) in
        Serializer.serialize ?config:(ringbuf_config ())
    | RowBinary ->
        let module Ser = DessserRowBinary.Ser in
        let module Serializer = DessserHeapValue.Serialize (Ser) in
        Serializer.serialize ?config:(rowbinary_config ())
    | SExpr ->
        let module Ser = DessserSExpr.Ser in
        let module Serializer = DessserHeapValue.Serialize (Ser) in
        Serializer.serialize ?config:(sexpr_config ())
    | CSV ->
        let module Ser = DessserCsv.Ser in
        let module Serializer = DessserHeapValue.Serialize (Ser) in
        Serializer.serialize ?config:(csv_config ())
    | Json ->
        let module Ser = DessserJson.Ser in
        let module Serializer = DessserHeapValue.Serialize (Ser) in
        Serializer.serialize ?config:(json_config ()) in
  let compunit, ser_id, _ =
    serialize ~with_fieldmask:false enc_schema compunit in
  (* Rather have a function called "serialize": *)
  let compunit, _, _ =
    DE.Ops.func2 enc_schema DT.ptr (fun v dst ->
      DE.Ops.apply ser_id [ v ; dst ]) |>
    DU.add_identifier_of_expression compunit ~name:"serialize" in

with the various XXX_config functions returning the specific configuration record based on the command line parameters, most of them still to be done:

encoder configuration functions
let null_config () = None
and ringbuf_config () = None
and rowbinary_config () = None
and sexpr_config () =
  Some {DessserConfigs.SExpr.default with
          newline = if with_newlines then Some '\n' else None }
and csv_config () =
  Some { DessserConfigs.Csv.default with
           separator ; null ; quote ; clickhouse_syntax }
and json_config () =
  Some { DessserConfigs.Json.default with
           newline = if with_newlines then Some '\n' else None } in

given those additional command line parameters to control CSV and JSON encodings:

command line arguments
let separator =
  let env = Term.env_info "CSV_SEPARATOR" in
  let doc = "Character to use as a separator." in
  let i = Arg.info ~doc ~env [ "csv-separator" ] in
  Arg.(value (opt better_char ',' i))

let null =
  let env = Term.env_info "CSV_NULL" in
  let doc = "String to use as NULL." in
  let i = Arg.info ~doc ~env [ "csv-null" ] in
  Arg.(value (opt string "\\N" i))

let quote =
  let env = Term.env_info "CSV_QUOTE" in
  let doc = "Character to use to quote strings." in
  let i = Arg.info ~doc ~env [ "csv-quote" ] in
  Arg.(value (opt (some better_char) None i))

let clickhouse_syntax =
  let env = Term.env_info "CSV_CLICKHOUSE_SYNTAX" in
  let doc = "Should CSV encoder uses clickhouse syntax for compound types." in
  let i = Arg.info ~doc ~env [ "csv-clickhouse-syntax" ] in
  Arg.(value (flag i))

let with_newlines =
  let env = Term.env_info "JSON_NEWLINES" in
  let doc = "Append a newline after every JSON/S-expression value." in
  let i = Arg.info ~doc ~env [ "with-newlines" ] in
  Arg.(value (flag i))
extra command line arguments
$ separator
$ null
$ quote
$ clickhouse_syntax
$ with_newlines
extra command line parameters
separator null quote clickhouse_syntax with_newlines

In the arguments above the type better_char is used to allow non printable chars, such as tabs, to be entered easily (whereas cmdliner default char type accept only single characters). It is defined as:

command line custom types
let better_char =
  let parse = function
    | "\\t" ->
        Stdlib.Ok '\t'
    (* TODO: other special chars *)
    | s when String.length s = 1 ->
        Stdlib.Ok s.[0]
    | s ->
        Stdlib.Error (`Msg (Printf.sprintf "Not a character: %S" s))
  and print fmt c =
    Format.fprintf fmt "%C" c
  in
  Arg.conv ~docv:"CHAR" (parse, print)

5.4. Generating serialize_random_value

Remember we said we can only manipulate from datasino the combination of serialize applied to a random_value (as opposed to generating the value first and then serializing it), so that the actual type of the value does not appear.

Let’s therefore generate this serialize_random_value function from the three functions random_value, map then serialize:

start function
  let compunit, _, _ =
    DE.Ops.func1 DT.ptr (fun dst ->
      let open DE.Ops in
      let v (* <1> *) = apply (identifier "random_value") [] in
      let v =
        if map = None then v else apply (identifier "map") [ v ] in
      apply (identifier "serialize") [ v ; dst ]) |>
    DU.add_identifier_of_expression compunit ~name:"serialize_random_value" in

Notice that in <1> the type of v is a compile time dessser expression, not a value of the runtime type schema, so we are in the clear.

We will see later, when it comes to runtime compilation, how datasino will get a handle on the actual function.

5.5. Knowing when to send

The is_full function in the main loop does not depend on the specifics of the specified data type and therefore need not be specialized at runtime. It can be easily and efficiently implemented from the command line parameters alone:

start function
  let is_full =
    if max_count > 0 then
      let count = ref 0 in
      fun _buffer ->
        count := (!count + 1) mod max_count ;
        !count = 0
    else if max_size > 0 then
      fun buffer ->
        DH.Pointer.offset buffer >= max_size
    else
      fun _buffer ->
        true in

Notice than when there is no limit, the message is full after every value.

5.6. The output function

The output function, which operates on a mere byte buffer, can be likewise derived from the command line parameters alone. As each output technique is a bit verbose let’s split them in distinct functions:

start function
let max_msg_size = (* <1> *)
  if max_size > 0 then max_size + 10_000
  else 10_000_000 in
let output =
  if discard then
    ignore
  else if kafka_brokers <> "" then
    output_to_kafka quiet kafka_brokers kafka_topic kafka_partitions kafka_timeout
                    kafka_wait_confirm kafka_compression_codec kafka_compression_level
                    max_msg_size
  else if output_file <> "" then
    output_to_file output_file max_count max_size
  else (* output to stdout by default *)
    output_to_file "/dev/stdout" max_count max_size
  in

With the specific function to output into a file defined a bit earlier as:

output functions
let output_to_file output_file max_count max_size =
  let single_file = max_count = 0 && max_size = 0 in
  let fd = ref None in
  let file_seq = ref ~-1 in (* to name multiple output files *)
  fun buffer ->
    if !fd = None then (
      let file_name =
        if single_file then output_file
        else (
          incr file_seq ;
          output_file ^"."^ string_of_int !file_seq) in
      fd := Some (open_file file_name)) ;
    write_buffer (Option.get !fd) buffer ;
    if not single_file then (
      rotate_file (Option.get !fd) ;
      fd := None)

open_file and rotate_file will take care of creating the files according to the configuration, and will be defined later on.

As for kafka, we merely rely on the bindings to rdkafka client library:

output functions
let output_to_kafka quiet brokers topic partitions timeout wait_confirm
                    compression_codec compression_level max_msg_size =
  let open Kafka in
  if not quiet then Printf.printf "Connecting to Kafka at %s\n%!" brokers ;
  let delivery_callback msg_id = function
    | None -> (* No error *) ()
    | Some err_code ->
        Printf.eprintf "delivery_callback: msg_id=%d, Error: %s\n%!"
          msg_id (kafka_err_string err_code) in
  let handler =
    new_producer ~delivery_callback [
      "metadata.broker.list", brokers ;
      "message.max.bytes", string_of_int max_msg_size ;
      "compression.codec", compression_codec ;
      "compression.level", string_of_int compression_level ] in
  let producer =
    Kafka.new_topic handler topic [
      "message.timeout.ms",
        string_of_int (int_of_float (timeout *. 1000.)) ;
    ] in
  let msg_id = ref 0 in
  let had_err = ref false in
  let partitions = if partitions = [] then [| 0 |]
                   else Array.of_list partitions in
  let next_partition = ref 0 in
  fun buffer ->
    let bytes = (fst buffer).DH.Pointer.impl.to_bytes () in
    let len = snd buffer in
    let str = Bytes.sub_string bytes 0 len in (* producer will not keep a ref on this *)
    let rec send () =
      try
        Kafka.produce producer ~msg_id:!msg_id partitions.(!next_partition) str ;
        next_partition := (!next_partition + 1) mod Array.length partitions ;
        if wait_confirm then Kafka.wait_delivery handler ; (* <1> *)
        incr msg_id
      with Kafka.Error (Kafka.QUEUE_FULL, _) ->
        if not !had_err then
          Printf.eprintf "Kafka queue is full, slowing down...\n%!" ;
        had_err := true ;
        Unix.sleepf 0.01 ;
        send () in
    send ()
    (* TODO: on exit, release all producers *)

Notice in <1> that this wait could be done only occasionally with little gain.

We now have all the possible output functions but all is not quite done yet, as the output function was supposed to return the emptied buffer:

start function
let output buffer =
  output buffer ;
  DH.Pointer.reset buffer in

5.7. The rate limiter

One simple yet accurate way to limit the rate to a given number of values per second is to sleep long enough from time to time (say, every 10 values) to make sure the actual rate do not exceed the limitation. We could sleep in between any two messages but for any then the inaccuracy of the sleep duration would become of the same order of magnitude than the rate limit itself for rates that are high enough.

Let’s merely sleep once every N messages when N is the rate limit itself, ie. sleep about once a second.

start function
  let rate_limit =
    if rate_limit <= 0. then
      ignore
    else
      let sleep_every = int_of_float (ceil rate_limit) in
      let period = float_of_int sleep_every /. rate_limit in
      let start = ref (Unix.gettimeofday ()) in
      let count = ref 0 in
      fun () ->
        incr count ;
        if !count = sleep_every then (
          count := 0 ;
          let now = Unix.gettimeofday () in
          let dt = now -. !start in
          if dt >= period then (
            (* We are late *)
            start := now
          ) else (
            Unix.sleepf (period -. dt) ;
            start := Unix.gettimeofday ()
          )
        ) in

While we are at it, we’d like to display periodically the past rates, in a loadavg way, that is: the average over the last 10 seconds, the average over the last 1 minute, the last 5 mins, and the total average. For this we need four counts, and a function being called every time rate_limit is:

start function
  let display_rates =
    let avg_tot = Avg.make ()
    and avg_5m = Avg.make ~rotate_every:(mins 5) ()
    and avg_1m = Avg.make ~rotate_every:(mins 1) ()
    and avg_10s = Avg.make ~rotate_every:10. () in
    fun () ->
      let now = Unix.gettimeofday () in
      let display =
        Avg.update avg_tot now ||| (* <1> *)
        Avg.update avg_5m now |||
        Avg.update avg_1m now |||
        Avg.update avg_10s now in
      if not quiet && display then
        Printf.printf "%sRates: 10s: %a, 1min: %a, 5min: %a, global: %a\n%!"
          prefix (* <2> *)
          Avg.print avg_10s
          Avg.print avg_1m
          Avg.print avg_5m
          Avg.print avg_tot in
  let rate_limit () =
    display_rates () ;
    rate_limit () in

with a special object avg that basically stores a starting time and a counter:

helper functions
module Avg =
struct
  type t =
    { mutable start : float (* timestamp *) ;
      mutable count : int ;
      rotate_every : float option (* seconds *) ;
      mutable last_avg : float }

  let make ?rotate_every () =
    { start = Unix.gettimeofday () ;
      count = 0 ;
      rotate_every ;
      last_avg = ~-.1. }

  let update t now =
    let dt = now -. t.start in
    t.count <- t.count + 1 ;
    match t.rotate_every with
    | None ->
        t.last_avg <- float_of_int t.count /. dt ;
        false
    | Some r ->
        if dt >= r then (
          t.last_avg <- float_of_int (t.count - 1) /. r ;
          while now -. t.start >= r do
            t.start <- t.start +. r
          done ;
          t.count <- 1 ;
          true
        ) else (
          false
        )

  let print oc t =
    if t.last_avg >= 0. then
      Printf.fprintf oc "%g" t.last_avg
    else
      String.print oc "n.a."
end

Notice earlier in <1> that we’ve used this weird operator that looks a bit like the or operator (||)? This is indeed the or operator, just with no shortcutting as we want the update functions side effects to take place even when the first one returns true (need to print the result). To avoid shortcutting it is good enough to rename the operator:

helper functions
let (|||) = (||)

Notice also in <2> that an arbitrary prefix was printed in front of each log line. This comes handy when running several instances of datasino in parallel to generate various streams of data, and can be set by the command line given:

command line arguments
let prefix =
  let env = Term.env_info "PREFIX" in
  let doc = "Any string to prefix the stdout logs with." in
  let i = Arg.info ~doc ~env [ "prefix" ] in
  Arg.(value (opt string "" i))
extra command line arguments
$ prefix
extra command line parameters
prefix

All the required functions have now been defined, but two of them still have to be actually compiled and dynamically loaded. Let’s go down to this now.

5.8. Runtime code generation

The dessser library has a function that compiles and load dynamically a compilation unit like compunit. The difficulty is that the compilation unit has to call datasino and register that serialize_random_value we are interested in, because OCaml dynamic linker offers no way to reach its symbols the other way around (for type safety).

Therefore the two endpoints of this registration process has to be added.

Inside datasino, a simple reference to the function waiting to be changed to the actual runtime functions by the dynamically loaded code:

registering callback
let gen_serialize_random_value : (DH.Pointer.t -> DH.Pointer.t) ref =
  ref (fun _buffer -> assert false)

And so we need to add in the compunit some code to change this reference. Hopefully, dessser allow to add arbitrary code to a compilation unit, which is a bit like the asm directive of meta-programming:

start function
  let compunit =
    DU.add_verbatim_definition compunit ~name:"registration"
                               ~dependencies:["serialize_random_value"]
                               ~backend:DessserBackEndOCaml.id
                               (fun ~recurs ~rec_seq oc _printer ->
      Printf.fprintf oc
        "%s registration = \
           Datasino_main.gen_serialize_random_value := serialize_random_value\n"
        (DessserBackEndOCaml.let_of ~recurs ~rec_seq)) in

The dessser library has a function called compile_and_load that compiles a compilation unit as a shared object and dynamically load the result. It also takes as a parameter a set of search path so that the generated module can find the headers and libraries it needs. In our case, it needs to find datasino libraries, which could be given by a new command line argument:

command line arguments
let extra_search_paths =
  let env = Term.env_info "EXTRA_SEARCH_PATHS" in
  let doc = "Where to find datasino libraries." in
  let i = Arg.info ~doc ~env [ "I" ; "extra-search-paths" ] in
  Arg.(value (opt_all string [] i))
extra command line arguments
$ extra_search_paths
extra command line parameters
extra_search_paths

While at it, it also accept an option to keep temporary files (useful to inspect the generated code), so let’s take this from the command line as well:

command line arguments
let keep_temp_files =
  let env = Term.env_info "KEEP_TEMP_FILES" in
  let doc = "Whether intermediary generated files should be kept around \
             for inspection." in
  let i = Arg.info ~doc ~env [ "keep-temp-files" ] in
  Arg.(value (flag i))
extra command line arguments
$ keep_temp_files
extra command line parameters
keep_temp_files

So if all goes well, calling compile_and_load now will result in the compilation unit to be compiled and loaded, at what time the initialization of the registration top level variable will set the value of datasino reference gen_serialize_random_value to the actual value from within the freshly compiled compilation unit, so that by the time the compile_and_load function returns the actual function will be ready for duty.

start function
  DessserBackEndOCaml.compile_and_load ~extra_search_paths ~keep_temp_files
                                       compunit ;
  let serialize_random_value = !gen_serialize_random_value in

Et voilà! Rarely can so many things go wrong in so few lines.

5.9. Cheating for a higher data rate

The stutter parameter allows datasino to reuse the same random value several times to obtain a higher throughput for cheap. The serialize_random_value function is the right place to implement this: it keeps the main loop simple and we can not only reuse the value but directly the serialized buffer, saving even more CPU:

start function
  let serialize_random_value =
    (* Store the last serialized value: *)
    let last_value = Bytes.create max_msg_size
    (* Its length: *)
    and last_value_len = ref 0
    (* Count down how many repetitions are still allowed: *)
    and allowance = ref 0. in (* <2> *)
    fun buffer ->
      if !allowance > 1. then (
        allowance := !allowance -. 1. ;
        (* Copy the last saved value into the passed in buffer: *)
        let bytes = (fst buffer).DH.Pointer.impl.to_string () |> Bytes.unsafe_of_string in
        Bytes.blit last_value 0 bytes (snd buffer) !last_value_len ;
        DH.Pointer.skip buffer !last_value_len
      ) else (
        let start = snd buffer in
        let buffer = serialize_random_value buffer in
        if stutter > 0. then (
          (* Copy the new value in last_value: *)
          let len = (snd buffer) - start in
          let bytes = (fst buffer).DH.Pointer.impl.to_string () |> Bytes.unsafe_of_string in
          Bytes.blit bytes start last_value 0 len ;
          last_value_len := len ;
          allowance := !allowance +. stutter
        ) (* else don't bother *) ;
        buffer
      ) in

5.10. Calling the main loop

Now that all the required functions are available, the main loop can be called:

start function
  let buffer = DH.pointer_of_buffer max_msg_size in
  main_loop serialize_random_value is_full output rate_limit count buffer

6. Filling in the boring details

A few trivial functions have been left aside but need to be filled in in order for datasino to compile.

6.1. Miscellaneous

We made use of this award winning minutes to seconds calculator:

helper functions
let mins m = float_of_int (60 * m)

6.2. File functions

open_file takes a file name and return a unix file descriptor. When writing into a file we want the file to be created if it does not exist and append otherwise. So the simplest version could be:

simple open_file
let open_file name =
  Unix.(openfile name [ O_WRONLY ; O_APPEND ; O_CREAT ] 0o640)

Although this serves the use case when we want to append data in an existing file (such as a fifo or a character device) it may not be practical when producing actual files. Then, it’s usually preferable to have files appear only once complete, atomically. It is therefore preferable, when the file does not exist already, to create a temporary file first and then rename it.

So instead of a mere file descriptor we will make the type for opened files a bit more sophisticated:

file functions
type opened_file =
  { fd : Unix.file_descr ;
    name : string ;
    opened_name : string }

Where opened_name being different than name will inform the close function that the file should be renamed. open_file could then be defined as:

file functions
let open_file name =
  let open Unix in
  let opened_name =
    if file_exists name then name else tmp_name name in
  { fd = openfile opened_name [ O_WRONLY ; O_APPEND ; O_CREAT ] 0o640 ;
    name ; opened_name }

With:

helper functions
let file_exists name =
  let open Unix in
  try
    ignore (stat name) ;
    true
  with Unix_error (ENOENT, _, _) ->
    false

let tmp_name name =
  let rec retry n =
    let ext =
      if n = 1 then ".tmp" else ".tmp."^ string_of_int n in
    let tmp_name = name ^ ext in
    if file_exists tmp_name then retry (n + 1) else tmp_name in
  retry 1

write_buffer is given a file descriptor and a "pointer" (DH.Pointer.t) and its sole job is to write its content into that file:

file functions
let write_buffer file buffer =
  let bytes = (fst buffer).DH.Pointer.impl.to_bytes () in
  let len = snd buffer in
  let len' = Unix.write file.fd bytes 0 len in
  assert (len = len')

rotate_file should close the current file, and maybe rename it.

file functions
let rotate_file file =
  let open Unix in
  Unix.close file.fd ;
  if file.opened_name <> file.name then
    Unix.rename file.opened_name file.name

6.3. Kafka functions

The last gap we need to fill is a few helper functions related to Kafka:

kafka functions
let kafka_err_string =
  let open Kafka in
  function
  | BAD_MSG -> "BAD_MSG"
  | BAD_COMPRESSION -> "BAD_COMPRESSION"
  | DESTROY -> "DESTROY"
  | FAIL -> "FAIL"
  | TRANSPORT -> "TRANSPORT"
  | CRIT_SYS_RESOURCE -> "CRIT_SYS_RESOURCE"
  | RESOLVE -> "RESOLVE"
  | MSG_TIMED_OUT -> "MSG_TIMED_OUT"
  | UNKNOWN_PARTITION -> "UNKNOWN_PARTITION"
  | FS -> "FS"
  | UNKNOWN_TOPIC -> "UNKNOWN_TOPIC"
  | ALL_BROKERS_DOWN -> "ALL_BROKERS_DOWN"
  | INVALID_ARG -> "INVALID_ARG"
  | TIMED_OUT -> "TIMED_OUT"
  | QUEUE_FULL -> "QUEUE_FULL"
  | ISR_INSUFF -> "ISR_INSUFF"
  | UNKNOWN -> "UNKNOWN"
  | OFFSET_OUT_OF_RANGE -> "OFFSET_OUT_OF_RANGE"
  | INVALID_MSG -> "INVALID_MSG"
  | UNKNOWN_TOPIC_OR_PART -> "UNKNOWN_TOPIC_OR_PART"
  | INVALID_MSG_SIZE -> "INVALID_MSG_SIZE"
  | LEADER_NOT_AVAILABLE -> "LEADER_NOT_AVAILABLE"
  | NOT_LEADER_FOR_PARTITION -> "NOT_LEADER_FOR_PARTITION"
  | REQUEST_TIMED_OUT -> "REQUEST_TIMED_OUT"
  | BROKER_NOT_AVAILABLE -> "BROKER_NOT_AVAILABLE"
  | REPLICA_NOT_AVAILABLE -> "REPLICA_NOT_AVAILABLE"
  | MSG_SIZE_TOO_LARGE -> "MSG_SIZE_TOO_LARGE"
  | STALE_CTRL_EPOCH -> "STALE_CTRL_EPOCH"
  | OFFSET_METADATA_TOO_LARGE -> "OFFSET_METADATA_TOO_LARGE"
  | CONF_UNKNOWN -> "CONF_UNKNOWN"
  | CONF_INVALID -> "CONF_INVALID"

7. Knitting it all together

Given the amount of work done in the dessser library, datasino itself is quite a short program. The code will nonetheless be split in three modules:

  1. datasino_cli.ml for all command line argument management,

  2. datasino_main.ml for the main function of the program and

  3. datasino_tool.ml for the various helper functions.

datasino_cli.ml
(* ...external modules... *)
open Datasino_config
open Datasino_tools
open Datasino_main

(* ...command line custom types... *)
(* ...command line arguments... *)
(* ...main function... *)
datasino_main.ml
(* ...external modules... *)
open Datasino_config
open Datasino_tools

(* ...registering callback... *)
(* ...main loop... *)
(* ...default command line values... *)
(* ...command line check... *)
(* ...output functions... *)
(* ...start function... *)
datasino_tools.ml
(* ...external modules... *)

exception Not_implemented of string
let todo msg =
  raise (Not_implemented msg)

(* ...helper functions... *)
(* ...file functions... *)
(* ...kafka functions... *)