Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 64 additions & 61 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
@@ -1,86 +1,89 @@
import asyncio
import json
import os
import logging

import pytest
from pytest_report import PytestReport, PytestTestStatus
from telethon import TelegramClient
from telethon.sessions import StringSession

from src.utils.singleton import Singleton
from src.config_manager import ConfigManager

if os.path.exists("config_override_integration_tests.json"):
with open("config_override_integration_tests.json") as config_override:
config = json.load(config_override)["telegram"]
else:
config = json.loads(os.environ["CONFIG_OVERRIDE"])["telegram"]
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

config_manager = ConfigManager("./config.json", "./config_override_integration_tests.json")
config_manager.load_config_with_override()
config = config_manager.get_telegram_config()

api_id = int(config["api_id"])
api_hash = config["api_hash"]
api_session = config["api_session"]
telegram_chat_id = int(config["error_logs_recipients"][0])
telegram_bot_name = config.get("handle", "")


class WrappedTelegramClientAsync(Singleton):
def __init__(self):
self.client = TelegramClient(
StringSession(api_session), api_id, api_hash, sequential_updates=True
)

async def __aenter__(self):
await self.client.connect()
await self.client.get_me()
return self.client

async def __aexit__(self, exc_t, exc_v, exc_tb):
await self.client.disconnect()
await self.client.disconnected


@pytest.fixture(scope="session")
async def telegram_client() -> TelegramClient:
async with WrappedTelegramClientAsync() as client:
yield client


@pytest.fixture(scope="session")
async def conversation(telegram_client):
async with telegram_client.conversation(telegram_bot_name) as conv:
yield conv

async def conversation():
"""
Provides a completely fresh Telegram client and conversation for each test function.
"""
client = TelegramClient(StringSession(api_session), api_id, api_hash, sequential_updates=True)
await client.connect()
try:
bot_entity = await client.get_entity(telegram_bot_name)
async with client.conversation(bot_entity, timeout=10) as conv:
yield conv
finally:
await client.disconnect()

def pytest_sessionfinish(session, exitstatus):
passed = exitstatus == pytest.ExitCode.OK
print("\nrun status code:", exitstatus)
passed = exitstatus == 0
logger.info(f"Pytest session finished with status code: {exitstatus}")
PytestReport().mark_finish()
asyncio.run(report_test_result(passed))
try:
asyncio.run(report_test_result(passed))
except Exception:
logger.error("FATAL: Could not send test report to Telegram.", exc_info=True)


