-
Notifications
You must be signed in to change notification settings - Fork 6
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
7e3aa05
commit e7f0cf9
Showing
7 changed files
with
470 additions
and
253 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,166 @@ | ||
# Standard Python Libraries | ||
from PyQt6.QtCore import Qt | ||
from PyQt6.QtWidgets import ( | ||
QDialog, | ||
QTableWidget, | ||
QTableWidgetItem, | ||
QVBoxLayout, | ||
QLabel, | ||
QHeaderView, | ||
QPushButton, | ||
QHBoxLayout | ||
) | ||
from typing import Union, Literal | ||
|
||
|
||
class InterfaceSelectionData: | ||
def __init__(self, | ||
index: int, | ||
interface_name: str, | ||
packets_sent: Union[Literal["N/A"], int], | ||
packets_recv: Union[Literal["N/A"], int], | ||
ip_address: Union[Literal["N/A"], str], | ||
mac_address: Union[Literal["N/A"], str], | ||
adapter_name: Union[Literal["N/A"], str], | ||
manufacturer: Union[Literal["N/A"], str], | ||
is_arp = False | ||
): | ||
self.index = index | ||
self.interface_name = interface_name | ||
self.packets_sent = packets_sent | ||
self.packets_recv = packets_recv | ||
self.ip_address = ip_address | ||
self.mac_address = mac_address | ||
self.adapter_name = adapter_name | ||
self.manufacturer = manufacturer | ||
self.is_arp = is_arp | ||
|
||
class InterfaceSelectionDialog(QDialog): | ||
def __init__(self, screen_width: int, screen_height: int, interfaces: list[InterfaceSelectionData]): | ||
super().__init__() | ||
|
||
# Set up the window | ||
self.setWindowTitle("Capture Network Interface Selection - Session Sniffer") | ||
# Set a minimum size for the window | ||
self.setMinimumSize(800, 600) | ||
self.resize_window_for_screen(screen_width, screen_height) | ||
|
||
# Raise and activate window to ensure it gets focus | ||
self.raise_() | ||
self.activateWindow() | ||
|
||
# Custom variables | ||
self.selected_interface_data = None | ||
self.interfaces = interfaces # Store the list of interface data | ||
|
||
# Layout for the dialog | ||
layout = QVBoxLayout() | ||
|
||
# Table widget for displaying interfaces | ||
self.table = QTableWidget() | ||
self.table.setColumnCount(7) | ||
self.table.setHorizontalHeaderLabels( | ||
["Interface Name", "Packets Sent", "Packets Received", "IP Address", "MAC Address", "Adapter Name", "Manufacturer"] | ||
) | ||
self.table.horizontalHeader().setSectionResizeMode(0, QHeaderView.ResizeMode.ResizeToContents) | ||
self.table.horizontalHeader().setSectionResizeMode(1, QHeaderView.ResizeMode.ResizeToContents) | ||
self.table.horizontalHeader().setSectionResizeMode(2, QHeaderView.ResizeMode.ResizeToContents) | ||
self.table.horizontalHeader().setSectionResizeMode(3, QHeaderView.ResizeMode.ResizeToContents) | ||
self.table.horizontalHeader().setSectionResizeMode(4, QHeaderView.ResizeMode.ResizeToContents) | ||
self.table.horizontalHeader().setSectionResizeMode(5, QHeaderView.ResizeMode.Stretch) | ||
self.table.horizontalHeader().setSectionResizeMode(6, QHeaderView.ResizeMode.Stretch) | ||
self.table.horizontalHeader().setStretchLastSection(True) | ||
self.table.verticalHeader().setVisible(False) | ||
self.table.setEditTriggers(QTableWidget.EditTrigger.NoEditTriggers) | ||
self.table.setSelectionBehavior(QTableWidget.SelectionBehavior.SelectRows) | ||
self.table.setSelectionMode(QTableWidget.SelectionMode.SingleSelection) | ||
|
||
# Populate the table with interface data | ||
for idx, interface in enumerate(self.interfaces): | ||
self.table.insertRow(idx) | ||
|
||
item = QTableWidgetItem(interface.interface_name) | ||
self.table.setItem(idx, 0, item) | ||
|
||
item = QTableWidgetItem(str(interface.packets_sent)) | ||
item.setTextAlignment(Qt.AlignmentFlag.AlignCenter) | ||
self.table.setItem(idx, 1, item) | ||
|
||
item = QTableWidgetItem(str(interface.packets_recv)) | ||
item.setTextAlignment(Qt.AlignmentFlag.AlignCenter) | ||
self.table.setItem(idx, 2, item) | ||
|
||
item = QTableWidgetItem(interface.ip_address) | ||
self.table.setItem(idx, 3, item) | ||
|
||
item = QTableWidgetItem(interface.mac_address) | ||
self.table.setItem(idx, 4, item) | ||
|
||
item = QTableWidgetItem(interface.adapter_name) | ||
item.setTextAlignment(Qt.AlignmentFlag.AlignCenter) | ||
self.table.setItem(idx, 5, item) | ||
|
||
item = QTableWidgetItem(interface.manufacturer) | ||
item.setTextAlignment(Qt.AlignmentFlag.AlignCenter) | ||
self.table.setItem(idx, 6, item) | ||
|
||
# Bottom layout for buttons | ||
bottom_layout = QHBoxLayout() | ||
instruction_label = QLabel("Select the network interface you want to sniff.") | ||
instruction_label.setStyleSheet("font-size: 15pt;") | ||
|
||
self.select_button = QPushButton("Start Sniffing") | ||
self.select_button.setStyleSheet("font-size: 18pt;") | ||
self.select_button.setEnabled(False) # Initially disabled | ||
|
||
# Set fixed size for the button | ||
self.select_button.setFixedSize(300, 50) # Adjusted width and height for slightly larger button | ||
|
||
self.select_button.clicked.connect(self.select_interface) | ||
|
||
bottom_layout.addWidget(instruction_label) | ||
bottom_layout.addWidget(self.select_button) | ||
|
||
# Center the button in the layout | ||
bottom_layout.setAlignment(instruction_label, Qt.AlignmentFlag.AlignCenter) | ||
bottom_layout.setAlignment(self.select_button, Qt.AlignmentFlag.AlignCenter) | ||
|
||
# Add widgets to layout | ||
layout.addWidget(self.table) | ||
layout.addLayout(bottom_layout) | ||
self.setLayout(layout) | ||
|
||
# Connect selection change signal to enable/disable Select button | ||
self.table.selectionModel().selectionChanged.connect(self.update_select_button_state) | ||
|
||
def resize_window_for_screen(self, screen_width: int, screen_height: int): | ||
# Resize the window based on screen size | ||
if screen_width >= 2560 and screen_height >= 1440: | ||
self.resize(1400, 900) | ||
elif screen_width >= 1920 and screen_height >= 1080: | ||
self.resize(1200, 720) | ||
elif screen_width >= 1024 and screen_height >= 768: | ||
self.resize(940, 680) | ||
|
||
def update_select_button_state(self): | ||
# Check if any row is selected | ||
selected_row = self.table.currentRow() | ||
if selected_row != -1: | ||
self.select_button.setEnabled(True) # Enable the Select button | ||
else: | ||
self.select_button.setEnabled(False) # Disable the Select button | ||
|
||
def select_interface(self): | ||
selected_row = self.table.currentRow() | ||
if selected_row != -1: | ||
# Retrieve the selected interface data | ||
self.selected_interface_data = self.interfaces[selected_row] # Get the selected interface data | ||
self.accept() # Close the dialog and set its result to QDialog.Accepted | ||
|
||
def show_interface_selection_dialog(screen_width: int, screen_height: int, interfaces: list[InterfaceSelectionData]): | ||
# Create and show the interface selection dialog | ||
dialog = InterfaceSelectionDialog(screen_width, screen_height, interfaces) | ||
if dialog.exec() == QDialog.DialogCode.Accepted: # Blocks until the dialog is accepted or rejected | ||
return dialog.selected_interface_data | ||
else: | ||
return None |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,66 +1,93 @@ | ||
# External/Third-party Python Libraries | ||
from pypresence import Presence, DiscordNotFound, PipeClosed, ResponseTimeout, exceptions | ||
|
||
# Standard Python Libraries | ||
import time | ||
from typing import Optional | ||
from pypresence import Presence | ||
from pypresence.exceptions import DiscordNotFound, PipeClosed, ResponseTimeout | ||
|
||
DISCORD_PRESENCE_CLIENT_ID = 1313304495958261781 | ||
DISCORD_RPC_TITLE = "Sniffin' my babies IPs" | ||
DISCORD_RPC_BUTTONS = [ | ||
{ | ||
"label": "GitHub Repo", | ||
"url": "https://github.com/BUZZARDGTA/Session-Sniffer" | ||
} | ||
] | ||
|
||
class DiscordRPCManager: | ||
import queue | ||
import threading | ||
from typing import Union | ||
|
||
|
||
QueueHint = queue.SimpleQueue[Union[str, None, "ShutdownSignal"]] | ||
|
||
|
||
class ShutdownSignal(object): | ||
"""A unique type to represent the shutdown signal.""" | ||
pass | ||
|
||
|
||
_SHUT_DOWN = ShutdownSignal() | ||
|
||
|
||
class DiscordRPC: | ||
"""Manages Discord Rich Presence updates and connection.""" | ||
|
||
def __init__(self): | ||
self.discord_rpc = Presence(DISCORD_PRESENCE_CLIENT_ID) | ||
self.is_connected = False | ||
self.start_time: Optional[int] = None | ||
self.last_update_time: Optional[float] = None | ||
def __init__(self, client_id: int): | ||
self._RPC = Presence(client_id) | ||
self._closed = False | ||
self._queue: QueueHint = queue.SimpleQueue() | ||
|
||
def connect(self): | ||
"""Attempts to connect to Discord RPC.""" | ||
if self.is_connected: | ||
return True | ||
self.connection_status = threading.Event() | ||
|
||
try: | ||
self.discord_rpc.connect() | ||
self.is_connected = True | ||
except DiscordNotFound: | ||
self.is_connected = False | ||
self._thread = threading.Thread(target=_run, args=(self._RPC, self._queue, self.connection_status)) | ||
self._thread.start() | ||
|
||
return self.is_connected | ||
self.last_update_time: float | None = None | ||
|
||
def update(self, state_message: Optional[str] = None): | ||
def update(self, state_message: str | None = None): | ||
""" | ||
Attempts to update the Discord Rich Presence. | ||
Args: | ||
state_message (optional): If provided, the state message to display in Discord presence. | ||
""" | ||
if not self.connect(): | ||
self.last_update_time = time.perf_counter() | ||
if self._closed: | ||
return | ||
|
||
if self.start_time is None: | ||
self.start_time = int(time.time()) | ||
self.last_update_time = time.monotonic() | ||
|
||
if self._thread.is_alive(): | ||
self._queue.put(state_message) | ||
|
||
def close(self): | ||
"""Remove the Discord Rich Presence.""" | ||
if self._closed: | ||
return | ||
|
||
self._closed = True | ||
self._queue.put(_SHUT_DOWN) | ||
self._thread.join(timeout=3) | ||
|
||
|
||
def _run(RPC: Presence, queue: QueueHint, connection_status: threading.Event): | ||
DISCORD_RPC_TITLE = "Sniffin' my babies IPs" | ||
START_TIME = time.time() | ||
DISCORD_RPC_BUTTONS = [ | ||
{"label": "GitHub Repo", "url": "https://github.com/BUZZARDGTA/Session-Sniffer"}, | ||
] | ||
|
||
while True: | ||
status_message = queue.get() | ||
if status_message is _SHUT_DOWN: | ||
if connection_status.is_set(): | ||
RPC.clear() | ||
RPC.close() | ||
return | ||
|
||
if not connection_status.is_set(): | ||
try: | ||
RPC.connect() | ||
except (DiscordNotFound, exceptions.DiscordError): | ||
continue | ||
else: | ||
connection_status.set() | ||
|
||
try: | ||
self.discord_rpc.update( | ||
**({} if state_message is None else {"state": state_message}), | ||
details = DISCORD_RPC_TITLE, | ||
start = self.start_time, | ||
#large_image = "image_name", # Name of the uploaded image in Discord app assets | ||
#large_text = "Hover text for large image", # Tooltip for the large image | ||
#small_image = "image_name_small", # Optional small image | ||
#small_text = "Hover text for small image", # Tooltip for the small image | ||
buttons = DISCORD_RPC_BUTTONS | ||
RPC.update( | ||
state=status_message or None, | ||
details=DISCORD_RPC_TITLE, | ||
start=START_TIME, | ||
buttons=DISCORD_RPC_BUTTONS, | ||
) | ||
except (PipeClosed, ResponseTimeout): | ||
self.is_connected = False | ||
self.start_time = None | ||
|
||
self.last_update_time = time.perf_counter() | ||
connection_status.clear() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.