-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathclient.py
More file actions
113 lines (89 loc) · 3.8 KB
/
client.py
File metadata and controls
113 lines (89 loc) · 3.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# client.py
import logging
from typing import Any, Dict, List
import requests
import socketio
from socketio import exceptions
# Configure logging for debugging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
BASE_URL = "http://localhost:5000"
# noinspection PyMethodMayBeStatic
class PubSubClient:
def __init__(self, consumer_name: str, topics: List[str]):
"""
Initialize the PubSub client.
:param consumer_name: Name of the consumer (e.g., 'alice')
:param topics: List of topics to subscribe to
"""
self.sio = socketio.Client(reconnection=True)
self.consumer_name = consumer_name
self.topics = topics
# Register event handlers
self.sio.on("message", self.on_message)
self.sio.on("new_client", self.on_new_client)
self.sio.on("client_disconnected", self.on_client_disconnected)
self.sio.on("new_consumption", self.on_new_consumption)
self.sio.on("new_message", self.on_new_message)
def connect(self) -> None:
logger.info(f"Attempting to connect as {self.consumer_name} to {BASE_URL}")
try:
self.sio.connect(BASE_URL)
self.sio.emit("subscribe", {"consumer": self.consumer_name, "topics": self.topics})
logger.info(f"Connected as {self.consumer_name}, subscribed to {self.topics}")
except exceptions.ConnectionError as e:
logger.error(f"Failed to connect to server: {e}")
# Optionally, implement retry logic or exit
except Exception as e:
logger.error(f"An unexpected error occurred during connection: {e}")
def on_message(self, data: Dict[str, Any]) -> None:
"""Handle incoming messages."""
logger.info(f"[MESSAGE] [{data['topic']}] {data['message']}")
def on_new_client(self, data: Dict[str, Any]) -> None:
"""Handle new client connections."""
logger.info(f"[NEW CLIENT] {data}")
def on_client_disconnected(self, data: Dict[str, Any]) -> None:
"""Handle client disconnections."""
logger.info(f"[CLIENT DISCONNECTED] {data}")
def on_new_consumption(self, data: Dict[str, Any]) -> None:
"""Handle new consumption events."""
logger.info(f"[NEW CONSUMPTION] {data}")
def on_new_message(self, data: Dict[str, Any]) -> None:
"""Handle new message events."""
logger.info(f"[NEW MESSAGE] {data}")
def publish(self, topic: str, message: Any, message_id: str) -> Dict[str, Any]:
"""
Publish a message to a topic via HTTP POST.
:param topic: Topic to publish to
:param message: Message content
:param message_id: Unique message ID
:return: Server response
"""
logger.info(f"Publishing to topic {topic}: {message} with ID {message_id}")
resp = requests.post(
f"{BASE_URL}/publish",
json={"topic": topic, "message": message, "producer": self.consumer_name,
"message_id": message_id},
timeout=10,
)
logger.info(f"Publish response: {resp.json()}")
return resp.json() # type: ignore[no-any-return]
def run_forever(self) -> None:
"""Keep the client running indefinitely."""
self.sio.wait()
def disconnect(self) -> None:
"""Disconnect from the Socket.IO server."""
if self.sio.connected:
self.sio.disconnect()
logger.info(f"Disconnected {self.consumer_name} from server.")
def main() -> None:
"""Entry point for the pubsub client demo."""
client = PubSubClient(consumer_name="demo-client", topics=["test"])
client.connect()
try:
client.run_forever()
except KeyboardInterrupt:
client.disconnect()
logger.info("Client disconnected.")
if __name__ == "__main__":
main()