Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

output/eve: reduce fflush call count #12525

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion doc/userguide/output/eve/eve-json-output.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,32 @@ can then be processed by 3rd party tools like Logstash (ELK) or jq.
If ``ethernet`` is set to yes, then ethernet headers will be added to events
if available.

Output Buffering
~~~~~~~~~~~~~~~~

Output flushing is controlled by values in the configuration section ``heartbeat``. By default, Suricata's
output is synchronous with little possibility that written data will not be persisted. However, if ``output.buffer-size``
has a non-zero value, then some data may be written for the output, but not actually flushed. ``buffer-size`` bytes
may be held in memory and written a short time later opening the possibility -- but limited -- for output data
loss.

Hence, a heartbeat mechanism is introduced to limit the amount of time buffered data may exist before being
flushed. Control is provided to instruct Suricata's detection threads to flush their EVE output. With default
values, there is no change in output buffering and flushing behavior. ``output-flush-interval`` controls
how often Suricata's detect threads will flush output in a heartbeat fashion. A value of ``0`` means
"never"; non-zero values must be in ``[1-60]`` seconds.

Flushing should be considered when ``outputs.buffer-size`` is greater than 0 to limit the amount and
age of buffered, but not persisted, output data. Flushing is never needed when ``buffer-size`` is ``0``.


::

heartbeat:
#output-flush-interval: 0



Output types
~~~~~~~~~~~~

Expand All @@ -30,6 +56,10 @@ Output types::
# Enable for multi-threaded eve.json output; output files are amended
# with an identifier, e.g., eve.9.json. Default: off
#threaded: off
# Specify the amount of buffering, in bytes, for
# this output type. The default value 0 means "no
# buffering".
#buffer-size: 0
#prefix: "@cee: " # prefix to prepend to each log entry
# the following are valid when type: syslog above
#identity: "suricata"
Expand Down Expand Up @@ -245,7 +275,7 @@ In the ``custom`` option values from both columns can be used. The
DNS
~~~

.. note::
.. note::

As of Suricata 7.0 the v1 EVE DNS format has been removed.

Expand Down
15 changes: 15 additions & 0 deletions doc/userguide/partials/eve-log.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ outputs:
# Enable for multi-threaded eve.json output; output files are amended with
# an identifier, e.g., eve.9.json
#threaded: false
# Specify the amount of buffering, in bytes, for
# this output type. The default value 0 means "no
# buffering".
#buffer-size: 0
#prefix: "@cee: " # prefix to prepend to each log entry
# the following are valid when type: syslog above
#identity: "suricata"
Expand Down Expand Up @@ -280,3 +284,14 @@ outputs:
# event-set: false # log packets that have a decoder/stream event
# state-update: false # log packets triggering a TCP state update
# spurious-retransmission: false # log spurious retransmission packets
#
heartbeat:
# The output-flush-interval value governs how often Suricata will instruct the
# detection threads to flush their EVE output. Specify the value in seconds [1-60]
# and Suricata will initiate EVE log output flushes at that interval. A value
# of 0 means no EVE log output flushes are initiated. When the EVE output
# buffer-size value is non-zero, some EVE output that was written may remain
# buffered. The output-flush-interval governs how much buffered data exists.
#
# The default value is: 0 (never instruct detection threads to flush output)
#output-flush-interval: 0
2 changes: 2 additions & 0 deletions src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ noinst_HEADERS = \
ippair-storage.h \
ippair-timeout.h \
log-cf-common.h \
log-flush.h \
log-httplog.h \
log-pcap.h \
log-stats.h \
Expand Down Expand Up @@ -911,6 +912,7 @@ libsuricata_c_a_SOURCES = \
ippair-storage.c \
ippair-timeout.c \
log-cf-common.c \
log-flush.c \
log-httplog.c \
log-pcap.c \
log-stats.c \
Expand Down
12 changes: 10 additions & 2 deletions src/alert-debuglog.c
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,15 @@ static int AlertDebugLogLogger(ThreadVars *tv, void *thread_data, const Packet *

void AlertDebugLogRegister(void)
{
OutputPacketLoggerFunctions output_logger_functions = {
.LogFunc = AlertDebugLogLogger,
.FlushFunc = NULL,
.ConditionFunc = AlertDebugLogCondition,
.ThreadInitFunc = AlertDebugLogThreadInit,
.ThreadDeinitFunc = AlertDebugLogThreadDeinit,
.ThreadExitPrintStatsFunc = NULL,
};

OutputRegisterPacketModule(LOGGER_ALERT_DEBUG, MODULE_NAME, "alert-debug", AlertDebugLogInitCtx,
AlertDebugLogLogger, AlertDebugLogCondition, AlertDebugLogThreadInit,
AlertDebugLogThreadDeinit);
&output_logger_functions);
}
14 changes: 11 additions & 3 deletions src/alert-fastlog.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,17 @@ int AlertFastLogger(ThreadVars *tv, void *data, const Packet *p);

