-
Notifications
You must be signed in to change notification settings - Fork 277
/
Copy pathadd_fail2ban_object.py
executable file
·86 lines (77 loc) · 3.27 KB
/
add_fail2ban_object.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from pymisp import ExpandedPyMISP, MISPEvent
from pymisp.tools import Fail2BanObject
import argparse
from base64 import b64decode
from io import BytesIO
import os
from datetime import date, datetime
from dateutil.parser import parse
try:
from keys import misp_url, misp_key, misp_verifycert
except Exception:
misp_url = 'URL'
misp_key = 'AUTH_KEY'
misp_verifycert = True
def create_new_event():
me = MISPEvent()
me.info = "Fail2Ban blocking"
me.add_tag(args.tag)
start = datetime.now()
me.add_attribute('datetime', start.isoformat(), comment='Start Time')
return me
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Add Fail2ban object.')
parser.add_argument("-b", "--banned_ip", required=True, help="Banned IP address.")
parser.add_argument("-a", "--attack_type", required=True, help="Type of attack.")
parser.add_argument("-t", "--tag", required=True, help="Tag to search on MISP.")
parser.add_argument("-p", "--processing_timestamp", help="Processing timestamp.")
parser.add_argument("-f", "--failures", help="Amount of failures that lead to the ban.")
parser.add_argument("-s", "--sensor", help="Sensor identifier.")
parser.add_argument("-v", "--victim", help="Victim identifier.")
parser.add_argument("-l", "--logline", help="Logline (base64 encoded).")
parser.add_argument("-F", "--logfile", help="Path to a logfile to attach.")
parser.add_argument("-n", "--force_new", action='store_true', default=False, help="Force new MISP event.")
parser.add_argument("-d", "--disable_new", action='store_true', default=False, help="Do not create a new Event.")
args = parser.parse_args()
pymisp = ExpandedPyMISP(misp_url, misp_key, misp_verifycert, debug=True)
event_id = -1
me = None
if args.force_new:
me = create_new_event()
else:
response = pymisp.search_index(tags=args.tag, timestamp='1h', pythonify=True)
if response:
if args.disable_new:
event_id = response[0].id
else:
last_event_date = parse(response[0].date).date()
nb_attr = response[0].attribute_count
if last_event_date < date.today() or int(nb_attr) > 1000:
me = create_new_event()
else:
event_id = response[0].id
else:
me = create_new_event()
parameters = {'banned-ip': args.banned_ip, 'attack-type': args.attack_type}
if args.processing_timestamp:
parameters['processing-timestamp'] = args.processing_timestamp
if args.failures:
parameters['failures'] = args.failures
if args.sensor:
parameters['sensor'] = args.sensor
if args.victim:
parameters['victim'] = args.victim
if args.logline:
parameters['logline'] = b64decode(args.logline).decode()
if args.logfile:
with open(args.logfile, 'rb') as f:
parameters['logfile'] = {'value': os.path.basename(args.logfile),
'data': BytesIO(f.read())}
f2b = Fail2BanObject(parameters=parameters, standalone=False)
if me:
me.add_object(f2b)
pymisp.add_event(me)
elif event_id:
a = pymisp.add_object(event_id, f2b)