-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmonitor.py
183 lines (156 loc) · 7.18 KB
/
monitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
'''
ryu-manager --ofp-tcp-listen-port 6633 --observe-links monitor.py ryu.app.ofctl_rest --config-file configs/dbscan.conf
'''
from threading import Thread
import shortestpath
from models.dbscan import Model as DBSCAN
# from ann import Model as ANN
from inspector import Inspector
from loghandler import Logger
from datetime import datetime
from ryu.controller import ofp_event
from ryu.controller.handler import MAIN_DISPATCHER, DEAD_DISPATCHER
from ryu.controller.handler import set_ev_cls
from ryu.lib import hub
import pandas as pd
import csv
import os
import copy
from ryu import cfg
# training dataset file
CSV_FILE = 'train_data.csv'
log = Logger().getlogger(__name__)
class Queue:
def __init__(self, maxsize):
self.maxsize = maxsize
self.list = []
def add(self, x):
self.list.append(x)
if len(self.list) > self.maxsize:
self.list = self.list[-self.maxsize:]
def qsize(self):
return len(self.list)
class Monitor(shortestpath.ProjectController):
def __init__(self, *args, **kwargs):
super(Monitor, self).__init__(*args, **kwargs)
self.datapaths = {}
self.monitor_thread = hub.spawn(self._monitor)
self.fields = {'time': '', 'datapath': '', 'in-port': '', 'eth_src': '', 'eth_dst': '', 'out-port': '',
'total_packets': 0, 'total_bytes': 0,
'duration': 0, 'priority': 0, 'out-port-1': [], 'out-port-2': [], 'out-port-3': [],
'out-port-4': [], 'out-port-5': [], 'out-port-6': [], 'class': 0}
self.inspector = Inspector()
self.que = Queue(maxsize=100)
# reading config params from provided conf file
CONF = cfg.CONF
CONF.register_opts([
cfg.StrOpt('train', default='false'),
cfg.StrOpt('model', default='ann')])
log.info('training param :' + str(CONF.train))
log.info('model param : ' + str(CONF.model))
# setting network state vars based on provided flags and values
if CONF.train == 'false':
self.train = False
else:
self.train = True
if CONF.model == 'ann':
self.model = ANN()
elif CONF.model == 'dbscan':
self.model = DBSCAN()
else:
self.model = None
@set_ev_cls(ofp_event.EventOFPStateChange,
[MAIN_DISPATCHER, DEAD_DISPATCHER])
def _state_change_handler(self, ev):
datapath = ev.datapath
if ev.state == MAIN_DISPATCHER:
if datapath.id not in self.datapaths:
log.info('register datapath: ' + str(datapath.id))
self.datapaths[datapath.id] = datapath
elif ev.state == DEAD_DISPATCHER:
if datapath.id in self.datapaths:
log.info('unregister datapath: ' + str(datapath.id))
del self.datapaths[datapath.id]
def _monitor(self):
# log.info('time\tdatapath\tin-port\teth-src\teth-dst\tout-port\ttotal_packets\ttotal_bytes')
while True:
for dp in self.datapaths.values():
self._request_stats(dp)
hub.sleep(15)
def _request_stats(self, datapath):
# log.info("send stats request: " + str(datapath.id))
ofproto = datapath.ofproto
parser = datapath.ofproto_parser
req = parser.OFPFlowStatsRequest(datapath, flags=0, match=parser.OFPMatch(), table_id=0xff,
out_port=ofproto.OFPP_NONE)
datapath.send_msg(req)
req = parser.OFPPortStatsRequest(datapath, 0, ofproto.OFPP_NONE)
datapath.send_msg(req)
def _classify_data(self):
# changing data type to match training data
self.fields['out-port-1'] = str(self.fields['out-port-1'])
self.fields['out-port-2'] = str(self.fields['out-port-2'])
self.fields['out-port-3'] = str(self.fields['out-port-3'])
self.fields['out-port-4'] = str(self.fields['out-port-4'])
self.fields['out-port-5'] = str(self.fields['out-port-5'])
self.fields['out-port-6'] = str(self.fields['out-port-6'])
# log.info('fields : ' + str(self.fields))
self.que.add(copy.deepcopy(self.fields))
if self.que.qsize() > 6:
# predicting class of record with model
df = pd.DataFrame(self.que.list)
df.drop(columns=['time', 'eth_src', 'eth_dst', 'priority', 'class'], inplace=True)
df = self.model.preprocess(df)
log.info(df.tail(1))
res = self.model.predict(df)
log.info('response from model : ' + str(res[-1]))
# back verifying the record if its classified as malicious
if len(res) > 0 and res[0] == -1:
self.inspector.verify(self.fields)
@set_ev_cls(ofp_event.EventOFPFlowStatsReply, MAIN_DISPATCHER)
def _flow_stats_reply_handler(self, ev):
body = ev.msg.body
# storing switch table flow entries as training data
for stat in body:
if len(stat.actions) > 0 and stat.actions[0].port != 65533:
self.fields['time'] = datetime.utcnow().strftime('%s')
self.fields['datapath'] = str(ev.msg.datapath.id)
self.fields['in-port'] = str(stat.match.in_port)
self.fields['eth_src'] = stat.match.dl_src
self.fields['eth_dst'] = stat.match.dl_dst
self.fields['out-port'] = str(stat.actions[0].port)
self.fields['total_packets'] = str(stat.packet_count)
self.fields['total_bytes'] = str(stat.byte_count)
self.fields['duration'] = str(stat.duration_sec)
self.fields['priority'] = str(stat.priority)
self.fields['out-port-1'] = []
self.fields['out-port-2'] = []
self.fields['out-port-3'] = []
self.fields['out-port-4'] = []
self.fields['out-port-5'] = []
self.fields['out-port-6'] = []
# 0 = normal traffic
self.fields['class'] = str(0)
# 1 = malicious traffic
# self.fields['class'] = str(1)
# getting out-edges for switch
out_edges = list(self.net.out_edges(ev.msg.datapath.id, data=True))
# log.info('out-edges : ' + str(out_edges))
for e in out_edges:
if e[2] and 'port' in e[2] and 'weight' in e[2]:
fld = 'out-port-' + str(e[2]['port'])
self.fields[fld].append(e[2]['weight'])
if self.train:
# log.info('training enabled')
flag = os.path.isfile(CSV_FILE)
with open(CSV_FILE, 'a') as f:
header = list(self.fields.keys())
writer = csv.DictWriter(f, fieldnames=header)
# writing header if file is being created for first time
if not flag:
writer.writeheader()
log.info(self.fields)
writer.writerow(self.fields)
else:
child = Thread(target=self._classify_data, daemon=True)
child.start()