diff --git a/examples/full-trickle-ice-client.py b/examples/full-trickle-ice-client.py new file mode 100644 index 0000000..00f193e --- /dev/null +++ b/examples/full-trickle-ice-client.py @@ -0,0 +1,162 @@ +#!/usr/bin/env python + +import argparse +import asyncio +import json +import logging +import time +from typing import Optional + +import websockets + +import aioice + +STUN_SERVER = ("stun.l.google.com", 19302) +WEBSOCKET_URI = "ws://127.0.0.1:8765" + + +async def offer(options): + websocket = await websockets.connect(WEBSOCKET_URI) + + connected = False + start_time = None + + async def signal_candidate(candidate: Optional[aioice.Candidate]): + message = { + "type": "candidate", + "candidate": candidate.to_sdp() if candidate else None, + } + await websocket.send(json.dumps(message)) + print("Sent candidate:", message["candidate"]) + + connection = aioice.Connection( + ice_controlling=True, + components=options.components, + stun_server=STUN_SERVER, + signal_candidate=signal_candidate, + ) + + start_time = time.time() + await websocket.send( + json.dumps( + { + "type": "offer", + "username": connection.local_username, + "password": connection.local_password, + } + ) + ) + + await connection.gather_candidates() + + async for raw in websocket: + message = json.loads(raw) + + if message["type"] == "answer": + connection.remote_username = message["username"] + connection.remote_password = message["password"] + print("Received answer.") + elif message["type"] == "candidate": + candidate = message["candidate"] + await connection.add_remote_candidate( + aioice.Candidate.from_sdp(candidate) if candidate else None + ) + print("Received remote candidate:", candidate) + else: + print("Unknown message type:", message) + + if not connected and connection.remote_username and connection.remote_password: + try: + await connection.connect() + connected = True + elapsed = time.time() - start_time + print(f"connected in {elapsed:.2f} seconds") + + data = b"hello" + await connection.sendto(data, 1) + data, component = await connection.recvfrom() + print("Received:", data) + + await asyncio.sleep(2) + await connection.close() + await websocket.close() + break + except Exception as e: + print("Connection error:", e) + + +async def answer(options): + websocket = await websockets.connect(WEBSOCKET_URI) + + connected = False + + async def signal_candidate(candidate: Optional[aioice.Candidate]): + message = { + "type": "candidate", + "candidate": candidate.to_sdp() if candidate else None, + } + await websocket.send(json.dumps(message)) + print("Sent candidate:", message["candidate"]) + + connection = aioice.Connection( + ice_controlling=False, + components=options.components, + stun_server=STUN_SERVER, + signal_candidate=signal_candidate, + ) + + async for raw in websocket: + message = json.loads(raw) + + if message["type"] == "offer": + connection.remote_username = message["username"] + connection.remote_password = message["password"] + + await websocket.send( + json.dumps( + { + "type": "answer", + "username": connection.local_username, + "password": connection.local_password, + } + ) + ) + + await connection.gather_candidates() + + elif message["type"] == "candidate": + candidate = message["candidate"] + await connection.add_remote_candidate( + aioice.Candidate.from_sdp(candidate) if candidate else None + ) + print("Received remote candidate:", candidate) + + if not connected and candidate is None and connection.remote_username: + try: + await connection.connect() + connected = True + print("Connected via ICE.") + + data, component = await connection.recvfrom() + print("Echoing:", data) + await connection.sendto(data, component) + + await asyncio.sleep(2) + await connection.close() + await websocket.close() + break + except Exception as e: + print("Connection error:", e) + + +parser = argparse.ArgumentParser(description="ICE trickle demo") +parser.add_argument("action", choices=["offer", "answer"]) +parser.add_argument("--components", type=int, default=1) +options = parser.parse_args() + +logging.basicConfig(level=logging.DEBUG) + +if options.action == "offer": + asyncio.run(offer(options)) +else: + asyncio.run(answer(options)) diff --git a/examples/ice-client.py b/examples/ice-client.py index 6e40135..bd34ab6 100644 --- a/examples/ice-client.py +++ b/examples/ice-client.py @@ -4,6 +4,7 @@ import asyncio import json import logging +import time import websockets @@ -21,6 +22,7 @@ async def offer(components: int) -> None: websocket = await websockets.connect(WEBSOCKET_URI) + start_time = time.time() # send offer await websocket.send( json.dumps( @@ -44,7 +46,8 @@ async def offer(components: int) -> None: await websocket.close() await connection.connect() - print("connected") + elapsed = time.time() - start_time + print(f"connected in {elapsed:.2f} seconds") # send data data = b"hello" diff --git a/examples/signaling-server.py b/examples/signaling-server.py index b3c1dc4..0357703 100644 --- a/examples/signaling-server.py +++ b/examples/signaling-server.py @@ -7,13 +7,14 @@ import binascii import os -from websockets.asyncio.server import ServerConnection, serve +import websockets +from websockets.asyncio.server import ServerConnection clients: dict[bytes, ServerConnection] = {} -async def echo(websocket: ServerConnection) -> None: - client_id = binascii.hexlify(os.urandom(8)) +async def echo(websocket): + client_id = binascii.hexlify(os.urandom(8)).decode() clients[client_id] = websocket try: @@ -25,9 +26,9 @@ async def echo(websocket: ServerConnection) -> None: clients.pop(client_id) -async def main() -> None: - async with serve(echo, "0.0.0.0", 8765) as server: - await server.serve_forever() +async def main(): + async with websockets.serve(echo, "0.0.0.0", 8765): + await asyncio.Future() if __name__ == "__main__": diff --git a/src/aioice/ice.py b/src/aioice/ice.py index 3316b47..c6a54eb 100644 --- a/src/aioice/ice.py +++ b/src/aioice/ice.py @@ -9,7 +9,7 @@ import secrets import socket import threading -from typing import Optional, Union, cast +from typing import Callable, Optional, Union, cast import ifaddr @@ -320,6 +320,7 @@ class Connection: will be generated. :param local_password: An optional local password, otherwise a random one will be generated. + :param signal_candidate: Callback to signal a candidate. """ def __init__( @@ -337,6 +338,7 @@ def __init__( transport_policy: TransportPolicy = TransportPolicy.ALL, local_username: Optional[str] = None, local_password: Optional[str] = None, + signal_candidate: Optional[Callable[[Optional[Candidate]], None]] = None, ) -> None: self.ice_controlling = ice_controlling @@ -363,6 +365,7 @@ def __init__( self.turn_password = turn_password self.turn_ssl = turn_ssl self.turn_transport = turn_transport + self.signal_candidate = signal_candidate # private self._closed = False @@ -499,13 +502,21 @@ async def gather_candidates(self) -> None: addresses = get_host_addresses( use_ipv4=self._use_ipv4, use_ipv6=self._use_ipv6 ) - coros = [ - self.get_component_candidates(component=component, addresses=addresses) - for component in self._components - ] - for candidates in await asyncio.gather(*coros): - self._local_candidates += candidates + + async def gather_and_signal(component): + candidates = await self.get_component_candidates( + component=component, addresses=addresses + ) + for candidate in candidates: + self._local_candidates.append(candidate) + if self.signal_candidate: + await self.signal_candidate(candidate) + + coros = [gather_and_signal(component) for component in self._components] + await asyncio.gather(*coros) self._local_candidates_end = True + if self.signal_candidate: + await self.signal_candidate(None) def get_default_candidate(self, component: int) -> Optional[Candidate]: """