diff --git a/userbot/plugins/video_dl.py b/userbot/plugins/video_dl.py index 29566a5b..695433eb 100644 --- a/userbot/plugins/video_dl.py +++ b/userbot/plugins/video_dl.py @@ -3,6 +3,7 @@ import re import subprocess import tempfile +import logging import aiohttp from pyrogram import filters @@ -13,6 +14,8 @@ # User Agent for requests and yt-dlp USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" +MAX_TITLE_LENGTH = 50 +logger = logging.getLogger(__name__) # YouTube Shorts URL regex pattern youtube_shorts_regex = r'https?://(www\.)?youtube\.com/shorts/[a-zA-Z0-9_-]+/?(\?.*)?' @@ -68,6 +71,77 @@ async def process_urls(url): return download_url + +async def download_tiktok_alternative(url, temp_dir, status_msg=None): + """Alternative TikTok download using third-party API""" + video_id = None + if "/video/" in url: + video_id = url.split("/video/")[1].split("?")[0].split("/")[0] + + if not video_id: + raise Exception("Could not extract video ID from TikTok URL") + + if status_msg: + await status_msg.edit( + f"🔄 Using alternative TikTok API...\n`{url}`", + link_preview_options=LinkPreviewOptions(is_disabled=True) + ) + + api = { + "name": "TikWM API", + "url": "https://www.tikwm.com/api/", + "data": {"url": url, "hd": 1} + } + + timeout = aiohttp.ClientTimeout(total=30) + headers = { + "User-Agent": USER_AGENT + } + + last_error = None + async with aiohttp.ClientSession(timeout=timeout, headers=headers) as session: + try: + if status_msg: + await status_msg.edit( + f"🔄 Trying {api['name']}...\n`{url}`", + link_preview_options=LinkPreviewOptions(is_disabled=True) + ) + + async with session.post(api["url"], json=api.get("data", {})) as response: + if response.status == 200: + data = await response.json() + + if "data" in data and data["data"]: + video_url = data["data"].get("hdplay") or data["data"].get("play") + title = data["data"].get("title", "TikTok Video") + + if video_url: + if status_msg: + await status_msg.edit( + f"⬇️ Downloading from {api['name']}...\n`{url}`", + link_preview_options=LinkPreviewOptions(is_disabled=True) + ) + + async with session.get(video_url) as video_response: + if video_response.status == 200: + normalized_title = title.replace(" ", "_") + sanitized_title = "".join(c for c in normalized_title if c.isalnum() or c in ('-', '_')).strip() + if not sanitized_title: + sanitized_title = "TikTok_Video" + filename = f"{sanitized_title[:MAX_TITLE_LENGTH]}[{video_id}].mp4" + filepath = os.path.join(temp_dir, filename) + + with open(filepath, 'wb') as f: + f.write(await video_response.read()) + + return filepath + + except Exception as e: + last_error = e + + raise Exception(f"Alternative TikTok download failed: {last_error}") if last_error else Exception("Alternative TikTok download failed") + + @UserBot.on_message(filters.regex(video_url_regex) & filters.me) async def video_downloader(bot: UserBot, message: Message, from_reply=False): # Extract the video URL from the message @@ -123,6 +197,7 @@ async def video_downloader(bot: UserBot, message: Message, from_reply=False): # Create a temporary directory for downloading with tempfile.TemporaryDirectory() as temp_dir: + downloaded_via_alt = None yt_dlp_args = [ "yt-dlp", "--user-agent", USER_AGENT, @@ -142,28 +217,24 @@ async def video_downloader(bot: UserBot, message: Message, from_reply=False): yt_dlp_args.append(SOCKS5_PROXY) try: - # Update status (without preview) - await status_msg.edit( - f"⬇️ Downloading: {download_url}", - link_preview_options=LinkPreviewOptions(is_disabled=True) # Disable link preview - ) - - # Download the video using yt-dlp - process = subprocess.run( - yt_dlp_args, - capture_output=True, - text=True, - check=False - ) + if platform == "TikTok": + try: + downloaded_via_alt = await download_tiktok_alternative(download_url, temp_dir, status_msg) + except Exception as e: + logger.warning(f"TikWM API download failed for {download_url}: {e}", exc_info=True) + await status_msg.edit( + f"⚠️ Alternative TikTok method failed, trying yt-dlp...\n`{download_url}`", + link_preview_options=LinkPreviewOptions(is_disabled=True) # Disable link preview + ) - if process.returncode != 0: + if not downloaded_via_alt: + # Update status (without preview) await status_msg.edit( - f"⚠️ Failed to download: {process.stderr[:500]}...\n\nTrying with different options...", + f"⬇️ Downloading: {download_url}", link_preview_options=LinkPreviewOptions(is_disabled=True) # Disable link preview ) - # Try with --no-check-certificate if it failed - yt_dlp_args.append('--no-check-certificate') + # Download the video using yt-dlp process = subprocess.run( yt_dlp_args, capture_output=True, @@ -173,25 +244,44 @@ async def video_downloader(bot: UserBot, message: Message, from_reply=False): if process.returncode != 0: await status_msg.edit( - f"❌ Download failed. Error: {process.stderr[:500]}...", + f"⚠️ Failed to download: {process.stderr[:500]}...\n\nTrying with different options...", + link_preview_options=LinkPreviewOptions(is_disabled=True) # Disable link preview + ) + + # Try with --no-check-certificate if it failed + yt_dlp_args.append('--no-check-certificate') + process = subprocess.run( + yt_dlp_args, + capture_output=True, + text=True, + check=False + ) + + if process.returncode != 0: + await status_msg.edit( + f"❌ Download failed. Error: {process.stderr[:500]}...", + link_preview_options=LinkPreviewOptions(is_disabled=True) # Disable link preview + ) + await asyncio.sleep(5) + await status_msg.delete() + return + + # Get the downloaded file + if downloaded_via_alt: + video_path = downloaded_via_alt + downloaded_files = [os.path.basename(downloaded_via_alt)] + else: + downloaded_files = os.listdir(temp_dir) + if not downloaded_files: + await status_msg.edit( + "❌ No files downloaded.", link_preview_options=LinkPreviewOptions(is_disabled=True) # Disable link preview ) await asyncio.sleep(5) await status_msg.delete() return - # Get the downloaded file - downloaded_files = os.listdir(temp_dir) - if not downloaded_files: - await status_msg.edit( - "❌ No files downloaded.", - link_preview_options=LinkPreviewOptions(is_disabled=True) # Disable link preview - ) - await asyncio.sleep(5) - await status_msg.delete() - return - - video_path = os.path.join(temp_dir, downloaded_files[0]) + video_path = os.path.join(temp_dir, downloaded_files[0]) # Extract the caption without extension file_name = os.path.splitext(downloaded_files[0])[0]