diff --git a/pjmedia/include/pjmedia.h b/pjmedia/include/pjmedia.h index 534eb21257..5e40458328 100644 --- a/pjmedia/include/pjmedia.h +++ b/pjmedia/include/pjmedia.h @@ -23,6 +23,7 @@ * @file pjmedia.h * @brief PJMEDIA main header file. */ +#include #include #include #include diff --git a/pjmedia/include/pjmedia/ai_port.h b/pjmedia/include/pjmedia/ai_port.h index 8001d8ce08..576b8c1a51 100644 --- a/pjmedia/include/pjmedia/ai_port.h +++ b/pjmedia/include/pjmedia/ai_port.h @@ -378,6 +378,18 @@ PJ_DECL(pj_status_t) pjmedia_ai_port_disconnect(pjmedia_ai_port *ai_port); */ PJ_DECL(void*) pjmedia_ai_port_get_user_data(pjmedia_ai_port *ai_port); +/** + * Set the user data associated with the AI port. If callbacks may + * be running concurrently, the caller should hold the port's + * grp_lock (via pjmedia_ai_port_get_port()->grp_lock) when calling + * this function. + * + * @param ai_port The AI port instance. + * @param user_data The user data pointer. + */ +PJ_DECL(void) pjmedia_ai_port_set_user_data(pjmedia_ai_port *ai_port, + void *user_data); + /** * Create an OpenAI Realtime API backend. * diff --git a/pjmedia/src/pjmedia/ai_port.c b/pjmedia/src/pjmedia/ai_port.c index e49f07305f..507f67156e 100644 --- a/pjmedia/src/pjmedia/ai_port.c +++ b/pjmedia/src/pjmedia/ai_port.c @@ -801,3 +801,10 @@ PJ_DEF(void*) pjmedia_ai_port_get_user_data(pjmedia_ai_port *ai_port) PJ_ASSERT_RETURN(ai_port, NULL); return ai_port->user_data; } + +PJ_DEF(void) pjmedia_ai_port_set_user_data(pjmedia_ai_port *ai_port, + void *user_data) +{ + PJ_ASSERT_ON_FAIL(ai_port, return); + ai_port->user_data = user_data; +} diff --git a/pjsip-apps/src/swig/pjsua2.i b/pjsip-apps/src/swig/pjsua2.i index 408fde5082..f5547fc090 100644 --- a/pjsip-apps/src/swig/pjsua2.i +++ b/pjsip-apps/src/swig/pjsua2.i @@ -179,6 +179,7 @@ using namespace pj; %feature("director") FindBuddyMatch; %feature("director") AudioMediaPlayer; %feature("director") AudioMediaPort; +%feature("director") AudioMediaAiPort; %feature("director") VideoRecorder; // PendingJob is only used on Python diff --git a/pjsip-apps/src/swig/python/test.py b/pjsip-apps/src/swig/python/test.py index da1a40c1c3..1aee9f6c0c 100644 --- a/pjsip-apps/src/swig/python/test.py +++ b/pjsip-apps/src/swig/python/test.py @@ -1,5 +1,6 @@ import pjsua2 as pj import sys +import os import time from collections import deque import struct @@ -233,6 +234,95 @@ def ua_tonegen_test(): ep.libDestroy() +# +# AI Media Port test +# +class MyAiPort(pj.AudioMediaAiPort): + """Custom AI port that collects events.""" + def __init__(self): + super().__init__() + self.events = [] + self.connected = False + self.transcripts = [] + + def onEvent(self, event): + self.events.append(event.type) + if event.type == pj.PJMEDIA_AI_EVENT_CONNECTED: + self.connected = True + write(" [AI] Connected\r\n") + elif event.type == pj.PJMEDIA_AI_EVENT_DISCONNECTED: + self.connected = False + write(" [AI] Disconnected\r\n") + elif event.type == pj.PJMEDIA_AI_EVENT_TRANSCRIPT: + self.transcripts.append(event.text) + write(" [AI] " + event.text + "\r\n") + elif event.type == pj.PJMEDIA_AI_EVENT_RESPONSE_DONE: + write(" [AI] Response done\r\n") + +def ua_ai_port_test(): + write("AI media port test.." + "\r\n") + ep_cfg = pj.EpConfig() + + ep = pj.Endpoint() + ep.libCreate() + ep.libInit(ep_cfg) + ep.libStart() + + # Create AI port with default params + ai = MyAiPort() + prm = pj.AiMediaPortParam() + assert prm.vadEnabled == False + assert prm.ptimeMsec == 20 + + ai.createPort(prm) + write(" AI port created and registered to conf bridge\r\n") + + # Route audio: mic -> AI -> speaker + ai.startTransmit(ep.audDevManager().getPlaybackDevMedia()) + ep.audDevManager().getCaptureDevMedia().startTransmit(ai) + write(" Audio routing established\r\n") + + # Connect to OpenAI if API key is available + api_key = os.environ.get("OPENAI_API_KEY", "") + if api_key: + url = ("wss://api.openai.com/v1/realtime" + "?model=gpt-4o-mini-realtime-preview") + write(" Connecting to OpenAI..\r\n") + ai.connect(url, api_key) + + # Wait for connection + for i in range(150): + ep.libHandleEvents(100) + if ai.connected: + break + + if ai.connected: + write(" Connected! Speak into your mic.\r\n") + write(" Press ENTER to stop.\r\n") + input() + + write(" Disconnecting..\r\n") + ai.disconnect() + # Let close handshake complete + for i in range(20): + ep.libHandleEvents(100) + else: + write(" Connection timeout (non-fatal)\r\n") + else: + write(" OPENAI_API_KEY not set, skipping AI connection test\r\n") + + # Disconnect routing + ai.stopTransmit(ep.audDevManager().getPlaybackDevMedia()) + ep.audDevManager().getCaptureDevMedia().stopTransmit(ai) + write(" Audio routing disconnected\r\n") + + del ai + write(" AI port destroyed\r\n") + + ep.libDestroy() + write(" AI media port test OK\r\n") + + class RandomIntVal(): def __init__(self): self.value = randint(0, 100000) @@ -304,6 +394,7 @@ def ua_pending_job_test(): ua_run_test_exception() ua_run_log_test() ua_run_ua_test() + ua_ai_port_test() ua_tonegen_test() ua_pending_job_test() sys.exit(0) diff --git a/pjsip-apps/src/swig/symbols.i b/pjsip-apps/src/swig/symbols.i index 5aa66d2a2b..5213132054 100644 --- a/pjsip-apps/src/swig/symbols.i +++ b/pjsip-apps/src/swig/symbols.i @@ -462,6 +462,17 @@ typedef enum pjmedia_vid_stream_rc_method PJMEDIA_VID_STREAM_RC_SEND_THREAD = 2 } pjmedia_vid_stream_rc_method; +typedef enum pjmedia_ai_event_type +{ + PJMEDIA_AI_EVENT_CONNECTED, + PJMEDIA_AI_EVENT_DISCONNECTED, + PJMEDIA_AI_EVENT_TRANSCRIPT, + PJMEDIA_AI_EVENT_RESPONSE_START, + PJMEDIA_AI_EVENT_RESPONSE_DONE, + PJMEDIA_AI_EVENT_SPEECH_STARTED, + PJMEDIA_AI_EVENT_SPEECH_STOPPED +} pjmedia_ai_event_type; + enum pjmedia_file_writer_option { PJMEDIA_FILE_WRITE_PCM = 0, diff --git a/pjsip-apps/src/swig/symbols.lst b/pjsip-apps/src/swig/symbols.lst index 01fcb7df05..5274efea0c 100644 --- a/pjsip-apps/src/swig/symbols.lst +++ b/pjsip-apps/src/swig/symbols.lst @@ -21,6 +21,7 @@ pjmedia/videodev.h pjmedia_vid_dev_hwnd_type pjmedia/vid_codec.h pjmedia_vid_packing pjmedia/vid_conf.h pjmedia_vid_conf_op_type pjmedia/vid_stream.h pjmedia_vid_stream_rc_method +pjmedia/ai_port.h pjmedia_ai_event_type pjmedia/wav_port.h pjmedia_file_writer_option pjmedia_file_player_option pjmedia-audiodev/audiodev.h pjmedia_aud_dev_route pjmedia_aud_dev_cap diff --git a/pjsip/include/pjsua2/media.hpp b/pjsip/include/pjsua2/media.hpp index b6749d1aa7..9d3179870d 100644 --- a/pjsip/include/pjsua2/media.hpp +++ b/pjsip/include/pjsua2/media.hpp @@ -556,6 +556,128 @@ class AudioMediaPort : public AudioMedia pjmedia_port *port; }; + +/** + * AI Media Port event data. + */ +struct AiMediaEvent +{ + /** Event type. */ + pjmedia_ai_event_type type; + + /** Status code. PJ_SUCCESS for informational events. */ + pj_status_t status; + + /** Text payload (transcript). Only valid for TRANSCRIPT events. */ + string text; +}; + + +/** + * AI Media Port creation parameters. + */ +struct AiMediaPortParam +{ + /** + * Enable voice activity detection (VAD) on the TX path. + * When enabled, silence frames are not sent to the AI service. + * + * Default: false + */ + bool vadEnabled; + + /** + * Ptime in milliseconds. + * + * Default: 20 + */ + unsigned ptimeMsec; + +public: + /** Default constructor */ + AiMediaPortParam() : vadEnabled(false), ptimeMsec(20) {} +}; + + +/** + * AI Media Port. + * + * This wraps pjmedia_ai_port as an AudioMedia object that can be + * connected to the conference bridge. It bridges audio to/from + * real-time AI services (e.g. OpenAI Realtime API) over WebSocket. + * + * Basic usage: + * 1. Create with createPort() (uses the OpenAI Realtime API backend). + * 2. Connect to the AI service with connect(). + * 3. Use startTransmit()/stopTransmit() to route audio from/to + * other conference bridge ports (e.g. AudioMedia from a call). + * 4. Disconnect with disconnect() when done. + * + * Events (connected, transcript, etc.) are delivered via the + * virtual onEvent() callback. + */ +class AudioMediaAiPort : public AudioMedia +{ +public: + /** + * Constructor. + */ + AudioMediaAiPort(); + + /** + * Destructor. Disconnects from the AI service (if connected) and + * unregisters the port from the conference bridge. + */ + virtual ~AudioMediaAiPort(); + + /** + * Create an AI media port with the OpenAI Realtime API backend and + * register it to the conference bridge. The port operates at the + * backend's native clock rate (e.g. 24kHz for OpenAI); the + * conference bridge handles resampling. + * + * @param prm Creation parameters. + */ + void createPort(const AiMediaPortParam &prm = AiMediaPortParam()) + PJSUA2_THROW(Error); + + /** + * Connect to the AI service asynchronously. The onEvent() callback + * will be called with PJMEDIA_AI_EVENT_CONNECTED or + * PJMEDIA_AI_EVENT_DISCONNECTED when complete. + * + * @param url WebSocket URL (e.g. + * "wss://api.openai.com/v1/realtime?model=...") + * @param authToken Authentication token (e.g. OpenAI API key). + */ + void connect(const string &url, const string &authToken) + PJSUA2_THROW(Error); + + /** + * Disconnect from the AI service gracefully. + */ + void disconnect() PJSUA2_THROW(Error); + + /* + * Callbacks + */ + + /** + * Called when an AI event occurs (connected, transcript, etc.). + * This may be called from the ioqueue worker thread. + * + * @param event The event data. + */ + virtual void onEvent(const AiMediaEvent &event) + { PJ_UNUSED_ARG(event); } + +private: + pj_pool_t *pool; + pjmedia_ai_port *aiPort; + pjmedia_ai_backend *backend; +}; + + /** * This structure contains additional info about AudioMediaPlayer. */ diff --git a/pjsip/src/pjsua2/media.cpp b/pjsip/src/pjsua2/media.cpp index 0710ed2678..6a6671b67f 100644 --- a/pjsip/src/pjsua2/media.cpp +++ b/pjsip/src/pjsua2/media.cpp @@ -425,6 +425,173 @@ void AudioMediaPort::createPort(const string &name, MediaFormatAudio &fmt) /////////////////////////////////////////////////////////////////////////////// +static void ai_on_event_cb(pjmedia_ai_port *ai_port, + const pjmedia_ai_event *event) +{ + pjmedia_port *port = pjmedia_ai_port_get_port(ai_port); + AudioMediaAiPort *amp; + + /* Protect against use-after-free: check the back-pointer under + * grp_lock so the callback sees NULL if destructor has run. + */ + if (!port || !port->grp_lock) + return; + + pj_grp_lock_acquire(port->grp_lock); + amp = static_cast( + pjmedia_ai_port_get_user_data(ai_port)); + if (!amp) { + pj_grp_lock_release(port->grp_lock); + return; + } + + { + AiMediaEvent ev; + ev.type = event->type; + ev.status = event->status; + if (event->text.slen > 0) + ev.text.assign(event->text.ptr, event->text.slen); + + pj_grp_lock_release(port->grp_lock); + amp->onEvent(ev); + } +} + + +AudioMediaAiPort::AudioMediaAiPort() +: pool(NULL), aiPort(NULL), backend(NULL) +{ +} + +static void ai_wrapper_on_destroy(void *member) +{ + pj_pool_t *pool = static_cast(member); + if (pool) + pj_pool_release(pool); +} + +AudioMediaAiPort::~AudioMediaAiPort() +{ + if (aiPort) { + /* Null the back-pointer under grp_lock so ai_on_event_cb + * sees NULL and bails out, preventing use-after-free. + */ + pjmedia_port *port = pjmedia_ai_port_get_port(aiPort); + if (port && port->grp_lock) { + pj_grp_lock_acquire(port->grp_lock); + pjmedia_ai_port_set_user_data(aiPort, NULL); + pj_grp_lock_release(port->grp_lock); + } + + PJSUA2_CATCH_IGNORE( disconnect() ); + } + + PJSUA2_CATCH_IGNORE( unregisterMediaPort() ); + + if (aiPort) { + pjmedia_port_destroy(pjmedia_ai_port_get_port(aiPort)); + aiPort = NULL; + } + + /* Note: pool is released by ai_wrapper_on_destroy() via + * grp_lock destroy handler, not here. This ensures the pool + * (which holds the backend struct) outlives the port. + */ +} + +void AudioMediaAiPort::createPort(const AiMediaPortParam &prm) + PJSUA2_THROW(Error) +{ + pjmedia_ai_port_param ai_param; + pjmedia_port *media_port; + pjsip_endpoint *endpt; + pj_status_t status; + + if (pool) { + PJSUA2_RAISE_ERROR(PJ_EEXISTS); + } + + pool = pjsua_pool_create("aiamp%p", 4096, 4096); + if (!pool) { + PJSUA2_RAISE_ERROR(PJ_ENOMEM); + } + + /* Use pjsua's SIP endpoint ioqueue and timer heap so that + * WebSocket I/O is polled by pjsua's existing worker threads. + */ + endpt = pjsua_get_pjsip_endpt(); + + /* Create OpenAI backend */ + status = pjmedia_ai_openai_backend_create(pool, &backend); + if (status != PJ_SUCCESS) { + pj_pool_release(pool); + pool = NULL; + PJSUA2_RAISE_ERROR(status); + } + + /* Create AI port */ + pjmedia_ai_port_param_default(&ai_param); + ai_param.ioqueue = pjsip_endpt_get_ioqueue(endpt); + ai_param.timer_heap = pjsip_endpt_get_timer_heap(endpt); + ai_param.cb.on_event = &ai_on_event_cb; + ai_param.user_data = this; + ai_param.backend = backend; + ai_param.vad_enabled = prm.vadEnabled ? PJ_TRUE : PJ_FALSE; + ai_param.ptime_msec = prm.ptimeMsec; + + status = pjmedia_ai_port_create(pool, &ai_param, &aiPort); + if (status != PJ_SUCCESS) { + pj_pool_release(pool); + pool = NULL; + PJSUA2_RAISE_ERROR(status); + } + + media_port = pjmedia_ai_port_get_port(aiPort); + + /* Register a grp_lock destroy handler so the wrapper pool (which + * holds the backend struct) is released only after the port's + * grp_lock ref-count reaches zero and all callbacks have completed. + */ + pj_grp_lock_add_handler(media_port->grp_lock, pool, + pool, &ai_wrapper_on_destroy); + + registerMediaPort2(media_port, pool); +} + +void AudioMediaAiPort::connect(const string &url, const string &authToken) + PJSUA2_THROW(Error) +{ + pj_str_t url_str, token_str; + pj_status_t status; + + if (!aiPort) { + PJSUA2_RAISE_ERROR(PJ_EINVALIDOP); + } + + url_str = pj_str((char*)url.c_str()); + token_str = pj_str((char*)authToken.c_str()); + + status = pjmedia_ai_port_connect(aiPort, &url_str, &token_str); + PJSUA2_CHECK_RAISE_ERROR(status); +} + +void AudioMediaAiPort::disconnect() PJSUA2_THROW(Error) +{ + pj_status_t status; + + if (!aiPort) { + return; + } + + status = pjmedia_ai_port_disconnect(aiPort); + /* Ignore PJ_EINVALIDOP (not connected) */ + if (status != PJ_SUCCESS && status != PJ_EINVALIDOP) { + PJSUA2_CHECK_RAISE_ERROR(status); + } +} + +/////////////////////////////////////////////////////////////////////////////// + AudioMediaPlayer::AudioMediaPlayer() : playerId(PJSUA_INVALID_ID) {