Skip to content

Commit

Permalink
Add example argument
Browse files Browse the repository at this point in the history
  • Loading branch information
IyeOnline committed Jan 7, 2025
1 parent a50c756 commit 191d5b7
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 43 deletions.
28 changes: 15 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# Tenzir Example Plugin

This is an example plugin for Tenzir, adding a `read_custom_log` operator that
parses a custom line-based log format.
We think that learning how to build a plugin is best done by example. This example plugin implements a simple operator `read_custom_log` that parses a [custom line-based log format](integration/data/inputs/sample.log).

The operators C++ implementation can be found in [`builtins/operators/read_custom_log.cpp`](builtins/operators/read_custom_log.cpp) and is extensively commented.

## Build and run

Expand All @@ -10,29 +11,28 @@ your additional plugin.

Use `docker compose run --build tenzir '<pipeline>'` to interact with the node
on the command-line, or set the following environment variables connect your
node to app.tenzir.com:
node to [app.tenzir.com](app):

```
```bash
export TENZIR_PLUGINS__PLATFORM__API_KEY='<api-key>'
export TENZIR_PLUGINS__PLATFORM__TENANT_ID='<tenant-id>'
```

## Learn how to write a plugin

We think that learning how to build a plugin is best done by example. Tenzir
ships with a variety of [plugins][plugins-source] and
[builtins][builtins-source] to get inspired by and to learn from.

If you have any questions, feel free to reach out in the [#developers channel
on Discord][discord].

## Run tests

Every plugin defines additional tests using
[BATS](https://bats-core.readthedocs.io/en/stable/writing-tests.html). Use
`docker compose run --build tests` to execute your tests and update the
reference files automatically.

## Further Resources

Tenzir ships with a variety of [plugins][plugins-source] and
[builtins][builtins-source] to get inspired by and to learn from.

If you have any questions, feel free to reach out in the [#developers channel
on Discord][discord].

## Contribute your plugin

If you want to upstream your plugin so that it is bundled with every Tenzir
Expand All @@ -42,6 +42,8 @@ dependencies, consider contributing it as a builtin instead. Builtins are
located in the [`libtenzir/builtins/` directory in the `tenzir/tenzir`
repository][builtins-source].

[tenzir]: https://github.com/tenzir/tenzir
[app]: https://app.tenzir.com
[plugins-source]: https://github.com/tenzir/tenzir/tree/main/plugins
[builtins-source]: https://github.com/tenzir/tenzir/tree/main/libtenzir/builtins
[discord]: https://docs.tenzir.com/discord
84 changes: 54 additions & 30 deletions builtins/operators/read_custom_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@
#include <tenzir/to_lines.hpp>
#include <tenzir/tql2/plugin.hpp>

// Next, plaese jump to the buttom to read the description of the
// Next, jump to the bottom to read the description of the
// `read_custom_log_plugin` class.

namespace tenzir::plugins::example {
namespace {

// The operator instance for the `read_custom_log` operator. This class
// implements the `crtp_operator`, which in turn implements most of the
// `operator_base` interface for us. We only need to implement the `operator()`,
// `operator_base` interface for us. We only need to implement `operator()`,
// mapping a generator of elements to another generator of elements.
//
// Note that many other functions are available to be overridden, such as
Expand All @@ -58,32 +58,34 @@ class read_custom_log_operator final
public:
// This provides a constructor for the operator. Note that operator instances
// _must_ be default-constructible without any arguments so that they can be
// transferred between processes. If you wish to provide arguments from the
// corresponding plugin class's `make` function, you can do so by providing an
// additional constructor that takes additional arguments.
// transferred between processes.
read_custom_log_operator() = default;

// Since our operator also accepts a parameter, we provide a second
// constructor accepting that parameter. This is what we will use in our code.
read_custom_log_operator(tenzir::duration time_offset)
: time_offset_{time_offset} {
}

// The name of the operator. Must be unique.
auto name() const -> std::string override { return "read_custom_log"; }

// Specifies how the operator handles optimization. Check the
// `operator_base::optimize` documentation for more information. For this
// particular operator, we don't do any optimization and choose to act as an
// optimization barrier in the pipeline.
auto optimize(const expression &filter, event_order order) const
-> optimize_result override {
auto optimize(const expression& filter,
event_order order) const -> optimize_result override {
TENZIR_UNUSED(filter, order);
return do_not_optimize(*this);
}

// Handles serialization and deserialization of the operator instance. This
// function _must_ capture all member variables of the instance.
friend auto inspect(auto &f, read_custom_log_operator &x) -> bool {
friend auto inspect(auto& f, read_custom_log_operator& x) -> bool {
return f.object(x).fields(
// If we had any members to serialize here, we'd list them as follows:
// f.field("foo", x.foo),
// f.field("bar", x.bar),
// ...
f.field("time_offset", x.time_offset_)
/// If there were further members, they _must_ be added here.
);
}

Expand All @@ -101,17 +103,17 @@ class read_custom_log_operator final
// generator of monostate denotes that an operator is a sink.
//
// The operator control plane is an escape hatch that allows operators to
// interact with whatever resides outside of the data plane.
// interact with whatever resides outside of the passed in or generated data.
//
// In this case, we're reading a custom line-based log format, so we'll be
// taking in bytes and returning events.
//
// Note that this function is marked `const`, which means that it is not
// allowed to modify any members of the operator instance. Store mutable state
// in the function instead.
auto operator()(generator<chunk_ptr> input,
operator_control_plane &ctrl) const
-> generator<table_slice> {
auto
operator()(generator<chunk_ptr> input,
operator_control_plane& ctrl) const -> generator<table_slice> {
// Since we have a line-based format, we'll adapt our generator of chunks
// into a generator of views onto lines using the `to_lines` function from
// libtenzir, and then for readability will continue with the `read_lines`
Expand All @@ -120,9 +122,11 @@ class read_custom_log_operator final
}

private:
auto read_lines(generator<std::optional<std::string_view>> input,
operator_control_plane &ctrl) const
-> generator<table_slice> {
// This function accepts lines, which are much easier to work with for our
// format. We dispatch to this from `operator()`.
auto
read_lines(generator<std::optional<std::string_view>> input,
operator_control_plane& ctrl) const -> generator<table_slice> {
// We set up a builder to create events in, and give it a fixed schema. For
// more advanced use cases, consider using the `multi_series_builder`
// instead. As a reminder, this is what our log format looks like:
Expand All @@ -139,7 +143,10 @@ class read_custom_log_operator final
}};
// We want to buffer events for no more than 250ms before returning them.
// For this, we need to store when we last returned events.
// Without this logic, the operator would not yield any results until all
// input has been processed.
auto last_yield = std::chrono::steady_clock::now();
// This is the main input loop, which will run until the input stream ends.
for (auto &&line : input) {
// Whenever we read a new line, we first check if we've passed our
// timeout.
Expand All @@ -153,8 +160,7 @@ class read_custom_log_operator final
last_yield = std::chrono::steady_clock::now();
}
if (!line) {
// We're playing friendly and yielding control back to the scheduler if
// we can't read a line.
// If we did not get a line, we yield control back to the scheduler.
co_yield {};
continue;
}
Expand All @@ -172,8 +178,8 @@ class read_custom_log_operator final
}
}

auto parse_line(std::string_view line, series_builder &builder,
diagnostic_handler &dh) const -> void {
auto parse_line(std::string_view line, series_builder& builder,
diagnostic_handler& dh) const -> void {
// Here, we can now finally parse the line piece by piece. Tenzir also ships
// with a parser combinator library, which can be used to parse more complex
// formats. But for this one, we'll just go through the line iteratively.
Expand Down Expand Up @@ -201,7 +207,8 @@ class read_custom_log_operator final
.emit(dh);
return;
}
parts[i] = parts[i].substr(1, parts[i].size() - 2);
parts[i].remove_prefix(1);
parts[i].remove_suffix(1);
}
// For the last section, we'll check whether we begin with a dash and a
// space and will then just leave the rest as-is.
Expand All @@ -211,7 +218,7 @@ class read_custom_log_operator final
.emit(dh);
return;
}
parts[4] = parts[4].substr(2);
parts[4].remove_prefix(2);
// For the first section, we need to additionally parse the timestamp. We'll
// use Tenzir's built-in timestamp parser for this.
auto timestamp = time{};
Expand All @@ -221,15 +228,24 @@ class read_custom_log_operator final
.emit(dh);
return;
}
// Apply our time offset.
timestamp += time_offset_;
// Now, we can finally build the event.
auto event = builder.record();
event.field("timestamp", timestamp);
event.field("log_level", parts[1]);
event.field("user_id", parts[2]);
event.field("action_type", parts[3]);
event.field("message", parts[4]);
TENZIR_WARN("parsed line {}", line);
// Nested fields could be added by calling
// `event.field("my_field").record()` and operating on that.
//
// We write a simple log message for debugging purposes.
TENZIR_TRACE("parsed line {}", line);
}

// Our data member for the time offset.
tenzir::duration time_offset_{};
};

// The `read_custom_log_plugin` class is the plugin that registers the operator
Expand All @@ -248,14 +264,22 @@ class read_custom_log_plugin final
public:
auto make(invocation inv, session ctx) const
-> failure_or<operator_ptr> override {
// We don't accept any arguments, so we can just parse the invocation with
// an empty parser. If you want to accept positional or named arguments,
// call the .positional and .named methods on the parser.
using namespace std::chrono_literals;
// We create an `argument_parser2` to parse arguments.
auto parser = argument_parser2::operator_(name());
// Our operator will accept an optional duration to offset the timestamp in
// the log. The default for this offset will be zero.
// Using `named_optional`, we add our argument. You can also use the
// `positional` and/or `named`. See `argument_parser2.hpp`. If you dont need
// any arguments, you can skip this part.
auto time_offset = tenzir::duration{0s};
parser.named_optional("time_offset", time_offset);
// We let the argument_parser parse our arguments. The TRY macro ensures
// that a parsing failure will stop the setup.
TRY(parser.parse(inv, ctx));
// Create the operator instance, passing in any arguments that the operator
// requires.
return std::make_unique<read_custom_log_operator>();
return std::make_unique<read_custom_log_operator>(time_offset);
}
};

Expand Down

0 comments on commit 191d5b7

Please sign in to comment.