Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add protocol filters #36

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 54 additions & 11 deletions cativity/cativity.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ def __init__(self):
self.catsniffer = Sniffer(logger=logging.getLogger("CatSniffer"))
self.grapher = Graphs()
self.network = Network()
self.protocol_filters = ["all", "zigbee", "thread"]
self.protocol = "all"
self.capture_started = True
self.packet_received = queue.Queue()
self.channel_activity = {}
Expand All @@ -59,20 +61,20 @@ def __print_banner(self):
typer.secho(
"""
____ _ _ _ _ ____ _ _
/ ___|__ _| |_(_)_ _(_) |_ _ _| _ \ ___| |_ ___ ___| |_ ___ _ __
| | / _` | __| \ \ / / | __| | | | | | |/ _ \ __/ _ \/ __| __/ _ \| '__|
| |__| (_| | |_| |\ V /| | |_| |_| | |_| | __/ || __/ (__| || (_) | |
\____\__,_|\__|_| \_/ |_|\__|\__, |____/ \___|\__\___|\___|\__\___/|_|
/ ___|__ _| |_(_)_ _(_) |_ _ _| _ \\ ___| |_ ___ ___| |_ ___ _ __
| | / _` | __| \\ \\ / / | __| | | | | | |/ _ \\ __/ _ \\/ __| __/ _ \\| '__|
| |__| (_| | |_| |\\ V /| | |_| |_| | |_| | __/ || __/ (__| || (_) | |
\\____\\__,_|\\__|_| \\_/ |_|\\__|\\__, |____/ \\___|\\__\\___|\\___|\\__\\___/|_|
|___/
""",
fg=typer.colors.BRIGHT_YELLOW,
)
typer.secho(
"A tool to analyze the channel activity fro Zigbee Networks",
"A tool to analyze the channel activity for IEEE 802.15.4 Networks",
fg=typer.colors.BRIGHT_CYAN,
)
typer.secho("Author: astrobyte", fg=typer.colors.BRIGHT_CYAN)
typer.secho("Version: 1.0", fg=typer.colors.BRIGHT_CYAN)
typer.secho("Version: 1.1", fg=typer.colors.BRIGHT_CYAN)
typer.secho("\n")

def channel_handler(self):
Expand Down Expand Up @@ -110,23 +112,55 @@ def main(
topology: bool = typer.Option(
False, help="Show the network topology", show_default=True
),
protocol: str = typer.Option(
"all",
help="Protocol to filter packets",
show_default=True,
show_choices=True,
case_sensitive=False,
),
):
if catsniffer is None:
raise UsageError("Please provide the serial path to the CatSniffer")
typer.secho(
"Please provide the serial path to the CatSniffer",
fg=typer.colors.BRIGHT_RED,
)
os._exit(1)
self.catsniffer.set_serial_path(catsniffer)

if channel is not None:
if channel < 11 or channel > 26:
raise UsageError(
"Invalid channel. Please provide a channel between 11 and 26"
typer.secho(
"Invalid channel. Please provide a channel between 11 and 26",
fg=typer.colors.BRIGHT_RED,
)
os._exit(1)
self.catsniffer.set_channel(channel)
if not topology:
self.grapher.update_channel(channel)
self.fixed_channel = True
self.channel_activity = {}
self.channel_activity[channel] = 0

if protocol not in self.protocol_filters:
typer.secho(
"Invalid protocol filter. Please provide a valid protocol:",
fg=typer.colors.BRIGHT_RED,
)
typer.secho(
f"{', '.join(self.protocol_filters)}", fg=typer.colors.BRIGHT_GREEN
)
os._exit(1)

self.protocol = protocol

if topology and protocol != "zigbee":
typer.secho(
"Topology analysis is only available for Zigbee protocol!",
fg=typer.colors.BRIGHT_RED,
)
os._exit(1)

self.__print_banner()
self.catsniffer.start_sniffer()

Expand Down Expand Up @@ -158,7 +192,12 @@ def main(
tisniffer_packet = TISnifferPacket(packet)
if tisniffer_packet.is_command_response():
continue
self.packet_received.put(tisniffer_packet.payload)

packet_filtered = self.network.get_packet_filtered(
tisniffer_packet.payload, self.protocol
)
if packet_filtered:
self.packet_received.put(packet_filtered)
if topology:
dissected_packet = self.network.dissect_packet(
tisniffer_packet.payload
Expand All @@ -180,7 +219,7 @@ def stop(self):
typer.secho("Happy Hacking!", fg=typer.colors.BRIGHT_YELLOW)


if __name__ == "__main__":
def main():
catbee = Cativity()
try:
catbee.app()
Expand All @@ -192,3 +231,7 @@ def stop(self):
except Exception as e:
typer.echo(f"Error: {e}")
os._exit(1)


if __name__ == "__main__":
main()
8 changes: 8 additions & 0 deletions cativity/changelog.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Change Log
## V1.1 - Cativity
### Added
- Protocol filter for Zigbee, Thread and All
- Setup.py for pip installations
### Fix
- Fix character error
- Fix Typos
12 changes: 12 additions & 0 deletions cativity/modules/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,18 @@ def __init__(self):
self.nStats = NetworkStats()
self.grapher = Graphs()

def get_packet_filtered(self, packet, pfilter):
pkt = Dot15d4(packet)
if pfilter == "all":
return packet
elif pfilter == "thread":
if not pkt.haslayer(ZigbeeNWK) or not pkt.haslayer(ZigBeeBeacon):
return packet
else:
if pkt.haslayer(ZigbeeNWK) or pkt.haslayer(ZigBeeBeacon):
return packet
return None

def dissect_packet(self, packet):
pkt = Dot15d4(packet)
new_pkt = {}
Expand Down
19 changes: 19 additions & 0 deletions cativity/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from setuptools import setup, find_packages

setup(
name="cativity",
version="1.1",
description="Cativity: A tool for channel activity",
author="Astrobyte",
url="https://github.com/ElectronicCats/CatSniffer-Tools",
packages=find_packages(include=["modules", "modules.*"]),
py_modules=["cativity"],
entry_points={
"console_scripts": [
"cativity=cativity:main",
],
},
install_requires=["click", "pyserial", "typer", "scapy", "shellingham"],
include_package_data=True,
package_data={"modules": ["*.py"]},
)