-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbackground_executor.py
More file actions
60 lines (48 loc) · 1.99 KB
/
background_executor.py
File metadata and controls
60 lines (48 loc) · 1.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import network
import socket
class Server:
def __init__(self, server_conn: network.SocketManager) -> None:
self.server_conn = server_conn
self.server_conn.init_sever()
self.status: network.ConnectionStatus = network.ConnectionStatus.CONNECTED
def handle_client_logic(self, client: network.Client):
data = client.connection.recv(1024)
if not data:
client.status = network.ConnectionStatus.CLIENT_DISCONNECTED
return
print(f"{client.address}: {data}")
client.connection.send(f"Response to {client.address}: {data}".encode())
def handle_client(self, client: network.Client) -> int:
print(f"[Server] connection started with: {client.address}")
while client.status == network.ConnectionStatus.CONNECTED:
try:
self.handle_client_logic(client)
except Exception as e:
print(f"[Server] Error handling client {client.address}: {e}")
return network.StatusCodes.ERROR
def handle_connection(self):
try:
connection, address = self.server_conn.socket.accept()
client = network.Client(connection, address, network.ConnectionStatus.CONNECTED)
with client.connection:
exit_status = self.handle_client(client)
print(f"[Server] Client got disconnected: {exit_status}")
except socket.timeout:
pass
except Exception as e:
print(f"[Server] An error occurred during connection: {e}")
def run(self):
while self.status == network.ConnectionStatus.CONNECTED:
print("[Server] waiting for new connection...")
self.handle_connection()
print("[Server] shutdown.")
class Worker:
pass
def main():
host = network.get_local_ip()
port = 12345
with network.SocketManager(host=host, port=port) as server_conn:
server = Server(server_conn)
server.run()
if __name__ == "__main__":
main()