Skip to content

Commit

Permalink
Merge pull request #7 from arthurlt/cleanup
Browse files Browse the repository at this point in the history
removed extra comments&code, updated yt-dlp options, handle failed urls
  • Loading branch information
arthurlt authored Dec 15, 2023
2 parents dc768c6 + 733683b commit 5dfa195
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 86 deletions.
165 changes: 79 additions & 86 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from enum import Enum
from pathlib import Path
from shutil import rmtree
from typing import Any

from aiogram import Bot, Dispatcher, flags, types
from aiogram.enums import ParseMode
Expand All @@ -20,14 +21,17 @@

fifty_mb = 52428800


class yt_dlp_file(Enum):
VIDEO = 'video.mp4'
THUMBNAIL = 'video.jpg'
JSON = 'video.info.json'


class EntityTypeFilter(Filter):
"""
"""

def __init__(self, filter_type: str) -> None:
self.filter_type = filter_type

Expand All @@ -38,60 +42,59 @@ async def __call__(self, message: types.Message) -> bool:
print(message.entities)
return any([self.filter_type in entity.type for entity in message.entities])

def validate_string(string: str) -> bool:

def validate_string(string: str | None) -> bool:
"""
Checks if a string is not None and not empty.
Args:
string: The string to validate.
string: The string to validate.
Returns:
True if the string is not None and not empty, False otherwise.
True if the string is not None and not empty, False otherwise.
"""
return string is not None and len(string) > 0


def get_substring(string: str, offset: int, length: int) -> str:
"""
Extracts a substring from a string based on offset and length.
Args:
string: The string to extract from.
offset: The starting position of the substring.
length: The length of the substring.
string: The string to extract from.
offset: The starting position of the substring.
length: The length of the substring.
Returns:
The extracted substring.
The extracted substring.
"""
if offset < 0 or offset >= len(string):
raise ValueError("Offset is out of bounds")
end_index = min(offset + length, len(string))
return string[offset:end_index]


# TODO: shrink caption to be no more than 3 lines
def generate_caption(info: dict, user: types.User, reply=False) -> str:
def generate_caption(info: dict[str, Any], user: types.User, reply=False) -> str:
"""
Generates a caption from a dictionary of information.
Args:
info: A dictionary containing the following keys:
info: A dictionary containing the following keys:
- title: The title of the content.
- description: The description of the content.
Returns:
A string containing the caption without hashtags.
A string containing the caption without hashtags.
"""
# Define the regular expression to match hashtags
regex = re.compile(r"#\w+\s*")

# Initialize the caption variable
caption = ""

# Check if the description is valid
if not validate_string(info["description"]):
# Use the title if no valid description is provided
# dict.get() doesn't throw an exception if the key is missing
if not validate_string(info.get("description")):
caption = info["title"]
else:
# Use the description if available
caption = info["description"]

if reply:
Expand All @@ -106,8 +109,10 @@ def generate_caption(info: dict, user: types.User, reply=False) -> str:
regex.sub('', caption)
)


async def has_delete_permissions(message: types.Message) -> bool:
# Something went wrong
"""
"""
if message.bot is None:
return False

Expand All @@ -121,95 +126,61 @@ async def has_delete_permissions(message: types.Message) -> bool:
else:
return False

# def launch_yt_dlp(dir="/tmp", extra_opts={}):
# if not os.path.exists(dir):
# try:
# os.makedirs(dir)
# except Exception as e:
# print(f"Exception during directory creation: {e}")
# print("Setting dir to /tmp as fallback")
# dir = "/tmp"
# ydl_opts = {
# 'final_ext': 'mp4',
# 'fragment_retries': 10,
# 'ignoreerrors': 'only_download',
# 'paths': {'home': dir},
# 'postprocessors': [
# {'key': 'FFmpegVideoRemuxer', 'preferedformat': 'mp4'}
# ],
# 'restrictfilenames': True,
# 'retries': 10,
# 'trim_file_name': 8
# }
# if extra_opts:
# ydl_opts.
# with YoutubeDL(ydl_opts) as ydl:
# return ydl


async def run_yt_dlp(video_url: str, simulate=False, dir="/tmp") -> asyncio.subprocess.Process:
"""
Downloads a YouTube video using yt-dlp.
Args:
video_url: The URL of the video to download.
simulate: (Optional) Whether to simulate the download without actually downloading the video.
dir: (Optional) The directory to download the video to.
video_url: The URL of the video to download.
simulate: (Optional) Whether to simulate the download without actually downloading the video.
dir: (Optional) The directory to download the video to.
Returns:
A subprocess.CompletedProcess instance containing the process results.
A subprocess.CompletedProcess instance containing the process results.
"""
# Define yt-dlp arguments
args = [
"--write-info-json",
# format conversion is failing: https://github.com/yt-dlp/yt-dlp/issues/6866
#"--write-thumbnail",
#"--convert-thumbnails",
# "--write-thumbnail",
# "--convert-thumbnails",
# "jpg",
"--format",
"bestvideo*[height<=?1080][filesize<40M]+bestaudio/best",
#"--format-sort",
#"hasvid,hasaud,quality",
"--max-filesize",
"50M",
# "--format",
# "bestvideo*[filesize<?30M]+bestaudio*/best[filesize<?40M]",
"--format-sort",
"filesize:40M",
"--merge-output-format",
"mp4",
"--recode-video",
"mp4",
"--max-filesize",
"50M",
"--restrict-filenames",
"--trim-filenames",
"10",
"--output",
"video.%(ext)s",
video_url,
]
# Add simulate flag if requested
if simulate:
args.append("--simulate")
# Create the download directory if it does not exist
if not os.path.exists(dir):
try:
os.makedirs(dir)
except Exception as e:
print(f"Exception during directory creation: {e}")
print("Setting dir to /tmp as fallback")
dir = "/tmp"
# Run yt-dlp asynchronously
process = await asyncio.create_subprocess_exec(
"yt-dlp", *args, cwd=dir
)
await process.wait()
# TODO: also return paths for files
return process

