Skip to content

Commit

Permalink
SEC-471: Fixed a bug which would not limit fluent-bit to ship logs be…
Browse files Browse the repository at this point in the history
…cause it was unable to load PII fields and Encryption Keys (which are encrypted). It now relies on temporary(cached) JSON files when the backend service APIs are unavailable.
  • Loading branch information
arasic committed Feb 2, 2025
1 parent 9cee826 commit 4538d9b
Showing 1 changed file with 249 additions and 36 deletions.
285 changes: 249 additions & 36 deletions plugins/filter_encrypt/encrypt.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,109 @@
#include <msgpack.h>
#include "encrypt.h"

#ifdef _WIN32
#include <windows.h>
#include <winsock2.h>
#else
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/time.h>
#endif

#if defined(__unix__) || defined(__APPLE__)
#include <sys/stat.h>
#include <sys/types.h>
#endif

#define DO_DEBUG 0

/* Define the configuration directory based on the platform */
#ifdef _WIN32
#define FLUENT_BIT_CONFIG_DIR "C:/Program Files/fluent-bit"
#elif defined(__APPLE__)
#define FLUENT_BIT_CONFIG_DIR "/opt/fluent-bit/etc/fluent-bit"
#else
#define FLUENT_BIT_CONFIG_DIR "/etc/fluent-bit"
#endif

/* Construct the file path for the PII fields cache */
static const char *get_cache_file_path()
{
static char cache_path[256] = {0};
snprintf(cache_path, sizeof(cache_path), "%s/pii_cache.json", FLUENT_BIT_CONFIG_DIR);
return cache_path;
}

/* Construct the file path for the encryption keys cache */
static const char *get_enc_keys_cache_file_path()
{
static char cache_path[256] = {0};
snprintf(cache_path, sizeof(cache_path), "%s/enc_keys_cache.json", FLUENT_BIT_CONFIG_DIR);
return cache_path;
}

/* Reads the cached PII fields from a file */
static flb_sds_t read_cached_pii_fields(const char *cache_file)
{
FILE *fp = fopen(cache_file, "r");
if (!fp) {
flb_error("[filter_encrypt] Failed to open cache file: %s", cache_file);
return NULL;
}

fseek(fp, 0, SEEK_END);
long filesize = ftell(fp);
fseek(fp, 0, SEEK_SET);
if (filesize <= 0) {
flb_error("[filter_encrypt] Cache file %s is empty or inaccessible", cache_file);
fclose(fp);
return NULL;
}

char *buffer = flb_malloc(filesize + 1);
if (!buffer) {
fclose(fp);
flb_error("[filter_encrypt] Failed to allocate memory for reading cache file");
return NULL;
}
size_t read_size = fread(buffer, 1, filesize, fp);
buffer[read_size] = '\0';
fclose(fp);

flb_info("[filter_encrypt] Loaded cached PII fields from file: %s", cache_file);
return flb_sds_create_len(buffer, read_size);
}

/* Writes the retrieved PII fields to a local cache file with restricted permissions (if desired) */
static int write_cached_pii_fields(const char *cache_file, flb_sds_t response_json)
{
FILE *fp;
#ifdef __unix__
mode_t old_umask = umask(0077);
fp = fopen(cache_file, "w");
umask(old_umask);
#else
fp = fopen(cache_file, "w");
#endif
if (!fp) {
flb_error("[filter_encrypt] Failed to open cache file for writing: %s", cache_file);
return -1;
}
size_t len = flb_sds_len(response_json);
size_t written = fwrite(response_json, 1, len, fp);
fclose(fp);
if (written != len) {
flb_error("[filter_encrypt] Failed to write complete data to cache file: %s", cache_file);
return -1;
}
flb_info("[filter_encrypt] Successfully updated cache file: %s with retrieved PII fields", cache_file);
return 0;
}

