From 195cc48eb8634df6770de218a35984fe94e65acc Mon Sep 17 00:00:00 2001 From: AstroAir Date: Sun, 17 Nov 2024 12:55:13 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=20unzip=20=E5=92=8C=20ffmpeg?= =?UTF-8?q?=20=E6=A8=A1=E5=9D=97=E7=9A=84=E6=B5=8B=E8=AF=95=E6=96=87?= =?UTF-8?q?=E4=BB=B6=EF=BC=8C=E6=B6=B5=E7=9B=96=E8=A7=A3=E5=8E=8B=E3=80=81?= =?UTF-8?q?=E6=A0=BC=E5=BC=8F=E8=BD=AC=E6=8D=A2=E3=80=81=E9=9F=B3=E9=A2=91?= =?UTF-8?q?=E6=8F=90=E5=8F=96=E7=AD=89=E5=8A=9F=E8=83=BD=EF=BC=8C=E6=94=B9?= =?UTF-8?q?=E8=BF=9B=E5=BC=82=E5=B8=B8=E5=A4=84=E7=90=86=E5=92=8C=E6=B5=8B?= =?UTF-8?q?=E8=AF=95=E7=94=A8=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- modules/lithium.pytools/tests/test_7z.py | 108 +++++ modules/lithium.pytools/tests/test_daemon.py | 286 +++++++++++++ modules/lithium.pytools/tests/test_ffmpeg.py | 126 ++++++ modules/lithium.pytools/tests/test_ftp.py | 125 ++++++ modules/lithium.pytools/tests/test_sftp.py | 191 +++++++++ modules/lithium.pytools/tests/test_ssh.py | 146 +++++++ modules/lithium.pytools/tests/test_unzip.py | 99 +++++ modules/lithium.pytools/tools/daemon.py | 413 +++++++++++++++++++ modules/lithium.pytools/tools/ffmpeg.py | 408 ++++++++++++++++++ modules/lithium.pytools/tools/ftp.py | 323 +++++++++++++++ modules/lithium.pytools/tools/sevenzip.py | 410 ++++++++++++++++++ modules/lithium.pytools/tools/sftp.py | 413 +++++++++++++++++++ modules/lithium.pytools/tools/ssh.py | 404 ++++++++++++++++++ modules/lithium.pytools/tools/unzip.py | 340 +++++++++++++++ src/atom | 2 +- 15 files changed, 3793 insertions(+), 1 deletion(-) create mode 100644 modules/lithium.pytools/tests/test_7z.py create mode 100644 modules/lithium.pytools/tests/test_daemon.py create mode 100644 modules/lithium.pytools/tests/test_ffmpeg.py create mode 100644 modules/lithium.pytools/tests/test_ftp.py create mode 100644 modules/lithium.pytools/tests/test_sftp.py create mode 100644 modules/lithium.pytools/tests/test_ssh.py create mode 100644 modules/lithium.pytools/tests/test_unzip.py create mode 100644 modules/lithium.pytools/tools/daemon.py create mode 100644 modules/lithium.pytools/tools/ffmpeg.py create mode 100644 modules/lithium.pytools/tools/ftp.py create mode 100644 modules/lithium.pytools/tools/sevenzip.py create mode 100644 modules/lithium.pytools/tools/sftp.py create mode 100644 modules/lithium.pytools/tools/ssh.py create mode 100644 modules/lithium.pytools/tools/unzip.py diff --git a/modules/lithium.pytools/tests/test_7z.py b/modules/lithium.pytools/tests/test_7z.py new file mode 100644 index 00000000..0cde5906 --- /dev/null +++ b/modules/lithium.pytools/tests/test_7z.py @@ -0,0 +1,108 @@ +import pytest +from pathlib import Path +from unittest.mock import patch, MagicMock +from seven import SevenZipWrapper, SevenZipValidationError, SevenZipCompressionError, SevenZipExtractionError, SevenZipListError, SevenZipTestError, SevenZipError +import subprocess +@pytest.fixture +def seven_zip(): + return SevenZipWrapper(executable="7z") + +def test_init_valid_executable(seven_zip): + assert seven_zip.executable == "7z" + +def test_init_invalid_executable(): + with patch("shutil.which", return_value=None): + with pytest.raises(SevenZipValidationError): + SevenZipWrapper(executable="invalid_executable") + +def test_validate_files_exist(seven_zip): + with patch("pathlib.Path.exists", return_value=True): + seven_zip._validate_files_exist(["file1.txt", "file2.txt"]) + +def test_validate_files_exist_invalid(seven_zip): + with patch("pathlib.Path.exists", side_effect=[True, False]): + with pytest.raises(SevenZipValidationError): + seven_zip._validate_files_exist(["file1.txt", "file2.txt"]) + +def test_validate_archive_exists(seven_zip): + with patch("pathlib.Path.exists", return_value=True): + seven_zip._validate_archive_exists("archive.7z") + +def test_validate_archive_exists_invalid(seven_zip): + with patch("pathlib.Path.exists", return_value=False): + with pytest.raises(SevenZipValidationError): + seven_zip._validate_archive_exists("archive.7z") + +@patch("subprocess.run") +def test_compress(mock_run, seven_zip): + mock_run.return_value = MagicMock(returncode=0, stdout="Success", stderr="") + with patch("pathlib.Path.exists", return_value=True): + seven_zip.compress(["file1.txt"], "archive.7z") + mock_run.assert_called_once() + +@patch("subprocess.run", side_effect=subprocess.CalledProcessError(1, "cmd")) +def test_compress_failure(mock_run, seven_zip): + with patch("pathlib.Path.exists", return_value=True): + with pytest.raises(SevenZipCompressionError): + seven_zip.compress(["file1.txt"], "archive.7z") + + +@patch("subprocess.run") +def test_list_contents(mock_run, seven_zip): + mock_run.return_value = MagicMock(returncode=0, stdout="file1.txt\nfile2.txt", stderr="") + with patch("pathlib.Path.exists", return_value=True): + contents = seven_zip.list_contents("archive.7z") + assert contents == "file1.txt\nfile2.txt" + mock_run.assert_called_once() + +@patch("subprocess.run", side_effect=subprocess.CalledProcessError(1, "cmd")) +def test_list_contents_failure(mock_run, seven_zip): + with patch("pathlib.Path.exists", return_value=True): + with pytest.raises(SevenZipListError): + seven_zip.list_contents("archive.7z") + +@patch("subprocess.run") +def test_test_archive(mock_run, seven_zip): + mock_run.return_value = MagicMock(returncode=0, stdout="Everything is Ok", stderr="") + with patch("pathlib.Path.exists", return_value=True): + is_valid = seven_zip.test_archive("archive.7z") + assert is_valid + mock_run.assert_called_once() + +@patch("subprocess.run", side_effect=subprocess.CalledProcessError(1, "cmd")) +def test_test_archive_failure(mock_run, seven_zip): + with patch("pathlib.Path.exists", return_value=True): + with pytest.raises(SevenZipTestError): + seven_zip.test_archive("archive.7z") + +@patch("pathlib.Path.unlink") +def test_delete_archive(mock_unlink, seven_zip): + with patch("pathlib.Path.exists", return_value=True): + seven_zip.delete_archive("archive.7z") + mock_unlink.assert_called_once() + +@patch("pathlib.Path.unlink", side_effect=Exception("Error")) +def test_delete_archive_failure(mock_unlink, seven_zip): + with patch("pathlib.Path.exists", return_value=True): + with pytest.raises(SevenZipError): + seven_zip.delete_archive("archive.7z") + +@patch("subprocess.run") +def test_update_archive_add(mock_run, seven_zip): + mock_run.return_value = MagicMock(returncode=0, stdout="Success", stderr="") + with patch("pathlib.Path.exists", return_value=True): + seven_zip.update_archive("archive.7z", ["file1.txt"], add=True, delete=False) + mock_run.assert_called_once() + +@patch("subprocess.run") +def test_update_archive_delete(mock_run, seven_zip): + mock_run.return_value = MagicMock(returncode=0, stdout="Success", stderr="") + with patch("pathlib.Path.exists", return_value=True): + seven_zip.update_archive("archive.7z", ["file1.txt"], add=False, delete=True) + mock_run.assert_called_once() + +@patch("subprocess.run", side_effect=subprocess.CalledProcessError(1, "cmd")) +def test_update_archive_failure(mock_run, seven_zip): + with patch("pathlib.Path.exists", return_value=True): + with pytest.raises(SevenZipError): + seven_zip.update_archive("archive.7z", ["file1.txt"], add=True, delete=False) \ No newline at end of file diff --git a/modules/lithium.pytools/tests/test_daemon.py b/modules/lithium.pytools/tests/test_daemon.py new file mode 100644 index 00000000..65ba6b65 --- /dev/null +++ b/modules/lithium.pytools/tests/test_daemon.py @@ -0,0 +1,286 @@ +import signal +import pytest +from unittest.mock import patch, MagicMock, mock_open +import os +import sys +import subprocess +import psutil +from daemon import DaemonProcess, write_pid, read_pid, is_daemon_running, stop_daemon, start_daemon, status_daemon, DEFAULT_CONFIG + +@pytest.fixture +def config(): + return { + "process_name": "python", + "script_path": "target_script.py", + "restart_interval": 5, + "cpu_threshold": 80, + "memory_threshold": 500, + "max_restarts": 3, + "monitor_interval": 5 + } + +@pytest.fixture +def daemon_process(config): + return DaemonProcess(config) + +def test_init_valid_config(daemon_process): + assert daemon_process.config["process_name"] == "python" + assert daemon_process.config["script_path"] == "target_script.py" + +def test_init_invalid_script_path(config): + config["script_path"] = "invalid_script.py" + with pytest.raises(FileNotFoundError): + DaemonProcess(config) + +@patch("subprocess.Popen") +def test_start_target_process(mock_popen, daemon_process): + mock_popen.return_value.pid = 1234 + daemon_process.start_target_process() + mock_popen.assert_called_once_with( + [sys.executable, "target_script.py"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + assert daemon_process.process.pid == 1234 + +@patch("subprocess.Popen") +def test_is_process_running(mock_popen, daemon_process): + mock_popen.return_value.poll.return_value = None + daemon_process.start_target_process() + assert daemon_process.is_process_running() is True + + mock_popen.return_value.poll.return_value = 1 + assert daemon_process.is_process_running() is False + +@patch("psutil.Process") +@patch("subprocess.Popen") +def test_monitor_process_health(mock_popen, mock_psutil, daemon_process): + mock_popen.return_value.pid = 1234 + mock_psutil.return_value.cpu_percent.return_value = 50 + mock_psutil.return_value.memory_info.return_value.rss = 400 * 1024 * 1024 + daemon_process.start_target_process() + daemon_process.monitor_process_health() + mock_psutil.assert_called_once_with(1234) + +@patch("psutil.Process") +@patch("subprocess.Popen") +def test_monitor_process_health_exceed_thresholds(mock_popen, mock_psutil, daemon_process): + mock_popen.return_value.pid = 1234 + mock_psutil.return_value.cpu_percent.return_value = 90 + mock_psutil.return_value.memory_info.return_value.rss = 600 * 1024 * 1024 + daemon_process.start_target_process() + with patch.object(daemon_process, 'restart_process') as mock_restart: + daemon_process.monitor_process_health() + mock_restart.assert_called() + +@patch("subprocess.Popen") +def test_restart_process(mock_popen, daemon_process): + mock_popen.return_value.poll.return_value = None + daemon_process.start_target_process() + daemon_process.restart_process() + assert daemon_process.restart_count == 1 + +@patch("subprocess.Popen") +def test_monitor_loop(mock_popen, daemon_process): + mock_popen.return_value.poll.return_value = None + with patch.object(daemon_process, 'is_process_running', return_value=True): + with patch.object(daemon_process, 'monitor_process_health'): + with patch("time.sleep", return_value=None): + with patch.object(daemon_process, 'cleanup'): + daemon_process.monitor_loop() + daemon_process.monitor_process_health.assert_called() + +@patch("subprocess.Popen") +def test_cleanup(mock_popen, daemon_process): + mock_popen.return_value.poll.return_value = None + daemon_process.start_target_process() + daemon_process.cleanup() + mock_popen.return_value.terminate.assert_called_once() + +@patch("builtins.open", new_callable=mock_open) +def test_write_pid(mock_open): + with patch("os.getpid", return_value=1234): + write_pid() + mock_open.assert_called_once_with("/tmp/daemon.pid", 'w', encoding='utf-8') + mock_open().write.assert_called_once_with("1234") + +@patch("builtins.open", new_callable=mock_open, read_data="1234") +def test_read_pid(mock_open): + pid = read_pid() + assert pid == 1234 + +@patch("psutil.pid_exists", return_value=True) +@patch("psutil.Process") +@patch("builtins.open", new_callable=mock_open, read_data="1234") +def test_is_daemon_running(mock_open, mock_psutil, mock_pid_exists, config): + mock_psutil.return_value.name.return_value = "python" + assert is_daemon_running(config) is True + +@patch("psutil.Process") +@patch("builtins.open", new_callable=mock_open, read_data="1234") +def test_stop_daemon(mock_open, mock_psutil): + mock_psutil.return_value.send_signal = MagicMock() + mock_psutil.return_value.wait = MagicMock() + stop_daemon() + mock_psutil.return_value.send_signal.assert_called_once_with(signal.SIGTERM) + +@patch("os.fork", side_effect=[0, 0]) +@patch("os.setsid") +@patch("builtins.open", new_callable=mock_open) +@patch("daemon.DaemonProcess") +def test_start_daemon(mock_daemon_process, mock_open, mock_setsid, mock_fork, config): + start_daemon(config) + mock_daemon_process.assert_called_once_with(config) + +@patch("psutil.pid_exists", return_value=True) +@patch("psutil.Process") +@patch("builtins.open", new_callable=mock_open, read_data="1234") +def test_status_daemon(mock_open, mock_psutil, mock_pid_exists): + mock_psutil.return_value.name.return_value = "python" + with patch("builtins.print") as mock_print: + status_daemon() + mock_print.assert_called_with("Daemon is running, PID: 1234") + @patch("daemon.DaemonProcess.start_target_process") + def test_monitor_loop_process_not_running(mock_start, daemon_process): + with patch.object(daemon_process, 'is_process_running', return_value=False): + with patch.object(daemon_process, 'restart_process') as mock_restart: + with patch("time.sleep", return_value=None): + with pytest.raises(SystemExit): + daemon_process.monitor_loop() + mock_restart.assert_called_once() + mock_start.assert_called_once() + + @patch("daemon.DaemonProcess.monitor_process_health") + def test_monitor_loop_process_running(mock_monitor_health, daemon_process): + with patch.object(daemon_process, 'is_process_running', return_value=True): + with patch("time.sleep", return_value=None): + with patch.object(daemon_process, 'cleanup'): + # To prevent an infinite loop, run the loop only once + with patch("builtins.iter", return_value=range(1)): + daemon_process.monitor_loop() + mock_monitor_health.assert_called_once() + + @patch("psutil.Process") + @patch("daemon.DaemonProcess.start_target_process") + def test_monitor_process_health_cpu_exceeds(mock_start, mock_psutil, daemon_process): + mock_proc = MagicMock() + mock_psutil.return_value = mock_proc + mock_proc.cpu_percent.return_value = 90 + mock_proc.memory_info.return_value.rss = 400 * 1024 * 1024 + daemon_process.start_target_process() + daemon_process.monitor_process_health() + mock_popen = daemon_process.process + mock_proc.cpu_percent.assert_called_once_with(interval=1) + daemon_process.restart_process.assert_called_once() + + @patch("psutil.Process") + @patch("daemon.DaemonProcess.start_target_process") + def test_monitor_process_health_memory_exceeds(mock_start, mock_psutil, daemon_process): + mock_proc = MagicMock() + mock_psutil.return_value = mock_proc + mock_proc.cpu_percent.return_value = 50 + mock_proc.memory_info.return_value.rss = 600 * 1024 * 1024 + daemon_process.start_target_process() + daemon_process.monitor_process_health() + mock_popen = daemon_process.process + mock_proc.cpu_percent.assert_called_once_with(interval=1) + daemon_process.restart_process.assert_called_once() + + @patch("psutil.Process", side_effect=psutil.NoSuchProcess(pid=1234)) + def test_monitor_process_health_no_such_process(mock_psutil, daemon_process): + daemon_process.start_target_process() + with patch.object(daemon_process, 'restart_process') as mock_restart: + daemon_process.monitor_process_health() + mock_restart.assert_called_once() + + @patch("psutil.Process", side_effect=psutil.AccessDenied(pid=1234)) + def test_monitor_process_health_access_denied(mock_psutil, daemon_process, caplog): + daemon_process.start_target_process() + daemon_process.monitor_process_health() + assert "Access denied when accessing process information." in caplog.text + + def test_is_process_running_none(daemon_process): + daemon_process.process = None + assert not daemon_process.is_process_running() + + @patch("psutil.Process") + def test_is_process_running_true(mock_psutil, daemon_process): + mock_proc = MagicMock() + mock_proc.poll.return_value = None + daemon_process.process = mock_proc + assert daemon_process.is_process_running() is True + mock_proc.poll.assert_called_once() + + @patch("psutil.Process") + def test_is_process_running_false(mock_psutil, daemon_process): + mock_proc = MagicMock() + mock_proc.poll.return_value = 1 + daemon_process.process = mock_proc + assert daemon_process.is_process_running() is False + mock_proc.poll.assert_called_once() + + @patch("subprocess.Popen") + def test_restart_process_below_max_restarts(mock_popen, daemon_process): + daemon_process.restart_count = 2 + mock_popen.return_value.pid = 5678 + daemon_process.process = mock_popen.return_value + daemon_process.restart_process() + assert daemon_process.restart_count == 3 + mock_popen.assert_called_with( + [sys.executable, "target_script.py"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + + @patch("subprocess.Popen") + def test_restart_process_max_restarts_reached(mock_popen, daemon_process): + daemon_process.restart_count = 3 + with patch("daemon.DaemonProcess.cleanup") as mock_cleanup: + with patch("sys.exit") as mock_exit: + daemon_process.restart_process() + mock_cleanup.assert_called_once() + mock_exit.assert_called_once_with("Daemon terminated: exceeded maximum restart count.") + + @patch("subprocess.Popen") + def test_cleanup_terminate_running_process(mock_popen, daemon_process): + mock_popen.return_value.poll.return_value = None + daemon_process.process = mock_popen.return_value + with patch.object(daemon_process, 'is_process_running', return_value=True): + daemon_process.cleanup() + mock_popen.return_value.terminate.assert_called_once() + mock_popen.return_value.wait.assert_called_once_with(timeout=5) + mock_popen.return_value.terminate.assert_called_once() + + @patch("subprocess.Popen") + def test_cleanup_force_kill_process_on_timeout(mock_popen, daemon_process): + mock_popen.return_value.poll.return_value = None + daemon_process.process = mock_popen.return_value + mock_popen.return_value.wait.side_effect = subprocess.TimeoutExpired(cmd='cmd', timeout=5) + with patch.object(daemon_process, 'is_process_running', return_value=True): + with patch("time.sleep", return_value=None): + daemon_process.cleanup() + mock_popen.return_value.terminate.assert_called_once() + mock_popen.return_value.kill.assert_called_once() + mock_popen.return_value.wait.assert_called() + + @patch("os.remove") + @patch("os.path.exists", return_value=True) + def test_cleanup_remove_pid_file(mock_exists, mock_remove, daemon_process): + daemon_process.cleanup() + mock_remove.assert_called_once_with("/tmp/daemon.pid") + + @patch("subprocess.Popen") + def test_start_target_process_exception(mock_popen, daemon_process, caplog): + mock_popen.side_effect = Exception("Start failed") + with pytest.raises(Exception): + daemon_process.start_target_process() + assert "Failed to start target process: Start failed" in caplog.text + + @patch("subprocess.Popen", side_effect=subprocess.TimeoutExpired(cmd='cmd', timeout=5)) + def test_restart_process_force_kill_on_timeout(mock_popen, daemon_process, caplog): + daemon_process.restart_count = 1 + daemon_process.process = mock_popen.return_value + with patch.object(daemon_process, 'is_process_running', return_value=True): + with pytest.raises(subprocess.TimeoutExpired): + daemon_process.restart_process() + assert "Process PID: None force killed." in caplog.text \ No newline at end of file diff --git a/modules/lithium.pytools/tests/test_ffmpeg.py b/modules/lithium.pytools/tests/test_ffmpeg.py new file mode 100644 index 00000000..1333fa21 --- /dev/null +++ b/modules/lithium.pytools/tests/test_ffmpeg.py @@ -0,0 +1,126 @@ +import pytest +from unittest.mock import patch, MagicMock +from ffmpeg import FFmpegWrapper +import ffmpeg +import asyncio + +# test_ffmpeg.py + + +@pytest.fixture +def ffmpeg_wrapper(): + return FFmpegWrapper() + +@pytest.mark.asyncio +@patch("ffmpeg.run") +async def test_run_ffmpeg_success(mock_run, ffmpeg_wrapper): + mock_run.return_value = None + command = ffmpeg.input("input.mp4").output("output.mp4") + await ffmpeg_wrapper._run_ffmpeg(command) + mock_run.assert_called_once() + +@pytest.mark.asyncio +@patch("ffmpeg.run", side_effect=ffmpeg.Error("error", b"stderr")) +async def test_run_ffmpeg_failure(mock_run, ffmpeg_wrapper): + command = ffmpeg.input("input.mp4").output("output.mp4") + with pytest.raises(ffmpeg.Error): + await ffmpeg_wrapper._run_ffmpeg(command) + +@pytest.mark.asyncio +@patch("ffmpeg.run") +async def test_convert_format(mock_run, ffmpeg_wrapper): + mock_run.return_value = None + await ffmpeg_wrapper.convert_format("input.mp4", "output.mp4") + mock_run.assert_called_once() + +@pytest.mark.asyncio +@patch("ffmpeg.run") +async def test_extract_audio(mock_run, ffmpeg_wrapper): + mock_run.return_value = None + await ffmpeg_wrapper.extract_audio("input.mp4", "output.mp3") + mock_run.assert_called_once() + +@pytest.mark.asyncio +@patch("ffmpeg.run") +async def test_trim_video(mock_run, ffmpeg_wrapper): + mock_run.return_value = None + await ffmpeg_wrapper.trim_video("input.mp4", "output.mp4", 10, 20) + mock_run.assert_called_once() + +@pytest.mark.asyncio +@patch("ffmpeg.run") +async def test_resize_video(mock_run, ffmpeg_wrapper): + mock_run.return_value = None + await ffmpeg_wrapper.resize_video("input.mp4", "output.mp4", 1920, 1080) + mock_run.assert_called_once() + +@pytest.mark.asyncio +@patch("ffmpeg.run") +async def test_extract_frames(mock_run, ffmpeg_wrapper): + mock_run.return_value = None + await ffmpeg_wrapper.extract_frames("input.mp4", "frame_%04d.png", 1) + mock_run.assert_called_once() + +@pytest.mark.asyncio +@patch("ffmpeg.run") +async def test_merge_videos(mock_run, ffmpeg_wrapper): + mock_run.return_value = None + await ffmpeg_wrapper.merge_videos(["input1.mp4", "input2.mp4"], "output.mp4") + mock_run.assert_called_once() + +@pytest.mark.asyncio +@patch("ffmpeg.run") +async def test_merge_audios(mock_run, ffmpeg_wrapper): + mock_run.return_value = None + await ffmpeg_wrapper.merge_audios(["input1.mp3", "input2.mp3"], "output.mp3") + mock_run.assert_called_once() + +@pytest.mark.asyncio +@patch("ffmpeg.run") +async def test_add_watermark(mock_run, ffmpeg_wrapper): + mock_run.return_value = None + await ffmpeg_wrapper.add_watermark("input.mp4", "output.mp4", "watermark.png", "topright") + mock_run.assert_called_once() + +@pytest.mark.asyncio +@patch("ffmpeg.run") +async def test_add_subtitles(mock_run, ffmpeg_wrapper): + mock_run.return_value = None + await ffmpeg_wrapper.add_subtitles("input.mp4", "output.mp4", "subtitles.srt") + mock_run.assert_called_once() + +@pytest.mark.asyncio +@patch("ffmpeg.run") +async def test_change_speed(mock_run, ffmpeg_wrapper): + mock_run.return_value = None + await ffmpeg_wrapper.change_speed("input.mp4", "output.mp4", 2.0) + mock_run.assert_called_once() + +@pytest.mark.asyncio +@patch("ffmpeg.probe") +async def test_extract_video_info(mock_probe, ffmpeg_wrapper): + mock_probe.return_value = {"streams": []} + info = await ffmpeg_wrapper.extract_video_info("input.mp4") + assert info == {"streams": []} + mock_probe.assert_called_once() + +@pytest.mark.asyncio +@patch("ffmpeg.run") +async def test_add_background_music(mock_run, ffmpeg_wrapper): + mock_run.return_value = None + await ffmpeg_wrapper.add_background_music("input.mp4", "audio.mp3", "output.mp4", 0.5) + mock_run.assert_called_once() + +@pytest.mark.asyncio +@patch("ffmpeg.run") +async def test_overlay_image(mock_run, ffmpeg_wrapper): + mock_run.return_value = None + await ffmpeg_wrapper.overlay_image("input.mp4", "overlay.png", "output.mp4", 10, 10) + mock_run.assert_called_once() + +@pytest.mark.asyncio +@patch("ffmpeg.run") +async def test_adjust_brightness_contrast(mock_run, ffmpeg_wrapper): + mock_run.return_value = None + await ffmpeg_wrapper.adjust_brightness_contrast("input.mp4", "output.mp4", 0.1, 1.2) + mock_run.assert_called_once() \ No newline at end of file diff --git a/modules/lithium.pytools/tests/test_ftp.py b/modules/lithium.pytools/tests/test_ftp.py new file mode 100644 index 00000000..728d8dd9 --- /dev/null +++ b/modules/lithium.pytools/tests/test_ftp.py @@ -0,0 +1,125 @@ +import pytest +from ftplib import FTP, error_perm, all_errors +from unittest.mock import patch, MagicMock +from ftp import FTPClient + +@pytest.fixture +def ftp_client(): + return FTPClient(host='test_host', username='test_user', password='test_pass') + +def test_connect_success(ftp_client): + with patch.object(FTP, 'connect', return_value=None), \ + patch.object(FTP, 'login', return_value=None): + assert ftp_client.connect() is True + assert ftp_client._is_connected is True + +def test_connect_failure(ftp_client): + with patch.object(FTP, 'connect', side_effect=all_errors): + assert ftp_client.connect() is False + assert ftp_client._is_connected is False + +def test_disconnect_success(ftp_client): + ftp_client.ftp = MagicMock() + ftp_client._is_connected = True + ftp_client.disconnect() + ftp_client.ftp.quit.assert_called_once() + assert ftp_client._is_connected is False + +def test_disconnect_failure(ftp_client): + ftp_client.ftp = MagicMock() + ftp_client._is_connected = True + ftp_client.ftp.quit.side_effect = all_errors + ftp_client.disconnect() + ftp_client.ftp.quit.assert_called_once() + assert ftp_client._is_connected is False + +def test_list_files_success(ftp_client): + ftp_client.ftp = MagicMock() + ftp_client.ftp.dir = MagicMock(side_effect=lambda path, callback: callback('drwxr-xr-x 1 owner group 0 Jan 1 00:00 test_dir')) + files = ftp_client.list_files() + assert len(files) == 1 + assert files[0]['name'] == 'test_dir' + assert files[0]['type'] == 'dir' + +def test_list_files_failure(ftp_client): + ftp_client.ftp = MagicMock() + ftp_client.ftp.dir.side_effect = error_perm + files = ftp_client.list_files() + assert files == [] + +def test_download_file_success(ftp_client): + ftp_client.ftp = MagicMock() + ftp_client.ftp.size.return_value = 100 + with patch('builtins.open', new_callable=MagicMock), \ + patch('os.path.getsize', return_value=0), \ + patch('tqdm.tqdm', return_value=MagicMock()): + assert ftp_client.download_file('remote_file') is True + +def test_download_file_failure(ftp_client): + ftp_client.ftp = MagicMock() + ftp_client.ftp.size.side_effect = all_errors + with patch('builtins.open', new_callable=MagicMock): + assert ftp_client.download_file('remote_file') is False + +def test_upload_file_success(ftp_client): + ftp_client.ftp = MagicMock() + with patch('builtins.open', new_callable=MagicMock), \ + patch('os.path.getsize', return_value=100), \ + patch('tqdm.tqdm', return_value=MagicMock()): + assert ftp_client.upload_file('local_file') is True + +def test_upload_file_failure(ftp_client): + ftp_client.ftp = MagicMock() + ftp_client.ftp.storbinary.side_effect = all_errors + with patch('builtins.open', new_callable=MagicMock): + assert ftp_client.upload_file('local_file') is False + +def test_delete_file_success(ftp_client): + ftp_client.ftp = MagicMock() + assert ftp_client.delete_file('test_file') is True + ftp_client.ftp.delete.assert_called_once_with('test_file') + +def test_delete_file_failure(ftp_client): + ftp_client.ftp = MagicMock() + ftp_client.ftp.delete.side_effect = all_errors + assert ftp_client.delete_file('test_file') is False + +def test_change_directory_success(ftp_client): + ftp_client.ftp = MagicMock() + assert ftp_client.change_directory('test_dir') is True + ftp_client.ftp.cwd.assert_called_once_with('test_dir') + +def test_change_directory_failure(ftp_client): + ftp_client.ftp = MagicMock() + ftp_client.ftp.cwd.side_effect = all_errors + assert ftp_client.change_directory('test_dir') is False + +def test_make_directory_success(ftp_client): + ftp_client.ftp = MagicMock() + assert ftp_client.make_directory('test_dir') is True + ftp_client.ftp.mkd.assert_called_once_with('test_dir') + +def test_make_directory_failure(ftp_client): + ftp_client.ftp = MagicMock() + ftp_client.ftp.mkd.side_effect = all_errors + assert ftp_client.make_directory('test_dir') is False + +def test_get_current_directory_success(ftp_client): + ftp_client.ftp = MagicMock() + ftp_client.ftp.pwd.return_value = '/test_dir' + assert ftp_client.get_current_directory() == '/test_dir' + +def test_get_current_directory_failure(ftp_client): + ftp_client.ftp = MagicMock() + ftp_client.ftp.pwd.side_effect = all_errors + assert ftp_client.get_current_directory() == '' + +def test_rename_file_success(ftp_client): + ftp_client.ftp = MagicMock() + assert ftp_client.rename_file('old_name', 'new_name') is True + ftp_client.ftp.rename.assert_called_once_with('old_name', 'new_name') + +def test_rename_file_failure(ftp_client): + ftp_client.ftp = MagicMock() + ftp_client.ftp.rename.side_effect = all_errors + assert ftp_client.rename_file('old_name', 'new_name') is False \ No newline at end of file diff --git a/modules/lithium.pytools/tests/test_sftp.py b/modules/lithium.pytools/tests/test_sftp.py new file mode 100644 index 00000000..e9f7501d --- /dev/null +++ b/modules/lithium.pytools/tests/test_sftp.py @@ -0,0 +1,191 @@ +import pytest +from unittest.mock import patch, MagicMock +from paramiko import SFTPClient, SSHException +from sftp import SFTPClientWrapper + +@pytest.fixture +def sftp_client(): + return SFTPClientWrapper(hostname='test_host', username='test_user', password='test_pass') + +def test_connect_success(sftp_client): + with patch.object(SFTPClientWrapper, 'connect', return_value=None): + sftp_client.connect() + assert sftp_client.client is not None + assert sftp_client.sftp is not None + +def test_connect_failure(sftp_client): + with patch.object(SFTPClientWrapper, 'connect', side_effect=SSHException): + with pytest.raises(SSHException): + sftp_client.connect() + +def test_upload_file_success(sftp_client): + sftp_client.sftp = MagicMock() + sftp_client.upload_file('local_path', 'remote_path') + sftp_client.sftp.put.assert_called_once_with('local_path', 'remote_path') + +def test_upload_file_failure(sftp_client): + sftp_client.sftp = MagicMock() + sftp_client.sftp.put.side_effect = Exception + sftp_client.upload_file('local_path', 'remote_path') + sftp_client.sftp.put.assert_called_once_with('local_path', 'remote_path') + +def test_download_file_success(sftp_client): + sftp_client.sftp = MagicMock() + sftp_client.download_file('remote_path', 'local_path') + sftp_client.sftp.get.assert_called_once_with('remote_path', 'local_path') + +def test_download_file_failure(sftp_client): + sftp_client.sftp = MagicMock() + sftp_client.sftp.get.side_effect = Exception + sftp_client.download_file('remote_path', 'local_path') + sftp_client.sftp.get.assert_called_once_with('remote_path', 'local_path') + +def test_upload_directory_success(sftp_client): + sftp_client.sftp = MagicMock() + with patch('os.walk', return_value=[('root', [], ['file1', 'file2'])]), \ + patch.object(SFTPClientWrapper, 'create_directory', return_value=None), \ + patch.object(SFTPClientWrapper, 'upload_file', return_value=None): + sftp_client.upload_directory('local_dir', 'remote_dir') + sftp_client.create_directory.assert_called() + sftp_client.upload_file.assert_called() + +def test_upload_directory_failure(sftp_client): + sftp_client.sftp = MagicMock() + with patch('os.walk', return_value=[('root', [], ['file1', 'file2'])]), \ + patch.object(SFTPClientWrapper, 'create_directory', side_effect=Exception), \ + patch.object(SFTPClientWrapper, 'upload_file', return_value=None): + sftp_client.upload_directory('local_dir', 'remote_dir') + sftp_client.create_directory.assert_called() + +def test_download_directory_success(sftp_client): + sftp_client.sftp = MagicMock() + sftp_client.sftp.listdir_attr.return_value = [MagicMock(filename='file1', st_mode=0), MagicMock(filename='dir1', st_mode=stat.S_IFDIR)] + with patch('os.makedirs', return_value=None), \ + patch.object(SFTPClientWrapper, 'download_file', return_value=None), \ + patch.object(SFTPClientWrapper, 'download_directory', return_value=None): + sftp_client.download_directory('remote_dir', 'local_dir') + sftp_client.download_file.assert_called() + sftp_client.download_directory.assert_called() + +def test_download_directory_failure(sftp_client): + sftp_client.sftp = MagicMock() + sftp_client.sftp.listdir_attr.side_effect = Exception + with patch('os.makedirs', return_value=None): + sftp_client.download_directory('remote_dir', 'local_dir') + sftp_client.sftp.listdir_attr.assert_called_once_with('remote_dir') + +def test_create_directory_success(sftp_client): + sftp_client.sftp = MagicMock() + sftp_client.create_directory('remote_path') + sftp_client.sftp.mkdir.assert_called_once_with('remote_path') + +def test_create_directory_failure(sftp_client): + sftp_client.sftp = MagicMock() + sftp_client.sftp.mkdir.side_effect = Exception + sftp_client.create_directory('remote_path') + sftp_client.sftp.mkdir.assert_called_once_with('remote_path') + +def test_remove_directory_success(sftp_client): + sftp_client.sftp = MagicMock() + sftp_client.sftp.listdir_attr.return_value = [MagicMock(filename='file1', st_mode=0), MagicMock(filename='dir1', st_mode=stat.S_IFDIR)] + with patch.object(SFTPClientWrapper, 'remove_directory', return_value=None): + sftp_client.remove_directory('remote_path') + sftp_client.sftp.remove.assert_called() + sftp_client.sftp.rmdir.assert_called_once_with('remote_path') + +def test_remove_directory_failure(sftp_client): + sftp_client.sftp = MagicMock() + sftp_client.sftp.listdir_attr.side_effect = Exception + sftp_client.remove_directory('remote_path') + sftp_client.sftp.listdir_attr.assert_called_once_with('remote_path') + +def test_get_file_info_success(sftp_client): + sftp_client.sftp = MagicMock() + sftp_client.sftp.stat.return_value = MagicMock(st_size=100, st_mtime=1000, st_mode=stat.S_IFREG) + sftp_client.get_file_info('remote_path') + sftp_client.sftp.stat.assert_called_once_with('remote_path') + +def test_get_file_info_failure(sftp_client): + sftp_client.sftp = MagicMock() + sftp_client.sftp.stat.side_effect = Exception + sftp_client.get_file_info('remote_path') + sftp_client.sftp.stat.assert_called_once_with('remote_path') + +def test_resume_upload_success(sftp_client): + sftp_client.sftp = MagicMock() + sftp_client.sftp.stat.return_value = MagicMock(st_size=50) + with patch('os.path.getsize', return_value=100), \ + patch('builtins.open', new_callable=MagicMock): + sftp_client.resume_upload('local_path', 'remote_path') + sftp_client.sftp.putfo.assert_called() + +def test_resume_upload_failure(sftp_client): + sftp_client.sftp = MagicMock() + sftp_client.sftp.stat.side_effect = Exception + with patch('os.path.getsize', return_value=100), \ + patch('builtins.open', new_callable=MagicMock): + sftp_client.resume_upload('local_path', 'remote_path') + sftp_client.sftp.putfo.assert_not_called() + +def test_list_files_success(sftp_client): + sftp_client.sftp = MagicMock() + sftp_client.sftp.listdir.return_value = ['file1', 'file2'] + files = sftp_client.list_files('remote_path') + assert files == ['file1', 'file2'] + sftp_client.sftp.listdir.assert_called_once_with('remote_path') + +def test_list_files_failure(sftp_client): + sftp_client.sftp = MagicMock() + sftp_client.sftp.listdir.side_effect = Exception + files = sftp_client.list_files('remote_path') + assert files == [] + sftp_client.sftp.listdir.assert_called_once_with('remote_path') + +def test_move_file_success(sftp_client): + sftp_client.sftp = MagicMock() + sftp_client.move_file('remote_src', 'remote_dest') + sftp_client.sftp.rename.assert_called_once_with('remote_src', 'remote_dest') + +def test_move_file_failure(sftp_client): + sftp_client.sftp = MagicMock() + sftp_client.sftp.rename.side_effect = Exception + sftp_client.move_file('remote_src', 'remote_dest') + sftp_client.sftp.rename.assert_called_once_with('remote_src', 'remote_dest') + +def test_delete_file_success(sftp_client): + sftp_client.sftp = MagicMock() + sftp_client.delete_file('remote_path') + sftp_client.sftp.remove.assert_called_once_with('remote_path') + +def test_delete_file_failure(sftp_client): + sftp_client.sftp = MagicMock() + sftp_client.sftp.remove.side_effect = Exception + sftp_client.delete_file('remote_path') + sftp_client.sftp.remove.assert_called_once_with('remote_path') + +def test_path_exists_success(sftp_client): + sftp_client.sftp = MagicMock() + sftp_client.sftp.stat.return_value = True + assert sftp_client.path_exists('remote_path') is True + sftp_client.sftp.stat.assert_called_once_with('remote_path') + +def test_path_exists_failure(sftp_client): + sftp_client.sftp = MagicMock() + sftp_client.sftp.stat.side_effect = FileNotFoundError + assert sftp_client.path_exists('remote_path') is False + sftp_client.sftp.stat.assert_called_once_with('remote_path') + +def test_disconnect_success(sftp_client): + sftp_client.sftp = MagicMock() + sftp_client.client = MagicMock() + sftp_client.disconnect() + sftp_client.sftp.close.assert_called_once() + sftp_client.client.close.assert_called_once() + +def test_disconnect_failure(sftp_client): + sftp_client.sftp = MagicMock() + sftp_client.client = MagicMock() + sftp_client.sftp.close.side_effect = Exception + sftp_client.disconnect() + sftp_client.sftp.close.assert_called_once() + sftp_client.client.close.assert_called_once() \ No newline at end of file diff --git a/modules/lithium.pytools/tests/test_ssh.py b/modules/lithium.pytools/tests/test_ssh.py new file mode 100644 index 00000000..b6ff5734 --- /dev/null +++ b/modules/lithium.pytools/tests/test_ssh.py @@ -0,0 +1,146 @@ +import pytest +from unittest.mock import patch, MagicMock +from ssh import SSHClient, SSHConnectionError, SSHCommandError, SFTPError, SSHError +import paramiko +@pytest.fixture +def ssh_client(): + return SSHClient(hostname="localhost", port=22, username="user", password="pass") + +@patch("paramiko.SSHClient") +def test_connect_success(mock_ssh_client, ssh_client): + mock_ssh_client.return_value.connect = MagicMock() + mock_ssh_client.return_value.open_sftp = MagicMock() + ssh_client.connect() + mock_ssh_client.return_value.connect.assert_called_once() + mock_ssh_client.return_value.open_sftp.assert_called_once() + +@patch("paramiko.SSHClient") +def test_connect_authentication_failure(mock_ssh_client, ssh_client): + mock_ssh_client.return_value.connect.side_effect = paramiko.AuthenticationException + with pytest.raises(SSHConnectionError): + ssh_client.connect() + +@patch("paramiko.SSHClient") +def test_connect_no_valid_connections(mock_ssh_client, ssh_client): + mock_ssh_client.return_value.connect.side_effect = IndexError + with pytest.raises(SSHConnectionError): + ssh_client.connect() + +@patch("paramiko.SSHClient") +def test_connect_ssh_exception(mock_ssh_client, ssh_client): + mock_ssh_client.return_value.connect.side_effect = paramiko.SSHException + with pytest.raises(SSHConnectionError): + ssh_client.connect() + +@patch("paramiko.SSHClient") +def test_connect_general_exception(mock_ssh_client, ssh_client): + mock_ssh_client.return_value.connect.side_effect = Exception + with pytest.raises(SSHConnectionError): + ssh_client.connect() + +@patch("paramiko.SSHClient") +def test_execute_command_success(mock_ssh_client, ssh_client): + mock_ssh_client.return_value.exec_command.return_value = (MagicMock(), MagicMock(), MagicMock()) + mock_ssh_client.return_value.exec_command.return_value[1].read.return_value = b"output" + mock_ssh_client.return_value.exec_command.return_value[2].read.return_value = b"" + mock_ssh_client.return_value.exec_command.return_value[1].channel.recv_exit_status.return_value = 0 + ssh_client.connect() + output, error = ssh_client.execute_command("ls") + assert output == "output" + assert error == "" + +@patch("paramiko.SSHClient") +def test_execute_command_failure(mock_ssh_client, ssh_client): + mock_ssh_client.return_value.exec_command.return_value = (MagicMock(), MagicMock(), MagicMock()) + mock_ssh_client.return_value.exec_command.return_value[1].read.return_value = b"" + mock_ssh_client.return_value.exec_command.return_value[2].read.return_value = b"error" + mock_ssh_client.return_value.exec_command.return_value[1].channel.recv_exit_status.return_value = 1 + ssh_client.connect() + with pytest.raises(SSHCommandError): + ssh_client.execute_command("ls") + +@patch("paramiko.SSHClient") +def test_upload_file_success(mock_ssh_client, ssh_client): + mock_ssh_client.return_value.open_sftp.return_value.put = MagicMock() + ssh_client.connect() + ssh_client.upload_file("local.txt", "remote.txt") + mock_ssh_client.return_value.open_sftp.return_value.put.assert_called_once_with("local.txt", "remote.txt") + +@patch("paramiko.SSHClient") +def test_upload_file_failure(mock_ssh_client, ssh_client): + mock_ssh_client.return_value.open_sftp.return_value.put.side_effect = FileNotFoundError + ssh_client.connect() + with pytest.raises(SFTPError): + ssh_client.upload_file("local.txt", "remote.txt") + +@patch("paramiko.SSHClient") +def test_download_file_success(mock_ssh_client, ssh_client): + mock_ssh_client.return_value.open_sftp.return_value.get = MagicMock() + ssh_client.connect() + ssh_client.download_file("remote.txt", "local.txt") + mock_ssh_client.return_value.open_sftp.return_value.get.assert_called_once_with("remote.txt", "local.txt") + +@patch("paramiko.SSHClient") +def test_download_file_failure(mock_ssh_client, ssh_client): + mock_ssh_client.return_value.open_sftp.return_value.get.side_effect = FileNotFoundError + ssh_client.connect() + with pytest.raises(SFTPError): + ssh_client.download_file("remote.txt", "local.txt") + +@patch("paramiko.SSHClient") +def test_list_remote_directory_success(mock_ssh_client, ssh_client): + mock_ssh_client.return_value.open_sftp.return_value.listdir.return_value = ["file1.txt", "file2.txt"] + ssh_client.connect() + files = ssh_client.list_remote_directory("remote_dir") + assert files == ["file1.txt", "file2.txt"] + +@patch("paramiko.SSHClient") +def test_list_remote_directory_failure(mock_ssh_client, ssh_client): + mock_ssh_client.return_value.open_sftp.return_value.listdir.side_effect = FileNotFoundError + ssh_client.connect() + with pytest.raises(SFTPError): + ssh_client.list_remote_directory("remote_dir") + +@patch("paramiko.SSHClient") +def test_create_remote_directory_success(mock_ssh_client, ssh_client): + mock_ssh_client.return_value.open_sftp.return_value.mkdir = MagicMock() + ssh_client.connect() + ssh_client.create_remote_directory("remote_dir") + mock_ssh_client.return_value.open_sftp.return_value.mkdir.assert_called_once_with("remote_dir") + +@patch("paramiko.SSHClient") +def test_create_remote_directory_failure(mock_ssh_client, ssh_client): + mock_ssh_client.return_value.open_sftp.return_value.mkdir.side_effect = Exception + ssh_client.connect() + with pytest.raises(SFTPError): + ssh_client.create_remote_directory("remote_dir") + +@patch("paramiko.SSHClient") +def test_delete_remote_file_success(mock_ssh_client, ssh_client): + mock_ssh_client.return_value.open_sftp.return_value.remove = MagicMock() + ssh_client.connect() + ssh_client.delete_remote_file("remote.txt") + mock_ssh_client.return_value.open_sftp.return_value.remove.assert_called_once_with("remote.txt") + +@patch("paramiko.SSHClient") +def test_delete_remote_file_failure(mock_ssh_client, ssh_client): + mock_ssh_client.return_value.open_sftp.return_value.remove.side_effect = FileNotFoundError + ssh_client.connect() + with pytest.raises(SFTPError): + ssh_client.delete_remote_file("remote.txt") + +@patch("paramiko.SSHClient") +def test_close_success(mock_ssh_client, ssh_client): + mock_ssh_client.return_value.close = MagicMock() + mock_ssh_client.return_value.open_sftp.return_value.close = MagicMock() + ssh_client.connect() + ssh_client.close() + mock_ssh_client.return_value.close.assert_called_once() + mock_ssh_client.return_value.open_sftp.return_value.close.assert_called_once() + +@patch("paramiko.SSHClient") +def test_close_failure(mock_ssh_client, ssh_client): + mock_ssh_client.return_value.close.side_effect = Exception + ssh_client.connect() + with pytest.raises(SSHError): + ssh_client.close() \ No newline at end of file diff --git a/modules/lithium.pytools/tests/test_unzip.py b/modules/lithium.pytools/tests/test_unzip.py new file mode 100644 index 00000000..eb92bb8c --- /dev/null +++ b/modules/lithium.pytools/tests/test_unzip.py @@ -0,0 +1,99 @@ +import pytest +from pathlib import Path +from unittest.mock import patch, MagicMock +from unzip import UnzipWrapper, UnzipValidationError, UnzipExtractionError, UnzipListError, UnzipIntegrityError, UnzipDeleteError, UnzipError +import subprocess + +@pytest.fixture +def unzip(): + return UnzipWrapper(executable="unzip") + +def test_init_valid_executable(unzip): + assert unzip.executable == "unzip" + +def test_init_invalid_executable(): + with patch("shutil.which", return_value=None): + with pytest.raises(UnzipValidationError): + UnzipWrapper(executable="invalid_executable") + +def test_validate_archive_exists(unzip): + with patch("pathlib.Path.exists", return_value=True): + unzip._validate_archive_exists("archive.zip") + +def test_validate_archive_exists_invalid(unzip): + with patch("pathlib.Path.exists", return_value=False): + with pytest.raises(UnzipValidationError): + unzip._validate_archive_exists("archive.zip") + +@patch("subprocess.run") +def test_extract(mock_run, unzip): + mock_run.return_value = MagicMock(returncode=0, stdout="Success", stderr="") + with patch("pathlib.Path.exists", return_value=True): + unzip.extract("archive.zip", "destination") + mock_run.assert_called_once() + +@patch("subprocess.run", side_effect=subprocess.CalledProcessError(1, "cmd")) +def test_extract_failure(mock_run, unzip): + with patch("pathlib.Path.exists", return_value=True): + with pytest.raises(UnzipExtractionError): + unzip.extract("archive.zip", "destination") + +@patch("subprocess.run") +def test_list_contents(mock_run, unzip): + mock_run.return_value = MagicMock(returncode=0, stdout="file1.txt\nfile2.txt", stderr="") + with patch("pathlib.Path.exists", return_value=True): + contents = unzip.list_contents("archive.zip") + assert contents == "file1.txt\nfile2.txt" + mock_run.assert_called_once() + +@patch("subprocess.run", side_effect=subprocess.CalledProcessError(1, "cmd")) +def test_list_contents_failure(mock_run, unzip): + with patch("pathlib.Path.exists", return_value=True): + with pytest.raises(UnzipListError): + unzip.list_contents("archive.zip") + +@patch("subprocess.run") +def test_test_integrity(mock_run, unzip): + mock_run.return_value = MagicMock(returncode=0, stdout="Everything is Ok", stderr="") + with patch("pathlib.Path.exists", return_value=True): + is_valid = unzip.test_integrity("archive.zip") + assert is_valid + mock_run.assert_called_once() + +@patch("subprocess.run", side_effect=subprocess.CalledProcessError(1, "cmd")) +def test_test_integrity_failure(mock_run, unzip): + with patch("pathlib.Path.exists", return_value=True): + with pytest.raises(UnzipIntegrityError): + unzip.test_integrity("archive.zip") + +@patch("pathlib.Path.unlink") +def test_delete_archive(mock_unlink, unzip): + with patch("pathlib.Path.exists", return_value=True): + unzip.delete_archive("archive.zip") + mock_unlink.assert_called_once() + +@patch("pathlib.Path.unlink", side_effect=Exception("Error")) +def test_delete_archive_failure(mock_unlink, unzip): + with patch("pathlib.Path.exists", return_value=True): + with pytest.raises(UnzipDeleteError): + unzip.delete_archive("archive.zip") + +@patch("pathlib.Path.exists", return_value=False) +def test_extract_nonexistent_archive(mock_exists, unzip): + with pytest.raises(UnzipValidationError): + unzip.extract("nonexistent.zip", "destination") + +@patch("pathlib.Path.exists", return_value=True) +def test_list_contents_nonexistent_archive(mock_exists, unzip): + with pytest.raises(UnzipListError): + unzip.list_contents("nonexistent.zip") + +@patch("pathlib.Path.exists", return_value=True) +def test_test_nonexistent_archive(mock_exists, unzip): + with pytest.raises(UnzipIntegrityError): + unzip.test_integrity("nonexistent.zip") + +@patch("pathlib.Path.exists", return_value=True) +def test_delete_nonexistent_archive(mock_exists, unzip): + with pytest.raises(UnzipDeleteError): + unzip.delete_archive("nonexistent.zip") \ No newline at end of file diff --git a/modules/lithium.pytools/tools/daemon.py b/modules/lithium.pytools/tools/daemon.py new file mode 100644 index 00000000..1e5a6ca0 --- /dev/null +++ b/modules/lithium.pytools/tools/daemon.py @@ -0,0 +1,413 @@ +import argparse +import os +import signal +import subprocess +import sys +import time +from datetime import datetime + +import psutil +from loguru import logger + +# Configure Loguru +logger.remove() # Remove the default logger +logger.add( + "/tmp/daemon.log", + rotation="10 MB", # Rotate log file when it reaches 10MB + retention="10 days", # Retain logs for the last 10 days + level="INFO", + format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}" +) + +# Daemon configuration parameters +DEFAULT_CONFIG = { + "process_name": "python", # Name of the process to monitor + "script_path": "target_script.py", # Path to the target script + "restart_interval": 5, # Restart interval in seconds + "cpu_threshold": 80, # CPU usage threshold in percentage + "memory_threshold": 500, # Memory usage threshold in MB + "max_restarts": 3, # Maximum number of restarts + "monitor_interval": 5 # Monitoring interval in seconds +} + +PID_FILE = "/tmp/daemon.pid" + + +class DaemonProcess: + def __init__(self, config): + self.config = config + self.restart_count = 0 + self.process = None + + # Check if the target script exists + if not os.path.isfile(self.config["script_path"]): + logger.critical( + f"Target script does not exist: {self.config['script_path']}") + raise FileNotFoundError( + f"Target script does not exist: {self.config['script_path']}") + + def start_target_process(self): + """ + Start the target process. + """ + try: + self.process = subprocess.Popen( + [sys.executable, self.config["script_path"]], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + logger.info(f"Target process started, PID: {self.process.pid}") + except Exception as e: + logger.exception(f"Failed to start target process: {e}") + raise + + def is_process_running(self): + """ + Check if the target process is running. + :return: Boolean indicating if the process exists. + """ + if self.process is None: + logger.debug("Target process instance is None.") + return False + + # Check if the process is still running + running = self.process.poll() is None + logger.debug( + f"Process status: {'Running' if running else 'Stopped'} (PID: {self.process.pid})") + return running + + def monitor_process_health(self): + """ + Monitor the CPU and memory usage of the process. Restart if thresholds are exceeded. + """ + try: + proc = psutil.Process(self.process.pid) + cpu_usage = proc.cpu_percent(interval=1) + memory_usage = proc.memory_info().rss / (1024 * 1024) # Convert to MB + logger.info( + f"Process PID: {proc.pid}, CPU: {cpu_usage}%, Memory: {memory_usage:.2f}MB") + + # Check if CPU and memory usage exceed thresholds + if cpu_usage > self.config["cpu_threshold"]: + logger.warning( + f"CPU usage exceeded threshold ({cpu_usage}% > {self.config['cpu_threshold']}%), restarting process...") + self.restart_process() + + if memory_usage > self.config["memory_threshold"]: + logger.warning( + f"Memory usage exceeded threshold ({memory_usage}MB > {self.config['memory_threshold']}MB), restarting process...") + self.restart_process() + + except psutil.NoSuchProcess: + logger.warning( + "Process does not exist, may have crashed. Will restart...") + self.restart_process() + except psutil.AccessDenied: + logger.error("Access denied when accessing process information.") + except Exception as e: + logger.exception( + f"Unknown error occurred while monitoring process health: {e}") + + def restart_process(self): + """ + Restart the target process, track restart counts, and check against maximum restarts. + """ + if self.restart_count < self.config["max_restarts"]: + try: + # Terminate existing process + if self.process and self.is_process_running(): + self.process.terminate() + try: + self.process.wait(timeout=5) + logger.info( + f"Process PID: {self.process.pid} terminated.") + except subprocess.TimeoutExpired: + self.process.kill() + self.process.wait() + logger.warning( + f"Process PID: {self.process.pid} force killed.") + + # Increment restart count and restart process + self.restart_count += 1 + logger.info( + f"Restarting target process (Restart count: {self.restart_count})") + time.sleep(self.config["restart_interval"]) + self.start_target_process() + except Exception as e: + logger.exception(f"Failed to restart process: {e}") + else: + logger.error("Maximum restart count reached, stopping daemon.") + self.cleanup() + sys.exit("Daemon terminated: exceeded maximum restart count.") + + def monitor_loop(self): + """ + Main loop of the daemon for continuous monitoring of the target process. + """ + self.start_target_process() + + try: + while True: + # Check if the process is running + if not self.is_process_running(): + logger.warning( + "Target process is not running, restarting...") + self.restart_process() + else: + # Monitor the health of the process + self.monitor_process_health() + + # Check at specified monitoring intervals + time.sleep(self.config["monitor_interval"]) + except KeyboardInterrupt: + logger.info("Daemon received interrupt signal, exiting...") + except Exception as e: + logger.exception(f"Error occurred while running daemon: {e}") + finally: + self.cleanup() + + def cleanup(self): + """ + Clean up resources and terminate the target process. + """ + if self.process and self.is_process_running(): + try: + self.process.terminate() + self.process.wait(timeout=5) + logger.info("Target process terminated.") + except subprocess.TimeoutExpired: + self.process.kill() + self.process.wait() + logger.warning("Target process force killed.") + except Exception as e: + logger.exception( + f"Error occurred while terminating process: {e}") + + # Remove PID file + if os.path.exists(PID_FILE): + os.remove(PID_FILE) + logger.debug("PID file removed.") + + +def write_pid(): + """ + Write the daemon's PID to the PID file. + """ + pid = os.getpid() + with open(PID_FILE, 'w', encoding='utf-8') as f: + f.write(str(pid)) + logger.debug(f"PID {pid} written to {PID_FILE}") + + +def read_pid(): + """ + Read the PID from the PID file. + :return: PID as an integer. + """ + try: + with open(PID_FILE, 'r', encoding='utf-8') as f: + pid = int(f.read().strip()) + return pid + except Exception as e: + logger.error(f"Failed to read PID file: {e}") + return None + + +def is_daemon_running(config): + """ + Check if the daemon is currently running. + :param config: Configuration dictionary. + :return: Boolean indicating if the daemon is running. + """ + pid = read_pid() + if pid and psutil.pid_exists(pid): + try: + proc = psutil.Process(pid) + if proc.name() == config["process_name"]: + return True + except psutil.NoSuchProcess: + return False + return False + + +def stop_daemon(): + """ + Stop the daemon process. + """ + pid = read_pid() + if not pid: + logger.error( + "PID file not found or PID is invalid. Daemon may not be running.") + print("Daemon is not running.") + return + + try: + proc = psutil.Process(pid) + logger.info(f"Sending SIGTERM signal to daemon PID: {pid}") + proc.send_signal(signal.SIGTERM) + proc.wait(timeout=10) + logger.info("Daemon has been stopped.") + print("Daemon has been stopped.") + except psutil.NoSuchProcess: + logger.error("Specified daemon process does not exist.") + print("Daemon does not exist.") + except psutil.TimeoutExpired: + logger.warning("Daemon did not respond, sending SIGKILL signal.") + proc.kill() + print("Daemon has been forcefully stopped.") + except Exception as e: + logger.exception(f"Error occurred while stopping daemon: {e}") + print(f"Error occurred while stopping daemon: {e}") + finally: + if os.path.exists(PID_FILE): + os.remove(PID_FILE) + logger.debug("PID file removed.") + + +def start_daemon(config): + """ + Start the daemon process. + """ + if is_daemon_running(config): + logger.error("Daemon is already running.") + print("Daemon is already running.") + sys.exit(1) + + # Fork the first child + try: + pid = os.fork() + if pid > 0: + # Parent process exits + sys.exit(0) + except OSError as e: + logger.exception(f"First fork failed: {e}") + sys.exit(1) + + # Create a new session + os.setsid() + + # Fork the second child + try: + pid = os.fork() + if pid > 0: + # Second parent process exits + sys.exit(0) + except OSError as e: + logger.exception(f"Second fork failed: {e}") + sys.exit(1) + + # Redirect standard file descriptors to /dev/null + sys.stdout.flush() + sys.stderr.flush() + with open('/dev/null', 'rb', 0) as f: + os.dup2(f.fileno(), sys.stdin.fileno()) + with open('/dev/null', 'ab', 0) as f: + os.dup2(f.fileno(), sys.stdout.fileno()) + os.dup2(f.fileno(), sys.stderr.fileno()) + + # Write PID file + write_pid() + + # Instantiate daemon process and start monitoring + daemon = DaemonProcess(config) + + # Set up signal handlers + def handle_signal(signum, frame): + logger.info(f"Received signal {signum}, preparing to exit daemon...") + daemon.cleanup() + sys.exit(0) + + signal.signal(signal.SIGTERM, handle_signal) + signal.signal(signal.SIGINT, handle_signal) + + # Start monitoring loop + daemon.monitor_loop() + + +def status_daemon(): + """ + Check the status of the daemon process. + """ + if is_daemon_running(DEFAULT_CONFIG): + pid = read_pid() + logger.info(f"Daemon is running, PID: {pid}") + print(f"Daemon is running, PID: {pid}") + else: + logger.info("Daemon is not running.") + print("Daemon is not running.") + + +def parse_arguments(): + """ + Parse command-line arguments. + :return: argparse.Namespace + """ + parser = argparse.ArgumentParser(description="Daemon Management Tool") + subparsers = parser.add_subparsers(dest="command", help="Sub-commands") + + # Start command + start_parser = subparsers.add_parser("start", help="Start daemon") + start_parser.add_argument( + "--process_name", type=str, default=DEFAULT_CONFIG["process_name"], + help="Name of the process to monitor" + ) + start_parser.add_argument( + "--script_path", type=str, default=DEFAULT_CONFIG["script_path"], + help="Path to the target script" + ) + start_parser.add_argument( + "--restart_interval", type=int, default=DEFAULT_CONFIG["restart_interval"], + help="Restart interval in seconds" + ) + start_parser.add_argument( + "--cpu_threshold", type=float, default=DEFAULT_CONFIG["cpu_threshold"], + help="CPU usage threshold (%)" + ) + start_parser.add_argument( + "--memory_threshold", type=float, default=DEFAULT_CONFIG["memory_threshold"], + help="Memory usage threshold (MB)" + ) + start_parser.add_argument( + "--max_restarts", type=int, default=DEFAULT_CONFIG["max_restarts"], + help="Maximum number of restarts" + ) + start_parser.add_argument( + "--monitor_interval", type=int, default=DEFAULT_CONFIG["monitor_interval"], + help="Monitoring interval in seconds" + ) + + # Stop command + subparsers.add_parser("stop", help="Stop daemon") + + # Status command + subparsers.add_parser("status", help="Check daemon status") + + return parser.parse_args() + + +if __name__ == "__main__": + args = parse_arguments() + + if args.command == "start": + config = { + "process_name": args.process_name, + "script_path": args.script_path, + "restart_interval": args.restart_interval, + "cpu_threshold": args.cpu_threshold, + "memory_threshold": args.memory_threshold, + "max_restarts": args.max_restarts, + "monitor_interval": args.monitor_interval + } + try: + start_daemon(config) + except Exception as e: + logger.critical(f"Failed to start daemon: {e}") + sys.exit(1) + elif args.command == "stop": + stop_daemon() + elif args.command == "status": + status_daemon() + else: + print("Use -h for help.") + sys.exit(1) diff --git a/modules/lithium.pytools/tools/ffmpeg.py b/modules/lithium.pytools/tools/ffmpeg.py new file mode 100644 index 00000000..c152b761 --- /dev/null +++ b/modules/lithium.pytools/tools/ffmpeg.py @@ -0,0 +1,408 @@ +# ffmpeg.py + +import ffmpeg +import asyncio +from concurrent.futures import ThreadPoolExecutor +import argparse +from loguru import logger +import sys +import os + + +class FFmpegWrapper: + def __init__(self): + self.executor = ThreadPoolExecutor() # 用于并行执行阻塞任务 + logger.info("FFmpegWrapper initialized with ThreadPoolExecutor.") + + async def _run_ffmpeg(self, command): + """ + 异步运行 ffmpeg 命令 + """ + try: + loop = asyncio.get_running_loop() + logger.debug( + f"Executing FFmpeg command: {' '.join(command.compile())}") + await loop.run_in_executor(self.executor, command.run) + logger.info("FFmpeg command executed successfully.") + except ffmpeg.Error as e: + logger.error(f"FFmpeg execution failed: {e.stderr.decode()}") + raise + except Exception as e: + logger.exception(f"Unexpected error during FFmpeg execution: {e}") + raise + + async def convert_format(self, input_file, output_file, codec="libx264"): + """转换视频格式""" + try: + command = ffmpeg.input(input_file).output( + output_file, vcodec=codec) + logger.debug( + f"Converting format: {input_file} to {output_file} with codec {codec}.") + await self._run_ffmpeg(command) + except Exception as e: + logger.error( + f"Failed to convert format from {input_file} to {output_file}: {e}") + raise + + async def extract_audio(self, input_file, output_file): + """从视频中提取音频""" + try: + command = ffmpeg.input(input_file).output( + output_file, acodec='copy') + logger.debug( + f"Extracting audio from {input_file} to {output_file}.") + await self._run_ffmpeg(command) + except Exception as e: + logger.error( + f"Failed to extract audio from {input_file} to {output_file}: {e}") + raise + + async def trim_video(self, input_file, output_file, start_time, duration): + """剪切视频""" + try: + command = ffmpeg.input( + input_file, ss=start_time, t=duration).output(output_file) + logger.debug( + f"Trimming video {input_file}: start_time={start_time}, duration={duration}.") + await self._run_ffmpeg(command) + except Exception as e: + logger.error( + f"Failed to trim video {input_file} to {output_file}: {e}") + raise + + async def resize_video(self, input_file, output_file, width, height): + """调整视频分辨率""" + try: + command = ffmpeg.input(input_file).filter( + 'scale', width, height).output(output_file) + logger.debug( + f"Resizing video {input_file} to {width}x{height}, output {output_file}.") + await self._run_ffmpeg(command) + except Exception as e: + logger.error( + f"Failed to resize video {input_file} to {output_file}: {e}") + raise + + async def extract_frames(self, input_file, output_pattern, fps=1): + """提取视频帧""" + try: + command = ffmpeg.input(input_file).filter( + 'fps', fps=fps).output(output_pattern) + logger.debug( + f"Extracting frames from {input_file} to {output_pattern} at {fps} FPS.") + await self._run_ffmpeg(command) + except Exception as e: + logger.error( + f"Failed to extract frames from {input_file} to {output_pattern}: {e}") + raise + + async def merge_videos(self, input_files, output_file): + """合并多个视频""" + try: + inputs = [ffmpeg.input(file) for file in input_files] + command = ffmpeg.concat(*inputs, v=1, a=1).output(output_file) + logger.debug(f"Merging videos {input_files} into {output_file}.") + await self._run_ffmpeg(command) + except Exception as e: + logger.error( + f"Failed to merge videos {input_files} into {output_file}: {e}") + raise + + async def merge_audios(self, input_files, output_file): + """合并多个音频文件""" + try: + inputs = [ffmpeg.input(file) for file in input_files] + command = ffmpeg.filter( + inputs, 'amix', inputs=len(inputs)).output(output_file) + logger.debug(f"Merging audios {input_files} into {output_file}.") + await self._run_ffmpeg(command) + except Exception as e: + logger.error( + f"Failed to merge audios {input_files} into {output_file}: {e}") + raise + + async def add_watermark(self, input_file, output_file, watermark_image, position="topright"): + """添加水印""" + try: + overlay_position = { + "topleft": "10:10", + "topright": "main_w-overlay_w-10:10", + "bottomleft": "10:main_h-overlay_h-10", + "bottomright": "main_w-overlay_w-10:main_h-overlay_h-10" + } + position_args = overlay_position.get(position, "10:10") + command = ffmpeg.input(input_file).overlay(watermark_image, x=position_args.split( + ":")[0], y=position_args.split(":")[1]).output(output_file) + logger.debug( + f"Adding watermark {watermark_image} to {input_file} at position {position}.") + await self._run_ffmpeg(command) + except Exception as e: + logger.error(f"Failed to add watermark to {input_file}: {e}") + raise + + async def add_subtitles(self, input_file, output_file, subtitle_file): + """添加字幕""" + try: + command = ffmpeg.input(input_file).output( + output_file, vf=f"subtitles={subtitle_file}") + logger.debug( + f"Adding subtitles from {subtitle_file} to {input_file}, output {output_file}.") + await self._run_ffmpeg(command) + except Exception as e: + logger.error(f"Failed to add subtitles to {input_file}: {e}") + raise + + async def change_speed(self, input_file, output_file, speed_factor): + """改变播放速度""" + try: + command = ffmpeg.input(input_file).filter( + 'setpts', f"{1/speed_factor}*PTS").output(output_file) + logger.debug( + f"Changing speed of {input_file} by a factor of {speed_factor}, output {output_file}.") + await self._run_ffmpeg(command) + except Exception as e: + logger.error(f"Failed to change speed of {input_file}: {e}") + raise + + # 新增功能示例 + async def extract_video_info(self, input_file): + """提取视频信息""" + try: + logger.debug(f"Extracting video info from {input_file}.") + probe = ffmpeg.probe(input_file) + logger.info(f"Video info extracted: {probe}") + return probe + except ffmpeg.Error as e: + logger.error( + f"Failed to extract video info from {input_file}: {e.stderr.decode()}") + raise + except Exception as e: + logger.exception( + f"Unexpected error while extracting video info: {e}") + raise + + async def add_background_music(self, input_video, input_audio, output_file, volume=0.5): + """为视频添加背景音乐""" + try: + logger.debug( + f"Adding background music from {input_audio} to {input_video}, output {output_file}.") + command = ( + ffmpeg + .input(input_video) + .input(input_audio) + .filter('amix', inputs=2, duration='first', dropout_transition=3) + .output(output_file, vcodec='copy', acodec='aac', audio_bitrate='192k') + ) + await self._run_ffmpeg(command) + except Exception as e: + logger.error( + f"Failed to add background music to {input_video}: {e}") + raise + + # 更多功能 + async def overlay_image(self, input_video, overlay_image, output_file, x=10, y=10): + """在视频上叠加图片""" + try: + command = ffmpeg.input(input_video).overlay( + overlay_image, x=x, y=y).output(output_file) + logger.debug( + f"Overlaying image {overlay_image} on {input_video} at position ({x}, {y}), output {output_file}.") + await self._run_ffmpeg(command) + except Exception as e: + logger.error( + f"Failed to overlay image {overlay_image} on {input_video}: {e}") + raise + + async def adjust_brightness_contrast(self, input_file, output_file, brightness=0, contrast=1): + """调整视频亮度和对比度""" + try: + command = ffmpeg.input(input_file).filter( + 'eq', brightness=brightness, contrast=contrast).output(output_file) + logger.debug( + f"Adjusting brightness to {brightness} and contrast to {contrast} for {input_file}, output {output_file}.") + await self._run_ffmpeg(command) + except Exception as e: + logger.error( + f"Failed to adjust brightness and contrast for {input_file}: {e}") + raise + + +def parse_args(): + parser = argparse.ArgumentParser(description="FFmpeg Wrapper CLI") + subparsers = parser.add_subparsers( + dest="command", help="Available commands") + + # Convert Format + parser_convert = subparsers.add_parser( + "convert", help="Convert video format") + parser_convert.add_argument("input", help="Input file path") + parser_convert.add_argument("output", help="Output file path") + parser_convert.add_argument( + "--codec", default="libx264", help="Video codec (default: libx264)") + + # Extract Audio + parser_extract_audio = subparsers.add_parser( + "extract-audio", help="Extract audio from video") + parser_extract_audio.add_argument("input", help="Input video file path") + parser_extract_audio.add_argument("output", help="Output audio file path") + + # Trim Video + parser_trim = subparsers.add_parser("trim", help="Trim video") + parser_trim.add_argument("input", help="Input video file path") + parser_trim.add_argument("output", help="Output video file path") + parser_trim.add_argument("start_time", type=int, + help="Start time in seconds") + parser_trim.add_argument("duration", type=int, help="Duration in seconds") + + # Resize Video + parser_resize = subparsers.add_parser("resize", help="Resize video") + parser_resize.add_argument("input", help="Input video file path") + parser_resize.add_argument("output", help="Output video file path") + parser_resize.add_argument("width", type=int, help="Width in pixels") + parser_resize.add_argument("height", type=int, help="Height in pixels") + + # Extract Frames + parser_extract_frames = subparsers.add_parser( + "extract-frames", help="Extract frames from video") + parser_extract_frames.add_argument("input", help="Input video file path") + parser_extract_frames.add_argument( + "output", help="Output frames pattern (e.g., frame_%04d.png)") + parser_extract_frames.add_argument( + "--fps", type=int, default=1, help="Frames per second (default: 1)") + + # Merge Videos + parser_merge_videos = subparsers.add_parser( + "merge-videos", help="Merge multiple videos") + parser_merge_videos.add_argument( + "inputs", nargs='+', help="Input video file paths") + parser_merge_videos.add_argument("output", help="Output video file path") + + # Merge Audios + parser_merge_audios = subparsers.add_parser( + "merge-audios", help="Merge multiple audio files") + parser_merge_audios.add_argument( + "inputs", nargs='+', help="Input audio file paths") + parser_merge_audios.add_argument("output", help="Output audio file path") + + # Add Watermark + parser_watermark = subparsers.add_parser( + "add-watermark", help="Add watermark to video") + parser_watermark.add_argument("input", help="Input video file path") + parser_watermark.add_argument("output", help="Output video file path") + parser_watermark.add_argument( + "watermark", help="Watermark image file path") + parser_watermark.add_argument("--position", default="topright", choices=[ + "topleft", "topright", "bottomleft", "bottomright"], help="Position of watermark (default: topright)") + + # Add Subtitles + parser_subtitles = subparsers.add_parser( + "add-subtitles", help="Add subtitles to video") + parser_subtitles.add_argument("input", help="Input video file path") + parser_subtitles.add_argument("output", help="Output video file path") + parser_subtitles.add_argument("subtitle", help="Subtitle file path") + + # Change Speed + parser_speed = subparsers.add_parser( + "change-speed", help="Change playback speed of video") + parser_speed.add_argument("input", help="Input video file path") + parser_speed.add_argument("output", help="Output video file path") + parser_speed.add_argument( + "speed", type=float, help="Speed factor (e.g., 2 for double speed)") + + # Extract Video Info + parser_info = subparsers.add_parser( + "extract-info", help="Extract video information") + parser_info.add_argument("input", help="Input video file path") + + # Add Background Music + parser_bg_music = subparsers.add_parser( + "add-bg-music", help="Add background music to video") + parser_bg_music.add_argument("video", help="Input video file path") + parser_bg_music.add_argument("audio", help="Input audio file path") + parser_bg_music.add_argument("output", help="Output video file path") + parser_bg_music.add_argument( + "--volume", type=float, default=0.5, help="Volume of background music (default: 0.5)") + + # Overlay Image + parser_overlay = subparsers.add_parser( + "overlay-image", help="Overlay image on video") + parser_overlay.add_argument("video", help="Input video file path") + parser_overlay.add_argument("image", help="Overlay image file path") + parser_overlay.add_argument("output", help="Output video file path") + parser_overlay.add_argument( + "--x", type=int, default=10, help="X position (default: 10)") + parser_overlay.add_argument( + "--y", type=int, default=10, help="Y position (default: 10)") + + # Adjust Brightness and Contrast + parser_brightness = subparsers.add_parser( + "adjust-bc", help="Adjust brightness and contrast of video") + parser_brightness.add_argument("input", help="Input video file path") + parser_brightness.add_argument("output", help="Output video file path") + parser_brightness.add_argument( + "--brightness", type=float, default=0, help="Brightness level (default: 0)") + parser_brightness.add_argument( + "--contrast", type=float, default=1, help="Contrast level (default: 1)") + + return parser.parse_args() + + +async def main(): + args = parse_args() + ffmpeg_wrapper = FFmpegWrapper() + + if args.command == "convert": + await ffmpeg_wrapper.convert_format(args.input, args.output, args.codec) + + elif args.command == "extract-audio": + await ffmpeg_wrapper.extract_audio(args.input, args.output) + + elif args.command == "trim": + await ffmpeg_wrapper.trim_video(args.input, args.output, args.start_time, args.duration) + + elif args.command == "resize": + await ffmpeg_wrapper.resize_video(args.input, args.output, args.width, args.height) + + elif args.command == "extract-frames": + await ffmpeg_wrapper.extract_frames(args.input, args.output, args.fps) + + elif args.command == "merge-videos": + await ffmpeg_wrapper.merge_videos(args.inputs, args.output) + + elif args.command == "merge-audios": + await ffmpeg_wrapper.merge_audios(args.inputs, args.output) + + elif args.command == "add-watermark": + await ffmpeg_wrapper.add_watermark(args.input, args.output, args.watermark, args.position) + + elif args.command == "add-subtitles": + await ffmpeg_wrapper.add_subtitles(args.input, args.output, args.subtitle) + + elif args.command == "change-speed": + await ffmpeg_wrapper.change_speed(args.input, args.output, args.speed) + + elif args.command == "extract-info": + info = await ffmpeg_wrapper.extract_video_info(args.input) + print(info) + + elif args.command == "add-bg-music": + await ffmpeg_wrapper.add_background_music(args.video, args.audio, args.output, args.volume) + + elif args.command == "overlay-image": + await ffmpeg_wrapper.overlay_image(args.video, args.image, args.output, args.x, args.y) + + elif args.command == "adjust-bc": + await ffmpeg_wrapper.adjust_brightness_contrast(args.input, args.output, args.brightness, args.contrast) + + else: + print("使用 -h 查看帮助信息。") + + +if __name__ == "__main__": + logger.add("ffmpeg_wrapper.log", rotation="5 MB", retention="7 days", level="DEBUG", + format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}") + try: + asyncio.run(main()) + except Exception as e: + logger.critical(f"An unhandled exception occurred: {e}") + sys.exit(1) diff --git a/modules/lithium.pytools/tools/ftp.py b/modules/lithium.pytools/tools/ftp.py new file mode 100644 index 00000000..8b57abe0 --- /dev/null +++ b/modules/lithium.pytools/tools/ftp.py @@ -0,0 +1,323 @@ +import os +import sys +import argparse +import threading +import queue +from typing import Optional, List +from datetime import datetime +from ftplib import FTP, error_perm, all_errors +from tqdm import tqdm +from concurrent.futures import ThreadPoolExecutor +from loguru import logger + +# Set up logging +logger.add("ftp_client.log", format="{time} {level} {message}", level="INFO") +logger.add(sys.stdout, format="{time} {level} {message}", level="INFO") + + +class FTPClient: + def __init__(self, host: str, username: str = 'anonymous', + password: str = '', port: int = 21, timeout: int = 30): + self.host = host + self.username = username + self.password = password + self.port = port + self.timeout = timeout + self.ftp: Optional[FTP] = None + self._is_connected = False + + def connect(self) -> bool: + try: + self.ftp = FTP() + self.ftp.connect(self.host, self.port, self.timeout) + self.ftp.login(self.username, self.password) + self._is_connected = True + logger.info(f"Connected to FTP server: {self.host}") + return True + except all_errors as e: + logger.error(f"Connection failed: {e}") + return False + + def __enter__(self): + if not self._is_connected: + self.connect() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.disconnect() + + def disconnect(self): + if self.ftp and self._is_connected: + try: + self.ftp.quit() + logger.info("FTP connection closed") + except all_errors as e: + logger.error(f"Error while disconnecting: {e}") + finally: + self._is_connected = False + + def list_files(self, path: str = '.', recursive: bool = False) -> List[dict]: + files = [] + try: + if recursive: + self._list_recursive(path, files) + else: + entries = [] + self.ftp.dir(path, entries.append) + for entry in entries: + parts = entry.split(maxsplit=8) + if len(parts) < 9: + continue + name = parts[8] + file_info = { + 'name': name, + 'size': parts[4], + 'date': ' '.join(parts[5:8]), + 'permissions': parts[0], + 'type': 'dir' if parts[0].startswith('d') else 'file' + } + files.append(file_info) + return files + except error_perm as e: + logger.error(f"Failed to list files: {e}") + return [] + + def _list_recursive(self, path: str, files: list): + try: + current_files = self.list_files(path) + for file_info in current_files: + full_path = f"{path}/{file_info['name']}" + file_info['path'] = full_path + files.append(file_info) + if file_info['type'] == 'dir': + self._list_recursive(full_path, files) + except error_perm as e: + logger.error(f"Failed to list files recursively: {e}") + + def download_file(self, remote_filename: str, local_filename: Optional[str] = None, + callback: Optional[callable] = None, resume: bool = False): + local_filename = local_filename or os.path.basename(remote_filename) + try: + existing_size = os.path.getsize( + local_filename) if resume and os.path.exists(local_filename) else 0 + total_size = self.ftp.size(remote_filename) + if resume and existing_size < total_size: + with open(local_filename, 'ab') as f: + with tqdm(total=total_size, unit='B', unit_scale=True, desc=f"Downloading {remote_filename}", initial=existing_size) as pbar: + def _callback(data): + f.write(data) + pbar.update(len(data)) + if callback: + callback(len(data)) + + self.ftp.retrbinary( + f"RETR {remote_filename}", _callback, rest=existing_size) + else: + with open(local_filename, 'wb') as f: + with tqdm(total=total_size, unit='B', unit_scale=True, desc=f"Downloading {remote_filename}") as pbar: + def _callback(data): + f.write(data) + pbar.update(len(data)) + if callback: + callback(len(data)) + + self.ftp.retrbinary( + f"RETR {remote_filename}", _callback) + logger.info(f"File {remote_filename} downloaded successfully") + return True + except all_errors as e: + logger.error(f"Error downloading file: {e}") + return False + + def upload_file(self, local_filename: str, remote_filename: Optional[str] = None, + callback: Optional[callable] = None): + remote_filename = remote_filename or os.path.basename(local_filename) + try: + file_size = os.path.getsize(local_filename) + with open(local_filename, 'rb') as f: + with tqdm(total=file_size, unit='B', unit_scale=True, + desc=f"Uploading {local_filename}") as pbar: + def _callback(data): + pbar.update(len(data)) + if callback: + callback(len(data)) + + self.ftp.storbinary( + f"STOR {remote_filename}", f, 8192, _callback) + logger.info(f"File {local_filename} uploaded successfully") + return True + except all_errors as e: + logger.error(f"Error uploading file: {e}") + return False + + def batch_transfer(self, operations: List[dict], max_workers: int = 5): + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = [] + for op in operations: + if op['type'] == 'upload': + futures.append( + executor.submit(self.upload_file, op['source'], op.get('dest'))) + elif op['type'] == 'download': + futures.append( + executor.submit(self.download_file, op['source'], op.get('dest'))) + + for future in futures: + try: + future.result() + except Exception as e: + logger.error(f"Transfer operation failed: {e}") + + def delete_file(self, filename: str): + try: + self.ftp.delete(filename) + logger.info(f"File deleted: {filename}") + return True + except all_errors as e: + logger.error(f"Error deleting file: {e}") + return False + + def change_directory(self, path: str): + try: + self.ftp.cwd(path) + logger.info(f"Changed directory to: {path}") + return True + except all_errors as e: + logger.error(f"Error changing directory: {e}") + return False + + def make_directory(self, dirname: str): + try: + self.ftp.mkd(dirname) + logger.info(f"Directory created: {dirname}") + return True + except all_errors as e: + logger.error(f"Error creating directory: {e}") + return False + + def get_current_directory(self) -> str: + try: + return self.ftp.pwd() + except all_errors as e: + logger.error(f"Failed to get current directory: {e}") + return "" + + def rename_file(self, from_name: str, to_name: str) -> bool: + try: + self.ftp.rename(from_name, to_name) + logger.info(f"File renamed: {from_name} -> {to_name}") + return True + except all_errors as e: + logger.error(f"Error renaming file: {e}") + return False + + +def create_parser(): + parser = argparse.ArgumentParser( + description='FTP client command line tool') + parser.add_argument('--host', required=True, help='FTP server address') + parser.add_argument('--port', type=int, default=21, help='FTP server port') + parser.add_argument('--username', default='anonymous', help='Username') + parser.add_argument('--password', default='', help='Password') + + subparsers = parser.add_subparsers( + dest='command', help='Available commands') + + # ls command + ls_parser = subparsers.add_parser('ls', help='List files') + ls_parser.add_argument('--path', default='.', help='Path to list') + ls_parser.add_argument( + '--recursive', action='store_true', help='List files recursively') + + # get command + get_parser = subparsers.add_parser('get', help='Download file') + get_parser.add_argument('remote_file', help='Remote file path') + get_parser.add_argument('--local-file', help='Local file path') + get_parser.add_argument( + '--resume', action='store_true', help='Resume download') + + # put command + put_parser = subparsers.add_parser('put', help='Upload file') + put_parser.add_argument('local_file', help='Local file path') + put_parser.add_argument('--remote-file', help='Remote file path') + + # rm command + rm_parser = subparsers.add_parser('rm', help='Delete file') + rm_parser.add_argument('filename', help='Filename to delete') + + # cd command + cd_parser = subparsers.add_parser('cd', help='Change directory') + cd_parser.add_argument('path', help='Target directory path') + + # mkdir command + mkdir_parser = subparsers.add_parser('mkdir', help='Create directory') + mkdir_parser.add_argument('dirname', help='Directory name to create') + + # pwd command + subparsers.add_parser('pwd', help='Show current directory') + + # rename command + rename_parser = subparsers.add_parser('rename', help='Rename file') + rename_parser.add_argument('from_name', help='Original filename') + rename_parser.add_argument('to_name', help='New filename') + + # batch command + batch_parser = subparsers.add_parser( + 'batch', help='Batch upload/download files') + batch_parser.add_argument( + 'operations_file', help='Path to operations list file') + + return parser + + +def main(): + parser = create_parser() + args = parser.parse_args() + + if not args.command: + parser.print_help() + return + + with FTPClient(args.host, args.username, args.password, args.port) as client: + if not client._is_connected: + return + + if args.command == 'ls': + files = client.list_files(args.path, args.recursive) + for file in files: + print( + f"{file['permissions']} {file['size']:>8} {file['date']} {file['name']}") + + elif args.command == 'get': + client.download_file( + args.remote_file, args.local_file, resume=args.resume) + + elif args.command == 'put': + client.upload_file(args.local_file, args.remote_file) + + elif args.command == 'rm': + client.delete_file(args.filename) + + elif args.command == 'cd': + client.change_directory(args.path) + + elif args.command == 'mkdir': + client.make_directory(args.dirname) + + elif args.command == 'pwd': + print(client.get_current_directory()) + + elif args.command == 'rename': + client.rename_file(args.from_name, args.to_name) + + elif args.command == 'batch': + try: + with open(args.operations_file, 'r') as f: + operations = [eval(line.strip()) + for line in f if line.strip()] + client.batch_transfer(operations) + except Exception as e: + logger.error(f"Batch operation failed: {e}") + + +if __name__ == '__main__': + main() diff --git a/modules/lithium.pytools/tools/sevenzip.py b/modules/lithium.pytools/tools/sevenzip.py new file mode 100644 index 00000000..480b7de6 --- /dev/null +++ b/modules/lithium.pytools/tools/sevenzip.py @@ -0,0 +1,410 @@ +import subprocess +from pathlib import Path +from typing import Union, List, Optional, Literal +from loguru import logger +import shutil +import argparse +import sys + +# Define custom exception classes + + +class SevenZipError(Exception): + """Base exception class for SevenZipWrapper""" + + +class SevenZipCompressionError(SevenZipError): + """Exception for compression failures""" + + +class SevenZipExtractionError(SevenZipError): + """Exception for extraction failures""" + + +class SevenZipListError(SevenZipError): + """Exception for listing contents failures""" + + +class SevenZipTestError(SevenZipError): + """Exception for testing archive failures""" + + +class SevenZipValidationError(SevenZipError): + """Exception for parameter validation failures""" + + +# Define supported operation types for 7z +Action = Literal['compress', 'extract', 'list', 'test', 'delete', 'update'] + + +class SevenZipWrapper: + def __init__(self, executable: str = "7z"): + """ + Initialize the 7z command-line tool wrapper + :param executable: Path to the 7z executable, defaults to "7z" in the system PATH + """ + self.executable = executable + if not shutil.which(self.executable): + logger.error(f"Executable not found: {self.executable}") + raise SevenZipValidationError(f"Executable does not exist: {self.executable}") + logger.info(f"Using 7z executable: {self.executable}") + + def _run_command(self, args: List[str]) -> subprocess.CompletedProcess: + """ + Run the 7z command and capture output + :param args: Arguments to pass to 7z + :return: subprocess.CompletedProcess object containing the return code and output information + """ + command = [self.executable] + args + logger.debug(f"Running command: {' '.join(command)}") + try: + result = subprocess.run( + command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + check=True + ) + logger.debug(f"Command output: {result.stdout}") + return result + except subprocess.CalledProcessError as e: + logger.error(f"7z command failed with exit code {e.returncode}") + logger.error(f"Error output: {e.stderr}") + raise SevenZipError(f"Command execution failed: {' '.join(command)}") from e + + def _validate_files_exist(self, files: List[Union[str, Path]]) -> None: + """ + Validate that all files exist + :param files: List of file paths + """ + for file in files: + if not Path(file).exists(): + logger.error(f"File does not exist: {file}") + raise SevenZipValidationError(f"File does not exist: {file}") + logger.debug("All files to be compressed exist") + + def _validate_archive_exists(self, archive: Union[str, Path]) -> None: + """ + Validate that the archive exists + :param archive: Path to the archive + """ + if not Path(archive).exists(): + logger.error(f"Archive does not exist: {archive}") + raise SevenZipValidationError(f"Archive does not exist: {archive}") + logger.debug("Archive exists") + + def compress(self, files: List[Union[str, Path]], archive: Union[str, Path], level: int = 5, + password: Optional[str] = None) -> None: + """ + Compress files into the specified archive + :param files: List of files to compress + :param archive: Path to the archive + :param level: Compression level (0-9), default is 5 + :param password: Password for the archive (optional) + """ + self._validate_files_exist(files) + args = ["a", "-t7z", f"-mx={level}", str(archive)] + if password: + args.extend(["-p" + password, "-mhe=on"]) # Encrypt archive with password and filenames + args.extend(str(f) for f in files) + logger.info(f"Compressing {files} into {archive} with compression level {level}") + try: + self._run_command(args) + logger.success(f"Successfully compressed {files} into {archive}") + except SevenZipError as e: + logger.error(f"Compression process failed: {e}") + raise SevenZipCompressionError("Compression failed") from e + + def extract(self, archive: Union[str, Path], destination: Union[str, Path], + password: Optional[str] = None, force: bool = False) -> None: + """ + Extract the specified archive to the destination folder + :param archive: Path to the archive + :param destination: Path to the extraction destination + :param password: Password for the archive (optional) + :param force: Whether to forcibly overwrite the destination directory if it exists + """ + self._validate_archive_exists(archive) + destination_path = Path(destination) + + if destination_path.exists(): + if force: + logger.warning(f"Destination directory exists and will be removed: {destination}") + shutil.rmtree(destination_path) + logger.info(f"Removed destination directory: {destination}") + else: + logger.error(f"Destination directory exists: {destination}") + raise SevenZipValidationError(f"Destination directory exists: {destination}") + + destination_path.parent.mkdir(parents=True, exist_ok=True) + args = ["x", str(archive), f"-o{destination}"] + if password: + args.append("-p" + password) + logger.info(f"Extracting {archive} to {destination}") + try: + self._run_command(args) + logger.success(f"Successfully extracted {archive} to {destination}") + except SevenZipError as e: + logger.error(f"Extraction process failed: {e}") + raise SevenZipExtractionError("Extraction failed") from e + + def list_contents(self, archive: Union[str, Path], + password: Optional[str] = None) -> str: + """ + List the contents of the archive + :param archive: Path to the archive + :param password: Password for the archive (optional) + :return: String representation of the archive contents + """ + self._validate_archive_exists(archive) + args = ["l", str(archive)] + if password: + args.append("-p" + password) + logger.info(f"Listing contents of {archive}") + try: + result = self._run_command(args) + logger.success(f"Successfully listed contents of {archive}") + return result.stdout + except SevenZipError as e: + logger.error(f"Listing contents failed: {e}") + raise SevenZipListError("Listing contents failed") from e + + def test_archive(self, archive: Union[str, Path], + password: Optional[str] = None) -> bool: + """ + Test the integrity of the archive + :param archive: Path to the archive + :param password: Password for the archive (optional) + :return: Whether the test was successful + """ + self._validate_archive_exists(archive) + args = ["t", str(archive)] + if password: + args.append("-p" + password) + logger.info(f"Testing integrity of {archive}") + try: + result = self._run_command(args) + is_valid = "Everything is Ok" in result.stdout + if is_valid: + logger.success(f"Integrity test passed for {archive}") + else: + logger.warning(f"Integrity test failed for {archive}") + return is_valid + except SevenZipError as e: + logger.error(f"Integrity test failed: {e}") + raise SevenZipTestError("Integrity test failed") from e + + def execute(self, action: Action, files: Optional[List[Union[str, Path]]] = None, + archive: Optional[Union[str, Path]] = None, + destination: Optional[Union[str, Path]] = None, + level: int = 5, + password: Optional[str] = None, + force: bool = False, **kwargs) -> Optional[str]: + """ + Execute the corresponding 7z command based on the action type + :param action: Action type ('compress', 'extract', 'list', 'test', 'delete', 'update') + :param files: List of files to compress (only for 'compress' and 'update') + :param archive: Path to the archive + :param destination: Path to the extraction destination (only for 'extract') + :param level: Compression level (only for 'compress') + :param password: Password for the archive (optional) + :param force: Whether to forcibly overwrite the destination directory (only for 'extract') + :return: If the action is 'list', returns the archive contents; otherwise, no return value + """ + logger.info(f"Executing action: {action}") + try: + match action: + case 'compress': + if files is None or archive is None: + raise SevenZipValidationError( + "Compress action requires 'files' and 'archive' parameters") + self.compress(files, archive, level, password) + case 'extract': + if archive is None or destination is None: + raise SevenZipValidationError( + "Extract action requires 'archive' and 'destination' parameters") + self.extract(archive, destination, password, force) + case 'list': + if archive is None: + raise SevenZipValidationError( + "List action requires 'archive' parameter") + return self.list_contents(archive, password) + case 'test': + if archive is None: + raise SevenZipValidationError( + "Test action requires 'archive' parameter") + return self.test_archive(archive, password) + case 'delete': + if archive is None: + raise SevenZipValidationError( + "Delete action requires 'archive' parameter") + self.delete_archive(archive) + case 'update': + if archive is None or files is None: + raise SevenZipValidationError( + "Update action requires 'archive' and 'files' parameters") + add = kwargs.get('add', True) + delete = kwargs.get('delete', False) + self.update_archive(archive, files, add, delete) + case _: + raise SevenZipValidationError(f"Unsupported action type: {action}") + except SevenZipError as e: + logger.error(f"Action {action} failed: {e}") + raise e + + def delete_archive(self, archive: Union[str, Path]) -> None: + """ + Delete the specified archive + :param archive: Path to the archive + """ + self._validate_archive_exists(archive) + try: + Path(archive).unlink() + logger.info(f"Deleted archive: {archive}") + logger.success(f"Successfully deleted {archive}") + except Exception as e: + logger.error(f"Failed to delete archive: {e}") + raise SevenZipError(f"Failed to delete archive: {archive}") from e + + def update_archive(self, archive: Union[str, Path], files: List[Union[str, Path]], + add: bool = True, delete: bool = False) -> None: + """ + Update the archive by adding or deleting files + :param archive: Path to the archive + :param files: List of files to add or delete + :param add: Whether to add files + :param delete: Whether to delete files + """ + self._validate_archive_exists(archive) + if add: + self._validate_files_exist(files) + args = ["u", str(archive)] + args.extend(str(f) for f in files) + logger.info(f"Adding files to {archive}: {files}") + try: + self._run_command(args) + logger.success(f"Successfully added files to {archive}") + except SevenZipError as e: + logger.error(f"Failed to update archive: {e}") + raise SevenZipError("Failed to update archive") from e + if delete: + args = ["d", str(archive)] + args.extend(str(f) for f in files) + logger.info(f"Deleting files from {archive}: {files}") + try: + self._run_command(args) + logger.success(f"Successfully deleted files from {archive}") + except SevenZipError as e: + logger.error(f"Failed to update archive: {e}") + raise SevenZipError("Failed to update archive") from e + + +def main(): + parser = argparse.ArgumentParser(description="7z Command-Line Tool Wrapper") + subparsers = parser.add_subparsers(dest='action', help='Action types') + + # Compress + compress_parser = subparsers.add_parser('compress', help='Compress files') + compress_parser.add_argument( + '-f', '--files', nargs='+', required=True, help='List of files to compress') + compress_parser.add_argument( + '-a', '--archive', required=True, help='Path to the archive') + compress_parser.add_argument('-l', '--level', type=int, default=5, choices=range(0, 10), + help='Compression level (0-9), default is 5') + compress_parser.add_argument('-p', '--password', help='Password for the archive (optional)') + + # Extract + extract_parser = subparsers.add_parser('extract', help='Extract files') + extract_parser.add_argument('-a', '--archive', required=True, help='Path to the archive') + extract_parser.add_argument( + '-d', '--destination', required=True, help='Path to the extraction destination') + extract_parser.add_argument('-p', '--password', help='Password for the archive (optional)') + extract_parser.add_argument( + '--force', action='store_true', help='Force overwrite if the destination directory exists') + + # List contents + list_parser = subparsers.add_parser('list', help='List archive contents') + list_parser.add_argument('-a', '--archive', required=True, help='Path to the archive') + list_parser.add_argument('-p', '--password', help='Password for the archive (optional)') + + # Test integrity + test_parser = subparsers.add_parser('test', help='Test archive integrity') + test_parser.add_argument('-a', '--archive', required=True, help='Path to the archive') + test_parser.add_argument('-p', '--password', help='Password for the archive (optional)') + + # Delete archive + delete_parser = subparsers.add_parser('delete', help='Delete archive') + delete_parser.add_argument('-a', '--archive', required=True, help='Path to the archive') + + # Update archive + update_parser = subparsers.add_parser('update', help='Update archive') + update_parser.add_argument('-a', '--archive', required=True, help='Path to the archive') + update_parser.add_argument( + '-f', '--files', nargs='+', required=True, help='List of files to add or delete') + update_group = update_parser.add_mutually_exclusive_group(required=True) + update_group.add_argument('--add', action='store_true', help='Add files') + update_group.add_argument('--delete', action='store_true', help='Delete files') + + args = parser.parse_args() + + if not args.action: + parser.print_help() + sys.exit(1) + + try: + zipper = SevenZipWrapper() + except SevenZipValidationError as e: + logger.critical(e) + sys.exit(1) + + try: + if args.action == 'compress': + zipper.execute( + action='compress', + files=args.files, + archive=args.archive, + level=args.level, + password=args.password + ) + elif args.action == 'extract': + zipper.execute( + action='extract', + archive=args.archive, + destination=args.destination, + password=args.password, + force=args.force + ) + elif args.action == 'list': + content = zipper.execute( + action='list', + archive=args.archive, + password=args.password + ) + print("Archive Contents:\n", content) + elif args.action == 'test': + is_valid = zipper.execute( + action='test', + archive=args.archive, + password=args.password + ) + print(f"Is the archive valid? {is_valid}") + elif args.action == 'delete': + zipper.execute( + action='delete', + archive=args.archive + ) + elif args.action == 'update': + zipper.execute( + action='update', + archive=args.archive, + files=args.files, + add=args.add, + delete=args.delete + ) + except SevenZipError as e: + logger.error(e) + sys.exit(1) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/modules/lithium.pytools/tools/sftp.py b/modules/lithium.pytools/tools/sftp.py new file mode 100644 index 00000000..ebd5ba59 --- /dev/null +++ b/modules/lithium.pytools/tools/sftp.py @@ -0,0 +1,413 @@ +from datetime import datetime +import os +import argparse +import stat +import sys +import paramiko +from paramiko import SFTPClient, SSHException +from loguru import logger + +# Configure Loguru logger +logger.add("sftp_client.log", format="{time} {level} {message}", level="INFO") +logger.add(sys.stdout, format="{time} {level} {message}", level="INFO") + + +class SFTPClientWrapper: + def __init__(self, hostname, username, password=None, port=22, key_file=None): + """ + Initialize the SFTP client wrapper. + + :param hostname: The hostname of the SFTP server. + :param username: The username to connect to the SFTP server. + :param password: The password to connect to the SFTP server (optional if using key_file). + :param port: The port to connect to the SFTP server (default is 22). + :param key_file: The path to the private key file for key-based authentication (optional). + """ + self.hostname = hostname + self.username = username + self.password = password + self.port = port + self.key_file = key_file + self.sftp = None + self.client = None + + def connect(self): + """ + Connect to the SFTP server using the provided credentials. + """ + try: + self.client = paramiko.SSHClient() + self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + if self.key_file: + private_key = paramiko.RSAKey.from_private_key_file( + self.key_file) + self.client.connect( + hostname=self.hostname, + port=self.port, + username=self.username, + pkey=private_key + ) + else: + self.client.connect( + hostname=self.hostname, + port=self.port, + username=self.username, + password=self.password + ) + self.sftp = self.client.open_sftp() + logger.info(f"Connected to SFTP server: {self.hostname}") + except (SSHException, Exception) as e: + logger.error(f"Failed to connect to SFTP server: {e}") + self.disconnect() + raise e + + def upload_file(self, local_path, remote_path): + """ + Upload a single file to the SFTP server. + + :param local_path: The local file path to upload. + :param remote_path: The remote file path to upload to. + """ + try: + self.sftp.put(local_path, remote_path) + logger.info(f"Uploaded: {local_path} -> {remote_path}") + except Exception as e: + logger.error(f"Failed to upload file '{local_path}': {e}") + + def download_file(self, remote_path, local_path): + """ + Download a single file from the SFTP server. + + :param remote_path: The remote file path to download. + :param local_path: The local file path to download to. + """ + try: + self.sftp.get(remote_path, local_path) + logger.info(f"Downloaded: {remote_path} -> {local_path}") + except Exception as e: + logger.error(f"Failed to download file '{remote_path}': {e}") + + def upload_directory(self, local_dir, remote_dir): + """ + Recursively upload a directory to the SFTP server. + + :param local_dir: The local directory path to upload. + :param remote_dir: The remote directory path to upload to. + """ + try: + for root, _, files in os.walk(local_dir): + rel_path = os.path.relpath(root, local_dir) + rel_path = "" if rel_path == "." else rel_path + remote_sub_dir = os.path.join( + remote_dir, rel_path).replace("\\", "/") + self.create_directory(remote_sub_dir) + for file in files: + local_file = os.path.join(root, file) + remote_file = os.path.join( + remote_sub_dir, file).replace("\\", "/") + self.upload_file(local_file, remote_file) + logger.info(f"Uploaded directory: {local_dir} -> {remote_dir}") + except Exception as e: + logger.error(f"Failed to upload directory '{local_dir}': {e}") + + def download_directory(self, remote_dir, local_dir): + """ + Recursively download a directory from the SFTP server. + + :param remote_dir: The remote directory path to download. + :param local_dir: The local directory path to download to. + """ + try: + os.makedirs(local_dir, exist_ok=True) + for item in self.sftp.listdir_attr(remote_dir): + remote_item = os.path.join( + remote_dir, item.filename).replace("\\", "/") + local_item = os.path.join(local_dir, item.filename) + if stat.S_ISDIR(item.st_mode): + self.download_directory(remote_item, local_item) + else: + self.download_file(remote_item, local_item) + logger.info(f"Downloaded directory: {remote_dir} -> {local_dir}") + except Exception as e: + logger.error(f"Failed to download directory '{remote_dir}': {e}") + + def create_directory(self, remote_path): + """ + Create a remote directory on the SFTP server. + + :param remote_path: The remote directory path to create. + """ + try: + self.sftp.mkdir(remote_path) + logger.info(f"Created directory: {remote_path}") + except IOError: + logger.warning( + f"Directory already exists or cannot be created: {remote_path}") + except Exception as e: + logger.error(f"Failed to create directory '{remote_path}': {e}") + + def remove_directory(self, remote_path): + """ + Recursively remove a remote directory from the SFTP server. + + :param remote_path: The remote directory path to remove. + """ + try: + for item in self.sftp.listdir_attr(remote_path): + remote_item = os.path.join( + remote_path, item.filename).replace("\\", "/") + if stat.S_ISDIR(item.st_mode): + self.remove_directory(remote_item) + else: + self.sftp.remove(remote_item) + logger.info(f"Removed file: {remote_item}") + self.sftp.rmdir(remote_path) + logger.info(f"Removed directory: {remote_path}") + except Exception as e: + logger.error(f"Failed to remove directory '{remote_path}': {e}") + + def get_file_info(self, remote_path): + """ + Get information about a remote file or directory. + + :param remote_path: The remote file or directory path. + """ + try: + info = self.sftp.stat(remote_path) + file_type = 'Directory' if stat.S_ISDIR(info.st_mode) else 'File' + logger.info(f"Information for {remote_path}:") + logger.info(f" Type: {file_type}") + logger.info(f" Size: {info.st_size} bytes") + logger.info( + f" Last modified: {datetime.fromtimestamp(info.st_mtime)}") + logger.info(f" Permissions: {oct(info.st_mode)}") + except FileNotFoundError: + logger.error(f"Path not found: {remote_path}") + except Exception as e: + logger.error(f"Failed to get file info for '{remote_path}': {e}") + + def resume_upload(self, local_path, remote_path): + """ + Resume an interrupted file upload to the SFTP server. + + :param local_path: The local file path to upload. + :param remote_path: The remote file path to upload to. + """ + try: + file_size = os.path.getsize(local_path) + try: + remote_size = self.sftp.stat(remote_path).st_size + except FileNotFoundError: + remote_size = 0 + + if remote_size < file_size: + with open(local_path, "rb") as f: + f.seek(remote_size) + self.sftp.putfo(f, remote_path, file_size=file_size, + callback=self._progress_callback(remote_size, file_size)) + logger.info(f"Resumed upload: {local_path} -> {remote_path}") + else: + logger.info(f"File already fully uploaded: {remote_path}") + except Exception as e: + logger.error(f"Failed to resume upload '{local_path}': {e}") + + def _progress_callback(self, initial, total): + """ + Internal method to provide a progress callback. + + :param initial: The initial amount of data transferred. + :param total: The total size of the file being transferred. + """ + def callback(transferred, total_size): + progress = (initial + transferred) / total + logger.info(f"Upload progress: {progress*100:.2f}%") + return callback + + def list_files(self, remote_path): + """ + List files in a remote directory on the SFTP server. + + :param remote_path: The remote directory path. + :return: A list of files in the remote directory. + """ + try: + files = self.sftp.listdir(remote_path) + logger.info(f"Files in '{remote_path}': {files}") + return files + except Exception as e: + logger.error(f"Failed to list files in '{remote_path}': {e}") + return [] + + def move_file(self, remote_src, remote_dest): + """ + Move or rename a remote file on the SFTP server. + + :param remote_src: The source remote file path. + :param remote_dest: The destination remote file path. + """ + try: + self.sftp.rename(remote_src, remote_dest) + logger.info(f"Moved/Renamed: {remote_src} -> {remote_dest}") + except Exception as e: + logger.error( + f"Failed to move/rename '{remote_src}' to '{remote_dest}': {e}") + + def delete_file(self, remote_path): + """ + Delete a remote file from the SFTP server. + + :param remote_path: The remote file path to delete. + """ + try: + self.sftp.remove(remote_path) + logger.info(f"Deleted file: {remote_path}") + except Exception as e: + logger.error(f"Failed to delete file '{remote_path}': {e}") + + def path_exists(self, remote_path): + """ + Check if a remote path exists on the SFTP server. + + :param remote_path: The remote path to check. + :return: True if the path exists, False otherwise. + """ + try: + self.sftp.stat(remote_path) + return True + except FileNotFoundError: + return False + except Exception as e: + logger.error(f"Error checking if path exists '{remote_path}': {e}") + return False + + def disconnect(self): + """ + Disconnect from the SFTP server. + """ + try: + if self.sftp: + self.sftp.close() + logger.info("SFTP connection closed") + if self.client: + self.client.close() + logger.info("SSH client disconnected") + except Exception as e: + logger.error(f"Error while disconnecting: {e}") + + +def parse_arguments(): + """ + Parse command-line arguments. + + :return: Parsed arguments. + """ + parser = argparse.ArgumentParser( + description="SFTP Client Command Line Tool") + parser.add_argument("hostname", help="SFTP server hostname") + parser.add_argument("username", help="SFTP username") + parser.add_argument("--password", help="SFTP password", default=None) + parser.add_argument("--port", type=int, + help="SFTP server port", default=22) + parser.add_argument( + "--key-file", help="Path to private key file", default=None) + + subparsers = parser.add_subparsers( + dest="command", help="Available commands") + + # Upload directory + upload_dir_parser = subparsers.add_parser( + "upload-dir", help="Upload a directory to the server") + upload_dir_parser.add_argument("local_dir", help="Local directory path") + upload_dir_parser.add_argument("remote_dir", help="Remote directory path") + + # Download directory + download_dir_parser = subparsers.add_parser( + "download-dir", help="Download a directory from the server") + download_dir_parser.add_argument( + "remote_dir", help="Remote directory path") + download_dir_parser.add_argument("local_dir", help="Local directory path") + + # Create directory + mkdir_parser = subparsers.add_parser( + "mkdir", help="Create a directory on the server") + mkdir_parser.add_argument("remote_path", help="Remote directory path") + + # Remove directory + rmdir_parser = subparsers.add_parser( + "rmdir", help="Remove a directory from the server") + rmdir_parser.add_argument("remote_path", help="Remote directory path") + + # Get file info + info_parser = subparsers.add_parser( + "info", help="Get file or directory info") + info_parser.add_argument( + "remote_path", help="Remote file or directory path") + + # Resume upload + resume_parser = subparsers.add_parser( + "resume-upload", help="Resume an interrupted file upload") + resume_parser.add_argument("local_path", help="Local file path") + resume_parser.add_argument("remote_path", help="Remote file path") + + # List files + list_parser = subparsers.add_parser( + "list", help="List files in a remote directory") + list_parser.add_argument("remote_path", help="Remote directory path") + + # Move file + move_parser = subparsers.add_parser( + "move", help="Move or rename a remote file") + move_parser.add_argument("remote_src", help="Source remote path") + move_parser.add_argument("remote_dest", help="Destination remote path") + + # Delete file + delete_parser = subparsers.add_parser( + "delete", help="Delete a remote file") + delete_parser.add_argument("remote_path", help="Remote file path") + + return parser.parse_args() + + +def main(): + """ + Main function to execute the SFTP client operations based on command-line arguments. + """ + args = parse_arguments() + + client = SFTPClientWrapper( + hostname=args.hostname, + username=args.username, + password=args.password, + port=args.port, + key_file=args.key_file + ) + + try: + client.connect() + if args.command == "upload-dir": + client.upload_directory(args.local_dir, args.remote_dir) + elif args.command == "download-dir": + client.download_directory(args.remote_dir, args.local_dir) + elif args.command == "mkdir": + client.create_directory(args.remote_path) + elif args.command == "rmdir": + client.remove_directory(args.remote_path) + elif args.command == "info": + client.get_file_info(args.remote_path) + elif args.command == "resume-upload": + client.resume_upload(args.local_path, args.remote_path) + elif args.command == "list": + client.list_files(args.remote_path) + elif args.command == "move": + client.move_file(args.remote_src, args.remote_dest) + elif args.command == "delete": + client.delete_file(args.remote_path) + else: + logger.warning( + "Unknown command. Use --help to see available commands.") + finally: + client.disconnect() + + +if __name__ == "__main__": + main() diff --git a/modules/lithium.pytools/tools/ssh.py b/modules/lithium.pytools/tools/ssh.py new file mode 100644 index 00000000..9625ac04 --- /dev/null +++ b/modules/lithium.pytools/tools/ssh.py @@ -0,0 +1,404 @@ +import paramiko +from paramiko.ssh_exception import SSHException, AuthenticationException, NoValidConnectionsError +from pathlib import Path +from typing import Union, List, Optional, Tuple +from loguru import logger +import sys +import argparse + + +# Define custom exception classes +class SSHError(Exception): + """Base exception class for SSHClient""" + + +class SSHConnectionError(SSHError): + """Exception for SSH connection failures""" + + +class SSHCommandError(SSHError): + """Exception for SSH command execution failures""" + + +class SSHPermissionError(SSHError): + """Exception for SSH permission related errors""" + + +class SFTPError(SSHError): + """Exception for SFTP operation failures""" + + +class SSHClient: + def __init__( + self, + hostname: str, + port: int, + username: str, + password: Optional[str] = None, + key_file: Optional[str] = None, + ): + """ + Initialize the SSH client. + :param hostname: Hostname or IP address. + :param port: Port number, typically 22. + :param username: Username for SSH login. + :param password: Password for SSH (if using password authentication). + :param key_file: Path to the private key file (if using key authentication). + """ + self.hostname = hostname + self.port = port + self.username = username + self.password = password + self.key_file = key_file + self.client = None + self.sftp = None + + def __enter__(self): + self.connect() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + def connect(self): + """ + Connect to the remote SSH server. + """ + logger.info( + f"Attempting to connect to {self.hostname}:{self.port} as {self.username}" + ) + try: + self.client = paramiko.SSHClient() + self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + connect_params = { + "hostname": self.hostname, + "port": self.port, + "username": self.username, + "password": self.password, + "key_filename": self.key_file, + "look_for_keys": False, + "allow_agent": False, + } + self.client.connect(**connect_params) + self.sftp = self.client.open_sftp() + logger.success( + f"Successfully connected to {self.hostname}:{self.port}") + except AuthenticationException as e: + logger.error(f"Authentication failed: {e}") + raise SSHConnectionError("Authentication failed") from e + except NoValidConnectionsError as e: + logger.error( + f"Unable to connect to {self.hostname}:{self.port}: {e}") + raise SSHConnectionError( + f"Unable to connect to {self.hostname}:{self.port}") from e + except SSHException as e: + logger.error(f"SSH connection failed: {e}") + raise SSHConnectionError("SSH connection failed") from e + except Exception as e: + logger.error(f"Connection failed: {e}") + raise SSHConnectionError("Connection failed") from e + + def execute_command( + self, command: str, timeout: Optional[float] = None + ) -> Tuple[str, str]: + """ + Execute a command on the remote SSH server. + :param command: The command to execute. + :param timeout: Optional timeout for command execution. + :return: A tuple containing standard output and standard error. + """ + if self.client is None: + logger.error( + "Not connected to any SSH server. Call connect() first.") + raise SSHCommandError("Not connected to any SSH server.") + + logger.info(f"Executing command: {command}") + try: + stdin, stdout, stderr = self.client.exec_command( + command, timeout=timeout) + output = stdout.read().decode("utf-8") + error = stderr.read().decode("utf-8") + logger.debug(f"Command output: {output}") + if error: + logger.warning(f"Command error output: {error}") + exit_status = stdout.channel.recv_exit_status() + if exit_status != 0: + logger.error( + f"Command '{command}' failed with exit status {exit_status}" + ) + raise SSHCommandError( + f"Command '{command}' failed with exit status {exit_status}" + ) + logger.success(f"Command '{command}' executed successfully.") + return output, error + except SSHException as e: + logger.error(f"Failed to execute command '{command}': {e}") + raise SSHCommandError( + f"Failed to execute command '{command}'") from e + except Exception as e: + logger.error(f"Unexpected error during command execution: {e}") + raise SSHCommandError( + "Unexpected error during command execution") from e + + def upload_file( + self, local_path: Union[str, Path], remote_path: Union[str, Path] + ): + """ + Upload a file to the remote SSH server using SFTP. + :param local_path: Path to the local file. + :param remote_path: Path on the remote server where the file will be uploaded. + """ + if self.sftp is None: + logger.error( + "SFTP session is not established. Call connect() first.") + raise SFTPError("SFTP session is not established.") + + logger.info(f"Uploading {local_path} to {remote_path}") + try: + self.sftp.put(str(local_path), str(remote_path)) + logger.success( + f"Successfully uploaded {local_path} to {remote_path}") + except FileNotFoundError as e: + logger.error(f"Local file not found: {e}") + raise SFTPError("Local file not found") from e + except SSHException as e: + logger.error(f"SFTP upload failed: {e}") + raise SFTPError("SFTP upload failed") from e + except Exception as e: + logger.error(f"Unexpected error during file upload: {e}") + raise SFTPError("Unexpected error during file upload") from e + + def download_file( + self, remote_path: Union[str, Path], local_path: Union[str, Path] + ): + """ + Download a file from the remote SSH server using SFTP. + :param remote_path: Path on the remote server to download. + :param local_path: Path on the local machine where the file will be saved. + """ + if self.sftp is None: + logger.error( + "SFTP session is not established. Call connect() first.") + raise SFTPError("SFTP session is not established.") + + logger.info(f"Downloading {remote_path} to {local_path}") + try: + self.sftp.get(str(remote_path), str(local_path)) + logger.success( + f"Successfully downloaded {remote_path} to {local_path}") + except FileNotFoundError as e: + logger.error(f"Remote file not found: {e}") + raise SFTPError("Remote file not found") from e + except SSHException as e: + logger.error(f"SFTP download failed: {e}") + raise SFTPError("SFTP download failed") from e + except Exception as e: + logger.error(f"Unexpected error during file download: {e}") + raise SFTPError("Unexpected error during file download") from e + + def list_remote_directory( + self, remote_path: Union[str, Path] + ) -> List[str]: + """ + List the contents of a remote directory using SFTP. + :param remote_path: Path on the remote server to list. + :return: A list of file and directory names. + """ + if self.sftp is None: + logger.error( + "SFTP session is not established. Call connect() first.") + raise SFTPError("SFTP session is not established.") + + logger.info(f"Listing directory: {remote_path}") + try: + files = self.sftp.listdir(str(remote_path)) + logger.success(f"Successfully listed directory: {remote_path}") + logger.debug(f"Directory contents: {files}") + return files + except FileNotFoundError as e: + logger.error(f"Remote directory not found: {e}") + raise SFTPError("Remote directory not found") from e + except SSHException as e: + logger.error(f"SFTP list directory failed: {e}") + raise SFTPError("SFTP list directory failed") from e + except Exception as e: + logger.error(f"Unexpected error during listing directory: {e}") + raise SFTPError("Unexpected error during listing directory") from e + + def create_remote_directory(self, remote_path: Union[str, Path]): + """ + Create a directory on the remote SSH server using SFTP. + :param remote_path: Path on the remote server where the directory will be created. + """ + if self.sftp is None: + logger.error( + "SFTP session is not established. Call connect() first.") + raise SFTPError("SFTP session is not established.") + + logger.info(f"Creating remote directory: {remote_path}") + try: + self.sftp.mkdir(str(remote_path)) + logger.success( + f"Successfully created remote directory: {remote_path}") + except FileExistsError: + logger.warning(f"Remote directory already exists: {remote_path}") + except SSHException as e: + logger.error(f"SFTP mkdir failed: {e}") + raise SFTPError("SFTP mkdir failed") from e + except Exception as e: + logger.error(f"Unexpected error during creating directory: {e}") + raise SFTPError( + "Unexpected error during creating directory") from e + + def delete_remote_file(self, remote_path: Union[str, Path]): + """ + Delete a file on the remote SSH server using SFTP. + :param remote_path: Path on the remote server of the file to delete. + """ + if self.sftp is None: + logger.error( + "SFTP session is not established. Call connect() first.") + raise SFTPError("SFTP session is not established.") + + logger.info(f"Deleting remote file: {remote_path}") + try: + self.sftp.remove(str(remote_path)) + logger.success(f"Successfully deleted remote file: {remote_path}") + except FileNotFoundError as e: + logger.error(f"Remote file not found: {e}") + raise SFTPError("Remote file not found") from e + except SSHException as e: + logger.error(f"SFTP delete failed: {e}") + raise SFTPError("SFTP delete failed") from e + except Exception as e: + logger.error(f"Unexpected error during file deletion: {e}") + raise SFTPError("Unexpected error during file deletion") from e + + def close(self): + """ + Close the SSH and SFTP connections. + """ + logger.info("Closing SSH and SFTP connections.") + try: + if self.sftp: + self.sftp.close() + logger.debug("SFTP connection closed.") + if self.client: + self.client.close() + logger.debug("SSH connection closed.") + logger.success("All connections closed successfully.") + except Exception as e: + logger.error(f"Error while closing connections: {e}") + raise SSHError("Error while closing connections") from e + + +def parse_args(): + """ + Parse command-line arguments. + :return: Parsed arguments. + """ + parser = argparse.ArgumentParser(description="SSH Client Tool") + parser.add_argument("--hostname", required=True, + help="Hostname or IP address") + parser.add_argument("--port", type=int, default=22, help="SSH port number") + parser.add_argument("--username", required=True, help="SSH username") + + auth_group = parser.add_mutually_exclusive_group(required=True) + auth_group.add_argument("--password", help="SSH password") + auth_group.add_argument("--key_file", help="Path to SSH private key file") + + subparsers = parser.add_subparsers( + dest="command", help="Available commands") + + # Execute command + exec_parser = subparsers.add_parser( + "exec", help="Execute a command on the remote server") + exec_parser.add_argument("cmd", help="Command to execute") + exec_parser.add_argument("--timeout", type=float, + default=None, help="Command timeout in seconds") + + # Upload file + upload_parser = subparsers.add_parser( + "upload", help="Upload a file to the remote server") + upload_parser.add_argument("local_path", help="Path to the local file") + upload_parser.add_argument("remote_path", help="Path on the remote server") + + # Download file + download_parser = subparsers.add_parser( + "download", help="Download a file from the remote server") + download_parser.add_argument( + "remote_path", help="Path on the remote server") + download_parser.add_argument( + "local_path", help="Path to save the downloaded file locally") + + # List directory + list_parser = subparsers.add_parser( + "list", help="List contents of a remote directory") + list_parser.add_argument("remote_path", help="Remote directory path") + + # Create directory + mkdir_parser = subparsers.add_parser( + "mkdir", help="Create a directory on the remote server") + mkdir_parser.add_argument( + "remote_path", help="Path of the remote directory to create") + + # Delete file + delete_parser = subparsers.add_parser( + "delete", help="Delete a file on the remote server") + delete_parser.add_argument( + "remote_path", help="Path of the remote file to delete") + + return parser.parse_args() + + +def main(): + """ + Main function to execute the SSH client operations based on command-line arguments. + """ + args = parse_args() + + try: + with SSHClient( + hostname=args.hostname, + port=args.port, + username=args.username, + password=args.password, + key_file=args.key_file, + ) as ssh: + if args.command == "exec": + output, error = ssh.execute_command( + args.cmd, timeout=args.timeout) + if output: + print("Output:\n", output) + if error: + print("Error:\n", error) + + elif args.command == "upload": + ssh.upload_file(args.local_path, args.remote_path) + + elif args.command == "download": + ssh.download_file(args.remote_path, args.local_path) + + elif args.command == "list": + contents = ssh.list_remote_directory(args.remote_path) + print("Directory Contents:") + for item in contents: + print(item) + + elif args.command == "mkdir": + ssh.create_remote_directory(args.remote_path) + + elif args.command == "delete": + ssh.delete_remote_file(args.remote_path) + + else: + logger.error("No valid command provided.") + sys.exit(1) + + except SSHError as e: + logger.critical(f"SSH operation failed: {e}") + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/modules/lithium.pytools/tools/unzip.py b/modules/lithium.pytools/tools/unzip.py new file mode 100644 index 00000000..a265258b --- /dev/null +++ b/modules/lithium.pytools/tools/unzip.py @@ -0,0 +1,340 @@ +import subprocess +from pathlib import Path +from typing import Union, List, Optional +from loguru import logger +import shutil +import argparse +import sys + + +# Define custom exception classes +class UnzipError(Exception): + """Base exception class for UnzipWrapper""" + + +class UnzipExtractionError(UnzipError): + """Exception for extraction failures""" + + +class UnzipListError(UnzipError): + """Exception for listing contents failures""" + + +class UnzipValidationError(UnzipError): + """Exception for parameter validation failures""" + + +class UnzipIntegrityError(UnzipError): + """Exception for archive integrity test failures""" + + +class UnzipDeleteError(UnzipError): + """Exception for deletion failures""" + + +class UnzipWrapper: + def __init__(self, executable: str = "unzip"): + """ + Initialize the unzip command-line tool wrapper + :param executable: Path to the unzip executable, defaults to "unzip" in the system PATH + """ + self.executable = executable + if not shutil.which(self.executable): + logger.error(f"Executable not found: {self.executable}") + raise UnzipValidationError( + f"Executable does not exist: {self.executable}") + logger.info(f"Using unzip executable: {self.executable}") + + def _run_command(self, args: List[str]) -> subprocess.CompletedProcess: + """ + Run the unzip command and capture output + :param args: Arguments to pass to unzip + :return: subprocess.CompletedProcess object containing the return code and output information + """ + command = [self.executable] + args + logger.debug(f"Running command: {' '.join(command)}") + try: + result = subprocess.run( + command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + check=True + ) + logger.debug(f"Command output: {result.stdout}") + return result + except subprocess.CalledProcessError as e: + logger.error(f"unzip command failed with exit code {e.returncode}") + logger.error(f"Error output: {e.stderr}") + raise UnzipError( + f"Command execution failed: {' '.join(command)}") from e + + def _validate_archive_exists(self, archive: Union[str, Path]) -> None: + """ + Validate that the archive exists + :param archive: Path to the archive + """ + if not Path(archive).exists(): + logger.error(f"Archive does not exist: {archive}") + raise UnzipValidationError(f"Archive does not exist: {archive}") + logger.debug("Archive exists") + + def extract(self, archive: Union[str, Path], destination: Union[str, Path], + password: Optional[str] = None, force: bool = False) -> None: + """ + Extract the specified archive to the destination folder + :param archive: Path to the archive + :param destination: Path to the extraction destination + :param password: Password for the archive (optional) + :param force: Whether to forcibly overwrite the destination directory if it exists + """ + logger.info( + f"Starting extraction of {archive} to {destination} with force={force}") + self._validate_archive_exists(archive) + destination_path = Path(destination) + + if destination_path.exists(): + if force: + logger.warning( + f"Destination directory exists and will be removed: {destination}") + try: + shutil.rmtree(destination_path) + logger.info( + f"Removed destination directory: {destination}") + except Exception as e: + logger.error( + f"Failed to remove destination directory: {e}") + raise UnzipExtractionError( + f"Failed to remove destination directory: {destination}") from e + else: + logger.error(f"Destination directory exists: {destination}") + raise UnzipValidationError( + f"Destination directory exists: {destination}") + + try: + destination_path.mkdir(parents=True, exist_ok=True) + args = ["-o", str(archive), "-d", str(destination)] + if password: + args.extend(["-P", password]) + logger.debug(f"Unzip arguments: {args}") + self._run_command(args) + logger.success( + f"Successfully extracted {archive} to {destination}") + except UnzipError as e: + logger.error(f"Extraction process failed: {e}") + raise UnzipExtractionError("Extraction failed") from e + + def list_contents(self, archive: Union[str, Path], + password: Optional[str] = None) -> str: + """ + List the contents of the archive + :param archive: Path to the archive + :param password: Password for the archive (optional) + :return: String representation of the archive contents + """ + logger.info(f"Listing contents of {archive}") + self._validate_archive_exists(archive) + args = ["-l", str(archive)] + if password: + args.extend(["-P", password]) + try: + result = self._run_command(args) + logger.success(f"Successfully listed contents of {archive}") + return result.stdout + except UnzipError as e: + logger.error(f"Listing contents failed: {e}") + raise UnzipListError("Listing contents failed") from e + + def test_integrity(self, archive: Union[str, Path], + password: Optional[str] = None) -> bool: + """ + Test the integrity of the archive + :param archive: Path to the archive + :param password: Password for the archive (optional) + :return: Whether the test was successful + """ + logger.info(f"Testing integrity of {archive}") + self._validate_archive_exists(archive) + args = ["-t", str(archive)] + if password: + args.extend(["-P", password]) + try: + result = self._run_command(args) + is_valid = "No errors detected" in result.stderr or "Everything is Ok" in result.stdout + if is_valid: + logger.success(f"Integrity test passed for {archive}") + else: + logger.warning(f"Integrity test failed for {archive}") + return is_valid + except UnzipError as e: + logger.error(f"Integrity test failed: {e}") + raise UnzipIntegrityError("Integrity test failed") from e + + def delete_archive(self, archive: Union[str, Path]) -> None: + """ + Delete the specified archive + :param archive: Path to the archive + """ + logger.info(f"Deleting archive: {archive}") + self._validate_archive_exists(archive) + try: + Path(archive).unlink() + logger.success(f"Successfully deleted archive: {archive}") + except Exception as e: + logger.error(f"Failed to delete archive: {e}") + raise UnzipDeleteError( + f"Failed to delete archive: {archive}") from e + + def update_archive(self, archive: Union[str, Path], files: List[Union[str, Path]], + add: bool = True, delete: bool = False) -> None: + """ + Update the archive by adding or deleting files + Note: unzip does not support updating archives. This method is a placeholder. + :param archive: Path to the archive + :param files: List of files to add or delete + :param add: Whether to add files + :param delete: Whether to delete files + """ + logger.warning("Update operation is not supported by unzip.") + raise NotImplementedError( + "Update operation is not supported by unzip.") + + def execute(self, action: str, files: Optional[List[Union[str, Path]]] = None, + archive: Optional[Union[str, Path]] = None, + destination: Optional[Union[str, Path]] = None, + password: Optional[str] = None, + force: bool = False, **kwargs) -> Optional[str]: + """ + Execute the corresponding unzip command based on the action type + :param action: Action type ('extract', 'list', 'test', 'delete') + :param files: List of files to add or delete (not supported) + :param archive: Path to the archive + :param destination: Path to the extraction destination (only for 'extract') + :param password: Password for the archive (optional) + :param force: Whether to forcibly overwrite the destination directory (only for 'extract') + :return: If the action is 'list', returns the archive contents; if 'test', returns integrity status; otherwise, no return value + """ + logger.info(f"Executing action: {action}") + try: + if action == 'extract': + if archive is None or destination is None: + raise UnzipValidationError( + "Extract action requires 'archive' and 'destination' parameters") + self.extract(archive, destination, password, force) + elif action == 'list': + if archive is None: + raise UnzipValidationError( + "List action requires 'archive' parameter") + return self.list_contents(archive, password) + elif action == 'test': + if archive is None: + raise UnzipValidationError( + "Test action requires 'archive' parameter") + return self.test_integrity(archive, password) + elif action == 'delete': + if archive is None: + raise UnzipValidationError( + "Delete action requires 'archive' parameter") + self.delete_archive(archive) + elif action == 'update': + if archive is None or files is None: + raise UnzipValidationError( + "Update action requires 'archive' and 'files' parameters") + add = kwargs.get('add', True) + delete = kwargs.get('delete', False) + self.update_archive(archive, files, add, delete) + else: + logger.error(f"Unsupported action type: {action}") + raise UnzipValidationError( + f"Unsupported action type: {action}") + except UnzipError as e: + logger.error(f"Action {action} failed: {e}") + raise e + + +def main(): + parser = argparse.ArgumentParser( + description="Unzip Command-Line Tool Wrapper") + subparsers = parser.add_subparsers(dest='action', help='Action types') + + # Extract action + extract_parser = subparsers.add_parser('extract', help='Extract files') + extract_parser.add_argument( + '-a', '--archive', required=True, help='Path to the archive') + extract_parser.add_argument( + '-d', '--destination', required=True, help='Path to the extraction destination') + extract_parser.add_argument( + '-p', '--password', help='Password for the archive (optional)') + extract_parser.add_argument( + '--force', action='store_true', help='Force overwrite if the destination directory exists') + + # List contents action + list_parser = subparsers.add_parser('list', help='List archive contents') + list_parser.add_argument( + '-a', '--archive', required=True, help='Path to the archive') + list_parser.add_argument( + '-p', '--password', help='Password for the archive (optional)') + + # Test integrity action + test_parser = subparsers.add_parser( + 'test', help='Test archive integrity') + test_parser.add_argument( + '-a', '--archive', required=True, help='Path to the archive') + test_parser.add_argument( + '-p', '--password', help='Password for the archive (optional)') + + # Delete action + delete_parser = subparsers.add_parser('delete', help='Delete archive') + delete_parser.add_argument( + '-a', '--archive', required=True, help='Path to the archive') + + args = parser.parse_args() + + if not args.action: + parser.print_help() + sys.exit(1) + + try: + zipper = UnzipWrapper() + except UnzipValidationError as e: + logger.critical(e) + sys.exit(1) + + try: + if args.action == 'extract': + zipper.execute( + action='extract', + archive=args.archive, + destination=args.destination, + password=args.password, + force=args.force + ) + elif args.action == 'list': + content = zipper.execute( + action='list', + archive=args.archive, + password=args.password + ) + print("Archive Contents:\n", content) + elif args.action == 'test': + is_valid = zipper.execute( + action='test', + archive=args.archive, + password=args.password + ) + print(f"Is the archive valid? {is_valid}") + elif args.action == 'delete': + zipper.execute( + action='delete', + archive=args.archive + ) + else: + logger.error(f"Unsupported action type: {args.action}") + sys.exit(1) + except UnzipError as e: + logger.error(f"Action {args.action} failed: {e}") + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/src/atom b/src/atom index cd7e56b2..8324c44b 160000 --- a/src/atom +++ b/src/atom @@ -1 +1 @@ -Subproject commit cd7e56b2e08881fb37a9dc8a6f654c67a498f131 +Subproject commit 8324c44b6589305faf953822c60c4be4d79a916e