Skip to content

Commit

Permalink
add crowdsec plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
d0m84 committed Jul 26, 2024
1 parent 5c1fae1 commit 60faa94
Showing 1 changed file with 229 additions and 0 deletions.
229 changes: 229 additions & 0 deletions plugins/security/crowdsec
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
#!/usr/bin/env python3
"""
=head1 NAME
crowdsec - Plugin to monitor CrowdSec
=head1 ABOUT
Requires Python 3.6
Requires CrowdSec 1.4
=head1 AUTHOR
Copyright (c) 2024 d0m84
=head1 CONFIGURATION
Add the following to your @@CONFDIR@@/munin-node:
[crowdsec]
user root
cli_path /usr/bin/cscli
=head1 LICENSE
GNU GPLv2 or any later version
=begin comment
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or (at
your option) any later version.
This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details
You should have received a copy of the GNU General Public License along
with this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
=end comment
=head1 MAGIC MARKERS
#%# family=auto
#%# capabilities=autoconf
=cut
"""
import subprocess
import json
import sys
from textwrap import dedent
import os
import hashlib
import shutil


def autoconf():
if shutil.which('cscli') is None:
print('no (cscli not found)')
else:
print('yes')


def call_cli(*args):
cli = os.environ.get('cli_path')
cli = cli if cli is not None else shutil.which('cscli')
o = subprocess.check_output([cli, '-o', 'json'] + list(args))
return json.loads(o)


class State():
def __init__(self):
self.state_file = f'{os.environ["MUNIN_PLUGSTATE"]}/crowdsec_s.json'
if not os.path.isfile(self.state_file):
self.content = {}
with open(self.state_file, 'w') as file:
json.dump(self.content, file)
else:
with open(self.state_file, 'r') as file:
self.content = json.load(file)

def read_section(self, section):
if section in self.content:
return self.content[section]
else:
return None

def write_section(self, section, data):
self.content[section] = data
with open(self.state_file, 'w') as file:
json.dump(self.content, file)


class Decisions():
def __init__(self):
_state = state.read_section('scenarios')
self.state = _state if _state is not None else {}
self.data = call_cli('decisions', 'list')
if self.data is not None:
self.decisions = len(self.data)
self.banned_ips = len(set([d['source']['ip'] for d in self.data if d['source']['scope'] == 'Ip'])) # noqa: E501
self.banned_cidrs = len(set([d['source']['ip'] for d in self.data if d['source']['scope'] == 'Range'])) # noqa: E501
self.decisions_by_scenario = {}
for d in self.data:
s = d['scenario'].replace('#', '-')
if s in self.decisions_by_scenario:
self.decisions_by_scenario[s] += 1
else:
self.decisions_by_scenario[s] = 1
else:
self.decisions, self.banned_ips, self.banned_cidrs = 0, 0, 0
self.decisions_by_scenario = {}

def config(self):
print(dedent("""
multigraph decisions
graph_title CrowdSec Decisions
graph_args --base 1000 --lower-limit 0
graph_vlabel Amount
graph_category security
banned_ips.label Banned IP addresses
banned_ips.type GAUGE
banned_ips.min 0
banned_cidrs.label Banned IP ranges
banned_cidrs.type GAUGE
banned_cidrs.min 0
decisions.label Active Decisions
decisions.type GAUGE
decisions.min 0
multigraph scenarios
graph_title CrowdSec Scenarios
graph_args --base 1000 --lower-limit 0
graph_vlabel Decisions by Scenario
graph_category security
"""))
# current active
for scenario in self.decisions_by_scenario.keys():
hash = hashlib.sha1(scenario.encode()).hexdigest()
if scenario not in self.state:
self.state[scenario] = hash
print(dedent(f"""
multigraph scenarios
{hash}.label {scenario}
{hash}.type GAUGE
{hash}.min 0
"""))
# known via state
for scenario, hash in self.state.items():
if scenario not in self.decisions_by_scenario.keys():
print(dedent(f"""
multigraph scenarios
{hash}.label {scenario}
{hash}.type GAUGE
{hash}.min 0
"""))
state.write_section('scenarios', self.state)

def current(self):
print(dedent(f"""
multigraph decisions
banned_ips.value {self.banned_ips}
banned_cidrs.value {self.banned_cidrs}
decisions.value {self.decisions}
"""))
if len(self.decisions_by_scenario) > 0:
print('multigraph scenarios')
for k, v in self.decisions_by_scenario.items():
name = hashlib.sha1(k.encode()).hexdigest()
print(f'{name}.value {v}')


class Aquisitions():
def __init__(self):
# requires crowdsec >= 1.5.3
try:
self.data = call_cli('metrics')
except json.JSONDecodeError:
self.data = {}

def config(self):
if len(self.data) > 0:
print(dedent("""
multigraph aquisitions
graph_title CrowdSec Aquisitions
graph_args --base 1000 --lower-limit 0
graph_vlabel Lines per ${graph_period}
graph_category security
parsed.label Parsed
parsed.type DERIVE
parsed.min 0
reads.label Read
reads.type DERIVE
reads.min 0
unparsed.label Unparsed
unparsed.type DERIVE
unparsed.min 0
"""))

def current(self):
if len(self.data) > 0:
r = {'reads': 0, 'parsed': 0, 'unparsed': 0}
for source in self.data['acquisition']:
for type in ['reads', 'parsed', 'unparsed']:
if type in self.data['acquisition'][source]:
r[type] += self.data['acquisition'][source][type]
print(dedent(f"""
multigraph aquisitions
reads.value {r['reads']}
unparsed.value {r['unparsed']}
parsed.value {r['parsed']}
"""))


if __name__ == "__main__":
if len(sys.argv) == 2 and sys.argv[1] == 'autoconf':
autoconf()
elif len(sys.argv) == 2 and sys.argv[1] == 'config':
state = State()
Decisions().config()
Aquisitions().config()
else:
state = State()
Decisions().current()
Aquisitions().current()

0 comments on commit 60faa94

Please sign in to comment.