async def report_test_result(passed: bool):
async with WrappedTelegramClientAsync() as client:
async with client.conversation(telegram_chat_id, timeout=30) as conv:
telegram_bot_mention = (
f"@{telegram_bot_name}" if telegram_bot_name else "Бот"
"""
Sends the test report using a new, completely isolated client.
"""
report_client = TelegramClient(StringSession(api_session), api_id, api_hash)
try:
await report_client.connect()
report_chat_entity = await report_client.get_entity(telegram_chat_id)

telegram_bot_mention = f"@{telegram_bot_name}"

report_path = "./integration_test_report.txt"
with open(report_path, "w", encoding="utf-8") as f:
json.dump(PytestReport().data, f, indent=4, ensure_ascii=False)

if passed:
caption = f"{telegram_bot_mention} протестирован. Все тесты пройдены успешно."
await report_client.send_file(report_chat_entity, report_path, caption=caption)
else:
caption = f"{telegram_bot_mention} разломан. Подробности в файле и в сообщении ниже."

failure_details = "\n".join(
["Сломались тесты:"]
+ [
f'\n--- FAIL: {test["cmd"]}\n'
f'-> {test["exception_class"]}\n'
f'-> {test["exception_message"]}'
for test in PytestReport().data.get("tests", [])
if test.get("status") == PytestTestStatus.FAILED
]
)
if passed:
message = f"{telegram_bot_mention} протестирован."
else:
message = "\n".join(
[f"{telegram_bot_mention} разломан.", "Сломались команды:"]
+ [
f"{test['cmd']}{telegram_bot_mention}\n"
f"{test['exception_class']}\n{test['exception_message']}"
for test in PytestReport().data["tests"]
if test["status"] == PytestTestStatus.FAILED
]
)
with open("./integration_test_report.txt", "w") as integration_test_report:
json.dump(
PytestReport().data,
integration_test_report,
indent=4,
sort_keys=True,
ensure_ascii=False,
)
await conv.send_file("./integration_test_report.txt", caption=message)
if not failure_details.strip() or len(PytestReport().data.get("tests", [])) == 0:
failure_details = "Детали в логах. Вероятно, тесты не были собраны, или ошибка произошла в фикстуре."

await report_client.send_file(report_chat_entity, report_path, caption=caption)
if failure_details:
await report_client.send_message(report_chat_entity, failure_details)
finally:
if report_client.is_connected():
await report_client.disconnect()
183 changes: 122 additions & 61 deletions tests/integration/test_telegram_bot.py
Original file line number Diff line number Diff line change
@@ -1,85 +1,146 @@
import asyncio
import nest_asyncio
import time
from typing import List, Dict
import re
import os

import nest_asyncio
import pytest
from pytest_report import PytestReport, PytestTestStatus
from telethon.tl.custom.message import Message
from telethon.errors import TimeoutError
from telethon.tl.custom.conversation import Conversation
from telethon import events

from pytest_report import PytestReport, PytestTestStatus
from src.config_manager import ConfigManager
from src.sheets.sheets_client import GoogleSheetsClient
from src.strings import StringsDBClient, load

def strip_html(text: str) -> str:
"""A simple helper to remove HTML tags for plain text comparison."""
return re.sub('<[^<]+?>', '', text)

def setup_strings_for_test_run():
"""Initializes and populates the string DB right before a test."""
ConfigManager.drop_instance()
StringsDBClient.drop_instance()
GoogleSheetsClient.drop_instance()

config_manager = ConfigManager("./config.json", "./config_override_integration_tests.json")
config_manager.load_config_with_override()

strings_db_config = config_manager.get_strings_db_config()
sheets_config = config_manager.get_sheets_config()

if not (strings_db_config and sheets_config):
pytest.skip("Skipping test: strings_db_config or sheets_config is missing.")

async def _test_command(report_state, conversation, command: str, timeout=120):
test_report = {"cmd": command}
strings_db_client = StringsDBClient(strings_db_config)
sheets_client = GoogleSheetsClient(sheets_config)
strings_db_client.fetch_strings_sheet(sheets_client)

async def _test_command_flow(report_state: PytestReport, conversation: Conversation, command_flow: List[Dict], timeout=120):
"""
Tests a sequence of user actions using the dictionary-based schema.
"""
setup_strings_for_test_run()

command_str = " -> ".join([f"{step['type']}: '{step['input']}'" for step in command_flow])
test_report = {"cmd": command_str}
start_time = time.time()

last_bot_message: Message = None

try:
await conversation.send_message(command)
resp: Message = await conversation.get_response(timeout=timeout)
await asyncio.sleep(1)
test_report["response"] = "\\n".join(resp.raw_text.splitlines())
assert resp.raw_text
await conversation.send_message("/clean_chat_data")
await conversation.get_response()

for step in command_flow:
action_type = step['type']
action_input = step['input']
expected_response_id = step['expected']

if action_type == 'message':
await conversation.send_message(action_input)
last_bot_message = await conversation.get_response(timeout=timeout)

elif action_type == 'click':
if not last_bot_message or not last_bot_message.buttons:
pytest.fail(f"Action failed: Tried to click '{action_input}', but the last bot message had no buttons.")

new_message_task = asyncio.create_task(
conversation.wait_event(events.NewMessage(incoming=True), timeout=timeout)
)
edited_message_task = asyncio.create_task(
conversation.wait_event(
events.MessageEdited(incoming=True, func=lambda e: e.message.id == last_bot_message.id),
timeout=timeout
)
)

await last_bot_message.click(text=action_input)

done, pending = await asyncio.wait([new_message_task, edited_message_task], return_when=asyncio.FIRST_COMPLETED)
for task in pending:
task.cancel()

event = done.pop().result()
last_bot_message = event.message
else:
pytest.fail(f"Unknown action type in test flow: '{action_type}'")

# Assertion logic for both action types
expected_html = load(expected_response_id)
expected_plain = strip_html(expected_html)
actual_plain = last_bot_message.raw_text.strip()

assert expected_plain in actual_plain, \
f"Action {step} failed. Expected response containing '{expected_plain}' but got '{actual_plain}'"

test_report["status"] = PytestTestStatus.OK
except BaseException as e:
test_report["status"] = PytestTestStatus.FAILED
test_report["exception_class"] = str(e.__class__)
test_report["exception_message"] = str(e)

raise
finally:
test_report["time_elapsed"] = time.time() - start_time
report_state.data["tests"].append(test_report)


class Test:
class TestTelegramBot:
report_state = PytestReport()
loop = asyncio.get_event_loop()
nest_asyncio.apply(loop)

@pytest.mark.parametrize("command", ("/mute_errors",))
def test_mute(self, conversation, command: str):
Test.loop.run_until_complete(
_test_command(Test.report_state, conversation, command)
)


@pytest.mark.parametrize(
"command",
(
"/start",
"/help",
),
"command_flow",
[
([{'type': 'message', 'input': "/start", 'expected': "start_handler__message"}]),
([{'type': 'message', 'input': "/help", 'expected': "help__commands_list"}]),
(
[
{'type': 'message', 'input': "/manage_reminders", 'expected': "manage_reminders_handler__no_reminders"},
{'type': 'click', 'input': "Создать новое", 'expected': "manager_reminders_handler__enter_chat_id"},
]
),
(
[
{'type': 'message', 'input': "/manage_all_reminders", 'expected': "manage_reminders_handler__no_reminders"},
]
),
],
)
def test_start_help(self, conversation, command: str):
Test.loop.run_until_complete(
_test_command(Test.report_state, conversation, command)
)
@pytest.mark.asyncio
async def test_command_flows(self, conversation: Conversation, command_flow: List[Dict]):
await _test_command_flow(self.report_state, conversation, command_flow)

@pytest.mark.parametrize(
"command",
(
"/get_sheets_report",
"/get_tasks_report_focalboard",
),
)
def test_not_failing_reports(self, conversation, command: str):
Test.loop.run_until_complete(
_test_command(Test.report_state, conversation, command)
)

@pytest.mark.parametrize(
"command",
("/get_tg_analytics_report",),
)
def test_not_failing_analytics(self, conversation, command: str):
Test.loop.run_until_complete(
_test_command(Test.report_state, conversation, command)
)

@pytest.mark.parametrize(
"command",
(
"/manage_reminders",
"/manage_all_reminders",
),
)
def test_reminder(self, conversation, command: str):
Test.loop.run_until_complete(
_test_command(Test.report_state, conversation, command)
)
@pytest.mark.xfail
@pytest.mark.parametrize("command", ("/bad_cmd",))
@pytest.mark.asyncio
async def test_failing_command(self, conversation: Conversation, command: str):
try:
await conversation.send_message(command)
await conversation.get_response(timeout=10)
except TimeoutError:
pass
except Exception as e:
pytest.fail(f"Test for failing command failed with unexpected exception: {e}")