From b469f9538a96bcf1ef3037a91c8863bf1f254163 Mon Sep 17 00:00:00 2001 From: Balazs Scheidler Date: Sun, 5 Jan 2025 09:44:05 +0100 Subject: [PATCH] filterx/filterx-scope: implement LogMessage change tracking in FilterXScope Previously message-tied variables were managed in part within expr-variable and in part within FilterXScope. Now with the message being available in FilterXScope, we can delegate this in entirety to FilterXScope. This also implements the validation of message-tied values, so if the LogMessage changes independently from FilterXScope, we will notice that too and consider the values of those variables stale. Signed-off-by: Balazs Scheidler --- lib/filterx/expr-variable.c | 87 +++++++--------- lib/filterx/filterx-scope.c | 99 +++++++++++++------ lib/filterx/filterx-scope.h | 36 ++++++- lib/filterx/filterx-variable.c | 9 +- lib/filterx/filterx-variable.h | 25 +++-- lib/filterx/func-vars.c | 13 ++- .../filterx/test_filterx_scope.py | 32 ++++++ 7 files changed, 188 insertions(+), 113 deletions(-) diff --git a/lib/filterx/expr-variable.c b/lib/filterx/expr-variable.c index 5c698395a..bc1e91444 100644 --- a/lib/filterx/expr-variable.c +++ b/lib/filterx/expr-variable.c @@ -40,31 +40,6 @@ typedef struct _FilterXVariableExpr guint32 handle_is_macro:1; } FilterXVariableExpr; -static FilterXObject * -_pull_variable_from_message(FilterXVariableExpr *self, FilterXEvalContext *context, LogMessage *msg) -{ - gssize value_len; - LogMessageValueType t; - const gchar *value = log_msg_get_value_if_set_with_type(msg, self->handle, &value_len, &t); - if (!value) - { - filterx_eval_push_error("No such name-value pair in the log message", &self->super, self->variable_name); - return NULL; - } - - if (self->handle_is_macro) - return filterx_message_value_new(value, value_len, t); - else - return filterx_message_value_new_borrowed(value, value_len, t); -} - -/* NOTE: unset on a variable that only exists in the LogMessage, without making the message writable */ -static void -_whiteout_variable(FilterXVariableExpr *self, FilterXEvalContext *context) -{ - filterx_scope_register_variable(context->scope, FX_VAR_MESSAGE_TIED, self->handle, NULL); -} - static FilterXObject * _eval_variable(FilterXExpr *s) { @@ -75,7 +50,7 @@ _eval_variable(FilterXExpr *s) variable = filterx_scope_lookup_variable(context->scope, self->handle); if (variable) { - FilterXObject *value = filterx_variable_get_value(variable); + FilterXObject *value = filterx_scope_get_variable(context->scope, variable); if (!value) { filterx_eval_push_error("Variable is unset", &self->super, self->variable_name); @@ -83,13 +58,12 @@ _eval_variable(FilterXExpr *s) return value; } - if (filterx_variable_handle_is_message_tied(self->handle)) + if (self->variable_type == FX_VAR_MESSAGE_TIED) { - FilterXObject *msg_ref = _pull_variable_from_message(self, context, context->msg); - if(!msg_ref) - return NULL; - filterx_scope_register_variable(context->scope, FX_VAR_MESSAGE_TIED, self->handle, msg_ref); - return msg_ref; + /* auto register message tied variables */ + variable = filterx_scope_register_variable(context->scope, self->variable_type, self->handle); + if (variable) + return filterx_variable_get_value(variable); } filterx_eval_push_error("No such variable", s, self->variable_name); @@ -100,19 +74,17 @@ static void _update_repr(FilterXExpr *s, FilterXObject *new_repr) { FilterXVariableExpr *self = (FilterXVariableExpr *) s; - FilterXScope *scope = filterx_eval_get_scope(); - FilterXVariable *variable = filterx_scope_lookup_variable(scope, self->handle); + FilterXEvalContext *context = filterx_eval_get_context(); + FilterXScope *scope = context->scope; - g_assert(variable != NULL); - filterx_variable_set_value(variable, new_repr, FALSE); + FilterXVariable *variable = filterx_scope_lookup_variable(scope, self->handle); + filterx_scope_set_variable(scope, variable, new_repr, FALSE); } static gboolean _assign(FilterXExpr *s, FilterXObject *new_value) { FilterXVariableExpr *self = (FilterXVariableExpr *) s; - FilterXScope *scope = filterx_eval_get_scope(); - FilterXVariable *variable = filterx_scope_lookup_variable(scope, self->handle); if (self->handle_is_macro) { @@ -120,17 +92,16 @@ _assign(FilterXExpr *s, FilterXObject *new_value) return FALSE; } - if (!variable) - { - /* NOTE: we pass NULL as initial_value to make sure the new variable - * is considered changed due to the assignment */ + FilterXEvalContext *context = filterx_eval_get_context(); + FilterXScope *scope = context->scope; + FilterXVariable *variable = filterx_scope_lookup_variable(scope, self->handle); - variable = filterx_scope_register_variable(scope, self->variable_type, self->handle, NULL); - } + if (!variable) + variable = filterx_scope_register_variable(scope, self->variable_type, self->handle); /* this only clones mutable objects */ new_value = filterx_object_clone(new_value); - filterx_variable_set_value(variable, new_value, TRUE); + filterx_scope_set_variable(scope, variable, new_value, TRUE); filterx_object_unref(new_value); return TRUE; } @@ -139,15 +110,17 @@ static gboolean _isset(FilterXExpr *s) { FilterXVariableExpr *self = (FilterXVariableExpr *) s; - FilterXScope *scope = filterx_eval_get_scope(); + FilterXEvalContext *context = filterx_eval_get_context(); + FilterXScope *scope = context->scope; + LogMessage *msg = context->msg; FilterXVariable *variable = filterx_scope_lookup_variable(scope, self->handle); if (variable) return filterx_variable_is_set(variable); - FilterXEvalContext *context = filterx_eval_get_context(); - LogMessage *msg = context->msg; - return log_msg_is_value_set(msg, self->handle); + if (self->variable_type == FX_VAR_MESSAGE_TIED) + return log_msg_is_value_set(msg, filterx_variable_handle_to_nv_handle(self->handle)); + return FALSE; } static gboolean @@ -162,17 +135,25 @@ _unset(FilterXExpr *s) } FilterXEvalContext *context = filterx_eval_get_context(); + FilterXScope *scope = context->scope; + LogMessage *msg = context->msg; FilterXVariable *variable = filterx_scope_lookup_variable(context->scope, self->handle); if (variable) { - filterx_variable_unset_value(variable); + filterx_scope_unset_variable(scope, variable); return TRUE; } - LogMessage *msg = context->msg; - if (log_msg_is_value_set(msg, self->handle)) - _whiteout_variable(self, context); + if (self->variable_type == FX_VAR_MESSAGE_TIED) + { + if (log_msg_is_value_set(msg, self->handle)) + { + FilterXVariable *v = filterx_scope_register_variable(context->scope, self->variable_type, self->handle); + /* make sure it is considered changed */ + filterx_scope_unset_variable(context->scope, v); + } + } return TRUE; } diff --git a/lib/filterx/filterx-scope.c b/lib/filterx/filterx-scope.c index 8e3d47ee5..c72cdf01d 100644 --- a/lib/filterx/filterx-scope.c +++ b/lib/filterx/filterx-scope.c @@ -21,6 +21,7 @@ * */ #include "filterx/filterx-scope.h" +#include "filterx/object-message-value.h" #include "scratch-buffers.h" @@ -62,42 +63,28 @@ _lookup_variable(FilterXScope *self, FilterXVariableHandle handle, FilterXVariab static gboolean _validate_variable(FilterXScope *self, FilterXVariable *variable) { + if (filterx_variable_is_declared(variable)) + return TRUE; + if (filterx_variable_is_floating(variable) && - !filterx_variable_is_declared(variable) && !filterx_variable_is_same_generation(variable, self->generation)) return FALSE; - return TRUE; -} - -FilterXVariable * -filterx_scope_lookup_variable(FilterXScope *self, FilterXVariableHandle handle) -{ - FilterXVariable *v; - if (_lookup_variable(self, handle, &v) && _validate_variable(self, v)) - return v; - return NULL; + if (filterx_variable_is_message_tied(variable) && + !filterx_variable_is_same_generation(variable, self->msg->generation)) + return FALSE; + return TRUE; } static FilterXVariable * _register_variable(FilterXScope *self, FilterXVariableType variable_type, - FilterXVariableHandle handle, - FilterXObject *initial_value) + FilterXVariableHandle handle) { FilterXVariable *v_slot; if (_lookup_variable(self, handle, &v_slot)) { - /* already present */ - if (!filterx_variable_is_same_generation(v_slot, self->generation)) - { - /* existing value is from a previous generation, override it as if - * it was a new value */ - - filterx_variable_set_generation(v_slot, self->generation); - filterx_variable_set_value(v_slot, initial_value, FALSE); - } return v_slot; } /* turn v_slot into an index */ @@ -105,26 +92,68 @@ _register_variable(FilterXScope *self, g_assert(v_index <= self->variables->len); g_assert(&g_array_index(self->variables, FilterXVariable, v_index) == v_slot); - + /* we register an unset variable here first */ FilterXVariable v; - filterx_variable_init_instance(&v, variable_type, handle, initial_value, self->generation); + filterx_variable_init_instance(&v, variable_type, handle); g_array_insert_val(self->variables, v_index, v); return &g_array_index(self->variables, FilterXVariable, v_index); } +FilterXVariable * +filterx_scope_lookup_variable(FilterXScope *self, FilterXVariableHandle handle) +{ + FilterXVariable *v; + + if (_lookup_variable(self, handle, &v) && + _validate_variable(self, v)) + return v; + return NULL; +} + +static FilterXObject * +_pull_variable_from_message(FilterXScope *self, NVHandle handle) +{ + gssize value_len; + LogMessageValueType t; + + const gchar *value = log_msg_get_value_if_set_with_type(self->msg, handle, &value_len, &t); + if (!value) + return NULL; + + if (log_msg_is_handle_macro(handle)) + { + FilterXObject *res = filterx_message_value_new(value, value_len, t); + filterx_object_make_readonly(res); + return res; + } + else + return filterx_message_value_new_borrowed(value, value_len, t); +} + + FilterXVariable * filterx_scope_register_variable(FilterXScope *self, FilterXVariableType variable_type, - FilterXVariableHandle handle, - FilterXObject *initial_value) + FilterXVariableHandle handle) { - FilterXVariable *v = _register_variable(self, variable_type, handle, initial_value); + FilterXVariable *v = _register_variable(self, variable_type, handle); /* the scope needs to be synced with the message if it holds a * message-tied variable (e.g. $MSG) */ - if (filterx_variable_handle_is_message_tied(handle)) - self->syncable = TRUE; + if (variable_type == FX_VAR_MESSAGE_TIED) + { + FilterXObject *value = _pull_variable_from_message(self, filterx_variable_handle_to_nv_handle(handle)); + self->syncable = TRUE; + + /* NOTE: value may be NULL on an error, in that case the variable becomes an unset one */ + filterx_variable_set_value(v, value, FALSE, self->msg->generation); + filterx_object_unref(value); + } + else + { + filterx_variable_set_value(v, NULL, FALSE, self->generation); + } return v; } @@ -155,7 +184,7 @@ filterx_scope_foreach_variable(FilterXScope *self, FilterXScopeForeachFunc func, void filterx_scope_sync(FilterXScope *self, LogMessage *msg) { - + filterx_scope_set_message(self, msg); if (!self->dirty) { msg_trace("Filterx sync: not syncing as scope is not dirty", @@ -172,6 +201,8 @@ filterx_scope_sync(FilterXScope *self, LogMessage *msg) GString *buffer = scratch_buffers_alloc(); + gint msg_generation = msg->generation; + for (gint i = 0; i < self->variables->len; i++) { FilterXVariable *v = &g_array_index(self->variables, FilterXVariable, i); @@ -199,16 +230,19 @@ filterx_scope_sync(FilterXScope *self, LogMessage *msg) } else if (v->value == NULL) { + g_assert(v->generation == msg_generation); msg_trace("Filterx sync: whiteout variable, unsetting in message", evt_tag_str("variable", log_msg_get_value_name(filterx_variable_get_nv_handle(v), NULL))); /* we need to unset */ log_msg_unset_value(msg, filterx_variable_get_nv_handle(v)); filterx_variable_unassign(v); + v->generation++; } else if (filterx_variable_is_assigned(v) || filterx_object_is_modified_in_place(v->value)) { LogMessageValueType t; + g_assert(v->generation == msg_generation); msg_trace("Filterx sync: changed variable in scope, overwriting in message", evt_tag_str("variable", log_msg_get_value_name(filterx_variable_get_nv_handle(v), NULL))); @@ -218,13 +252,17 @@ filterx_scope_sync(FilterXScope *self, LogMessage *msg) log_msg_set_value_with_type(msg, filterx_variable_get_nv_handle(v), buffer->str, buffer->len, t); filterx_object_set_modified_in_place(v->value, FALSE); filterx_variable_unassign(v); + v->generation++; } else { msg_trace("Filterx sync: variable in scope and message in sync, not doing anything", evt_tag_str("variable", log_msg_get_value_name(filterx_variable_get_nv_handle(v), NULL))); + v->generation++; } } + /* FIXME: hack ! */ + msg->generation = msg_generation + 1; self->dirty = FALSE; } @@ -298,7 +336,6 @@ filterx_scope_make_writable(FilterXScope **pself) *pself = new; } (*pself)->generation++; - g_assert((*pself)->generation < FILTERX_SCOPE_MAX_GENERATION); return *pself; } diff --git a/lib/filterx/filterx-scope.h b/lib/filterx/filterx-scope.h index 4d693e40a..882d415cd 100644 --- a/lib/filterx/filterx-scope.h +++ b/lib/filterx/filterx-scope.h @@ -43,9 +43,10 @@ typedef struct _FilterXScope FilterXScope; struct _FilterXScope { GAtomicCounter ref_cnt; + guint16 write_protected:1, dirty:1, syncable:1; + FilterXGenCounter generation; LogMessage *msg; GArray *variables; - guint32 generation:20, write_protected, dirty, syncable; }; typedef gboolean (*FilterXScopeForeachFunc)(FilterXVariable *variable, gpointer user_data); @@ -55,8 +56,7 @@ void filterx_scope_sync(FilterXScope *self, LogMessage *msg); FilterXVariable *filterx_scope_lookup_variable(FilterXScope *self, FilterXVariableHandle handle); FilterXVariable *filterx_scope_register_variable(FilterXScope *self, FilterXVariableType variable_type, - FilterXVariableHandle handle, - FilterXObject *initial_value); + FilterXVariableHandle handle); gboolean filterx_scope_foreach_variable(FilterXScope *self, FilterXScopeForeachFunc func, gpointer user_data); /* copy on write */ @@ -79,6 +79,36 @@ filterx_scope_is_dirty(FilterXScope *self) return self->dirty; } +static inline FilterXObject * +filterx_scope_get_variable(FilterXScope *self, FilterXVariable *v) +{ + return filterx_variable_get_value(v); +} + +static inline void +filterx_scope_set_variable(FilterXScope *self, FilterXVariable *v, FilterXObject *value, gboolean assignment) +{ + if (filterx_variable_is_floating(v)) + { + G_STATIC_ASSERT(sizeof(v->generation) == sizeof(self->generation)); + filterx_variable_set_value(v, value, assignment, self->generation); + } + else + { + G_STATIC_ASSERT(sizeof(v->generation) == sizeof(self->msg->generation)); + filterx_variable_set_value(v, value, assignment, self->msg->generation); + } +} + +static inline void +filterx_scope_unset_variable(FilterXScope *self, FilterXVariable *v) +{ + if (filterx_variable_is_floating(v)) + filterx_variable_unset_value(v, self->generation); + else + filterx_variable_unset_value(v, self->msg->generation); +} + static inline void filterx_scope_set_message(FilterXScope *self, LogMessage *msg) { diff --git a/lib/filterx/filterx-variable.c b/lib/filterx/filterx-variable.c index 3b5b726e2..6a2499f6f 100644 --- a/lib/filterx/filterx-variable.c +++ b/lib/filterx/filterx-variable.c @@ -47,13 +47,10 @@ filterx_variable_clear(FilterXVariable *v) void filterx_variable_init_instance(FilterXVariable *v, FilterXVariableType variable_type, - FilterXVariableHandle handle, - FilterXObject *initial_value, - guint32 generation) + FilterXVariableHandle handle) { - v->handle = handle; v->variable_type = variable_type; + v->handle = handle; v->assigned = FALSE; - v->generation = generation; - v->value = filterx_object_ref(initial_value); + v->value = NULL; } diff --git a/lib/filterx/filterx-variable.h b/lib/filterx/filterx-variable.h index b31afe52c..c4950b5b8 100644 --- a/lib/filterx/filterx-variable.h +++ b/lib/filterx/filterx-variable.h @@ -36,9 +36,8 @@ typedef enum FX_VAR_DECLARED_FLOATING, } FilterXVariableType; -#define FILTERX_SCOPE_MAX_GENERATION ((1UL << 20) - 1) - typedef guint32 FilterXVariableHandle; +typedef guint16 FilterXGenCounter; #define FILTERX_HANDLE_FLOATING_BIT (1UL << 31) @@ -71,17 +70,15 @@ typedef struct _FilterXVariable * * declared -- this variable is declared (e.g. retained for the entire input pipeline) */ - guint32 assigned:1, - variable_type:2, - generation:20; + guint16 assigned:1, + variable_type:2; + FilterXGenCounter generation; FilterXObject *value; } FilterXVariable; void filterx_variable_init_instance(FilterXVariable *v, FilterXVariableType variable_type, - FilterXVariableHandle handle, - FilterXObject *initial_value, - guint32 generation); + FilterXVariableHandle handle); void filterx_variable_clear(FilterXVariable *v); static inline gboolean @@ -115,17 +112,19 @@ filterx_variable_get_value(FilterXVariable *v) } static inline void -filterx_variable_set_value(FilterXVariable *v, FilterXObject *new_value, gboolean assignment) +filterx_variable_set_value(FilterXVariable *v, FilterXObject *new_value, gboolean assignment, + FilterXGenCounter generation) { filterx_object_unref(v->value); v->value = filterx_object_ref(new_value); v->assigned = assignment; + v->generation = generation; } static inline void -filterx_variable_unset_value(FilterXVariable *v) +filterx_variable_unset_value(FilterXVariable *v, FilterXGenCounter generation) { - filterx_variable_set_value(v, NULL, TRUE); + filterx_variable_set_value(v, NULL, TRUE, generation); } static inline gboolean @@ -147,13 +146,13 @@ filterx_variable_is_assigned(FilterXVariable *v) } static inline gboolean -filterx_variable_is_same_generation(FilterXVariable *v, guint32 generation) +filterx_variable_is_same_generation(FilterXVariable *v, FilterXGenCounter generation) { return v->generation == generation; } static inline void -filterx_variable_set_generation(FilterXVariable *v, guint32 generation) +filterx_variable_set_generation(FilterXVariable *v, FilterXGenCounter generation) { v->generation = generation; } diff --git a/lib/filterx/func-vars.c b/lib/filterx/func-vars.c index d412998db..2c1db420b 100644 --- a/lib/filterx/func-vars.c +++ b/lib/filterx/func-vars.c @@ -114,19 +114,18 @@ _load_from_dict(FilterXObject *key, FilterXObject *value, gpointer user_data) FilterXVariableType variable_type = (key_str[0] == '$') ? FX_VAR_MESSAGE_TIED : FX_VAR_DECLARED_FLOATING; FilterXVariableHandle handle = filterx_map_varname_to_handle(key_str, variable_type); - FilterXVariable *variable = NULL; - variable = filterx_scope_register_variable(scope, variable_type, handle, NULL); - - FilterXObject *cloned_value = filterx_object_clone(value); - filterx_variable_set_value(variable, cloned_value, TRUE); - filterx_object_unref(cloned_value); - + FilterXVariable *variable = filterx_scope_register_variable(scope, variable_type, handle); if (!variable) { filterx_eval_push_error("Failed to register variable", NULL, key); return FALSE; } + FilterXObject *cloned_value = filterx_object_clone(value); + filterx_scope_set_variable(scope, variable, cloned_value, TRUE); + filterx_object_unref(cloned_value); + + if (debug_flag) { LogMessageValueType type; diff --git a/tests/light/functional_tests/filterx/test_filterx_scope.py b/tests/light/functional_tests/filterx/test_filterx_scope.py index 78f8d4c6e..7fb8395f8 100644 --- a/tests/light/functional_tests/filterx/test_filterx_scope.py +++ b/tests/light/functional_tests/filterx/test_filterx_scope.py @@ -188,6 +188,38 @@ def test_message_tied_variables_do_not_propagate_to_parallel_branches(config, sy assert file_false.read_log() == "kecske\n" +def test_message_tied_variables_are_invalidated_if_message_is_changed(config, syslog_ng): + (file_true, file_false, file_final) = create_config( + config, init_exprs=[ + """ + declare foo = $MSG; + foo; + $MSG; + foo == "foobar"; + $MSG == "foobar"; + """, + ], init_log_exprs=[ + """ + rewrite { + set("foobar replacement" value("MSG")); + }; + """, + ], true_exprs=[ + """ + $MSG; + foo; + $MSG == "foobar replacement"; + foo == "foobar"; + """, + ], + ) + syslog_ng.start(config) + + assert file_true.get_stats()["processed"] == 1 + assert "processed" not in file_false.get_stats() + assert file_true.read_log() == "foobar replacement\n" + + def test_message_tied_variables_are_not_considered_changed_just_by_unmarshaling(config, syslog_ng): (file_true, file_false, file_final) = create_config( config, init_exprs=[