-
Notifications
You must be signed in to change notification settings - Fork 132
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding audit-search.py to the utilities directory. #1117
base: master
Are you sure you want to change the base?
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,185 @@ | ||
#!/usr/bin/env python | ||
""" NIPAP Audit searcher | ||
==================== | ||
This is a quick and dirty script to search the audit log db | ||
for specific IP addresses and subnets, and in that way, get the audit trail | ||
for it. | ||
""" | ||
|
||
import psycopg2 | ||
import psycopg2.extras | ||
import re | ||
import IPy | ||
import nipap | ||
import ConfigParser | ||
import datetime | ||
import sys | ||
|
||
|
||
class AuditLog: | ||
|
||
_con_pg = None | ||
_curs_pg = None | ||
|
||
def __init__(self): | ||
self._cfg = ConfigParser.ConfigParser() | ||
self._cfg.read("/etc/nipap/nipap.conf") | ||
self._connect_db() | ||
|
||
def _is_ipv4(self, ip): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. too many blank lines (2) |
||
""" Return true if given arg is a valid IPv4 address | ||
""" | ||
try: | ||
p = IPy.IP(ip) | ||
except ValueError: | ||
return False | ||
|
||
if p.version() == 4: | ||
return True | ||
return False | ||
|
||
def _is_ipv6(self, ip): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. too many blank lines (3) |
||
""" Return true if given arg is a valid IPv6 address | ||
""" | ||
try: | ||
p = IPy.IP(ip) | ||
except ValueError: | ||
return False | ||
|
||
if p.version() == 6: | ||
return True | ||
return False | ||
|
||
def _get_afi(self, ip): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. too many blank lines (3) |
||
""" Return address-family (4 or 6) for IP or None if invalid address | ||
""" | ||
|
||
parts = unicode(ip).split("/") | ||
if len(parts) == 1: | ||
# just an address | ||
if self._is_ipv4(ip): | ||
return 4 | ||
elif self._is_ipv6(ip): | ||
return 6 | ||
else: | ||
return None | ||
elif len(parts) == 2: | ||
# a prefix! | ||
try: | ||
pl = int(parts[1]) | ||
except ValueError: | ||
# if casting parts[1] to int failes, this is not a prefix.. | ||
return None | ||
|
||
if self._is_ipv4(parts[0]): | ||
if pl >= 0 and pl <= 32: | ||
# prefix mask must be between 0 and 32 | ||
return 4 | ||
# otherwise error | ||
return None | ||
elif self._is_ipv6(parts[0]): | ||
if pl >= 0 and pl <= 128: | ||
# prefix mask must be between 0 and 128 | ||
return 6 | ||
# otherwise error | ||
return None | ||
else: | ||
return None | ||
else: | ||
# more than two parts.. this is neither an address or a prefix | ||
return None | ||
|
||
def _connect_db(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. too many blank lines (3) |
||
# Open the database | ||
|
||
db_args = {} | ||
db_args['host'] = self._cfg.get('nipapd', 'db_host') | ||
db_args['database'] = self._cfg.get('nipapd', 'db_name') | ||
db_args['user'] = self._cfg.get('nipapd', 'db_user') | ||
db_args['password'] = self._cfg.get('nipapd', 'db_pass') | ||
db_args['sslmode'] = self._cfg.get('nipapd', 'db_sslmode') | ||
db_args['port'] = self._cfg.get('nipapd', 'db_port') | ||
if db_args['host'] is not None and db_args['host'] == '': | ||
db_args['host'] = None | ||
for key in db_args.copy(): | ||
if db_args[key] is None: | ||
del(db_args[key]) | ||
|
||
while True: | ||
try: | ||
self._con_pg = psycopg2.connect(**db_args) | ||
self._curs_pg = self._con_pg.cursor( | ||
cursor_factory=psycopg2.extras.DictCursor | ||
) | ||
self._register_inet() | ||
psycopg2.extras.register_hstore( | ||
self._con_pg, globally=True, unicode=True | ||
) | ||
except psycopg2.Error as exc: | ||
raise "Error" | ||
except psycopg2.Warning as warn: | ||
raise "Warning: %s" % warn | ||
|
||
break | ||
|
||
def _register_inet(self, oid=None, conn_or_curs=None): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. too many blank lines (2) |
||
""" Create the INET type and an Inet adapter.""" | ||
from psycopg2 import extensions as _ext | ||
if not oid: | ||
oid = 869 | ||
_ext.INET = _ext.new_type((oid, ), "INET", | ||
lambda data, cursor: | ||
data and Inet(data) or None) | ||
_ext.register_type(_ext.INET, self._con_pg) | ||
return _ext.INET | ||
|
||
def _format_log_prefix(self, data): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. too many blank lines (2) |
||
if type(data) is not dict: | ||
print("Not a dict") | ||
raise | ||
output = "{:<15} {}\n".format("Log id", data['id']) | ||
output += "{:<15} {}\n".format("Author:", data['username']) | ||
output += "{:<15} {}\n".format("Date:", | ||
data['timestamp'].strftime('%c')) | ||
if re.search(".*attr:.*", data['description']): | ||
res = re.match(r'(.*\d\b).*attr: (\{.*\})', data['description']) | ||
description = res.group(1) | ||
dataset = res.group(2) | ||
output += "{:<15} {}\n".format("Description:", description) | ||
parsed = eval(dataset) | ||
output += "New data:\n" | ||
for k, v in parsed.items(): | ||
output += "{:<16}{:<20}: {:<32}\n".format("", k, v) | ||
else: | ||
output += "{:<15} {}\n".format("Description:", data['description']) | ||
return output | ||
|
||
def search_log_prefix(self, prefix): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. too many blank lines (2) |
||
if self._get_afi(prefix) is None: | ||
raise ValueError("Invalid Prefix") | ||
|
||
sql = "SELECT * FROM ip_net_log" | ||
|
||
sql += " WHERE prefix_prefix <<= %s" | ||
data = (prefix,) | ||
|
||
self._curs_pg.execute(sql, data) | ||
res = list() | ||
for row in self._curs_pg: | ||
res.append(dict(row)) | ||
|
||
output = "Audit log for prefix {}\n".format(prefix) | ||
for entry in res: | ||
output += self._format_log_prefix(entry) | ||
output += "\n" | ||
|
||
return output | ||
|
||
a = AuditLog() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. too many blank lines (3) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. expected 2 blank lines after class or function definition, found 1 |
||
|
||
if len(sys.argv) == 1: | ||
print "Use with prefix to search in audit log" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SyntaxError: invalid syntax There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We try to use py3 syntax so that print would be |
||
sys.exit() | ||
|
||
prefix = sys.argv[1] | ||
print(a.search_log_prefix(prefix)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
expected 2 blank lines, found 1