/* Reduced timeout HTTP request.
* Note: In your current Fluent Bit version the HTTP client does not expose a timeout field.
* If a newer fluent-bit version adds timeout support, you can set it here.
*/
static flb_sds_t make_http_request(struct flb_config *config,
const char* HOST,
const int PORT,
Expand All @@ -79,7 +179,7 @@ static flb_sds_t make_http_request(struct flb_config *config,
struct flb_upstream *upstream = NULL;
struct flb_http_client *client = NULL;
struct flb_tls *tls = NULL;
int flb_upstream_flag = FLB_IO_TLS; // using HTTPS (TLS)
int flb_upstream_flag = FLB_IO_TLS; /* using HTTPS (TLS) */
size_t b_sent;
int ret;
struct flb_connection *u_conn = NULL;
Expand All @@ -89,19 +189,19 @@ static flb_sds_t make_http_request(struct flb_config *config,
/* Create TLS context */
tls = flb_tls_create(FLB_TLS_CLIENT_MODE, /* mode */
FLB_FALSE, /* verify */
-1, /* debug */
NULL, /* vhost */
NULL, /* ca_path */
NULL, /* ca_file */
NULL, /* crt_file */
NULL, /* key_file */
NULL); /* key_passwd */
-1, /* debug */
NULL, /* vhost */
NULL, /* ca_path */
NULL, /* ca_file */
NULL, /* crt_file */
NULL, /* key_file */
NULL); /* key_passwd */
if (!tls) {
flb_error("[filter_encrypt] error initializing TLS context");
goto cleanup;
}
} else {
// using HTTP (no HTTPS/TLS)
/* using HTTP (no HTTPS/TLS) */
flb_upstream_flag = FLB_IO_TCP;
}

Expand All @@ -128,15 +228,19 @@ static flb_sds_t make_http_request(struct flb_config *config,
NULL, 0,
HOST, PORT,
NULL, 0);

if (!client) {
flb_error("[filter_encrypt] could not create http client");
goto cleanup;
}

/* If your Fluent Bit version supported setting a timeout, it would be set here.
* e.g. client->timeout = 2;
*/

/* Add headers */
for (size_t i = 0; i < num_headers; i += 2) {
flb_http_add_header(client, headers[i], strlen(headers[i]), headers[i + 1], strlen(headers[i + 1]));
flb_http_add_header(client, headers[i], strlen(headers[i]),
headers[i + 1], strlen(headers[i + 1]));
}

/* Perform the HTTP request */
Expand All @@ -154,7 +258,7 @@ static flb_sds_t make_http_request(struct flb_config *config,
resp = flb_sds_create_len(client->resp.payload, client->resp.payload_size);
flb_debug("\nresp: %s\n", resp);

cleanup:
cleanup:
if (client) {
flb_http_client_destroy(client);
}
Expand All @@ -168,7 +272,9 @@ static flb_sds_t make_http_request(struct flb_config *config,
return resp;
}

flb_sds_t api_retrieve_encryption_keys(struct flb_config* config, bool is_startup) {
/* Updated API retrieval for PII fields to use the cache if the endpoint is unavailable */
flb_sds_t api_retrieve_pii_fields(struct flb_config* config, bool is_startup)
{
struct HashMapEntry* organization_key_kv = get(FLB_FILTER_ENCRYPT_ORGANIZATION_KEY);
const char *organization_key = organization_key_kv->data;
struct HashMapEntry* api_access_kv = get(FLB_FILTER_ENCRYPT_API_ACCESS_KEY);
Expand All @@ -178,35 +284,123 @@ flb_sds_t api_retrieve_encryption_keys(struct flb_config* config, bool is_startu

struct HashMapEntry* item_backend_host = get(FLB_FILTER_ENCRYPT_HOST);
const char *backend_server_host_value = item_backend_host->data;
struct HashMapEntry* item_backend_path = get(FLB_FILTER_ENCRYPT_URI_ENC_KEYS);
struct HashMapEntry* item_backend_path = get(FLB_FILTER_ENCRYPT_URI_PII_FIELDS);
const char *backend_server_path_value = item_backend_path->data;
struct HashMapEntry* item_backend_port = get(FLB_FILTER_ENCRYPT_PORT);
const char *backend_server_port_value = item_backend_port->data;

flb_debug("port = %s", backend_server_port_value);

const uintmax_t backend_server_port_numeric = strtoumax(backend_server_port_value, NULL, 10);

const char* headers[] = {
"User-Agent", "Fluent-Bit",
FLB_FILTER_ENCRYPT_HEADER_X_ORGANIZATION_KEY, organization_key,
FLB_FILTER_ENCRYPT_HEADER_X_ACCESS_KEY, api_access_key,
FLB_FILTER_ENCRYPT_HEADER_X_SECRET_KEY, api_secret_key
"User-Agent", "Fluent-Bit",
FLB_FILTER_ENCRYPT_HEADER_X_ORGANIZATION_KEY, organization_key,
FLB_FILTER_ENCRYPT_HEADER_X_ACCESS_KEY, api_access_key,
FLB_FILTER_ENCRYPT_HEADER_X_SECRET_KEY, api_secret_key
};

flb_sds_t resp = make_http_request(config, backend_server_host_value, backend_server_port_numeric,
backend_server_path_value, headers, sizeof(headers) / sizeof(headers[0]));
if(resp == (char *)-1) {
flb_error("[filter_encrypt] Unable to connect to the key server: http(s)://%s:%d", backend_server_host_value, backend_server_port_numeric);
if(is_startup) {
exit(EXIT_FAILURE);
flb_sds_t resp = make_http_request(config,
backend_server_host_value,
backend_server_port_numeric,
backend_server_path_value,
headers,
sizeof(headers) / sizeof(headers[0]));

const char *cache_file = get_cache_file_path();

/* Check for a NULL response (i.e. the endpoint is unavailable) */
if (!resp) {
flb_error("[filter_encrypt] Unable to connect to the backend server: http(s)://%s:%d",
backend_server_host_value, backend_server_port_numeric);
flb_info("[filter_encrypt] Falling back to cached PII fields from: %s", cache_file);
/* Attempt to load from cache instead of failing */
resp = read_cached_pii_fields(cache_file);
if (resp == NULL) {
flb_error("[filter_encrypt] No cached PII fields available, and API endpoint http(s)://%s:%d is unreachable",
backend_server_host_value, backend_server_port_numeric);
if (is_startup) {
exit(EXIT_FAILURE);
}
}
}
else {
/* If API request was successful, update the cache file */
if (write_cached_pii_fields(cache_file, resp) < 0) {
flb_error("[filter_encrypt] Failed to update cache file: %s", cache_file);
}
}
return resp;
}

/**
* Helper function to get a cross-platform temporary file path for the encryption keys cache
*/
static flb_sds_t read_cached_enc_keys(const char *cache_file)
{
FILE *fp = fopen(cache_file, "r");
if (!fp) {
flb_error("[filter_encrypt] Failed to open encryption keys cache file: %s", cache_file);
return NULL;
}

fseek(fp, 0, SEEK_END);
long filesize = ftell(fp);
fseek(fp, 0, SEEK_SET);
if (filesize <= 0) {
flb_error("[filter_encrypt] Encryption keys cache file %s is empty or inaccessible", cache_file);
fclose(fp);
return NULL;
}

char *buffer = flb_malloc(filesize + 1);
if (!buffer) {
fclose(fp);
flb_error("[filter_encrypt] Failed to allocate memory for reading encryption keys cache file");
return NULL;
}
size_t read_size = fread(buffer, 1, filesize, fp);
buffer[read_size] = '\0';
fclose(fp);

flb_info("[filter_encrypt] Loaded cached encryption keys from file: %s", cache_file);
return flb_sds_create_len(buffer, read_size);
}

/**
* Writes the retrieved encryption keys to a local encryption keys cache file
* with restricted file permissions (e.g. 0600 on Unix).
*/
static int write_cached_enc_keys(const char *cache_file, flb_sds_t response_json)
{
FILE *fp;
#ifdef __unix__
/* Save current umask and set to 0077, so new file gets mode 0600 (rw-------) */
mode_t old_umask = umask(0077);
fp = fopen(cache_file, "w");
umask(old_umask);
#else
fp = fopen(cache_file, "w");
#endif
if (!fp) {
flb_error("[filter_encrypt] Failed to open encryption keys cache file for writing: %s", cache_file);
return -1;
}
size_t len = flb_sds_len(response_json);
size_t written = fwrite(response_json, 1, len, fp);
fclose(fp);
if (written != len) {
flb_error("[filter_encrypt] Failed to write complete data to encryption keys cache file: %s", cache_file);
return -1;
}
flb_info("[filter_encrypt] Successfully updated encryption keys cache file: %s", cache_file);
return 0;
}

flb_sds_t api_retrieve_pii_fields(struct flb_config* config, bool is_startup) {
/**
* Updated API retrieval for encryption keys to use the cache if the endpoint is unavailable.
*/
flb_sds_t api_retrieve_encryption_keys(struct flb_config* config, bool is_startup)
{
struct HashMapEntry* organization_key_kv = get(FLB_FILTER_ENCRYPT_ORGANIZATION_KEY);
const char *organization_key = organization_key_kv->data;
struct HashMapEntry* api_access_kv = get(FLB_FILTER_ENCRYPT_API_ACCESS_KEY);
Expand All @@ -216,18 +410,19 @@ flb_sds_t api_retrieve_pii_fields(struct flb_config* config, bool is_startup) {

struct HashMapEntry* item_backend_host = get(FLB_FILTER_ENCRYPT_HOST);
const char *backend_server_host_value = item_backend_host->data;
struct HashMapEntry* item_backend_path = get(FLB_FILTER_ENCRYPT_URI_PII_FIELDS);
struct HashMapEntry* item_backend_path = get(FLB_FILTER_ENCRYPT_URI_ENC_KEYS);
const char *backend_server_path_value = item_backend_path->data;
struct HashMapEntry* item_backend_port = get(FLB_FILTER_ENCRYPT_PORT);
const char *backend_server_port_value = item_backend_port->data;

flb_debug("port = %s", backend_server_port_value);
const uintmax_t backend_server_port_numeric = strtoumax(backend_server_port_value, NULL, 10);

const char* headers[] = {
"User-Agent", "Fluent-Bit",
FLB_FILTER_ENCRYPT_HEADER_X_ORGANIZATION_KEY, organization_key,
FLB_FILTER_ENCRYPT_HEADER_X_ACCESS_KEY, api_access_key,
FLB_FILTER_ENCRYPT_HEADER_X_SECRET_KEY, api_secret_key
"User-Agent", "Fluent-Bit",
FLB_FILTER_ENCRYPT_HEADER_X_ORGANIZATION_KEY, organization_key,
FLB_FILTER_ENCRYPT_HEADER_X_ACCESS_KEY, api_access_key,
FLB_FILTER_ENCRYPT_HEADER_X_SECRET_KEY, api_secret_key
};

flb_sds_t resp = make_http_request(config,
Expand All @@ -237,10 +432,27 @@ flb_sds_t api_retrieve_pii_fields(struct flb_config* config, bool is_startup) {
headers,
sizeof(headers) / sizeof(headers[0]));

if (resp == (char *)-1) {
flb_error("[filter_encrypt] Unable to connect to the backend server: http(s)://%s:%d", backend_server_host_value, backend_server_port_numeric);
if (is_startup) {
exit(EXIT_FAILURE);
const char *cache_file = get_enc_keys_cache_file_path();

/* Check for a NULL response (i.e. the endpoint is unavailable) */
if (!resp) {
flb_error("[filter_encrypt] Unable to connect to the key server: http(s)://%s:%d",
backend_server_host_value, backend_server_port_numeric);
flb_info("[filter_encrypt] Falling back to cached encryption keys from: %s", cache_file);
/* Attempt to load from cache instead of failing */
resp = read_cached_enc_keys(cache_file);
if (resp == NULL) {
flb_error("[filter_encrypt] No cached encryption keys available, and API endpoint http(s)://%s:%d is unreachable",
backend_server_host_value, backend_server_port_numeric);
if (is_startup) {
exit(EXIT_FAILURE);
}
}
}
else {
/* If API request was successful, update the cache file */
if (write_cached_enc_keys(cache_file, resp) < 0) {
flb_error("[filter_encrypt] Failed to update encryption keys cache file: %s", cache_file);
}
}
return resp;
Expand Down Expand Up @@ -539,7 +751,8 @@ void set_encryption_keys(char *aes_det_key_ptr, char *ip_encryption_key_ptr) {
}


static int setup(struct flb_filter_encrypt* ctx, struct flb_filter_instance* f_ins, struct flb_config* config) {
static int setup(struct flb_filter_encrypt* ctx, struct flb_filter_instance* f_ins, struct flb_config* config)
{
struct mk_list* head;
struct mk_list* split;
struct flb_kv* kv;
Expand Down

0 comments on commit 4538d9b

Please sign in to comment.