From b0e727badde07d9093a7a6753c63612dddaaaae2 Mon Sep 17 00:00:00 2001 From: Jakub Cabal Date: Fri, 3 Jan 2025 13:17:47 +0100 Subject: [PATCH] chore(core-app-uvm): remove old revision of UVM packet generator --- core/comp/app/app_uvm/pkt_gen/config.py | 53 --- core/comp/app/app_uvm/pkt_gen/layers/trill.py | 89 ----- core/comp/app/app_uvm/pkt_gen/parser.py | 361 ------------------ core/comp/app/app_uvm/pkt_gen/parser_dfs.py | 80 ---- core/comp/app/app_uvm/pkt_gen/parser_rand.py | 40 -- core/comp/app/app_uvm/pkt_gen/pkt_gen.py | 67 ---- 6 files changed, 690 deletions(-) delete mode 100644 core/comp/app/app_uvm/pkt_gen/config.py delete mode 100644 core/comp/app/app_uvm/pkt_gen/layers/trill.py delete mode 100644 core/comp/app/app_uvm/pkt_gen/parser.py delete mode 100644 core/comp/app/app_uvm/pkt_gen/parser_dfs.py delete mode 100644 core/comp/app/app_uvm/pkt_gen/parser_rand.py delete mode 100755 core/comp/app/app_uvm/pkt_gen/pkt_gen.py diff --git a/core/comp/app/app_uvm/pkt_gen/config.py b/core/comp/app/app_uvm/pkt_gen/config.py deleted file mode 100644 index 7189c5022..000000000 --- a/core/comp/app/app_uvm/pkt_gen/config.py +++ /dev/null @@ -1,53 +0,0 @@ -#!/bin/python3 - -# SPDX-License-Identifier: BSD-3-Clause -# -# simple pakcet generator. Packet are generated by random walk. -# -# Copyright (C) 2022 CESNET -# Author(s): -# Radek Iša - - -def json_object_get(json, path): - index = 0 - obj = json - while index < len(path) and obj is not None: - obj = obj.get(path[index]) - index += 1 - - return obj - - -class packet_config: - def __init__(self, constraints=None): - #init values - self.trill = 1 - self.vlan = 4 - self.mpls = 4 - self.ipv6ext = 4 - - self.constraints = constraints - - mpls_stack = self.object_get(["mpls", "stack"]) - if mpls_stack is not None: - self.mpls = int(mpls_stack.get("max")) - - vlan_stack = self.object_get(["vlan", "stack"]) - if vlan_stack is not None: - self.vlan = int(vlan_stack.get("max")) - - ipv6ext_stack = self.object_get(["ipv6ext", "stack"]) - if ipv6ext_stack is not None: - self.ipv6ext = int(ipv6ext.get("max")) - - def copy(self): - ret = packet_config(self.constraints) - ret.trill = self.trill - ret.vlan = self.vlan - ret.mpls = self.mpls - ret.ipv6ext = self.ipv6ext - return ret - - def object_get(self, path): - return json_object_get(self.constraints, path) diff --git a/core/comp/app/app_uvm/pkt_gen/layers/trill.py b/core/comp/app/app_uvm/pkt_gen/layers/trill.py deleted file mode 100644 index eff88647e..000000000 --- a/core/comp/app/app_uvm/pkt_gen/layers/trill.py +++ /dev/null @@ -1,89 +0,0 @@ -#!/bin/python3 - -# SPDX-License-Identifier: BSD-3-Clause -# -# simple pakcet generator. Packet are generated by random walk. -# In this file is resolving how prototoclols follows. -# -# Copyright (C) 2022 CESNET -# Author(s): -# Radek Iša - -import scapy.all -import scapy.packet -import scapy.fields -import scapy.utils -import scapy.volatile - - -class MyStrLenField(scapy.fields.StrLenField): - - __slots__ = ["rand_item"] - - def __init__( - self, - name, # type: str - default, # type: bytes - length_from=None, # type: Optional[Callable[[Packet], int]] - max_length=None, # type: Optional[Any] - ): - super(MyStrLenField, self).__init__(name, default, length_from=length_from, max_length=max_length) - - def randval(self): - # Randomization is move to Trill protocol. - return b"" - - -class Trill(scapy.packet.Packet): - name = "Trill" - fields_desc = [ - #16bits - scapy.fields.BitField("version", 0, 2), - scapy.fields.BitField("res", 0, 2), - scapy.fields.BitField("m", 0, 1), - scapy.fields.BitField("opt_length", 0, 5), - scapy.fields.BitField("hop_count", 0, 6), - #16bits - scapy.fields.ShortField("src_trill_id", 0), - #16bits - scapy.fields.ShortField("dst_trill_id", 0), - #Variable lengtih - MyStrLenField("data", None, length_from=lambda pkt: pkt.opt_length) - ] - - def do_build(self): - field_opt_length = self.opt_length - field_data = self.data - - if (isinstance(field_opt_length, scapy.volatile.RandNum)): - if field_data is None: # randomize opt_length and data - field_opt_length = field_opt_length._fix() - field_data = scapy.volatile.RandBin(field_opt_length*4) - else: # get length from data - field_data += b"\0" * ((-len(field_data)) % 4) - field_opt_length = len(field_data)/4 - else: # randomize data depends on length - if field_data is None: - field_data = scapy.volatile.RandBin(field_opt_length*4) - else: # Cannot randomize because all is set - pass - - if not self.explicit: - self = next(iter(self)) - - self.opt_length = field_opt_length - self.data = field_data - - pkt = self.self_build() - for t in self.post_transforms: - pkt = t(pkt) - pay = self.do_build_payload() - if self.raw_packet_cache is None: - return self.post_build(pkt, pay) - else: - return pkt + pay - - -scapy.packet.bind_layers(scapy.all.Ether, Trill, type=0x22f3) -scapy.packet.bind_layers(scapy.all.Dot1Q, Trill, type=0x22f3) -scapy.packet.bind_layers(Trill, scapy.all.Ether) diff --git a/core/comp/app/app_uvm/pkt_gen/parser.py b/core/comp/app/app_uvm/pkt_gen/parser.py deleted file mode 100644 index e1eaa971f..000000000 --- a/core/comp/app/app_uvm/pkt_gen/parser.py +++ /dev/null @@ -1,361 +0,0 @@ -#!/bin/python3 - -# SPDX-License-Identifier: BSD-3-Clause -# -# simple pakcet generator. Packet are generated by random walk. -# In this file is resolving how prototoclols follows. -# -# Copyright (C) 2022 CESNET -# Author(s): -# Radek Iša - -from config import json_object_get -from layers import trill -import sys -import scapy.all -import scapy.utils -import scapy.volatile -import scapy.contrib.mpls -import random -import ipaddress -import json - - -class base_node: - def __init__(self, name): - self.name = name - - def name_get(self): - return self.name - - def protocol_add(self, config): - return None - - def protocol_next(self, config): - return {} - - -################################# -# PAYLOAD protocols -################################# -class Empty(base_node): - def __init__(self): - super().__init__("Empty") - - -class Payload(base_node): - def __init__(self): - super().__init__("Payload") - - def protocol_add(self, config): - return scapy.all.Raw() - - -class TRILL(base_node): - def __init__(self): - super().__init__("TRILL") - - def protocol_add(self, config): - return trill.Trill(version=0, res=0) - - def protocol_next(self, config): - if config.trill != 0: - config.trill -= 1 - proto = {"ETH" : 1} - return proto - - -################################# -# L7 protocols -################################# -class ICMPv4(base_node): - def __init__(self): - super().__init__("ICMPv4") - - def protocol_add(self, config): - return scapy.all.ICMP() - - -class ICMPv6(base_node): - def __init__(self): - super().__init__("ICMPv6") - - def protocol_add(self, config): - return scapy.all.ICMPv6Unknown() - - -class UDP(base_node): - def __init__(self): - super().__init__("UDP") - - def protocol_add(self, config): - return scapy.all.UDP() - - def protocol_next(self, config): - proto = {"Empty" : 1, "Payload" : 1} - cfg_obj = config.object_get([self.name, "weight"]) - if cfg_obj is not None: - proto.update(cfg_obj) - return proto - - -class TCP(base_node): - def __init__(self): - super().__init__("TCP") - - def protocol_add(self, config): - return scapy.all.TCP() - - def protocol_next(self, config): - proto = {"Empty" : 1, "Payload" : 1} - proto_weight = config.object_get([self.name, "weight"]) - if proto_weight is not None: - proto.update(proto_weight) - return proto - - -class SCTP(base_node): - def __init__(self): - super().__init__("SCTP") - - def protocol_add(self, config): - return scapy.all.SCTP() - - def protocol_next(self, config): - proto = {"Empty" : 1, "Payload" : 1} - cfg_obj = config.object_get([self.name, "weight"]) - if cfg_obj is not None: - proto.update(cfg_obj) - return proto - - -################################# -# IP protocols -################################# -class IPv4(base_node): - def __init__(self): - super().__init__("IPv4") - - def protocol_add(self, config): - src = None - dst = None - - src_rand = config.object_get([self.name, "values", "src"]) - if src_rand is not None: - val_range = random.choice(src_rand) - src_min = int(val_range.get("min"), 0) - src_max = int(val_range.get("max"), 0) - src = str(ipaddress.IPv4Address(random.randint(src_min, src_max))) - - dst_rand = config.object_get([self.name, "values", "dst"]) - if dst_rand is not None: - val_range = random.choice(dst_rand) - dst_min = int(val_range.get("min"), 0) - dst_max = int(val_range.get("max"), 0) - dst = str(ipaddress.IPv4Address(random.randint(dst_min, dst_max))) - - return scapy.all.IP(version=4, src=src, dst=dst) - - def protocol_next(self, config): - proto = {"Payload" : 1, "Empty" : 1, "ICMPv4" : 1, "UDP" : 1, "TCP" : 1, "SCTP" : 1} - proto_weight = config.object_get([self.name, "weight"]) - if proto_weight is not None: - proto.update(proto_weight) - return proto - - -class IPv6Ext(base_node): - def __init__(self): - super().__init__("IPv6Ext") - - def protocol_add(self, config): - possible_protocols = [scapy.all.IPv6ExtHdrDestOpt(), scapy.all.IPv6ExtHdrFragment(id=random.randint(0, 2**32-1)), scapy.all.IPv6ExtHdrHopByHop(), scapy.all.IPv6ExtHdrRouting()] - return random.choice(possible_protocols) - - def protocol_next(self, config): - proto = {"Payload" : 1, "Empty" : 1, "ICMPv4" : 1, "ICMPv6" : 1, "UDP" : 1, "TCP" : 1, "SCTP" : 1, "IPv6Ext" : 1} - proto_weight = config.object_get([self.name, "weight"]) - if proto_weight is not None: - proto.update(proto_weight) - # Check if it is last generated IPv6Ext - if config.ipv6ext != 0: - config.ipv6ext -= 1 - if config.ipv6ext == 0: - proto["IPv6Ext"] = 0 - - return proto - - -class IPv6(base_node): - def __init__(self): - super().__init__("IPv6") - - def protocol_add(self, config): - src = None - dst = None - - src_rand = config.object_get([self.name, "values", "src"]) - if src_rand is not None: - val_range = random.choice(src_rand) - src_min = int(val_range.get("min"), 0) - src_max = int(val_range.get("max"), 0) - src = str(ipaddress.IPv6Address(random.randint(src_min, src_max))) - - dst_rand = config.object_get([self.name, "values", "dst"]) - if dst_rand is not None: - val_range = random.choice(dst_rand) - dst_min = int(val_range.get("min"), 0) - dst_max = int(val_range.get("max"), 0) - dst = str(ipaddress.IPv6Address(random.randint(dst_min, dst_max))) - - return scapy.all.IPv6(version=6, src=src, dst=dst) - - def protocol_next(self, config): - proto = {"Payload" : 1, "Empty" : 1, "ICMPv4" : 1, "ICMPv6" : 1, "UDP" : 1, "TCP" : 1, "SCTP" : 1, "IPv6Ext" : 1} - proto_weight = config.object_get([self.name, "weight"]) - if proto_weight is not None: - proto.update(proto_weight) - return proto - -################################# -# ETHERNET protocols -################################# - - -class MPLS(base_node): - def __init__(self): - super().__init__("MPLS") - - def protocol_add(self, config): - return scapy.contrib.mpls.MPLS() - - def protocol_next(self, config): - proto = {"IPv4" : 1, "IPv6" : 1, "MPLS" : 1, "Empty" : 1} - proto_weight = config.object_get([self.name, "weight"]) - if proto_weight is not None: - proto.update(proto_weight) - # Check if it is last generated MPLS - if config.mpls != 0: - config.mpls -= 1 - if config.mpls == 0: - proto["MPLS"] = 0 - - return proto - - -class PPP(base_node): - def __init__(self): - super().__init__("PPP") - - def protocol_add(self, config): - return scapy.all.PPPoE()/scapy.all.PPP() - - def protocol_next(self, config): - proto = {"IPv4" : 1, "IPv6" : 1, "MPLS" : 1, "Empty" : 1} - proto_weight = config.object_get([self.name, "weight"]) - if proto_weight is not None: - proto.update(proto_weight) - - return proto - - -class VLAN(base_node): - def __init__(self): - super().__init__("VLAN") - - def protocol_add(self, config): - possible_protocols = [scapy.all.Dot1Q(), scapy.all.Dot1AD()] - return random.choice(possible_protocols) - - def protocol_next(self, config): - proto = {"IPv4" : 1, "IPv6" : 1, "VLAN" : 1 , "TRILL" : 1, "MPLS" : 1, "Empty" : 1, "PPP" : 1} - proto_weight = config.object_get([self.name, "weight"]) - if proto_weight is not None: - proto.update(proto_weight) - # check if it is last generated VLAN - if config.vlan != 0: - config.vlan -= 1 - if config.vlan == 0: - proto["VLAN"] = 0 - if config.trill == 0: - proto["TRILL"] = 0 - - return proto - - -class ETH(base_node): - def __init__(self): - super().__init__("ETH") - - def protocol_add(self, config): - return scapy.all.Ether(src=scapy.volatile.RandMAC(), dst=scapy.volatile.RandMAC()) - - def protocol_next(self, config): - proto = {"IPv4" : 1, "IPv6" : 1, "VLAN" : 1, "TRILL" : 1, "MPLS" : 1, "Empty" : 1, "PPP" : 1} - proto_weight = config.object_get([self.name, "weight"]) - if proto_weight is not None: - proto.update(proto_weight) - - if config.trill == 0: - proto["TRILL"] = 0 - - return proto - - -class parser: - def __init__(self, pcap_file, cfg, seed): - self.protocols = {"ETH" : ETH(), "VLAN" : VLAN(), "TRILL" : TRILL(), "PPP" : PPP(), "MPLS" : MPLS(), "IPv6" : IPv6(), "IPv6Ext" : IPv6Ext(), - "IPv4" : IPv4(), "TCP" : TCP(), "UDP" : UDP(), "ICMPv6" : ICMPv6(), "ICMPv4" : ICMPv4(), "SCTP" : SCTP(), - "Payload" : Payload(), "Empty" : Empty()} - self.pcap_file = scapy.utils.PcapWriter(pcap_file, append=False, sync=True) - self.cfg = None - if cfg is not None: - conf_file = open(cfg) - json_cfg = conf_file.read() - conf_file.close() - self.cfg = json.loads(json_cfg) - random.seed(seed) - - pkt_size_min = json_object_get(self.cfg, ["packet", "size_min"]) - if pkt_size_min is not None: - self.pkt_size_min = pkt_size_min - else: - self.pkt_size_min = 60 - - pkt_err_probability = json_object_get(self.cfg, ["packet", "err_probability"]) - if pkt_err_probability is not None: - self.pkt_err_probability = pkt_err_probability - else: - self.pkt_err_probability = 0 - - def __del__(self): - self.pcap_file.close() - - def gen(self): - print("This is Null packet generator and shouldn't be used", file=sys.stderr) - pass - - def proto_weight_get(self, dict_items): - proto = [] - weight = [] - for key in dict_items: - proto.append(key) - weight.append(dict_items[key]) - - return (proto, weight) - - def write(self, packet): - packet_fuzz = scapy.packet.fuzz(packet) - packet_wr = b"" - try: - packet_wr = packet_fuzz.build() - except Exception: - packet_wr = packet.build() - - # GENERATE ERROR PACKETS - if random.randint(0, 99) < self.pkt_err_probability: - packet_wr = packet_wr[0:random.randint(0, len(packet_wr))] - # SET MINIMAL SIZE - if len(packet_wr) < self.pkt_size_min: - packet_wr += bytes(self.pkt_size_min - len(packet_wr)) - self.pcap_file.write(packet_wr) diff --git a/core/comp/app/app_uvm/pkt_gen/parser_dfs.py b/core/comp/app/app_uvm/pkt_gen/parser_dfs.py deleted file mode 100644 index 1aab3a8db..000000000 --- a/core/comp/app/app_uvm/pkt_gen/parser_dfs.py +++ /dev/null @@ -1,80 +0,0 @@ -#!/bin/python3 - -# SPDX-License-Identifier: BSD-3-Clause -# -# simple packet generator. Packet are generated by dfs algorithm. -# In this file is resolving how prototoclols follows. -# -# Copyright (C) 2022 CESNET -# Author(s): -# Radek Iša - -from config import packet_config -from parser import parser as Parser - - -class dfs_item: - def __init__(self, protocol, cfg): - self.protocol = protocol - self.index = 0 - self.cfg = cfg.copy() - proto_next = protocol.protocol_next(self.cfg) - self.protocols_next = [] - for it in proto_next: - if (proto_next[it] != 0): - self.protocols_next.append(it) - - def last(self): - return len(self.protocols_next) == 0 - - def next(self): - if self.protocols_next is None or self.index >= len(self.protocols_next): - return None - - ret = self.protocols_next[self.index] - self.index += 1 - return ret - - -class parser_dfs(Parser): - def __init__(self, pcap_file, cfg, seed): - super().__init__(pcap_file, cfg, seed) - - def gen(self): - next_items = [] - - cfg_act = packet_config(self.cfg) - item = dfs_item(self.protocols["ETH"], cfg_act) - next_items.append(item) - - packets = 0 - while len(next_items) > 0: - # get last item - item = next_items[-1] - # get next generated protocol - proto_next = item.next() - - if not item.last(): - if proto_next is not None: - item_next = dfs_item(self.protocols[proto_next], item.cfg) - next_items.append(item_next) - else: - #remove last index there is no next protocol - del next_items[-1] - else: - #generate packet - packets += 1 - packet = scapy.packet.Packet() - - for it in next_items: - pkt_proto = it.protocol.protocol_add(it.cfg) - if pkt_proto is not None: - packet = packet / pkt_proto - - #write packet - self.write(packet) - #remove last index - del next_items[-1] - - print("PACKETS %d" % (packets)) - pass diff --git a/core/comp/app/app_uvm/pkt_gen/parser_rand.py b/core/comp/app/app_uvm/pkt_gen/parser_rand.py deleted file mode 100644 index d1f76b622..000000000 --- a/core/comp/app/app_uvm/pkt_gen/parser_rand.py +++ /dev/null @@ -1,40 +0,0 @@ -#!/bin/python3 - -# SPDX-License-Identifier: BSD-3-Clause -# -# simple packet generator. Packet are generated by random walk. -# In this file is resolving how prototoclols follows. -# -# Copyright (C) 2022 CESNET -# Author(s): -# Radek Iša - -from config import packet_config -from parser import parser as Parser - - -class parser_rand(Parser): - def __init__(self, pcap_file, cfg, seed, packets): - super().__init__(pcap_file, cfg, seed) - self.packets = packets - - def gen(self): - for x in range(self.packets): - cfg = packet_config(self.cfg) - proto_act = self.protocols["ETH"] - packet = scapy.packet.Packet() - - while proto_act is not None: - pkt_proto = proto_act.protocol_add(cfg) - if pkt_proto is not None: - packet = packet/pkt_proto - proto_next = proto_act.protocol_next(cfg) - if len(proto_next) > 0: - (proto_next_indexs, proto_next_weights) = self.proto_weight_get(proto_next) - protocol_next_name = random.choices(proto_next_indexs, proto_next_weights)[0] - proto_act = self.protocols.get(protocol_next_name) - else: - proto_act = None - - # End While - self.write(packet) diff --git a/core/comp/app/app_uvm/pkt_gen/pkt_gen.py b/core/comp/app/app_uvm/pkt_gen/pkt_gen.py deleted file mode 100755 index 9a1ae1490..000000000 --- a/core/comp/app/app_uvm/pkt_gen/pkt_gen.py +++ /dev/null @@ -1,67 +0,0 @@ -#!/bin/python3 - -# SPDX-License-Identifier: BSD-3-Clause -# -# simple pakcet generator. Packet are generated by random walk. -# -# Copyright (C) 2022 CESNET -# Author(s): -# Radek Iša - - -from parser_rand import parser_rand as Parser_rand -from parser_dfs import parser_dfs as Parser_dfs -import argparse -import time -import enum - - -class parse_alg(enum.Enum): - noe = 'none' - dfs = 'dfs' - rand = 'rand' - - def __str__(self): - return self.value - - @staticmethod - def values(): - ret = [] - arg_list = list(parse_alg) - for it in arg_list: - ret.append(it.value) - return ret - - -def main(): - #parse options - arg_parser = argparse.ArgumentParser() - arg_parser.add_argument("-f", "--file_output", type=str, - help="Set output file", required=True) - arg_parser.add_argument("-p", "--packets", type=int, - help="number of generated packets", default=20) - arg_parser.add_argument("-a", "--algorithm", type=parse_alg, - help=("parse algorithms possible values [" + ' '.join(parse_alg.values()) + "]"), default="rand") - arg_parser.add_argument("-s", "--seed", type=int, - help="set seed to random generator", default=int(time.time()*1000)) - arg_parser.add_argument("-c", "--conf", type=str, - help="Configrutation of random genertor for protocols in JSON", default=None) - - args = arg_parser.parse_args() - print("SEED : " + f'{args.seed}') - print("ALGORITHM : " + f'{args.algorithm}') - - #args.seed = 1667909888.37288 ./pkt_gen.py -f test.pcap -p 189 result in error - gen = parser(args.file_output, args.conf, args.seed) - - if (args.algorithm == parse_alg.rand): - gen = Parser_rand(args.file_output, args.conf, args.seed, args.packets) - if (args.algorithm == parse_alg.dfs): - gen = Parser_dfs(args.file_output, args.conf, args.seed) - - #run generator - gen.gen() - - -if __name__ == "__main__": - main()