void AlertFastLogRegister(void)
{
OutputRegisterPacketModule(LOGGER_ALERT_FAST, MODULE_NAME, "fast", AlertFastLogInitCtx,
AlertFastLogger, AlertFastLogCondition, AlertFastLogThreadInit,
AlertFastLogThreadDeinit);
OutputPacketLoggerFunctions output_logger_functions = {
.LogFunc = AlertFastLogger,
.FlushFunc = NULL,
.ConditionFunc = AlertFastLogCondition,
.ThreadInitFunc = AlertFastLogThreadInit,
.ThreadDeinitFunc = AlertFastLogThreadDeinit,
.ThreadExitPrintStatsFunc = NULL,
};

OutputRegisterPacketModule(
LOGGER_ALERT_FAST, MODULE_NAME, "fast", AlertFastLogInitCtx, &output_logger_functions);
AlertFastLogRegisterTests();
}

Expand Down
11 changes: 9 additions & 2 deletions src/alert-syslog.c
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,15 @@ static int AlertSyslogLogger(ThreadVars *tv, void *thread_data, const Packet *p)
void AlertSyslogRegister (void)
{
#ifndef OS_WIN32
OutputPacketLoggerFunctions output_logger_functions = {
.LogFunc = AlertSyslogLogger,
.FlushFunc = NULL,
.ConditionFunc = AlertSyslogCondition,
.ThreadInitFunc = AlertSyslogThreadInit,
.ThreadDeinitFunc = AlertSyslogThreadDeinit,
.ThreadExitPrintStatsFunc = NULL,
};
OutputRegisterPacketModule(LOGGER_ALERT_SYSLOG, MODULE_NAME, "syslog", AlertSyslogInitCtx,
AlertSyslogLogger, AlertSyslogCondition, AlertSyslogThreadInit,
AlertSyslogThreadDeinit);
&output_logger_functions);
#endif /* !OS_WIN32 */
}
3 changes: 3 additions & 0 deletions src/decode.h
Original file line number Diff line number Diff line change
Expand Up @@ -1315,9 +1315,12 @@ void DecodeUnregisterCounters(void);
#define PKT_FIRST_ALERTS BIT_U32(29)
#define PKT_FIRST_TAG BIT_U32(30)

#define PKT_PSEUDO_LOG_FLUSH BIT_U32(31) /**< Detect/log flush for protocol upgrade */

/** \brief return 1 if the packet is a pseudo packet */
#define PKT_IS_PSEUDOPKT(p) \
((p)->flags & (PKT_PSEUDO_STREAM_END|PKT_PSEUDO_DETECTLOG_FLUSH))
#define PKT_IS_FLUSHPKT(p) ((p)->flags & (PKT_PSEUDO_LOG_FLUSH))

#define PKT_SET_SRC(p, src_val) ((p)->pkt_src = src_val)

Expand Down
51 changes: 43 additions & 8 deletions src/detect-engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -2300,15 +2300,50 @@ int DetectEngineInspectPktBufferGeneric(
}