async def try_delete(message: types.Message) -> bool:
try:
await message.delete()
except Exception as e:
print(f"Exception during delete: {e}")
return False
else:
return True

# TODO: support text_link type
# looks like that then provides the url via the entity.url
# TODO: if we're able to send the video delete the original message
# make sure the OP gets credit
# also make the caption link to the video
# TODO: specifically handle slideshow tiktoks
@dp.message(EntityTypeFilter('url'))
@flags.chat_action(action="upload_video")
Expand All @@ -222,28 +193,36 @@ async def url_handler(message: types.Message) -> None:
return
if message.from_user is None:
return
can_delete = await has_delete_permissions(message)
failed_urls = []
for entity in message.entities:
print(entity)
if entity.type != "url":
print(f"Message entity wasn't a url type")
continue
url = get_substring(message.text, entity.offset, entity.length)
download_dir = f"/tmp/yt-dlp-{message.message_id}-{hash(url)}";
download_dir = f"/tmp/yt-dlp-{message.message_id}-{hash(url)}"
video_file = Path(f"{download_dir}/video.mp4")
print(f"{url} received from {message.from_user.username} in {message.chat.title}")

download_result = await run_yt_dlp(video_url=url,dir=download_dir)
download_result = await run_yt_dlp(video_url=url, dir=download_dir)
if download_result.returncode != 0:
print(f"yt-dlp failed to download {url}\n{download_result.stderr}")
failed_urls.append(url)
continue

try:
with open(f"{download_dir}/video.info.json") as j:
video_info = json.load(j)
except Exception as e:
print(f"Exception during opening JSON: {e}")
failed_urls.append(url)
continue

# TODO: make this a try/except block
if not video_file.is_file():
print(f"video_file is missing")
failed_urls.append(url)
continue

# TODO: upload video file and respond separately
Expand All @@ -253,20 +232,36 @@ async def url_handler(message: types.Message) -> None:
duration=int(video_info['duration']),
width=video_info['width'],
height=video_info['height'],
# TODO: send bool for reply to get different caption when replying
caption=generate_caption(video_info, message.from_user),
caption=generate_caption(
video_info, message.from_user, reply=not can_delete),
disable_notification=True,
reply_to_message_id=None if await try_delete(message) else message.message_id
reply_to_message_id=None if can_delete else message.message_id
)
except Exception as e:
print(f"Exception during answer_video: {e}")
await message.reply(
text="I'm sorry, there was an error... \n" + message.text,
disable_web_page_preview=False,
allow_sending_without_reply=True
)
failed_urls.append(url)
continue
finally:
rmtree(download_dir)
if failed_urls:
print(f"URLs that failed: {failed_urls}")
if len(failed_urls) == len([["url" in entity.type for entity in message.entities]]):
print(f"All provided URLs failed processing")
return
for url in failed_urls:
try:
await message.answer(
text=f"There was an error processing this:\n{url}",
disable_notification=True,
disable_web_page_preview=False
)
except Exception as e:
print(f"Exception during answer: {e}")
try:
await message.delete()
except Exception as e:
print(f"Exception during delete: {e}")


@dp.message(CommandStart())
async def command_start_handler(message: types.Message) -> None:
Expand All @@ -275,28 +270,26 @@ async def command_start_handler(message: types.Message) -> None:
"""
await message.answer(f"DM me your videos, or add me to your group chats!")


@dp.message(Command("test"))
async def command_test_handler(message: types.Message) -> None:
if await has_delete_permissions(message):
await message.answer("can delete messages")
else:
await message.answer("can't delete messages")
# Put stuff you're testing in here
pass


async def main() -> None:
"""
"""
# Bot token can be obtained via https://t.me/BotFather
TOKEN = os.getenv("BOT_TOKEN")

if not validate_string(TOKEN):
raise ValueError('ENV not set')

# Initialize Bot instance with a default parse mode which will be passed to all API calls
bot = Bot(TOKEN, parse_mode=ParseMode.HTML)
bot = Bot(TOKEN, parse_mode=ParseMode.HTML) # pyright: ignore
# And the run events dispatching
await dp.start_polling(bot)


if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
asyncio.run(main())
asyncio.run(main())
1 change: 1 addition & 0 deletions vidfetch-bot.container
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ Description=A Telegram bot
[Container]
Image=ghcr.io/arthurlt/vidfetch_bot:latest
Environment="BOT_TOKEN="
Pull=newer
#AutoUpdate=registry

[Service]
Expand Down

0 comments on commit 5dfa195

Please sign in to comment.