See http://www.confluent.io/stream-processing-cookbook/ksql-recipes/syslog-pattern-detection-alerting
-
Docker
-
If running on Mac/Windows, at least 4GB allocated to Docker:
docker system info | grep Memory
Should return a value greater than 8GB - if not, the Kafka stack will probably not work.
-
Clone this repository
git clone https://github.com/confluentinc/ksql-recipes-try-it-at-home.git
-
Launch:
cd ksql-recipes-try-it-at-home/syslog-pattern-detection-alerting docker-compose up -d
-
Run KSQL CLI:
docker-compose exec ksql-cli ksql http://ksql-server:8088
-
Register the existing
syslog
topic for use as a KSQL Stream calledsyslog
:CREATE STREAM syslog \ (TYPE VARCHAR, HOST VARCHAR, MESSAGE VARCHAR, SEVERITY INT, TAG VARCHAR, FACILITY INT, REMOTEADDRESS VARCHAR, DATE BIGINT) \ WITH (KAFKA_TOPIC='syslog', VALUE_FORMAT='JSON');
-
Inspect the first few messages as they arrive:
ksql> SELECT HOST, TAG, MESSAGE FROM SYSLOG LIMIT 20; rpi-02 | CRON | pam_unix(cron:session): session opened for user smmsp by (uid=0) rpi-02 | /USR/SBIN/CRON | (smmsp) CMD (test -x /etc/init.d/sendmail && /usr/share/sendmail/sendmail cron-msp) [...]
-
It's easy to filter out noise:
ksql> SELECT HOST, TAG, MESSAGE FROM SYSLOG \ WHERE TAG !='CRON' \ AND TAG !='/USR/SBIN/CRON' \ LIMIT 20; rpi-02 | minissdpd | device not found for removing : uuid:RKU-42XXX-1GU4A6067130::upnp:rootdevice rpi-02 | minissdpd | device not found for removing : uuid:RKU-42XXX-1GU4A6067130 [...]
-
It's also easy to filter to include just specific types of message; in this example, SSH connections
SELECT HOST, TAG, MESSAGE FROM SYSLOG \ WHERE TAG ='sshd' \ LIMIT 20; rpi-03 | sshd | Invalid user xbmc from 186.249.209.22 rpi-03 | sshd | input_userauth_request: invalid user xbmc [preauth]
-
Create a Kafka topic of just SSH connections, populated in real time from the source syslog topic
CREATE STREAM SYSLOG_SSHD AS \ SELECT * FROM SYSLOG \ WHERE TAG ='sshd';
-
Create a Kafka topic of SSH brute-force attempts, daisy-chained from the first:
CREATE STREAM SYSLOG_SSHD_BRUTEFORCE_ATTACK AS \ SELECT HOST, TAG, MESSAGE FROM SYSLOG_SSHD \ WHERE MESSAGE LIKE 'Invalid user%';
-
Observe there are now two new topics created, each of which contain live feeds of derived syslog data based on the predicate specified:
ksql> LIST TOPICS; Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups ----------------------------------------------------------------------------------------------------------- syslog | true | 1 | 1 | 2 | 2 SYSLOG_SSHD | true | 4 | 1 | 0 | 0 SYSLOG_SSHD_BRUTEFORCE_ATTACK | true | 4 | 1 | 0 | 0
- Kafka Connect syslog connector
- Blog series We ❤️ syslogs: Real-time syslog Processing with Apache Kafka and KSQL