/** \internal
* \brief inject a pseudo packet into each detect thread that doesn't use the
* new det_ctx yet
* \brief inject a pseudo packet into each detect thread
* if the thread should flush its output logs.
*/
static void InjectPackets(ThreadVars **detect_tvs,
DetectEngineThreadCtx **new_det_ctx,
int no_of_detect_tvs)
{
/* inject a fake packet if the detect thread isn't using the new ctx yet,
* this speeds up the process */
void InjectPacketsForFlush(ThreadVars **detect_tvs, int no_of_detect_tvs)
{
/* inject a fake packet if the detect thread that needs it. This function
* is called when a heartbeat log-flush request has been made
* and it should process a pseudo packet and flush its output logs
* to speed the process. */
#if DEBUG
int count = 0;
#endif
for (int i = 0; i < no_of_detect_tvs; i++) {
if (detect_tvs[i]) { // && detect_tvs[i]->inq != NULL) {
Packet *p = PacketGetFromAlloc();
if (p != NULL) {
SCLogDebug("Injecting pkt for tv %s[i=%d] %d", detect_tvs[i]->name, i, count++);
p->flags |= PKT_PSEUDO_STREAM_END;
p->flags |= PKT_PSEUDO_LOG_FLUSH;
PKT_SET_SRC(p, PKT_SRC_DETECT_RELOAD_FLUSH);
PacketQueue *q = detect_tvs[i]->stream_pq;
SCMutexLock(&q->mutex_q);
PacketEnqueue(q, p);
SCCondSignal(&q->cond_q);
SCMutexUnlock(&q->mutex_q);
}
}
}
SCLogDebug("leaving: thread notification count = %d", count);
}

/** \internal
* \brief inject a pseudo packet into each detect thread
* -that doesn't use the new det_ctx yet
* -*or*, if the thread should flush its output logs.
*/
static void InjectPackets(
ThreadVars **detect_tvs, DetectEngineThreadCtx **new_det_ctx, int no_of_detect_tvs)
{
/* inject a fake packet if the detect thread that needs it. This function
* is called if
* - A thread isn't using a DE ctx and should
* - Or, it should process a pseudo packet and flush its output logs.
* to speed the process. */
for (int i = 0; i < no_of_detect_tvs; i++) {
if (SC_ATOMIC_GET(new_det_ctx[i]->so_far_used_by_detect) != 1) {
if (detect_tvs[i]->inq != NULL) {
Expand Down
2 changes: 2 additions & 0 deletions src/detect-engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,4 +209,6 @@ void DetectEngineStateResetTxs(Flow *f);

void DeStateRegisterTests(void);

/* packet injection */
void InjectPacketsForFlush(ThreadVars **detect_tvs, int no_of_detect_tvs);
#endif /* SURICATA_DETECT_ENGINE_H */
28 changes: 28 additions & 0 deletions src/flow-worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ typedef struct FlowWorkerThreadData_ {

SC_ATOMIC_DECLARE(DetectEngineThreadCtxPtr, detect_thread);

SC_ATOMIC_DECLARE(bool, flush_ack);

void *output_thread; /* Output thread data. */
void *output_thread_flow; /* Output thread data. */

Expand Down Expand Up @@ -554,6 +556,15 @@ static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data)

SCLogDebug("packet %"PRIu64, p->pcap_cnt);

if ((PKT_IS_FLUSHPKT(p))) {
SCLogDebug("thread %s flushing", tv->printable_name);
OutputLoggerFlush(tv, p, fw->output_thread);
/* Ack if a flush was requested */
bool notset = false;
SC_ATOMIC_CAS(&fw->flush_ack, notset, true);
return TM_ECODE_OK;
}

/* handle Flow */
if (p->flags & PKT_WANTS_FLOW) {
FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW);
Expand Down Expand Up @@ -723,6 +734,23 @@ void *FlowWorkerGetDetectCtxPtr(void *flow_worker)
return SC_ATOMIC_GET(fw->detect_thread);
}

void *FlowWorkerGetThreadData(void *flow_worker)
{
return (FlowWorkerThreadData *)flow_worker;
}

bool FlowWorkerGetFlushAck(void *flow_worker)
{
FlowWorkerThreadData *fw = flow_worker;
return SC_ATOMIC_GET(fw->flush_ack) == true;
}

void FlowWorkerSetFlushAck(void *flow_worker)
{
FlowWorkerThreadData *fw = flow_worker;
SC_ATOMIC_SET(fw->flush_ack, false);
}

const char *ProfileFlowWorkerIdToString(enum ProfileFlowWorkerId fwi)
{
switch (fwi) {
Expand Down
3 changes: 3 additions & 0 deletions src/flow-worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ const char *ProfileFlowWorkerIdToString(enum ProfileFlowWorkerId fwi);

void FlowWorkerReplaceDetectCtx(void *flow_worker, void *detect_ctx);
void *FlowWorkerGetDetectCtxPtr(void *flow_worker);
void *FlowWorkerGetThreadData(void *flow_worker);
bool FlowWorkerGetFlushAck(void *flow_worker);
void FlowWorkerSetFlushAck(void *flow_worker);

void TmModuleFlowWorkerRegister (void);

Expand Down
Loading
Loading