diff --git a/src/bot.py b/src/bot.py index c0ac228..5b7ff23 100644 --- a/src/bot.py +++ b/src/bot.py @@ -156,6 +156,8 @@ def init_handlers(self): def asyncify(func): async def wrapper(*args, **kwargs): results = func(*args, **kwargs) + if asyncio.iscoroutine(results): + return await results return results return wrapper diff --git a/src/db/db_client.py b/src/db/db_client.py index baff241..80043d3 100644 --- a/src/db/db_client.py +++ b/src/db/db_client.py @@ -456,7 +456,7 @@ def _get_now_msk_naive() -> datetime: @staticmethod def _make_next_reminder_ts(weekday_num: int, time: str): hour, minute = map(int, time.split(":")) - today = datetime.today() + today = DBClient._get_now_msk_naive() next_reminder = today + timedelta(days=(weekday_num - today.weekday())) next_reminder = next_reminder.replace( diff --git a/src/jobs/backfill_telegram_user_ids_job.py b/src/jobs/backfill_telegram_user_ids_job.py index 8e571a0..3e005c2 100644 --- a/src/jobs/backfill_telegram_user_ids_job.py +++ b/src/jobs/backfill_telegram_user_ids_job.py @@ -1,5 +1,5 @@ +import asyncio import logging -import time from typing import Callable from ..app_context import AppContext @@ -10,7 +10,7 @@ class BackfillTelegramUserIdsJob(BaseJob): @staticmethod - def _execute( + async def _execute( app_context: AppContext, send: Callable[[str], None], called_from_handler=False ): """ @@ -75,9 +75,9 @@ def _execute( # Try to resolve username to user_id # Add delay to avoid rate limiting (telethon has limits) - time.sleep(0.2) # 200ms delay between requests + await asyncio.sleep(0.2) # 200ms delay between requests - result = tg_client.resolve_telegram_username(telegram_username) + result = await tg_client.resolve_telegram_username(telegram_username) if not result: # Result is None if: diff --git a/src/jobs/base_job.py b/src/jobs/base_job.py index 54c79a8..c0465de 100644 --- a/src/jobs/base_job.py +++ b/src/jobs/base_job.py @@ -28,13 +28,46 @@ def execute( try: logging_func(f"Job {module} started...") - cls._execute( + import inspect + import asyncio + + # This returns either a result (sync) or coroutine (async) + res = cls._execute( app_context, send, called_from_handler, *args if args else [], **kwargs if kwargs else {}, ) + + if inspect.isawaitable(res): + try: + # Check if we are running in the main event loop (e.g. called from Handler) + loop = asyncio.get_running_loop() + if loop.is_running(): + # We are in an async context (Handler). + # Return the coroutine so the caller (asyncify) can await it. + return res + except RuntimeError: + # No running loop. We are likely in a Scheduler thread. + pass + + # If we are here, we need to run the coroutine synchronously/threadsafe + # The coroutine likely uses TgClient which is bound to the Main Loop. + # So we must schedule it on the Main Loop and wait for result. + + # Check if app_context has tg_client and valid loop + if hasattr(app_context, "tg_client") and hasattr( + app_context.tg_client, "api_client" + ): + main_loop = app_context.tg_client.api_client.loop + if main_loop and main_loop.is_running(): + future = asyncio.run_coroutine_threadsafe(res, main_loop) + return future.result() + + # Fallback (e.g. testing or no loop running): run locally + return asyncio.run(res) + logging_func(f"Job {module} finished") except Exception as e: # should not raise exception, so that schedule module won't go mad retrying diff --git a/src/jobs/hr_check_chat_consistency_frozen_job.py b/src/jobs/hr_check_chat_consistency_frozen_job.py index b890476..839c914 100644 --- a/src/jobs/hr_check_chat_consistency_frozen_job.py +++ b/src/jobs/hr_check_chat_consistency_frozen_job.py @@ -13,11 +13,11 @@ class HRCheckChatConsistencyFrozenJob(BaseJob): @staticmethod - def _execute( + async def _execute( app_context: AppContext, send: Callable[[str], None], called_from_handler=False ): # get users that are in the main chat - chat_users = app_context.tg_client.get_main_chat_users() + chat_users = await app_context.tg_client.get_main_chat_users() # get Frozen users frozen_members = app_context.role_manager.get_members_for_role( Roles.FROZEN_MEMBER diff --git a/src/jobs/hr_check_chat_consistency_job.py b/src/jobs/hr_check_chat_consistency_job.py index 83f0dfd..7826a17 100644 --- a/src/jobs/hr_check_chat_consistency_job.py +++ b/src/jobs/hr_check_chat_consistency_job.py @@ -12,11 +12,11 @@ class HRCheckChatConsistencyJob(BaseJob): @staticmethod - def _execute( + async def _execute( app_context: AppContext, send: Callable[[str], None], called_from_handler=False ): # get users that are in the main chat - chat_users = app_context.tg_client.get_main_chat_users() + chat_users = await app_context.tg_client.get_main_chat_users() # get users that _should_ be in the main chat <==> roles include Active or Frozen active_members = app_context.role_manager.get_members_for_role( Roles.ACTIVE_MEMBER diff --git a/src/jobs/tg_analytics_report_job.py b/src/jobs/tg_analytics_report_job.py index 122a029..ad88f09 100644 --- a/src/jobs/tg_analytics_report_job.py +++ b/src/jobs/tg_analytics_report_job.py @@ -14,19 +14,30 @@ class TgAnalyticsReportJob(BaseJob): @staticmethod - def _execute( + async def _execute( app_context: AppContext, send: Callable[[str], None], called_from_handler=False ): - app_context.tg_client.api_client.loop.run_until_complete( - app_context.tg_client.api_client.connect() - ) - stats = app_context.tg_client.api_client.loop.run_until_complete( - app_context.tg_client.api_client.get_stats(app_context.tg_client.channel) - ) - entity = app_context.tg_client.api_client.loop.run_until_complete( - app_context.tg_client.api_client.get_entity(app_context.tg_client.channel) - ) - app_context.tg_client.api_client.disconnect() + # We use the raw api_client here, so we must manage connection manually + if not app_context.tg_client.api_client.is_connected(): + await app_context.tg_client.api_client.connect() + + try: + stats = await app_context.tg_client.api_client.get_stats( + app_context.tg_client.channel + ) + entity = await app_context.tg_client.api_client.get_entity( + app_context.tg_client.channel + ) + finally: + # Do not disconnect as other jobs might be using the client locally? + # But TgClient generally keeps it disconnected? + # Based on previous code, it disconnected. Let's keep it safe. + # Wait, TgClient.resolve_telegram_username connects and disconnects. + # So we should probably disconnect to avoid state accumulation if it's meant to be ephemeral. + # BUT if we are running the bot (TelegramSender), isn't the client connected? + # No, TelegramSender uses `bot` instance. TgClient uses `TelegramClient` (Userbot/MTProto). + # So they are separate. + await app_context.tg_client.api_client.disconnect() new_posts_count = len(stats.recent_message_interactions) followers_stats = TgAnalyticsReportJob._get_followers_stats(stats) message = load( diff --git a/src/tg/handler_registry.py b/src/tg/handler_registry.py index dfd92a2..6a8955a 100644 --- a/src/tg/handler_registry.py +++ b/src/tg/handler_registry.py @@ -1,7 +1,7 @@ from dataclasses import dataclass from typing import Callable, Optional, Literal -from ...consts import CommandCategories +from ..consts import CommandCategories from . import handlers diff --git a/src/tg/handlers/__init__.py b/src/tg/handlers/__init__.py index 3ab3803..380b372 100644 --- a/src/tg/handlers/__init__.py +++ b/src/tg/handlers/__init__.py @@ -44,6 +44,6 @@ # Plain text message handler from .user_message_handler import ( handle_callback_query, - handle_new_members, handle_user_message, ) +from .flow_handlers import handle_new_members diff --git a/src/tg/handlers/flow_handlers.py b/src/tg/handlers/flow_handlers.py index b9e16b6..a377c7a 100644 --- a/src/tg/handlers/flow_handlers.py +++ b/src/tg/handlers/flow_handlers.py @@ -806,6 +806,16 @@ def _handle_edit_action(self, chat_title, chat_id): return None def handle(self) -> Optional[PlainTextUserAction]: + # If user clicked a button (like "Create New" again) instead of typing + if not self.user_input: + if self.button: + # If valid known button from previous step, maybe we should just ignore or re-ask + # For now, let's just ask to enter ID again to avoid crash + reply(load("manager_reminders_handler__enter_chat_id"), self.update) + return PlainTextUserAction.MANAGE_REMINDERS__ENTER_CHAT_ID + # If no input and no button (shouldn't happen?), return same state + return PlainTextUserAction.MANAGE_REMINDERS__ENTER_CHAT_ID + try: chat_id = int(self.user_input) chat_title = DBClient().get_chat_name(chat_id) @@ -904,6 +914,15 @@ def handle(self) -> Optional[PlainTextUserAction]: self.command_data[consts.ManageRemindersData.CHOSEN_REMINDER_ID] ) DBClient().update_reminder(reminder_id, weekday=weekday_num, time=time) + weekday_name = self.command_data[consts.ManageRemindersData.WEEKDAY_NAME] + reply( + load( + "manage_reminders_handler__success_time", + weekday_name=weekday_name, + time=time, + ), + self.update, + ) return None button_yes = telegram.InlineKeyboardButton( @@ -932,7 +951,7 @@ def handle(self) -> Optional[PlainTextUserAction]: weekday_name = self.command_data[consts.ManageRemindersData.WEEKDAY_NAME] time = self.command_data[consts.ManageRemindersData.TIME] - if self.button == consts.ButtonValues.MANAGE_REMINDERS__TOGGLE_POLL__YES: + if self.button == consts.ButtonValues.MANAGE_REMINDERS__POLL__YES: if text is None: reminder_id = int( self.command_data[consts.ManageRemindersData.CHOSEN_REMINDER_ID] diff --git a/src/tg/handlers/user_message_handler.py b/src/tg/handlers/user_message_handler.py index d4dd006..a88b41d 100644 --- a/src/tg/handlers/user_message_handler.py +++ b/src/tg/handlers/user_message_handler.py @@ -12,13 +12,13 @@ logger = logging.getLogger(__name__) -def handle_callback_query( +async def handle_callback_query( update: telegram.Update, tg_context: telegram.ext.CallbackContext ): """ Handler for handling button callbacks. Redirects to handle_user_message """ - update.callback_query.answer() + await update.callback_query.answer() handle_user_message(update, tg_context, ButtonValues(update.callback_query.data)) diff --git a/src/tg/tg_client.py b/src/tg/tg_client.py index e06b0c0..aa0e922 100644 --- a/src/tg/tg_client.py +++ b/src/tg/tg_client.py @@ -38,17 +38,17 @@ def _update_from_config(self): self.sysblok_chats = self._tg_config["sysblok_chats"] self.channel = self._tg_config["channel"] - def _get_chat_users(self, chat_id: str) -> List[User]: - with self.api_client: - users = self.api_client.loop.run_until_complete( - self.api_client.get_participants(chat_id) - ) + async def _get_chat_users(self, chat_id: str) -> List[User]: + async with self.api_client: + users = await self.api_client.get_participants(chat_id) return users - def get_main_chat_users(self) -> List[User]: - return self._get_chat_users(self.sysblok_chats["main_chat"]) + async def get_main_chat_users(self) -> List[User]: + return await self._get_chat_users(self.sysblok_chats["main_chat"]) - def resolve_telegram_username(self, username: str) -> Optional[Tuple[int, str]]: + async def resolve_telegram_username( + self, username: str + ) -> Optional[Tuple[int, str]]: """ Resolve Telegram username to user ID using telethon. @@ -67,12 +67,10 @@ def resolve_telegram_username(self, username: str) -> Optional[Tuple[int, str]]: if not was_connected: # Connect if not already connected - self.api_client.loop.run_until_complete(self.api_client.connect()) + await self.api_client.connect() try: - entity = self.api_client.loop.run_until_complete( - self.api_client.get_entity(normalized) - ) + entity = await self.api_client.get_entity(normalized) # Only process User entities, skip channels, groups, bots, etc. if entity and isinstance(entity, User): @@ -88,7 +86,7 @@ def resolve_telegram_username(self, username: str) -> Optional[Tuple[int, str]]: finally: # Only disconnect if we connected it if not was_connected: - self.api_client.disconnect() + await self.api_client.disconnect() except Exception as e: logger.warning(f"Failed to resolve username {username}: {e}", exc_info=True) return None