Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 120 additions & 30 deletions userbot/plugins/video_dl.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import re
import subprocess
import tempfile
import logging

import aiohttp
from pyrogram import filters
Expand All @@ -13,6 +14,8 @@

# User Agent for requests and yt-dlp
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
MAX_TITLE_LENGTH = 50
logger = logging.getLogger(__name__)

# YouTube Shorts URL regex pattern
youtube_shorts_regex = r'https?://(www\.)?youtube\.com/shorts/[a-zA-Z0-9_-]+/?(\?.*)?'
Expand Down Expand Up @@ -68,6 +71,77 @@ async def process_urls(url):

return download_url


async def download_tiktok_alternative(url, temp_dir, status_msg=None):
"""Alternative TikTok download using third-party API"""
video_id = None
if "/video/" in url:
video_id = url.split("/video/")[1].split("?")[0].split("/")[0]

if not video_id:
raise Exception("Could not extract video ID from TikTok URL")

if status_msg:
await status_msg.edit(
f"🔄 Using alternative TikTok API...\n`{url}`",
link_preview_options=LinkPreviewOptions(is_disabled=True)
)

api = {
"name": "TikWM API",
"url": "https://www.tikwm.com/api/",
"data": {"url": url, "hd": 1}
}

timeout = aiohttp.ClientTimeout(total=30)
headers = {
"User-Agent": USER_AGENT
}

last_error = None
async with aiohttp.ClientSession(timeout=timeout, headers=headers) as session:
try:
if status_msg:
await status_msg.edit(
f"🔄 Trying {api['name']}...\n`{url}`",
link_preview_options=LinkPreviewOptions(is_disabled=True)
)

async with session.post(api["url"], json=api.get("data", {})) as response:
if response.status == 200:
data = await response.json()

if "data" in data and data["data"]:
video_url = data["data"].get("hdplay") or data["data"].get("play")
title = data["data"].get("title", "TikTok Video")

if video_url:
if status_msg:
await status_msg.edit(
f"⬇️ Downloading from {api['name']}...\n`{url}`",
link_preview_options=LinkPreviewOptions(is_disabled=True)
)

async with session.get(video_url) as video_response:
if video_response.status == 200:
normalized_title = title.replace(" ", "_")
sanitized_title = "".join(c for c in normalized_title if c.isalnum() or c in ('-', '_')).strip()
if not sanitized_title:
sanitized_title = "TikTok_Video"
filename = f"{sanitized_title[:MAX_TITLE_LENGTH]}[{video_id}].mp4"
filepath = os.path.join(temp_dir, filename)

with open(filepath, 'wb') as f:
f.write(await video_response.read())

return filepath

except Exception as e:
last_error = e

raise Exception(f"Alternative TikTok download failed: {last_error}") if last_error else Exception("Alternative TikTok download failed")


@UserBot.on_message(filters.regex(video_url_regex) & filters.me)
async def video_downloader(bot: UserBot, message: Message, from_reply=False):
# Extract the video URL from the message
Expand Down Expand Up @@ -123,6 +197,7 @@ async def video_downloader(bot: UserBot, message: Message, from_reply=False):

# Create a temporary directory for downloading
with tempfile.TemporaryDirectory() as temp_dir:
downloaded_via_alt = None
yt_dlp_args = [
"yt-dlp",
"--user-agent", USER_AGENT,
Expand All @@ -142,28 +217,24 @@ async def video_downloader(bot: UserBot, message: Message, from_reply=False):
yt_dlp_args.append(SOCKS5_PROXY)

try:
# Update status (without preview)
await status_msg.edit(
f"⬇️ Downloading: {download_url}",
link_preview_options=LinkPreviewOptions(is_disabled=True) # Disable link preview
)

# Download the video using yt-dlp
process = subprocess.run(
yt_dlp_args,
capture_output=True,
text=True,
check=False
)
if platform == "TikTok":
try:
downloaded_via_alt = await download_tiktok_alternative(download_url, temp_dir, status_msg)
except Exception as e:
logger.warning(f"TikWM API download failed for {download_url}: {e}", exc_info=True)
await status_msg.edit(
f"⚠️ Alternative TikTok method failed, trying yt-dlp...\n`{download_url}`",
link_preview_options=LinkPreviewOptions(is_disabled=True) # Disable link preview
)

if process.returncode != 0:
if not downloaded_via_alt:
# Update status (without preview)
await status_msg.edit(
f"⚠️ Failed to download: {process.stderr[:500]}...\n\nTrying with different options...",
f"⬇️ Downloading: {download_url}",
link_preview_options=LinkPreviewOptions(is_disabled=True) # Disable link preview
)

# Try with --no-check-certificate if it failed
yt_dlp_args.append('--no-check-certificate')
# Download the video using yt-dlp
process = subprocess.run(
yt_dlp_args,
capture_output=True,
Expand All @@ -173,25 +244,44 @@ async def video_downloader(bot: UserBot, message: Message, from_reply=False):

if process.returncode != 0:
await status_msg.edit(
f"❌ Download failed. Error: {process.stderr[:500]}...",
f"⚠️ Failed to download: {process.stderr[:500]}...\n\nTrying with different options...",
link_preview_options=LinkPreviewOptions(is_disabled=True) # Disable link preview
)

# Try with --no-check-certificate if it failed
yt_dlp_args.append('--no-check-certificate')
process = subprocess.run(
yt_dlp_args,
capture_output=True,
text=True,
check=False
)

if process.returncode != 0:
await status_msg.edit(
f"❌ Download failed. Error: {process.stderr[:500]}...",
link_preview_options=LinkPreviewOptions(is_disabled=True) # Disable link preview
)
await asyncio.sleep(5)
await status_msg.delete()
return

# Get the downloaded file
if downloaded_via_alt:
video_path = downloaded_via_alt
downloaded_files = [os.path.basename(downloaded_via_alt)]
else:
downloaded_files = os.listdir(temp_dir)
if not downloaded_files:
await status_msg.edit(
"❌ No files downloaded.",
link_preview_options=LinkPreviewOptions(is_disabled=True) # Disable link preview
)
await asyncio.sleep(5)
await status_msg.delete()
return

# Get the downloaded file
downloaded_files = os.listdir(temp_dir)
if not downloaded_files:
await status_msg.edit(
"❌ No files downloaded.",
link_preview_options=LinkPreviewOptions(is_disabled=True) # Disable link preview
)
await asyncio.sleep(5)
await status_msg.delete()
return

video_path = os.path.join(temp_dir, downloaded_files[0])
video_path = os.path.join(temp_dir, downloaded_files[0])

# Extract the caption without extension
file_name = os.path.splitext(downloaded_files[0])[0]
Expand Down