-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmessage_handler.py
More file actions
940 lines (774 loc) Β· 48.8 KB
/
message_handler.py
File metadata and controls
940 lines (774 loc) Β· 48.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
#!/usr/bin/env python3
"""
Message Handler for SimpleX Bot
Handles message processing, routing, and command execution
"""
import logging
import time
from typing import Dict, Any, Optional, Callable, List
from datetime import datetime
from file_download_manager import FileDownloadManager
from message_context import MessageContext
from background_task_processor import BackgroundTaskProcessor
class MessageHandler:
"""Handles incoming message processing and routing"""
def __init__(self,
command_registry: 'CommandRegistry',
file_download_manager: FileDownloadManager,
send_message_callback: Callable,
logger: logging.Logger,
message_logger: logging.Logger,
startup_timestamp: float = None,
enable_parallel_processing: bool = True):
self.command_registry = command_registry
self.file_download_manager = file_download_manager
self.send_message_callback = send_message_callback
self.logger = logger
self.message_logger = message_logger
self.startup_timestamp = startup_timestamp or 0
# Background task processing for parallel command execution
self.enable_parallel_processing = enable_parallel_processing
if self.enable_parallel_processing:
self.background_processor = BackgroundTaskProcessor(
send_message_callback=self._send_message_wrapper,
default_timeout=300, # 5 minutes default
max_concurrent_tasks=50
)
self.logger.info("π Parallel command processing enabled")
else:
self.background_processor = None
self.logger.info("βΈοΈ Sequential command processing (legacy mode)")
# Constants
self.MESSAGE_PREVIEW_LENGTH = 100
async def _send_message_wrapper(self, chat_id: str, message: str):
"""Wrapper for send_message_callback to work with background processor"""
try:
# Determine if this is a group message based on chat_id format
is_group = chat_id != chat_id.lower() and not chat_id.startswith('@')
await self.send_message_callback(chat_id, message, is_group=is_group)
except Exception as e:
self.logger.error(f"Error sending message via wrapper: {e}")
# Fallback to direct call
await self.send_message_callback(chat_id, message)
def _is_message_too_old(self, message_data: Dict[str, Any]) -> bool:
"""Check if message timestamp is older than bot startup time"""
if not self.startup_timestamp:
return False
try:
# Try to extract timestamp from different message structures
message_ts = None
# Check for newChatItems structure
if "chatItems" in message_data:
chat_items = message_data["chatItems"]
if chat_items and len(chat_items) > 0:
chat_item = chat_items[0]
if "chatItem" in chat_item:
item_data = chat_item["chatItem"]
# Look for timestamp fields - common SimpleX timestamp fields
message_ts = (item_data.get("ts") or
item_data.get("timestamp") or
item_data.get("sentAt") or
item_data.get("receivedAt"))
# Check for direct chatItem structure
elif "chatItem" in message_data:
chat_item = message_data["chatItem"]
message_ts = (chat_item.get("ts") or
chat_item.get("timestamp") or
chat_item.get("sentAt") or
chat_item.get("receivedAt"))
if message_ts:
# Convert timestamp to seconds if it's in milliseconds
if message_ts > 1e12: # Likely milliseconds
message_ts = message_ts / 1000
# Compare with startup timestamp
is_old = message_ts < self.startup_timestamp
if is_old:
self.logger.info(f"π OLD MESSAGE: Message ts={message_ts}, startup ts={self.startup_timestamp}")
return is_old
except Exception as e:
self.logger.debug(f"π TIMESTAMP: Error checking message timestamp: {e}")
# If we can't determine timestamp, process the message (fail-safe)
return False
def _get_message_context(self, message_data: Dict[str, Any]) -> MessageContext:
"""Get unified message context using MessageContext class"""
return MessageContext(message_data)
def _determine_chat_routing(self, message_data: Dict[str, Any], contact_name: str) -> str:
"""Determine the correct chat ID for routing messages based on message context"""
try:
# Handle different event structures
chat_info = None
# Check for regular message structure
if "chatInfo" in message_data:
chat_info = message_data["chatInfo"]
self.logger.debug(f"π ROUTING: Using regular message structure")
# Check for XFTP event structure
elif "chatItem" in message_data:
chat_item = message_data["chatItem"]
chat_info = chat_item.get("chatInfo", {})
self.logger.debug(f"π ROUTING: Using XFTP event structure")
if not chat_info:
self.logger.warning(f"π ROUTING: No chat info found in message data, using contact fallback")
return contact_name
# Check if this is a group message
if "groupInfo" in chat_info:
# Group message - route to group
group_info = chat_info.get("groupInfo", {})
group_name = group_info.get("localDisplayName", group_info.get("groupName", contact_name))
self.logger.debug(f"π ROUTING: Group message - routing to group: {group_name}")
return group_name
else:
# Direct message - route to contact
self.logger.debug(f"π ROUTING: Direct message - routing to contact: {contact_name}")
return contact_name
except Exception as e:
self.logger.error(f"π ROUTING: Error determining chat routing: {e}")
# Fallback to contact name
return contact_name
def _is_group_message(self, message_data: Dict[str, Any]) -> bool:
"""Determine if this is a group message based on message structure"""
try:
chat_info = message_data.get("chatInfo", {})
# Check XFTP event structure
if not chat_info and "chatItem" in message_data:
chat_item = message_data["chatItem"]
chat_info = chat_item.get("chatInfo", {})
# Check if this is a group message
return "groupInfo" in chat_info
except Exception as e:
self.logger.error(f"π ROUTING: Error determining if group message: {e}")
return False
async def send_routed_message(self, message_data: Dict[str, Any], contact_name: str, message: str) -> None:
"""Send a message using proper routing logic (group vs direct chat)"""
try:
chat_id = self._determine_chat_routing(message_data, contact_name)
is_group = self._is_group_message(message_data)
await self.send_message_callback(chat_id, message, is_group=is_group)
except Exception as e:
self.logger.error(f"π ROUTING: Error sending routed message: {e}")
# Fallback to direct contact
await self.send_message_callback(contact_name, message, is_group=False)
async def process_message(self, message_data: Dict[str, Any]) -> None:
"""Process an incoming message and handle commands"""
try:
# Check if message is older than bot startup time
if self._is_message_too_old(message_data):
self.logger.debug(f"π IGNORING: Message older than bot startup time")
return
# Use unified message context for all parsing
context = self._get_message_context(message_data)
self.logger.debug(f"Processing message in {context.get_chat_context_string()}")
# Get message content using context
msg_type = context.message_content.get("type", "unknown")
# Log message type for debugging
self.logger.debug(f"Processing message type: {msg_type}")
if msg_type == "text":
await self._handle_text_message(context, message_data)
elif msg_type == "link":
# Handle link messages (treat similar to text for commands)
await self._handle_text_message(context, message_data)
elif msg_type in ["file", "image", "video", "audio", "media", "attachment"]:
await self._handle_file_message(context, msg_type, message_data)
elif msg_type == "voice":
await self._handle_voice_message(context, message_data)
else:
# Log unhandled message types
if msg_type not in ["text", "link"]:
self.logger.warning(f"Unhandled message type: {msg_type}")
except Exception as e:
self.logger.error(f"Error processing message: {e}")
# Log the full message structure on error for debugging
self.logger.debug(f"Message data structure: {message_data}")
async def _handle_text_message(self, context: MessageContext, message_data: Dict[str, Any]) -> None:
"""Handle text messages and command processing"""
text = context.message_content.get("text", "")
# Log the message with context
self.message_logger.info(f"{context.get_chat_context_string()}: {text}")
self.logger.info(f"Received message in {context.get_chat_context_string()}: {text[:self.MESSAGE_PREVIEW_LENGTH]}...")
# Check if it's a command
if self.command_registry.is_command(text):
await self._process_command(context.contact_name, text, message_data)
else:
# Handle non-command messages by passing to plugin manager
await self._process_non_command_message(context.contact_name, text, message_data)
async def _process_command(self, contact_name: str, text: str, message_data: Dict[str, Any]) -> None:
"""Process a command message with optional parallel processing"""
try:
# Parse command for routing decision
if not self.command_registry.is_command(text):
return
command_text = text[1:] # Remove ! prefix
parts = command_text.split()
command_name = parts[0] if parts else ""
# Determine chat routing
chat_id = self._determine_chat_routing(message_data, contact_name)
# Check if parallel processing is enabled and if command should use it
if self.enable_parallel_processing and self.background_processor and self._should_use_parallel_processing(command_name):
await self._process_command_parallel(contact_name, text, message_data, command_name, chat_id)
else:
await self._process_command_sequential(contact_name, text, message_data, chat_id)
except Exception as e:
self.logger.error(f"Error in command processing: {e}")
error_msg = f"Error processing command"
await self.send_routed_message(message_data, contact_name, error_msg)
def _should_use_parallel_processing(self, command_name: str) -> bool:
"""Determine if a command should use parallel processing"""
# Commands that should stay sequential (for now)
sequential_commands = {
"help", # Quick response, no need for background
"ping", # Quick response, no need for background
"status", # May need to check background processor status
"reload", # Critical system operation
"enable", # Critical system operation
"disable", # Critical system operation
}
return command_name not in sequential_commands
async def _process_command_parallel(self, contact_name: str, text: str, message_data: Dict[str, Any], command_name: str, chat_id: str):
"""Process command using background task processor"""
try:
# Create CommandContext for the command
from plugins.universal_plugin_base import CommandContext, BotPlatform
# Parse command and arguments
command_text = text[1:] # Remove ! prefix
parts = command_text.split()
args = parts[1:] if len(parts) > 1 else []
args_raw = ' '.join(args) if args else ''
context = CommandContext(
command=command_name,
args=args,
args_raw=args_raw,
user_id=contact_name,
chat_id=chat_id,
user_display_name=contact_name,
platform=BotPlatform.SIMPLEX,
raw_message=message_data
)
# Create command handler function
async def command_handler(ctx: CommandContext) -> str:
# Get plugin manager from the bot instance
plugin_manager = None
if hasattr(self, '_bot_instance'):
plugin_manager = getattr(self._bot_instance, 'plugin_manager', None)
# Execute the command using the command registry
result = await self.command_registry.execute_command(text, contact_name, plugin_manager, message_data)
return result or "Command completed successfully."
# Submit to background processor
await self.background_processor.submit_command(
context=context,
plugin_name="unknown", # Will be determined by command registry
command_handler=command_handler
)
self.logger.info(f"π€ Command '{command_name}' submitted for parallel processing")
except Exception as e:
self.logger.error(f"Error in parallel command processing: {e}")
# Fallback to sequential processing
await self._process_command_sequential(contact_name, text, message_data, chat_id)
async def _process_command_sequential(self, contact_name: str, text: str, message_data: Dict[str, Any], chat_id: str):
"""Process command using original sequential method"""
try:
# Try to get plugin manager from the bot instance
plugin_manager = None
if hasattr(self, '_bot_instance'):
plugin_manager = getattr(self._bot_instance, 'plugin_manager', None)
result = await self.command_registry.execute_command(text, contact_name, plugin_manager, message_data)
if result:
is_group = self._is_group_message(message_data)
await self.send_message_callback(chat_id, result, is_group=is_group)
self.message_logger.info(f"TO {chat_id}: {result[:self.MESSAGE_PREVIEW_LENGTH]}...")
except Exception as e:
self.logger.error(f"Error executing command sequentially: {e}")
error_msg = f"Error processing command"
await self.send_routed_message(message_data, contact_name, error_msg)
async def _process_non_command_message(self, contact_name: str, text: str, message_data: Dict[str, Any]) -> None:
"""Process non-command messages by forwarding to plugin manager"""
try:
# Get plugin manager from the bot instance
if hasattr(self, '_bot_instance') and hasattr(self._bot_instance, 'plugin_manager'):
plugin_manager = self._bot_instance.plugin_manager
# Create CommandContext for the non-command message
from plugins.universal_plugin_base import CommandContext, BotPlatform
# Determine chat routing
chat_id = self._determine_chat_routing(message_data, contact_name)
context = CommandContext(
command="", # Empty for non-command messages
args=[],
args_raw=text, # Full message text
user_id=contact_name,
chat_id=chat_id,
user_display_name=contact_name,
platform=BotPlatform.SIMPLEX,
raw_message=message_data
)
# Let plugin manager handle the message (will broadcast to all plugins)
await plugin_manager.handle_message(context)
self.logger.debug(f"Non-command message forwarded to plugin manager: '{text[:50]}...'")
else:
self.logger.warning("Plugin manager not available for non-command message processing")
except Exception as e:
self.logger.error(f"Error processing non-command message: {e}")
async def _handle_voice_message(self, contact_name: str, content: Dict[str, Any], chat_context: str, message_data: Dict[str, Any]) -> None:
"""Handle voice messages with STT integration"""
self.logger.debug(f"π€ VOICE DEBUG: Voice message detected from {contact_name}")
try:
# Extract voice message info from message structure
chat_item = message_data.get("chatItem", {})
file_info = chat_item.get("file", {})
msg_content = content.get("msgContent", {})
self.logger.debug(f"π€ VOICE DEBUG: Chat item keys: {list(chat_item.keys())}")
self.logger.debug(f"π€ VOICE DEBUG: File info keys: {list(file_info.keys()) if file_info else 'None'}")
self.logger.debug(f"π€ VOICE DEBUG: Full file info: {file_info}")
if not file_info:
self.logger.warning("π€ VOICE DEBUG: Voice message has no file information")
return
# Extract file details
file_name = file_info.get("fileName", "unknown_voice.m4a")
file_size = file_info.get("fileSize", 0)
file_status = file_info.get("fileStatus", {})
file_protocol = file_info.get("fileProtocol", "unknown")
duration = msg_content.get("duration", 0)
self.logger.debug(f"π€ VOICE DEBUG: File name: {file_name}")
self.logger.debug(f"π€ VOICE DEBUG: File size: {file_size} bytes")
self.logger.debug(f"π€ VOICE DEBUG: File status: {file_status}")
self.logger.debug(f"π€ VOICE DEBUG: File protocol: {file_protocol}")
self.logger.debug(f"π€ VOICE DEBUG: Duration: {duration}s")
self.logger.info(f"Voice message: {file_name} ({file_size} bytes, {duration}s duration)")
self.message_logger.info(f"VOICE FROM {contact_name}: {file_name} ({duration}s)")
# Check media download status
self.logger.debug(f"π€ VOICE DEBUG: Media downloads enabled: {self.file_download_manager.media_enabled}")
if not self.file_download_manager.media_enabled:
self.logger.warning("π€ VOICE DEBUG: Media downloads disabled - voice files cannot be downloaded for STT")
# Voice message acknowledged - STT processing will happen after XFTP download completes
self.logger.info("Voice message acknowledged - STT will process after XFTP download")
# No need to acknowledge voice messages since STT will handle this after download
except Exception as e:
self.logger.error(f"Error handling voice message: {e}")
import traceback
self.logger.error(f"Voice message error traceback: {traceback.format_exc()}")
await self.send_routed_message(message_data, contact_name, "Error processing voice message")
async def _handle_file_message(self, contact_name: str, content: Dict[str, Any], msg_type: str, chat_context: str, message_data: Dict[str, Any]) -> None:
"""Handle file/media messages"""
self.logger.info(f"π DOWNLOAD DEBUG: File message detected: {msg_type}")
# Clean base64 data from content structure for logging
content_for_log = self.file_download_manager.clean_content_for_logging(content)
self.logger.info(f"π DOWNLOAD DEBUG: Content structure: {content_for_log}")
self.logger.info(f"π DOWNLOAD DEBUG: Media enabled: {self.file_download_manager.media_enabled}")
if not self.file_download_manager.media_enabled:
self.logger.warning("π DOWNLOAD DEBUG: Media downloads disabled, skipping file")
return
try:
file_info = content.get("msgContent", {})
inner_msg_type = file_info.get("type", "")
self.logger.info(f"π DOWNLOAD DEBUG: File info keys: {list(file_info.keys())}")
self.logger.info(f"π DOWNLOAD DEBUG: Inner message type: {inner_msg_type}")
# Extract file information
self.logger.info("π DOWNLOAD DEBUG: Extracting file information...")
file_name, file_size, file_type = self.file_download_manager.extract_file_info_from_content(
file_info, inner_msg_type, contact_name
)
self.logger.info(f"π DOWNLOAD DEBUG: Extracted - name: {file_name}, size: {file_size}, type: {file_type}")
# Validate file for download
self.logger.info("π DOWNLOAD DEBUG: Validating file for download...")
is_valid = self.file_download_manager.validate_file_for_download(file_name, file_size, file_type)
self.logger.info(f"π DOWNLOAD DEBUG: File validation result: {is_valid}")
if not is_valid:
from config_manager import parse_file_size
max_size = parse_file_size(self.file_download_manager.media_config.get('max_file_size', '100MB'))
self.logger.warning(f"π DOWNLOAD DEBUG: File validation failed - size: {file_size}, max: {max_size}")
if file_size > max_size:
await self.send_routed_message(message_data, contact_name, f"File {file_name} is too large to download")
return
# Log the file message
self.message_logger.info(f"FILE FROM {contact_name}: {file_name} ({file_size} bytes)")
self.logger.info(f"π DOWNLOAD DEBUG: File approved for download: {file_name}")
# Attempt to download the file using the restored download logic
self.logger.info("π DOWNLOAD DEBUG: Starting file download process...")
download_success = await self._download_file(contact_name, file_info, file_type, inner_msg_type)
if download_success == "acknowledged":
self.logger.info(f"π DOWNLOAD DEBUG: Video/audio message acknowledged - waiting for XFTP file description: {file_name}")
await self.send_routed_message(message_data, contact_name, f"πΉ Video received - downloading via XFTP...")
elif download_success == "thumbnail_skipped":
self.logger.info(f"π DOWNLOAD DEBUG: Thumbnail skipped for {file_name} - waiting for XFTP")
# No message sent - wait for XFTP download
elif download_success:
self.logger.info(f"π DOWNLOAD DEBUG: Successfully downloaded file: {file_name}")
await self.send_routed_message(message_data, contact_name, f"β Downloaded: {file_name}")
else:
self.logger.error(f"π DOWNLOAD DEBUG: Failed to download file: {file_name}")
await self.send_routed_message(message_data, contact_name, f"β Failed to download: {file_name}")
except Exception as e:
self.logger.error(f"π DOWNLOAD DEBUG: Error handling file message: {e}")
import traceback
self.logger.error(f"π DOWNLOAD DEBUG: Traceback: {traceback.format_exc()}")
await self.send_routed_message(message_data, contact_name, f"Error processing file: {str(e)}")
async def _download_file(self, contact_name: str, file_info: Dict, file_type: str, inner_msg_type: str = "file") -> bool:
"""Download file using available methods (direct data or XFTP)"""
try:
self.logger.info(f"π DOWNLOAD: Starting file download - contact: {contact_name}, type: {file_type}, inner_msg_type: {inner_msg_type}")
self.logger.info(f"π DOWNLOAD: File info keys: {list(file_info.keys())}")
# Clean base64 data for logging
file_info_for_log = dict(file_info)
if 'image' in file_info_for_log and isinstance(file_info_for_log['image'], str):
if file_info_for_log['image'].startswith('data:image/'):
header_part = file_info_for_log['image'].split(',')[0] if ',' in file_info_for_log['image'] else file_info_for_log['image']
file_info_for_log['image'] = f"{header_part},<base64_truncated>"
self.logger.info(f"π DOWNLOAD: Full file_info: {file_info_for_log}")
# Handle SimpleX image format - Skip thumbnails, wait for XFTP
if inner_msg_type == "image" and "image" in file_info:
image_data_url = file_info.get("image", "")
file_name = self.file_download_manager._generate_image_filename(contact_name, image_data_url)
file_size = self.file_download_manager._calculate_data_url_size(image_data_url)
self.logger.info(f"π DOWNLOAD: Processing SimpleX image - name: {file_name}, size: {file_size}")
self.logger.info(f"π DOWNLOAD: This is an embedded image (thumbnail), skipping - waiting for XFTP")
# Skip thumbnail download, return acknowledgment
return "thumbnail_skipped"
# For other file types, check for various download methods
# Generate filename and path
file_name = file_info.get("fileName", f"unknown_file_{int(time.time())}")
file_size = file_info.get("fileSize", 0)
# Generate safe filename and determine storage path
safe_filename = self.file_download_manager.generate_safe_filename(file_name, contact_name, file_type)
# Determine storage directory based on file type
if file_type == 'audio':
storage_dir = self.file_download_manager.media_path / 'audio'
else:
storage_dir = self.file_download_manager.media_path / f"{file_type}s"
storage_dir.mkdir(exist_ok=True)
file_path = storage_dir / safe_filename
self.logger.info(f"π DOWNLOAD: Target path: {file_path}")
# Method 1: Skip thumbnails - only use XFTP
if "fileData" in file_info:
self.logger.info(f"π DOWNLOAD: Direct file data (thumbnail) detected - skipping, waiting for XFTP")
return "thumbnail_skipped"
# Method 2: Try XFTP download using file ID/hash
elif "fileId" in file_info or "fileHash" in file_info:
self.logger.info(f"π DOWNLOAD: Using Method 2 - XFTP download (large file)")
if hasattr(self, '_bot_instance') and hasattr(self._bot_instance, 'xftp_client'):
xftp_client = self._bot_instance.xftp_client
self.logger.info(f"π DOWNLOAD: XFTP client available: {xftp_client is not None}")
if xftp_client:
xftp_success = await self._download_via_xftp(file_info, file_path, file_name, xftp_client)
if xftp_success:
self.logger.info(f"π DOWNLOAD: XFTP download successful")
return True
self.logger.warning(f"π DOWNLOAD: XFTP download failed for {file_name}")
return False
# Method 3: Handle video/audio messages without XFTP indicators
else:
# For video/audio messages, the initial message only contains thumbnail
# The actual file will arrive later via rcvFileDescrReady event
if inner_msg_type in ["video", "audio", "voice"]:
self.logger.info(f"πΉ Video/audio message received - waiting for XFTP file description")
# Don't attempt download - just acknowledge receipt
# The rcvFileDescrReady event will handle the actual download
return "acknowledged" # Special return value to indicate waiting for XFTP
else:
# For other file types without XFTP indicators, assume it's a thumbnail - skip
self.logger.info(f"π DOWNLOAD: File without XFTP indicators detected - likely thumbnail, skipping: {file_name} (type: {inner_msg_type})")
return "thumbnail_skipped"
except Exception as e:
self.logger.error(f"π DOWNLOAD: Error in file download: {e}")
# If it's likely a thumbnail parsing error, skip instead of failing
if "fileData" in file_info or ("image" in file_info and inner_msg_type == "image"):
self.logger.info(f"π DOWNLOAD: Error likely from thumbnail processing - skipping")
return "thumbnail_skipped"
return False
async def _download_via_xftp(self, file_info: Dict, file_path, original_name: str, xftp_client) -> bool:
"""Download file via XFTP using file ID or hash"""
try:
self.logger.info(f"π₯ XFTP: _download_via_xftp called for {original_name}")
# Check if XFTP client is available
if not xftp_client:
self.logger.warning(f"π₯ XFTP: XFTP client not initialized for {original_name}")
return False
# Check for new XFTP format (file description text)
file_descr_text = file_info.get("fileDescrText")
if file_descr_text:
self.logger.info(f"π₯ XFTP: Using XFTP file description format")
file_size = file_info.get("fileSize", 0)
self.logger.info(f"π₯ XFTP: File description length: {len(file_descr_text)} chars, Size: {file_size}")
self.logger.info(f"π₯ XFTP: Starting XFTP download for {original_name} (Size: {file_size})")
# Use XFTPClient with file description text
self.logger.info(f"π₯ XFTP: Calling xftp_client.download_file_with_description")
success = await xftp_client.download_file_with_description(
file_description=file_descr_text,
file_size=file_size,
file_name=original_name,
output_path=str(file_path)
)
else:
# Fallback to old format
file_id = file_info.get("fileId")
file_hash = file_info.get("fileHash")
file_size = file_info.get("fileSize", 0)
self.logger.info(f"π₯ XFTP: File parameters - ID: {file_id}, Hash: {file_hash}, Size: {file_size}")
if not file_id:
self.logger.warning(f"π₯ XFTP: No file ID available for XFTP download: {original_name}")
return False
self.logger.info(f"π₯ XFTP: Starting XFTP download for {original_name} (ID: {file_id}, Size: {file_size})")
# Use XFTPClient to download the file
self.logger.info(f"π₯ XFTP: Calling xftp_client.download_file with output_path: {file_path}")
success = await xftp_client.download_file(
file_id=file_id,
file_hash=file_hash,
file_size=file_size,
file_name=original_name,
output_path=str(file_path)
)
if success:
self.logger.info(f"π₯ XFTP: XFTP download completed successfully: {original_name}")
return True
else:
self.logger.warning(f"π₯ XFTP: XFTP download failed: {original_name}")
return False
except Exception as e:
self.logger.error(f"π₯ XFTP: Unexpected error in XFTP download for {original_name}: {e}")
return False
async def handle_file_descriptor_ready(self, data: Dict):
"""Handle rcvFileDescrReady event with XFTP file metadata"""
try:
self.logger.info(f"π― XFTP: Processing file descriptor ready event")
self.logger.info(f"π― XFTP: Event data keys: {list(data.keys())}")
# Extract file information from the event
file_info = data.get("rcvFileInfo", {})
if not file_info:
# Try alternative key names
file_info = data.get("rcvFileDescr", {})
if not file_info:
file_info = data.get("rcvFileTransfer", {})
if not file_info:
self.logger.warning(f"π― XFTP: No file info found in event data")
self.logger.warning(f"π― XFTP: Available keys: {list(data.keys())}")
return
else:
self.logger.info(f"π― XFTP: Found file info in rcvFileTransfer")
else:
self.logger.info(f"π― XFTP: Found file info in rcvFileDescr")
# Extract the XFTP file description text
file_descr_text = file_info.get("fileDescrText", "")
if not file_descr_text:
# Try alternative key names
file_descr_text = file_info.get("description", "")
if not file_descr_text:
file_descr_text = file_info.get("fileDescription", "")
if not file_descr_text:
self.logger.warning(f"π― XFTP: No file description text found")
self.logger.warning(f"π― XFTP: File info content: {file_info}")
return
# Extract contact information if available
contact_name = "unknown_contact"
# Log the full data structure to debug contact extraction
self.logger.debug(f"π― XFTP DEBUG: Full event data structure:")
for key, value in data.items():
if key not in ['rcvFileDescr', 'rcvFileTransfer']: # Skip large XFTP data
self.logger.debug(f"π― XFTP DEBUG: {key}: {value}")
# Try multiple sources for contact info
if "chatItem" in data:
chat_item = data["chatItem"]
self.logger.debug(f"π― XFTP DEBUG: chatItem keys: {list(chat_item.keys())}")
chat_info = chat_item.get("chatInfo", {})
self.logger.debug(f"π― XFTP DEBUG: chatInfo: {chat_info}")
# Check if it's a direct contact
if "contact" in chat_info:
contact_name = chat_info["contact"].get("localDisplayName", "unknown_contact")
self.logger.debug(f"π― XFTP: Found contact name from chatItem.chatInfo.contact: {contact_name}")
# Check if it's a group message
elif "groupInfo" in chat_info:
# For group messages, get the actual sender from chatItem.chatItem.chatDir.groupMember
chat_item_inner = chat_item.get("chatItem", {})
chat_dir = chat_item_inner.get("chatDir", {})
group_member = chat_dir.get("groupMember", {})
if group_member:
contact_name = group_member.get("localDisplayName", "unknown_contact")
self.logger.debug(f"π― XFTP: Found contact name from chatItem.chatItem.chatDir.groupMember: {contact_name}")
else:
self.logger.warning(f"π― XFTP: Group message but no groupMember found in chatDir: {chat_dir}")
# Fallback: try user field
if contact_name == "unknown_contact" and "user" in data:
user_info = data["user"]
self.logger.debug(f"π― XFTP DEBUG: user field: {user_info}")
contact_name = user_info.get("localDisplayName", user_info.get("displayName", "unknown_contact"))
self.logger.debug(f"π― XFTP: Found contact name from user field: {contact_name}")
self.logger.debug(f"π― XFTP: Final contact name: {contact_name}")
# Parse file information from description text
file_size = self._parse_xftp_file_size(file_descr_text)
# Use a temporary filename for initial download - actual filename will be determined after download
temp_file_name = f"xftp_download_{int(time.time())}"
self.logger.info(f"π― XFTP: Ready to download - temp name: {temp_file_name}, size: {file_size}")
self.logger.info(f"π― XFTP: XFTP description available: {len(file_descr_text)} chars")
# Create a file info dict compatible with our XFTP client
xftp_file_info = {
'fileName': temp_file_name,
'fileSize': file_size,
'fileDescrText': file_descr_text # Use the actual XFTP description
}
# Attempt XFTP download - this will return the actual filename and path
download_result = await self._download_via_xftp_with_filename_detection(xftp_file_info, contact_name)
if download_result:
actual_filename, actual_path = download_result
self.logger.info(f"π― XFTP: File download successful: {actual_filename} at {actual_path}")
#await self.send_routed_message(data, contact_name, f"β Downloaded via XFTP: {actual_filename}")
# Check if this is an audio file and trigger STT processing
await self._maybe_trigger_stt_processing(actual_filename, actual_path, contact_name, data)
else:
self.logger.error(f"π― XFTP: File download failed: {temp_file_name}")
except Exception as e:
self.logger.error(f"π― XFTP: Error handling file descriptor ready: {e}")
import traceback
self.logger.error(f"π― XFTP: Traceback: {traceback.format_exc()}")
async def _download_via_xftp_with_filename_detection(self, file_info: Dict, contact_name: str):
"""Download file via XFTP and return actual filename and path"""
try:
file_size = file_info.get('fileSize', 0)
file_descr_text = file_info.get('fileDescrText', '')
if not file_descr_text:
self.logger.error("π― XFTP: No XFTP file description provided")
return None
self.logger.info(f"π₯ XFTP: Starting XFTP download with filename detection")
# Get XFTP client from bot instance
if not (hasattr(self, '_bot_instance') and hasattr(self._bot_instance, 'xftp_client')):
self.logger.error("π― XFTP: XFTP client not available")
return None
xftp_client = self._bot_instance.xftp_client
if not xftp_client:
self.logger.error("π― XFTP: XFTP client not initialized")
return None
# Use the new XFTP client method that preserves filenames
import tempfile
with tempfile.TemporaryDirectory(prefix="xftp_download_") as temp_dir:
# Download using XFTP client with filename detection
success, actual_filename, file_path = await xftp_client.download_file_with_description_get_filename(
file_description=file_descr_text,
file_size=file_size,
temp_dir=temp_dir
)
if not success or not actual_filename:
self.logger.error("π― XFTP: XFTP download failed or no filename detected")
return None
self.logger.info(f"π₯ XFTP: Successfully detected actual filename: {actual_filename}")
# Determine file type and storage directory
file_type = self.file_download_manager._get_file_type(actual_filename)
safe_filename = self.file_download_manager.generate_safe_filename(actual_filename, contact_name, file_type)
# Determine storage directory
if file_type == 'audio':
storage_dir = self.file_download_manager.media_path / 'audio'
else:
storage_dir = self.file_download_manager.media_path / f"{file_type}s"
storage_dir.mkdir(exist_ok=True)
final_path = storage_dir / safe_filename
# Move file to final location
import shutil
shutil.move(file_path, str(final_path))
self.logger.info(f"π₯ XFTP: File moved to final location: {final_path}")
return (actual_filename, str(final_path))
except Exception as e:
self.logger.error(f"π― XFTP: Error in XFTP download with filename detection: {e}")
import traceback
self.logger.error(f"π― XFTP: Traceback: {traceback.format_exc()}")
return None
def _parse_xftp_file_size(self, file_descr_text: str) -> int:
"""Parse file size from XFTP file description text"""
try:
import re
# Look for "size: XXmb" pattern
size_match = re.search(r'size:\s*(\d+)([kmg]?b)', file_descr_text, re.IGNORECASE)
if size_match:
size_num = int(size_match.group(1))
size_unit = size_match.group(2).lower()
if size_unit == 'gb':
return size_num * 1024 * 1024 * 1024
elif size_unit == 'mb':
return size_num * 1024 * 1024
elif size_unit == 'kb':
return size_num * 1024
else: # bytes
return size_num
# Fallback - look for just numbers
size_match = re.search(r'(\d+)', file_descr_text)
if size_match:
return int(size_match.group(1))
return 0
except Exception as e:
self.logger.error(f"Error parsing XFTP file size: {e}")
return 0
async def _maybe_trigger_stt_processing(self, filename: str, file_path: str, contact_name: str, xftp_data: Dict):
"""Trigger STT processing if this is an audio file that was just downloaded"""
try:
# Check if this is an audio file
audio_extensions = {'.mp3', '.wav', '.m4a', '.ogg', '.flac', '.aac'}
file_ext = filename.lower().split('.')[-1] if '.' in filename else ''
if f'.{file_ext}' not in audio_extensions:
self.logger.debug(f"π€ STT: File {filename} is not an audio file, skipping STT")
return
self.logger.info(f"π€ STT: Audio file {filename} downloaded, triggering STT processing")
# Check if we have a plugin manager with audio processing service
if hasattr(self, '_bot_instance') and hasattr(self._bot_instance, 'plugin_manager'):
plugin_manager = self._bot_instance.plugin_manager
# Look for audio processing service first (preferred)
audio_service = None
if hasattr(plugin_manager, 'service_registry') and plugin_manager.service_registry:
audio_service = plugin_manager.service_registry.get_service('audio_processing')
# Fall back to direct plugin access for compatibility
stt_plugin = None
if not audio_service:
for plugin in plugin_manager.plugins.values():
if hasattr(plugin, 'handle_downloaded_audio') and plugin.enabled:
stt_plugin = plugin
break
if audio_service or stt_plugin:
self.logger.debug(f"π€ STT: Found STT plugin, processing downloaded audio file: {filename}")
# Create context for the STT plugin based on XFTP event data
# Use unified context for routing
context = self._get_message_context(xftp_data)
chat_id = context.chat_id
# Create file info for STT plugin
audio_info = {
'fileName': filename,
'fileSize': 0, # Size not critical for downloaded file
'filePath': file_path # Add the actual path
}
# Create command context for STT plugin
from plugins.universal_plugin_base import CommandContext, BotPlatform
context = CommandContext(
command="auto_transcribe_downloaded",
args=[],
args_raw="",
user_id=contact_name,
user_display_name=contact_name,
chat_id=chat_id,
platform=BotPlatform.SIMPLEX,
raw_message=xftp_data
)
# Process the downloaded audio file
result = None
if audio_service:
# Use audio processing service (preferred)
self.logger.debug(f"π€ STT: Using audio processing service")
process_context = {
'user_name': contact_name,
'chat_id': chat_id,
'filename': filename,
'message_data': xftp_data
}
result = await audio_service.process_audio_file(file_path, process_context)
else:
# Fall back to direct plugin access
self.logger.debug(f"π€ STT: Using direct plugin access")
result = await stt_plugin.handle_downloaded_audio(filename, file_path, contact_name, chat_id, xftp_data)
if result:
self.logger.info(f"π€ STT: Transcription completed for {filename}")
# Send the transcription result to the chat
try:
is_group = self._is_group_message(xftp_data)
await self.send_message_callback(chat_id, result, is_group=is_group)
self.logger.info(f"π€ STT: Transcription sent to chat: {chat_id}")
except Exception as e:
self.logger.error(f"π€ STT: Failed to send transcription to chat: {e}")
else:
self.logger.warning(f"π€ STT: Transcription failed for {filename}")
else:
self.logger.debug("π€ STT: No STT plugin found")
else:
self.logger.debug("π€ STT: No plugin manager available")
except Exception as e:
self.logger.error(f"π€ STT: Error triggering STT processing: {e}")
import traceback
self.logger.error(f"π€ STT: Traceback: {traceback.format_exc()}")