From cd4a893c65605300012636a4fa52e5d221e3c6e9 Mon Sep 17 00:00:00 2001 From: getchannel <78183014+getchannel@users.noreply.github.com> Date: Fri, 9 May 2025 10:50:27 -0400 Subject: [PATCH 01/26] add FileAPI to gemini.py --- .../services/gemini_multimodal_live/gemini.py | 102 +++++++++++++++++- 1 file changed, 101 insertions(+), 1 deletion(-) diff --git a/src/pipecat/services/gemini_multimodal_live/gemini.py b/src/pipecat/services/gemini_multimodal_live/gemini.py index a89db20407..6bc9f0616b 100644 --- a/src/pipecat/services/gemini_multimodal_live/gemini.py +++ b/src/pipecat/services/gemini_multimodal_live/gemini.py @@ -63,6 +63,7 @@ from . import events from .audio_transcriber import AudioTranscriber +from .file_api import GeminiFileAPI try: import websockets @@ -187,6 +188,29 @@ def extract_system_instructions(self): system_instruction += str(content) return system_instruction + def add_file_reference(self, file_uri: str, mime_type: str, text: Optional[str] = None): + """Add a file reference to the context. + + This adds a user message with a file reference that will be sent during context initialization. + + Args: + file_uri: URI of the uploaded file + mime_type: MIME type of the file + text: Optional text prompt to accompany the file + """ + # Create parts list with file reference + parts = [] + if text: + parts.append({"type": "text", "text": text}) + + # Add file reference part + parts.append({"type": "file_data", "file_data": {"mime_type": mime_type, "file_uri": file_uri}}) + + # Add to messages + message = {"role": "user", "content": parts} + self.messages.append(message) + logger.info(f"Added file reference to context: {file_uri}") + def get_messages_for_initializing_history(self): messages = [] for item in self.messages: @@ -206,6 +230,14 @@ def get_messages_for_initializing_history(self): for part in content: if part.get("type") == "text": parts.append({"text": part.get("text")}) + elif part.get("type") == "file_data": + file_data = part.get("file_data", {}) + parts.append({ + "fileData": { + "mimeType": file_data.get("mime_type"), + "fileUri": file_data.get("file_uri") + } + }) else: logger.warning(f"Unsupported content type: {str(part)[:80]}") else: @@ -305,6 +337,62 @@ class GeminiMultimodalLiveLLMService(LLMService): # Overriding the default adapter to use the Gemini one. adapter_class = GeminiLLMAdapter + """Gemini Live LLM Service with multimodal capabilities including File API support. + + This service implements the Gemini Multimodal Live API with support for: + - Audio input and output + - Image/video input + - File API (upload, reference, and management) + - Tools/function calling + + Example usage of File API: + ```python + # Initialize the service + gemini_service = GeminiMultimodalLiveLLMService(api_key="YOUR_API_KEY") + + # Upload a file from the client + file_path = "/path/to/user_uploaded_file.pdf" + file_info = await gemini_service.file_api.upload_file(file_path) + + # Get file URI and mime type from response + file_uri = file_info["file"]["uri"] + mime_type = "application/pdf" # Set appropriate MIME type + + # When starting a new bot session: + # 1. Initialize the context + context = GeminiMultimodalLiveContext() + + # 2. Add file reference to context BEFORE starting the conversation + context.add_file_reference( + file_uri=file_uri, + mime_type=mime_type, + text="Please analyze this document" + ) + + # 3. Now set the context to start the conversation with file reference included + await gemini_service.set_context(context) + + # Gemini now has access to the file reference in its context window + # The file URI remains valid for 48 hours before Google deletes it + + # Optional: List all files for this user + files = await gemini_service.file_api.list_files() + + # Optional: Get metadata for a specific file + file_metadata = await gemini_service.file_api.get_file(file_info["file"]["name"]) + + # Optional: Delete a file when no longer needed + await gemini_service.file_api.delete_file(file_info["file"]["name"]) + ``` + + Notes: + - Files are stored for 48 hours on Google's servers + - Maximum file size is 2GB + - Total storage per project is 20GB + - File references should be added to the context BEFORE starting the conversation + - The same file reference can be reused for multiple sessions within the 48-hour window + """ + def __init__( self, *, @@ -319,6 +407,7 @@ def __init__( transcribe_user_audio: bool = False, params: InputParams = InputParams(), inference_on_context_initialization: bool = True, + file_api_base_url: str = "https://generativelanguage.googleapis.com/v1beta/files", **kwargs, ): super().__init__(base_url=base_url, **kwargs) @@ -378,6 +467,9 @@ def __init__( else {}, "extra": params.extra if isinstance(params.extra, dict) else {}, } + + # Initialize the File API client + self.file_api = GeminiFileAPI(api_key=api_key, base_url=file_api_base_url) def can_generate_metrics(self) -> bool: return True @@ -776,7 +868,7 @@ async def _create_initial_response(self): self._needs_turn_complete_message = True async def _create_single_response(self, messages_list): - # refactor to combine this logic with same logic in GeminiMultimodalLiveContext + # Refactor to combine this logic with same logic in GeminiMultimodalLiveContext messages = [] for item in messages_list: role = item.get("role") @@ -795,6 +887,14 @@ async def _create_single_response(self, messages_list): for part in content: if part.get("type") == "text": parts.append({"text": part.get("text")}) + elif part.get("type") == "file_data": + file_data = part.get("file_data", {}) + parts.append({ + "fileData": { + "mimeType": file_data.get("mime_type"), + "fileUri": file_data.get("file_uri") + } + }) else: logger.warning(f"Unsupported content type: {str(part)[:80]}") else: From 949971dea9e5db14102312fe40410450ddafc8d8 Mon Sep 17 00:00:00 2001 From: getchannel <78183014+getchannel@users.noreply.github.com> Date: Fri, 9 May 2025 10:51:24 -0400 Subject: [PATCH 02/26] Create file_api --- .../services/gemini_multimodal_live/file_api | 179 ++++++++++++++++++ 1 file changed, 179 insertions(+) create mode 100644 src/pipecat/services/gemini_multimodal_live/file_api diff --git a/src/pipecat/services/gemini_multimodal_live/file_api b/src/pipecat/services/gemini_multimodal_live/file_api new file mode 100644 index 0000000000..10c8a44db4 --- /dev/null +++ b/src/pipecat/services/gemini_multimodal_live/file_api @@ -0,0 +1,179 @@ +import aiohttp +import mimetypes +from typing import Dict, Any, Optional + +from loguru import logger + +class GeminiFileAPI: + """Client for the Gemini File API. + + This class provides methods for uploading, fetching, listing, and deleting files + through Google's Gemini File API. + + Files uploaded through this API remain available for 48 hours and can be referenced + in calls to the Gemini generative models. Maximum file size is 2GB, with total + project storage limited to 20GB. + """ + + def __init__(self, api_key: str, base_url: str = "https://generativelanguage.googleapis.com/v1beta/files"): + """Initialize the Gemini File API client. + + Args: + api_key: Google AI API key + base_url: Base URL for the Gemini File API (default is the v1beta endpoint) + """ + self.api_key = api_key + self.base_url = base_url + + async def upload_file(self, file_path: str, display_name: Optional[str] = None) -> Dict[str, Any]: + """Upload a file to the Gemini File API. + + Args: + file_path: Path to the file to upload + display_name: Optional display name for the file + + Returns: + File metadata including uri, name, and display_name + """ + logger.info(f"Uploading file: {file_path}") + + async with aiohttp.ClientSession() as session: + # Determine the file's MIME type + mime_type, _ = mimetypes.guess_type(file_path) + if not mime_type: + mime_type = "application/octet-stream" + + # Read the file + with open(file_path, "rb") as f: + file_data = f.read() + + # First request to initiate the upload + headers = { + "X-Goog-Upload-Protocol": "resumable", + "X-Goog-Upload-Command": "start", + "X-Goog-Upload-Header-Content-Length": str(len(file_data)), + "X-Goog-Upload-Header-Content-Type": mime_type, + "Content-Type": "application/json" + } + + # Create the metadata payload + metadata = {} + if display_name: + metadata = {"file": {"display_name": display_name}} + + # Initial request to get the upload URL + async with session.post( + f"{self.base_url}?key={self.api_key}", + headers=headers, + json=metadata + ) as response: + if response.status != 200: + error_text = await response.text() + logger.error(f"Error initiating file upload: {error_text}") + raise Exception(f"Failed to initiate upload: {response.status}") + + # Get the upload URL from the response header + upload_url = response.headers.get("X-Goog-Upload-URL") + if not upload_url: + raise Exception("No upload URL in response") + + # Upload the actual file + headers = { + "Content-Length": str(len(file_data)), + "X-Goog-Upload-Offset": "0", + "X-Goog-Upload-Command": "upload, finalize" + } + + async with session.post( + upload_url, + headers=headers, + data=file_data + ) as response: + if response.status != 200: + error_text = await response.text() + logger.error(f"Error uploading file: {error_text}") + raise Exception(f"Failed to upload file: {response.status}") + + file_info = await response.json() + logger.info(f"File uploaded successfully: {file_info.get('file', {}).get('name')}") + return file_info + + async def get_file(self, name: str) -> Dict[str, Any]: + """Get metadata for a file. + + Args: + name: File name (or full path) + + Returns: + File metadata + """ + # Extract just the name part if a full path is provided + if '/' in name: + name = name.split('/')[-1] + + async with aiohttp.ClientSession() as session: + async with session.get( + f"{self.base_url}/{name}?key={self.api_key}" + ) as response: + if response.status != 200: + error_text = await response.text() + logger.error(f"Error getting file metadata: {error_text}") + raise Exception(f"Failed to get file metadata: {response.status}") + + file_info = await response.json() + return file_info + + async def list_files(self, page_size: int = 10, page_token: Optional[str] = None) -> Dict[str, Any]: + """List uploaded files. + + Args: + page_size: Number of files to return per page + page_token: Token for pagination + + Returns: + List of files and next page token if available + """ + params = { + "key": self.api_key, + "pageSize": page_size + } + + if page_token: + params["pageToken"] = page_token + + async with aiohttp.ClientSession() as session: + async with session.get( + self.base_url, + params=params + ) as response: + if response.status != 200: + error_text = await response.text() + logger.error(f"Error listing files: {error_text}") + raise Exception(f"Failed to list files: {response.status}") + + result = await response.json() + return result + + async def delete_file(self, name: str) -> bool: + """Delete a file. + + Args: + name: File name (or full path) + + Returns: + True if deleted successfully + """ + # Extract just the name part if a full path is provided + if '/' in name: + name = name.split('/')[-1] + + async with aiohttp.ClientSession() as session: + async with session.delete( + f"{self.base_url}/{name}?key={self.api_key}" + ) as response: + if response.status != 200: + error_text = await response.text() + logger.error(f"Error deleting file: {error_text}") + raise Exception(f"Failed to delete file: {response.status}") + + return True From 59c7744590a3e4c5e6bcab6655f98127c3d77520 Mon Sep 17 00:00:00 2001 From: getchannel <78183014+getchannel@users.noreply.github.com> Date: Fri, 9 May 2025 10:52:04 -0400 Subject: [PATCH 03/26] add FileData class events.py --- src/pipecat/services/gemini_multimodal_live/events.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/pipecat/services/gemini_multimodal_live/events.py b/src/pipecat/services/gemini_multimodal_live/events.py index f621b41fd6..df1579f738 100644 --- a/src/pipecat/services/gemini_multimodal_live/events.py +++ b/src/pipecat/services/gemini_multimodal_live/events.py @@ -29,6 +29,16 @@ class MediaChunk(BaseModel): class ContentPart(BaseModel): text: Optional[str] = Field(default=None, validate_default=False) inlineData: Optional[MediaChunk] = Field(default=None, validate_default=False) + fileData: Optional['FileData'] = Field(default=None, validate_default=False) + + +class FileData(BaseModel): + """Represents a file reference in the Gemini File API.""" + mimeType: str + fileUri: str + + +ContentPart.model_rebuild() # Rebuild model to resolve forward reference class Turn(BaseModel): From d86502e79ac6b0c515fc4225fc96ee45770c15b6 Mon Sep 17 00:00:00 2001 From: getchannel <78183014+getchannel@users.noreply.github.com> Date: Fri, 9 May 2025 10:53:31 -0400 Subject: [PATCH 04/26] add file_api __init__.py --- src/pipecat/services/gemini_multimodal_live/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/pipecat/services/gemini_multimodal_live/__init__.py b/src/pipecat/services/gemini_multimodal_live/__init__.py index 61bdf58dd7..60a226e64f 100644 --- a/src/pipecat/services/gemini_multimodal_live/__init__.py +++ b/src/pipecat/services/gemini_multimodal_live/__init__.py @@ -1 +1,2 @@ from .gemini import GeminiMultimodalLiveLLMService +from .file_api import GeminiFileAPI From e27da96cdce4123d7117d92116945a8cf013595d Mon Sep 17 00:00:00 2001 From: getchannel <78183014+getchannel@users.noreply.github.com> Date: Tue, 13 May 2025 22:01:02 -0400 Subject: [PATCH 05/26] Rename file_api to file_api.py added proper .py to file name. --- .../services/gemini_multimodal_live/{file_api => file_api.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename src/pipecat/services/gemini_multimodal_live/{file_api => file_api.py} (100%) diff --git a/src/pipecat/services/gemini_multimodal_live/file_api b/src/pipecat/services/gemini_multimodal_live/file_api.py similarity index 100% rename from src/pipecat/services/gemini_multimodal_live/file_api rename to src/pipecat/services/gemini_multimodal_live/file_api.py From 40c7e3c52cbce7731edef2ccd238e886eeaa01a7 Mon Sep 17 00:00:00 2001 From: getchannel <78183014+getchannel@users.noreply.github.com> Date: Fri, 30 May 2025 12:19:40 -0400 Subject: [PATCH 06/26] Update gemini.py --- .../services/gemini_multimodal_live/gemini.py | 56 ------------------- 1 file changed, 56 deletions(-) diff --git a/src/pipecat/services/gemini_multimodal_live/gemini.py b/src/pipecat/services/gemini_multimodal_live/gemini.py index 6bc9f0616b..9e580cbfb2 100644 --- a/src/pipecat/services/gemini_multimodal_live/gemini.py +++ b/src/pipecat/services/gemini_multimodal_live/gemini.py @@ -336,63 +336,7 @@ class InputParams(BaseModel): class GeminiMultimodalLiveLLMService(LLMService): # Overriding the default adapter to use the Gemini one. adapter_class = GeminiLLMAdapter - - """Gemini Live LLM Service with multimodal capabilities including File API support. - - This service implements the Gemini Multimodal Live API with support for: - - Audio input and output - - Image/video input - - File API (upload, reference, and management) - - Tools/function calling - - Example usage of File API: - ```python - # Initialize the service - gemini_service = GeminiMultimodalLiveLLMService(api_key="YOUR_API_KEY") - - # Upload a file from the client - file_path = "/path/to/user_uploaded_file.pdf" - file_info = await gemini_service.file_api.upload_file(file_path) - - # Get file URI and mime type from response - file_uri = file_info["file"]["uri"] - mime_type = "application/pdf" # Set appropriate MIME type - - # When starting a new bot session: - # 1. Initialize the context - context = GeminiMultimodalLiveContext() - - # 2. Add file reference to context BEFORE starting the conversation - context.add_file_reference( - file_uri=file_uri, - mime_type=mime_type, - text="Please analyze this document" - ) - # 3. Now set the context to start the conversation with file reference included - await gemini_service.set_context(context) - - # Gemini now has access to the file reference in its context window - # The file URI remains valid for 48 hours before Google deletes it - - # Optional: List all files for this user - files = await gemini_service.file_api.list_files() - - # Optional: Get metadata for a specific file - file_metadata = await gemini_service.file_api.get_file(file_info["file"]["name"]) - - # Optional: Delete a file when no longer needed - await gemini_service.file_api.delete_file(file_info["file"]["name"]) - ``` - - Notes: - - Files are stored for 48 hours on Google's servers - - Maximum file size is 2GB - - Total storage per project is 20GB - - File references should be added to the context BEFORE starting the conversation - - The same file reference can be reused for multiple sessions within the 48-hour window - """ - def __init__( self, *, From f2d5b9ad697aa7c5c0476a6ab7844c805c95613b Mon Sep 17 00:00:00 2001 From: getchannel <78183014+getchannel@users.noreply.github.com> Date: Fri, 30 May 2025 13:04:52 -0400 Subject: [PATCH 07/26] Create 26f-gemini-multimodal-live-files-api.py This is an example to test usage of the Files API integration. Specifically with the Gemini Multimodal Live Service. --- .../26f-gemini-multimodal-live-files-api.py | 210 ++++++++++++++++++ 1 file changed, 210 insertions(+) create mode 100644 examples/foundational/26f-gemini-multimodal-live-files-api.py diff --git a/examples/foundational/26f-gemini-multimodal-live-files-api.py b/examples/foundational/26f-gemini-multimodal-live-files-api.py new file mode 100644 index 0000000000..413830cf9b --- /dev/null +++ b/examples/foundational/26f-gemini-multimodal-live-files-api.py @@ -0,0 +1,210 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import argparse +import os +import tempfile + +from dotenv import load_dotenv +from loguru import logger + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.audio.vad.vad_analyzer import VADParams +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.services.gemini_multimodal_live.gemini import ( + GeminiMultimodalLiveLLMService, + GeminiMultimodalLiveContext, +) +from pipecat.transports.base_transport import TransportParams +from pipecat.transports.network.small_webrtc import SmallWebRTCTransport +from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection + +load_dotenv(override=True) + + +async def create_sample_file(): + """Create a sample text file for testing the File API.""" + content = """# Sample Document for Gemini File API Test + +This is a test document to demonstrate the Gemini File API functionality. + +## Key Information: +- This document was created for testing purposes +- It contains information about AI assistants +- The document should be analyzed by Gemini +- The secret phrase for the test is "Pineapple Pizza" + +## AI Assistant Capabilities: +1. Natural language processing +2. File analysis and understanding +3. Context-aware conversations +4. Multi-modal interactions + +## Conclusion: +This document serves as a test case for the Gemini File API integration with Pipecat. +The AI should be able to reference and discuss the contents of this file. +""" + + # Create a temporary file + with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f: + f.write(content) + return f.name + + +async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespace): + logger.info(f"Starting File API bot") + + # Create a sample file to upload + sample_file_path = await create_sample_file() + logger.info(f"Created sample file: {sample_file_path}") + + # Initialize the SmallWebRTCTransport with the connection + transport = SmallWebRTCTransport( + webrtc_connection=webrtc_connection, + params=TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + video_in_enabled=False, + vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)), + ), + ) + + system_instruction = """ + You are a helpful AI assistant with access to a document that has been uploaded for analysis. + + The document contains test information including a secret phrase. You should be able to: + - Reference and discuss the contents of the uploaded document + - Answer questions about what's in the document + - Use the information from the document in our conversation + + Your output will be converted to audio so don't include special characters in your answers. + Be friendly and demonstrate your ability to work with the uploaded file. + """ + + # Initialize Gemini service with File API support + llm = GeminiMultimodalLiveLLMService( + api_key=os.getenv("GOOGLE_API_KEY"), + system_instruction=system_instruction, + voice_id="Charon", # Aoede, Charon, Fenrir, Kore, Puck + transcribe_user_audio=True, + ) + + # Upload the sample file to Gemini File API + logger.info("Uploading file to Gemini File API...") + file_info = None + try: + file_info = await llm.file_api.upload_file( + sample_file_path, + display_name="Sample Test Document" + ) + logger.info(f"File uploaded successfully: {file_info['file']['name']}") + + # Get file URI and mime type + file_uri = file_info["file"]["uri"] + mime_type = "text/plain" + + # Create context with file reference + context = OpenAILLMContext( + [ + { + "role": "user", + "content": [ + { + "type": "text", + "text": "Greet the user and let them know you have access to a document they can ask you about. Mention that you can discuss its contents." + }, + { + "type": "file_data", + "file_data": { + "mime_type": mime_type, + "file_uri": file_uri + } + } + ] + } + ] + ) + + logger.info("File reference added to conversation context") + + except Exception as e: + logger.error(f"Error uploading file: {e}") + # Continue with a basic context if file upload fails + context = OpenAILLMContext( + [ + { + "role": "user", + "content": "Greet the user and explain that there was an issue with file upload, but you're ready to help with other tasks." + } + ] + ) + + # Create context aggregator + context_aggregator = llm.create_context_aggregator(context) + + # Build the pipeline + pipeline = Pipeline([ + transport.input(), + context_aggregator.user(), + llm, + transport.output(), + context_aggregator.assistant(), + ]) + + # Configure the pipeline task + task = PipelineTask( + pipeline, + params=PipelineParams( + allow_interruptions=True, + enable_metrics=True, + enable_usage_metrics=True, + ), + ) + + # Handle client connection event + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected") + # Kick off the conversation using standard context frame + await task.queue_frames([context_aggregator.user().get_context_frame()]) + + # Handle client disconnection events + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + + @transport.event_handler("on_client_closed") + async def on_client_closed(transport, client): + logger.info(f"Client closed connection") + await task.cancel() + + # Run the pipeline + runner = PipelineRunner(handle_sigint=False) + await runner.run(task) + + # Clean up: delete the uploaded file and temporary file + if file_info: + try: + await llm.file_api.delete_file(file_info["file"]["name"]) + logger.info("Cleaned up uploaded file from Gemini") + except Exception as e: + logger.error(f"Error cleaning up file: {e}") + + # Remove temporary file + try: + os.unlink(sample_file_path) + logger.info("Cleaned up temporary file") + except Exception as e: + logger.error(f"Error removing temporary file: {e}") + + +if __name__ == "__main__": + from run import main + + main() From 7263d11ee41ac379e27c6299efefe972ffa8d26f Mon Sep 17 00:00:00 2001 From: getchannel <78183014+getchannel@users.noreply.github.com> Date: Fri, 30 May 2025 13:41:55 -0400 Subject: [PATCH 08/26] update correct upload endpoint file_api.py --- .../gemini_multimodal_live/file_api.py | 44 ++++++++++++------- 1 file changed, 28 insertions(+), 16 deletions(-) diff --git a/src/pipecat/services/gemini_multimodal_live/file_api.py b/src/pipecat/services/gemini_multimodal_live/file_api.py index 10c8a44db4..39b7d9df93 100644 --- a/src/pipecat/services/gemini_multimodal_live/file_api.py +++ b/src/pipecat/services/gemini_multimodal_live/file_api.py @@ -1,3 +1,9 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + import aiohttp import mimetypes from typing import Dict, Any, Optional @@ -24,9 +30,11 @@ def __init__(self, api_key: str, base_url: str = "https://generativelanguage.goo """ self.api_key = api_key self.base_url = base_url + # Upload URL uses the /upload/ path + self.upload_base_url = "https://generativelanguage.googleapis.com/upload/v1beta/files" async def upload_file(self, file_path: str, display_name: Optional[str] = None) -> Dict[str, Any]: - """Upload a file to the Gemini File API. + """Upload a file to the Gemini File API using the correct resumable upload protocol. Args: file_path: Path to the file to upload @@ -47,7 +55,12 @@ async def upload_file(self, file_path: str, display_name: Optional[str] = None) with open(file_path, "rb") as f: file_data = f.read() - # First request to initiate the upload + # Create the metadata payload + metadata = {} + if display_name: + metadata = {"file": {"display_name": display_name}} + + # Step 1: Initial resumable request to get upload URL headers = { "X-Goog-Upload-Protocol": "resumable", "X-Goog-Upload-Command": "start", @@ -56,43 +69,42 @@ async def upload_file(self, file_path: str, display_name: Optional[str] = None) "Content-Type": "application/json" } - # Create the metadata payload - metadata = {} - if display_name: - metadata = {"file": {"display_name": display_name}} - - # Initial request to get the upload URL + logger.debug(f"Step 1: Getting upload URL from {self.upload_base_url}") async with session.post( - f"{self.base_url}?key={self.api_key}", + f"{self.upload_base_url}?key={self.api_key}", headers=headers, json=metadata ) as response: if response.status != 200: error_text = await response.text() logger.error(f"Error initiating file upload: {error_text}") - raise Exception(f"Failed to initiate upload: {response.status}") + raise Exception(f"Failed to initiate upload: {response.status} - {error_text}") # Get the upload URL from the response header upload_url = response.headers.get("X-Goog-Upload-URL") if not upload_url: - raise Exception("No upload URL in response") + logger.error(f"Response headers: {dict(response.headers)}") + raise Exception("No upload URL in response headers") + + logger.debug(f"Got upload URL: {upload_url}") - # Upload the actual file - headers = { + # Step 2: Upload the actual file data + upload_headers = { "Content-Length": str(len(file_data)), "X-Goog-Upload-Offset": "0", "X-Goog-Upload-Command": "upload, finalize" } + logger.debug(f"Step 2: Uploading file data to {upload_url}") async with session.post( upload_url, - headers=headers, + headers=upload_headers, data=file_data ) as response: if response.status != 200: error_text = await response.text() - logger.error(f"Error uploading file: {error_text}") - raise Exception(f"Failed to upload file: {response.status}") + logger.error(f"Error uploading file data: {error_text}") + raise Exception(f"Failed to upload file: {response.status} - {error_text}") file_info = await response.json() logger.info(f"File uploaded successfully: {file_info.get('file', {}).get('name')}") From f53f5445ba5838410e4a780a99be6d711abe4ed2 Mon Sep 17 00:00:00 2001 From: getchannel <78183014+getchannel@users.noreply.github.com> Date: Fri, 30 May 2025 17:36:36 -0400 Subject: [PATCH 09/26] Create 26g-gemini-multimodal-live-groundingMetadata.py --- ...emini-multimodal-live-groundingMetadata.py | 164 ++++++++++++++++++ 1 file changed, 164 insertions(+) create mode 100644 examples/foundational/26g-gemini-multimodal-live-groundingMetadata.py diff --git a/examples/foundational/26g-gemini-multimodal-live-groundingMetadata.py b/examples/foundational/26g-gemini-multimodal-live-groundingMetadata.py new file mode 100644 index 0000000000..0c6d35ff0a --- /dev/null +++ b/examples/foundational/26g-gemini-multimodal-live-groundingMetadata.py @@ -0,0 +1,164 @@ + +import argparse +import os + +from dotenv import load_dotenv +from loguru import logger + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.audio.vad.vad_analyzer import VADParams +from pipecat.adapters.schemas.tools_schema import AdapterType, ToolsSchema +from pipecat.frames.frames import Frame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.services.gemini_multimodal_live.gemini import GeminiMultimodalLiveLLMService +from pipecat.services.google.frames import LLMSearchResponseFrame +from pipecat.transports.base_transport import TransportParams +from pipecat.transports.network.small_webrtc import SmallWebRTCTransport +from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection + +load_dotenv(override=True) + +SYSTEM_INSTRUCTION = """ +You are a helpful AI assistant that actively uses Google Search to provide up-to-date, accurate information. + +IMPORTANT: For ANY question about current events, news, recent developments, real-time information, or anything that might have changed recently, you MUST use the google_search tool to get the latest information. + +You should use Google Search for: +- Current news and events +- Recent developments in any field +- Today's weather, stock prices, or other real-time data +- Any question that starts with "what's happening", "latest", "recent", "current", "today", etc. +- When you're not certain about recent information + +Always be proactive about using search when the user asks about anything that could benefit from real-time information. + +Your output will be converted to audio so don't include special characters in your answers. + +Respond to what the user said in a creative and helpful way, always using search for current information. +""" + + +class GroundingMetadataProcessor(FrameProcessor): + """Processor to capture and display grounding metadata from Gemini Live API.""" + + def __init__(self): + super().__init__() + self._grounding_count = 0 + + async def process_frame(self, frame: Frame, direction: FrameDirection): + # Always call super().process_frame first + await super().process_frame(frame, direction) + + # Only log important frame types, not every audio frame + if hasattr(frame, '__class__'): + frame_type = frame.__class__.__name__ + if frame_type in ['LLMTextFrame', 'TTSTextFrame', 'LLMFullResponseStartFrame', 'LLMFullResponseEndFrame']: + logger.debug(f"GroundingProcessor received: {frame_type}") + + if isinstance(frame, LLMSearchResponseFrame): + self._grounding_count += 1 + logger.info(f"\nšŸ” GROUNDING METADATA RECEIVED #{self._grounding_count}") + logger.info(f"šŸ“ Search Result Text: {frame.search_result[:200]}...") + + if frame.rendered_content: + logger.info(f"šŸ”— Rendered Content: {frame.rendered_content}") + + if frame.origins: + logger.info(f"šŸ“ Number of Origins: {len(frame.origins)}") + for i, origin in enumerate(frame.origins): + logger.info(f" Origin {i+1}: {origin.site_title} - {origin.site_uri}") + if origin.results: + logger.info(f" Results: {len(origin.results)} items") + + # Always push the frame downstream + await self.push_frame(frame, direction) + + +async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespace): + logger.info(f"Starting Gemini Live Grounding Test Bot") + + # Initialize the SmallWebRTCTransport with the connection + transport = SmallWebRTCTransport( + webrtc_connection=webrtc_connection, + params=TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + video_in_enabled=False, + vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)), + ), + ) + + # Create tools using ToolsSchema with custom tools for Gemini + tools = ToolsSchema( + standard_tools=[], # No standard function declarations needed + custom_tools={ + AdapterType.GEMINI: [ + {"google_search": {}}, + {"code_execution": {}} + ] + } + ) + + llm = GeminiMultimodalLiveLLMService( + api_key=os.getenv("GOOGLE_API_KEY"), + system_instruction=SYSTEM_INSTRUCTION, + voice_id="Charon", # Aoede, Charon, Fenrir, Kore, Puck + transcribe_user_audio=True, + tools=tools, + ) + + # Create a processor to capture grounding metadata + grounding_processor = GroundingMetadataProcessor() + + messages = [ + { + "role": "user", + "content": 'Please introduce yourself and let me know that you can help with current information by searching the web. Ask me what current information I\'d like to know about.', + }, + ] + + # Set up conversation context and management + context = OpenAILLMContext(messages) + context_aggregator = llm.create_context_aggregator(context) + + pipeline = Pipeline( + [ + transport.input(), + context_aggregator.user(), + llm, + grounding_processor, # Add our grounding processor here + transport.output(), + context_aggregator.assistant(), + ] + ) + + task = PipelineTask(pipeline) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected") + # Kick off the conversation. + await task.queue_frames([context_aggregator.user().get_context_frame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + + @transport.event_handler("on_client_closed") + async def on_client_closed(transport, client): + logger.info(f"Client closed connection") + await task.cancel() + + runner = PipelineRunner(handle_sigint=False) + + await runner.run(task) + + +if __name__ == "__main__": + from run import main + + main() From 43c6f1f5cdf02a53921b878a974591c280c2b228 Mon Sep 17 00:00:00 2001 From: getchannel <78183014+getchannel@users.noreply.github.com> Date: Fri, 30 May 2025 18:01:15 -0400 Subject: [PATCH 10/26] Add groundingMetadata and logging gemini.py --- .../services/gemini_multimodal_live/gemini.py | 100 ++++++++++++++++++ 1 file changed, 100 insertions(+) diff --git a/src/pipecat/services/gemini_multimodal_live/gemini.py b/src/pipecat/services/gemini_multimodal_live/gemini.py index 9e580cbfb2..a9198893f2 100644 --- a/src/pipecat/services/gemini_multimodal_live/gemini.py +++ b/src/pipecat/services/gemini_multimodal_live/gemini.py @@ -53,6 +53,7 @@ OpenAILLMContextFrame, ) from pipecat.processors.frame_processor import FrameDirection +from pipecat.services.google.frames import LLMSearchOrigin, LLMSearchResponseFrame, LLMSearchResult from pipecat.services.llm_service import LLMService from pipecat.services.openai.llm import ( OpenAIAssistantContextAggregator, @@ -415,6 +416,10 @@ def __init__( # Initialize the File API client self.file_api = GeminiFileAPI(api_key=api_key, base_url=file_api_base_url) + # Grounding metadata tracking + self._search_result_buffer = "" + self._accumulated_grounding_metadata = None + def can_generate_metrics(self) -> bool: return True @@ -741,6 +746,8 @@ async def _receive_task_handler(self): await self._handle_evt_turn_complete(evt) elif evt.serverContent and evt.serverContent.outputTranscription: await self._handle_evt_output_transcription(evt) + elif evt.serverContent and evt.serverContent.groundingMetadata: + await self._handle_evt_grounding_metadata(evt) elif evt.toolCall: await self._handle_evt_tool_call(evt) elif False: # !!! todo: error events? @@ -748,6 +755,8 @@ async def _receive_task_handler(self): # errors are fatal, so exit the receive loop return else: + # Log unhandled events that might contain grounding metadata + logger.warning(f"Received unhandled server event type: {evt}") pass async def _transcribe_audio_handler(self): @@ -902,8 +911,14 @@ async def _handle_evt_model_turn(self, evt): await self.push_frame(LLMFullResponseStartFrame()) self._bot_text_buffer += text + self._search_result_buffer += text # Also accumulate for grounding await self.push_frame(LLMTextFrame(text=text)) + # Check for grounding metadata in server content + if evt.serverContent and evt.serverContent.groundingMetadata: + self._accumulated_grounding_metadata = evt.serverContent.groundingMetadata + logger.debug("Grounding metadata detected in model turn.") + inline_data = part.inlineData if not inline_data: return @@ -947,6 +962,17 @@ async def _handle_evt_turn_complete(self, evt): text = self._bot_text_buffer self._bot_text_buffer = "" + # Process grounding metadata if we have accumulated any + if self._accumulated_grounding_metadata: + logger.debug("Processing grounding metadata...") + await self._process_grounding_metadata(self._accumulated_grounding_metadata, self._search_result_buffer) + else: + logger.debug("No grounding metadata to process") + + # Reset grounding tracking for next response + self._search_result_buffer = "" + self._accumulated_grounding_metadata = None + # Only push the TTSStoppedFrame the bot is outputting audio # when text is found, modalities is set to TEXT and no audio # is produced. @@ -967,9 +993,83 @@ async def _handle_evt_output_transcription(self, evt): if not text: return + # Accumulate text for grounding as well + self._search_result_buffer += text + + # Check for grounding metadata in server content + if evt.serverContent and evt.serverContent.groundingMetadata: + self._accumulated_grounding_metadata = evt.serverContent.groundingMetadata + await self.push_frame(LLMTextFrame(text=text)) await self.push_frame(TTSTextFrame(text=text)) + async def _handle_evt_grounding_metadata(self, evt): + """Handle dedicated grounding metadata events.""" + logger.debug("Received dedicated grounding metadata event.") + + if evt.serverContent and evt.serverContent.groundingMetadata: + grounding_metadata = evt.serverContent.groundingMetadata + logger.debug(f"Grounding data: {len(grounding_metadata.groundingChunks or [])} chunks, {len(grounding_metadata.groundingSupports or [])} supports") + + # Process the grounding metadata immediately + await self._process_grounding_metadata(grounding_metadata, self._search_result_buffer) + + async def _process_grounding_metadata(self, grounding_metadata: events.GroundingMetadata, search_result: str = ""): + """Process grounding metadata and emit LLMSearchResponseFrame.""" + logger.debug(f"Processing grounding metadata. Search result text length: {len(search_result)}") + if not grounding_metadata: + logger.warning("No grounding metadata provided to _process_grounding_metadata") + return + + # logger.debug(f"Processing grounding metadata: {grounding_metadata}") # Too verbose for PR + + # Extract rendered content for search suggestions + rendered_content = None + if grounding_metadata.searchEntryPoint and grounding_metadata.searchEntryPoint.renderedContent: + rendered_content = grounding_metadata.searchEntryPoint.renderedContent + + # Convert grounding chunks and supports to LLMSearchOrigin format + origins = [] + + if grounding_metadata.groundingChunks and grounding_metadata.groundingSupports: + # Create a mapping of chunk indices to origins + chunk_to_origin = {} + + for index, chunk in enumerate(grounding_metadata.groundingChunks): + if chunk.web: + origin = LLMSearchOrigin( + site_uri=chunk.web.uri, + site_title=chunk.web.title, + results=[] + ) + chunk_to_origin[index] = origin + origins.append(origin) + + # Add grounding support results to the appropriate origins + for support in grounding_metadata.groundingSupports: + if support.segment and support.groundingChunkIndices: + text = support.segment.text or "" + confidence_scores = support.confidenceScores or [] + + # Add this result to all origins referenced by this support + for chunk_index in support.groundingChunkIndices: + if chunk_index in chunk_to_origin: + result = LLMSearchResult( + text=text, + confidence=confidence_scores + ) + chunk_to_origin[chunk_index].results.append(result) + + # Create and push the search response frame + search_frame = LLMSearchResponseFrame( + search_result=search_result, + origins=origins, + rendered_content=rendered_content + ) + + logger.debug(f"Emitting LLMSearchResponseFrame with {len(origins)} origins, rendered_content available: {rendered_content is not None}") + await self.push_frame(search_frame) + def create_context_aggregator( self, context: OpenAILLMContext, From 8070e156d8aa468134b8d9f9fb4bf77ca05e9daa Mon Sep 17 00:00:00 2001 From: getchannel <78183014+getchannel@users.noreply.github.com> Date: Fri, 30 May 2025 18:07:09 -0400 Subject: [PATCH 11/26] Add groundingMetadata events.py --- .../services/gemini_multimodal_live/events.py | 68 +++++++++++++++++-- 1 file changed, 64 insertions(+), 4 deletions(-) diff --git a/src/pipecat/services/gemini_multimodal_live/events.py b/src/pipecat/services/gemini_multimodal_live/events.py index df1579f738..d88a7368c3 100644 --- a/src/pipecat/services/gemini_multimodal_live/events.py +++ b/src/pipecat/services/gemini_multimodal_live/events.py @@ -11,6 +11,7 @@ from enum import Enum from typing import List, Literal, Optional +from loguru import logger from PIL import Image from pydantic import BaseModel, Field @@ -138,6 +139,49 @@ class Config(BaseModel): setup: Setup +# +# Grounding metadata models +# + + +class SearchEntryPoint(BaseModel): + """Represents the search entry point with rendered content for search suggestions.""" + renderedContent: Optional[str] = None + + +class WebSource(BaseModel): + """Represents a web source from grounding chunks.""" + uri: Optional[str] = None + title: Optional[str] = None + + +class GroundingChunk(BaseModel): + """Represents a grounding chunk containing web source information.""" + web: Optional[WebSource] = None + + +class GroundingSegment(BaseModel): + """Represents a segment of text that is grounded.""" + startIndex: Optional[int] = None + endIndex: Optional[int] = None + text: Optional[str] = None + + +class GroundingSupport(BaseModel): + """Represents support information for grounded text segments.""" + segment: Optional[GroundingSegment] = None + groundingChunkIndices: Optional[List[int]] = None + confidenceScores: Optional[List[float]] = None + + +class GroundingMetadata(BaseModel): + """Represents grounding metadata from Google Search.""" + searchEntryPoint: Optional[SearchEntryPoint] = None + groundingChunks: Optional[List[GroundingChunk]] = None + groundingSupports: Optional[List[GroundingSupport]] = None + webSearchQueries: Optional[List[str]] = None + + # # Server events # @@ -178,6 +222,7 @@ class ServerContent(BaseModel): interrupted: Optional[bool] = None turnComplete: Optional[bool] = None outputTranscription: Optional[BidiGenerateContentTranscription] = None + groundingMetadata: Optional[GroundingMetadata] = None class FunctionCall(BaseModel): @@ -196,12 +241,27 @@ class ServerEvent(BaseModel): toolCall: Optional[ToolCall] = None -def parse_server_event(str): +def parse_server_event(message_str): + from loguru import logger # Import logger locally to avoid scoping issues + try: - evt = json.loads(str) - return ServerEvent.model_validate(evt) + evt_dict = json.loads(message_str) + + # Only log grounding metadata detection if truly needed for debugging + # In production, this could be removed entirely or moved to TRACE level + if 'serverContent' in evt_dict: + server_content = evt_dict['serverContent'] + if 'groundingMetadata' in server_content: + # Consider removing this log entirely for production + pass + + evt = ServerEvent.model_validate(evt_dict) + return evt except Exception as e: - print(f"Error parsing server event: {e}") + logger.error(f"Error parsing server event: {e}") + # Truncate raw message to avoid logging potentially sensitive or overly long data + truncated_message = message_str[:200] + "..." if len(message_str) > 200 else message_str + logger.error(f"Raw message (truncated): {truncated_message}") return None From 8d55e13750ab118ff2adffe7f7d80f76e6c2c26b Mon Sep 17 00:00:00 2001 From: Pete <78183014+getchannel@users.noreply.github.com> Date: Tue, 10 Jun 2025 11:22:16 -0400 Subject: [PATCH 12/26] remove audio_transcriber from gemini.py unecessary import removed. --- src/pipecat/services/gemini_multimodal_live/gemini.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/pipecat/services/gemini_multimodal_live/gemini.py b/src/pipecat/services/gemini_multimodal_live/gemini.py index 47eada34eb..cd4b8fc8cb 100644 --- a/src/pipecat/services/gemini_multimodal_live/gemini.py +++ b/src/pipecat/services/gemini_multimodal_live/gemini.py @@ -64,7 +64,6 @@ from . import events -from .audio_transcriber import AudioTranscriber from .file_api import GeminiFileAPI try: From e3fe040017cbf7566235a69dc17f7cdf75944cfe Mon Sep 17 00:00:00 2001 From: Pete <78183014+getchannel@users.noreply.github.com> Date: Sat, 21 Jun 2025 14:43:15 -0400 Subject: [PATCH 13/26] Update gemini.py --- src/pipecat/services/gemini_multimodal_live/gemini.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/pipecat/services/gemini_multimodal_live/gemini.py b/src/pipecat/services/gemini_multimodal_live/gemini.py index 1180cd78bd..139a561b17 100644 --- a/src/pipecat/services/gemini_multimodal_live/gemini.py +++ b/src/pipecat/services/gemini_multimodal_live/gemini.py @@ -55,7 +55,6 @@ from pipecat.services.google.frames import LLMSearchOrigin, LLMSearchResponseFrame, LLMSearchResult from pipecat.services.llm_service import LLMService -======= from pipecat.services.llm_service import FunctionCallFromLLM, LLMService from pipecat.services.openai.llm import ( From 9b38f3e2fa1e56085f3ce39ef6ebce38ea8ba06d Mon Sep 17 00:00:00 2001 From: Pete <78183014+getchannel@users.noreply.github.com> Date: Thu, 3 Jul 2025 17:15:18 -0400 Subject: [PATCH 14/26] Delete examples/foundational/26f-gemini-multimodal-live-files-api.py --- .../26f-gemini-multimodal-live-files-api.py | 210 ------------------ 1 file changed, 210 deletions(-) delete mode 100644 examples/foundational/26f-gemini-multimodal-live-files-api.py diff --git a/examples/foundational/26f-gemini-multimodal-live-files-api.py b/examples/foundational/26f-gemini-multimodal-live-files-api.py deleted file mode 100644 index 413830cf9b..0000000000 --- a/examples/foundational/26f-gemini-multimodal-live-files-api.py +++ /dev/null @@ -1,210 +0,0 @@ -# -# Copyright (c) 2024–2025, Daily -# -# SPDX-License-Identifier: BSD 2-Clause License -# - -import argparse -import os -import tempfile - -from dotenv import load_dotenv -from loguru import logger - -from pipecat.audio.vad.silero import SileroVADAnalyzer -from pipecat.audio.vad.vad_analyzer import VADParams -from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner -from pipecat.pipeline.task import PipelineParams, PipelineTask -from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext -from pipecat.services.gemini_multimodal_live.gemini import ( - GeminiMultimodalLiveLLMService, - GeminiMultimodalLiveContext, -) -from pipecat.transports.base_transport import TransportParams -from pipecat.transports.network.small_webrtc import SmallWebRTCTransport -from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection - -load_dotenv(override=True) - - -async def create_sample_file(): - """Create a sample text file for testing the File API.""" - content = """# Sample Document for Gemini File API Test - -This is a test document to demonstrate the Gemini File API functionality. - -## Key Information: -- This document was created for testing purposes -- It contains information about AI assistants -- The document should be analyzed by Gemini -- The secret phrase for the test is "Pineapple Pizza" - -## AI Assistant Capabilities: -1. Natural language processing -2. File analysis and understanding -3. Context-aware conversations -4. Multi-modal interactions - -## Conclusion: -This document serves as a test case for the Gemini File API integration with Pipecat. -The AI should be able to reference and discuss the contents of this file. -""" - - # Create a temporary file - with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f: - f.write(content) - return f.name - - -async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespace): - logger.info(f"Starting File API bot") - - # Create a sample file to upload - sample_file_path = await create_sample_file() - logger.info(f"Created sample file: {sample_file_path}") - - # Initialize the SmallWebRTCTransport with the connection - transport = SmallWebRTCTransport( - webrtc_connection=webrtc_connection, - params=TransportParams( - audio_in_enabled=True, - audio_out_enabled=True, - video_in_enabled=False, - vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)), - ), - ) - - system_instruction = """ - You are a helpful AI assistant with access to a document that has been uploaded for analysis. - - The document contains test information including a secret phrase. You should be able to: - - Reference and discuss the contents of the uploaded document - - Answer questions about what's in the document - - Use the information from the document in our conversation - - Your output will be converted to audio so don't include special characters in your answers. - Be friendly and demonstrate your ability to work with the uploaded file. - """ - - # Initialize Gemini service with File API support - llm = GeminiMultimodalLiveLLMService( - api_key=os.getenv("GOOGLE_API_KEY"), - system_instruction=system_instruction, - voice_id="Charon", # Aoede, Charon, Fenrir, Kore, Puck - transcribe_user_audio=True, - ) - - # Upload the sample file to Gemini File API - logger.info("Uploading file to Gemini File API...") - file_info = None - try: - file_info = await llm.file_api.upload_file( - sample_file_path, - display_name="Sample Test Document" - ) - logger.info(f"File uploaded successfully: {file_info['file']['name']}") - - # Get file URI and mime type - file_uri = file_info["file"]["uri"] - mime_type = "text/plain" - - # Create context with file reference - context = OpenAILLMContext( - [ - { - "role": "user", - "content": [ - { - "type": "text", - "text": "Greet the user and let them know you have access to a document they can ask you about. Mention that you can discuss its contents." - }, - { - "type": "file_data", - "file_data": { - "mime_type": mime_type, - "file_uri": file_uri - } - } - ] - } - ] - ) - - logger.info("File reference added to conversation context") - - except Exception as e: - logger.error(f"Error uploading file: {e}") - # Continue with a basic context if file upload fails - context = OpenAILLMContext( - [ - { - "role": "user", - "content": "Greet the user and explain that there was an issue with file upload, but you're ready to help with other tasks." - } - ] - ) - - # Create context aggregator - context_aggregator = llm.create_context_aggregator(context) - - # Build the pipeline - pipeline = Pipeline([ - transport.input(), - context_aggregator.user(), - llm, - transport.output(), - context_aggregator.assistant(), - ]) - - # Configure the pipeline task - task = PipelineTask( - pipeline, - params=PipelineParams( - allow_interruptions=True, - enable_metrics=True, - enable_usage_metrics=True, - ), - ) - - # Handle client connection event - @transport.event_handler("on_client_connected") - async def on_client_connected(transport, client): - logger.info(f"Client connected") - # Kick off the conversation using standard context frame - await task.queue_frames([context_aggregator.user().get_context_frame()]) - - # Handle client disconnection events - @transport.event_handler("on_client_disconnected") - async def on_client_disconnected(transport, client): - logger.info(f"Client disconnected") - - @transport.event_handler("on_client_closed") - async def on_client_closed(transport, client): - logger.info(f"Client closed connection") - await task.cancel() - - # Run the pipeline - runner = PipelineRunner(handle_sigint=False) - await runner.run(task) - - # Clean up: delete the uploaded file and temporary file - if file_info: - try: - await llm.file_api.delete_file(file_info["file"]["name"]) - logger.info("Cleaned up uploaded file from Gemini") - except Exception as e: - logger.error(f"Error cleaning up file: {e}") - - # Remove temporary file - try: - os.unlink(sample_file_path) - logger.info("Cleaned up temporary file") - except Exception as e: - logger.error(f"Error removing temporary file: {e}") - - -if __name__ == "__main__": - from run import main - - main() From 4951c97eabd3289fbc5d6df61c25f0ce2cefae31 Mon Sep 17 00:00:00 2001 From: Pete Date: Thu, 3 Jul 2025 17:49:27 -0400 Subject: [PATCH 15/26] Clean up verbose logging in grounding metadata implementation - Remove debug logging from grounding metadata event handlers - Simplify logging in _process_grounding_metadata method - Clean up example file logging for better readability - Remove verbose event parsing comments Based on suggestions from draft PR #2121 --- ...emini-multimodal-live-groundingMetadata.py | 23 ++++++------------- .../services/gemini_multimodal_live/events.py | 9 -------- .../services/gemini_multimodal_live/gemini.py | 13 ----------- 3 files changed, 7 insertions(+), 38 deletions(-) diff --git a/examples/foundational/26g-gemini-multimodal-live-groundingMetadata.py b/examples/foundational/26g-gemini-multimodal-live-groundingMetadata.py index 0c6d35ff0a..b5b2a18f6e 100644 --- a/examples/foundational/26g-gemini-multimodal-live-groundingMetadata.py +++ b/examples/foundational/26g-gemini-multimodal-live-groundingMetadata.py @@ -53,33 +53,24 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): # Always call super().process_frame first await super().process_frame(frame, direction) - # Only log important frame types, not every audio frame - if hasattr(frame, '__class__'): - frame_type = frame.__class__.__name__ - if frame_type in ['LLMTextFrame', 'TTSTextFrame', 'LLMFullResponseStartFrame', 'LLMFullResponseEndFrame']: - logger.debug(f"GroundingProcessor received: {frame_type}") - if isinstance(frame, LLMSearchResponseFrame): self._grounding_count += 1 - logger.info(f"\nšŸ” GROUNDING METADATA RECEIVED #{self._grounding_count}") - logger.info(f"šŸ“ Search Result Text: {frame.search_result[:200]}...") - - if frame.rendered_content: - logger.info(f"šŸ”— Rendered Content: {frame.rendered_content}") + logger.info(f"šŸ” GROUNDING METADATA RECEIVED #{self._grounding_count}") + logger.info(f"šŸ“ Search Result: {frame.search_result[:200]}...") if frame.origins: - logger.info(f"šŸ“ Number of Origins: {len(frame.origins)}") + logger.info(f"šŸ“ Origins: {len(frame.origins)} sources") for i, origin in enumerate(frame.origins): - logger.info(f" Origin {i+1}: {origin.site_title} - {origin.site_uri}") - if origin.results: - logger.info(f" Results: {len(origin.results)} items") + logger.info(f" {i+1}. {origin.site_title} - {origin.site_uri}") + + if frame.rendered_content: + logger.info(f"šŸ”— Rendered Content Available: {len(frame.rendered_content)} chars") # Always push the frame downstream await self.push_frame(frame, direction) async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespace): - logger.info(f"Starting Gemini Live Grounding Test Bot") # Initialize the SmallWebRTCTransport with the connection transport = SmallWebRTCTransport( diff --git a/src/pipecat/services/gemini_multimodal_live/events.py b/src/pipecat/services/gemini_multimodal_live/events.py index dced95d6c5..c8530b7ee3 100644 --- a/src/pipecat/services/gemini_multimodal_live/events.py +++ b/src/pipecat/services/gemini_multimodal_live/events.py @@ -488,15 +488,6 @@ def parse_server_event(str): """ try: evt_dict = json.loads(message_str) - - # Only log grounding metadata detection if truly needed for debugging - # In production, this could be removed entirely or moved to TRACE level - if 'serverContent' in evt_dict: - server_content = evt_dict['serverContent'] - if 'groundingMetadata' in server_content: - # Consider removing this log entirely for production - pass - evt = ServerEvent.model_validate(evt_dict) return evt except Exception as e: diff --git a/src/pipecat/services/gemini_multimodal_live/gemini.py b/src/pipecat/services/gemini_multimodal_live/gemini.py index 5303a3c6fe..633bf6de26 100644 --- a/src/pipecat/services/gemini_multimodal_live/gemini.py +++ b/src/pipecat/services/gemini_multimodal_live/gemini.py @@ -1096,7 +1096,6 @@ async def _handle_evt_model_turn(self, evt): # Check for grounding metadata in server content if evt.serverContent and evt.serverContent.groundingMetadata: self._accumulated_grounding_metadata = evt.serverContent.groundingMetadata - logger.debug("Grounding metadata detected in model turn.") inline_data = part.inlineData if not inline_data: @@ -1166,10 +1165,7 @@ async def _handle_evt_turn_complete(self, evt): # Process grounding metadata if we have accumulated any if self._accumulated_grounding_metadata: - logger.debug("Processing grounding metadata...") await self._process_grounding_metadata(self._accumulated_grounding_metadata, self._search_result_buffer) - else: - logger.debug("No grounding metadata to process") # Reset grounding tracking for next response self._search_result_buffer = "" @@ -1262,24 +1258,16 @@ async def _handle_evt_output_transcription(self, evt): async def _handle_evt_grounding_metadata(self, evt): """Handle dedicated grounding metadata events.""" - logger.debug("Received dedicated grounding metadata event.") - if evt.serverContent and evt.serverContent.groundingMetadata: grounding_metadata = evt.serverContent.groundingMetadata - logger.debug(f"Grounding data: {len(grounding_metadata.groundingChunks or [])} chunks, {len(grounding_metadata.groundingSupports or [])} supports") - # Process the grounding metadata immediately await self._process_grounding_metadata(grounding_metadata, self._search_result_buffer) async def _process_grounding_metadata(self, grounding_metadata: events.GroundingMetadata, search_result: str = ""): """Process grounding metadata and emit LLMSearchResponseFrame.""" - logger.debug(f"Processing grounding metadata. Search result text length: {len(search_result)}") if not grounding_metadata: - logger.warning("No grounding metadata provided to _process_grounding_metadata") return - # logger.debug(f"Processing grounding metadata: {grounding_metadata}") # Too verbose for PR - # Extract rendered content for search suggestions rendered_content = None if grounding_metadata.searchEntryPoint and grounding_metadata.searchEntryPoint.renderedContent: @@ -1324,7 +1312,6 @@ async def _process_grounding_metadata(self, grounding_metadata: events.Grounding rendered_content=rendered_content ) - logger.debug(f"Emitting LLMSearchResponseFrame with {len(origins)} origins, rendered_content available: {rendered_content is not None}") await self.push_frame(search_frame) async def _handle_evt_usage_metadata(self, evt): if not evt.usageMetadata: From d565e9ae5322afe052bcfd545ae7bd2c4441b39e Mon Sep 17 00:00:00 2001 From: Pete Date: Thu, 3 Jul 2025 17:53:55 -0400 Subject: [PATCH 16/26] Update grounding metadata example with final refinements - Reorganize imports and transport_params structure - Remove copyright header for consistency - Enhance grounding metadata logging with better formatting - Remove unnecessary PipelineParams configuration - Update message content formatting Completes incorporation of draft PR #2121 changes --- ...emini-multimodal-live-groundingMetadata.py | 80 +++++++++++-------- 1 file changed, 45 insertions(+), 35 deletions(-) diff --git a/examples/foundational/26g-gemini-multimodal-live-groundingMetadata.py b/examples/foundational/26g-gemini-multimodal-live-groundingMetadata.py index b5b2a18f6e..e26fbed515 100644 --- a/examples/foundational/26g-gemini-multimodal-live-groundingMetadata.py +++ b/examples/foundational/26g-gemini-multimodal-live-groundingMetadata.py @@ -1,13 +1,12 @@ - import argparse import os from dotenv import load_dotenv from loguru import logger +from pipecat.adapters.schemas.tools_schema import AdapterType, ToolsSchema from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.audio.vad.vad_analyzer import VADParams -from pipecat.adapters.schemas.tools_schema import AdapterType, ToolsSchema from pipecat.frames.frames import Frame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner @@ -16,12 +15,37 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.services.gemini_multimodal_live.gemini import GeminiMultimodalLiveLLMService from pipecat.services.google.frames import LLMSearchResponseFrame -from pipecat.transports.base_transport import TransportParams -from pipecat.transports.network.small_webrtc import SmallWebRTCTransport -from pipecat.transports.network.webrtc_connection import SmallWebRTCConnection +from pipecat.transports.base_transport import BaseTransport, TransportParams +from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketParams +from pipecat.transports.services.daily import DailyParams load_dotenv(override=True) + +# We store functions so objects (e.g. SileroVADAnalyzer) don't get +# instantiated. The function will be called when the desired transport gets +# selected. +transport_params = { + "daily": lambda: DailyParams( + audio_in_enabled=True, + audio_out_enabled=True, + video_in_enabled=False, + vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)), + ), + "twilio": lambda: FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + video_in_enabled=False, + vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)), + ), + "webrtc": lambda: TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + video_in_enabled=False, + vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)), + ), +} + SYSTEM_INSTRUCTION = """ You are a helpful AI assistant that actively uses Google Search to provide up-to-date, accurate information. @@ -50,48 +74,34 @@ def __init__(self): self._grounding_count = 0 async def process_frame(self, frame: Frame, direction: FrameDirection): - # Always call super().process_frame first await super().process_frame(frame, direction) if isinstance(frame, LLMSearchResponseFrame): self._grounding_count += 1 - logger.info(f"šŸ” GROUNDING METADATA RECEIVED #{self._grounding_count}") - logger.info(f"šŸ“ Search Result: {frame.search_result[:200]}...") - + logger.info(f"\n\nšŸ” GROUNDING METADATA RECEIVED #{self._grounding_count}\n") + logger.info(f"šŸ“ Search Result Text: {frame.search_result[:200]}...") + + if frame.rendered_content: + logger.info(f"šŸ”— Rendered Content: {frame.rendered_content}") + if frame.origins: - logger.info(f"šŸ“ Origins: {len(frame.origins)} sources") + logger.info(f"šŸ“ Number of Origins: {len(frame.origins)}") for i, origin in enumerate(frame.origins): - logger.info(f" {i+1}. {origin.site_title} - {origin.site_uri}") - - if frame.rendered_content: - logger.info(f"šŸ”— Rendered Content Available: {len(frame.rendered_content)} chars") + logger.info(f" Origin {i + 1}: {origin.site_title} - {origin.site_uri}") + if origin.results: + logger.info(f" Results: {len(origin.results)} items") # Always push the frame downstream await self.push_frame(frame, direction) -async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespace): - - # Initialize the SmallWebRTCTransport with the connection - transport = SmallWebRTCTransport( - webrtc_connection=webrtc_connection, - params=TransportParams( - audio_in_enabled=True, - audio_out_enabled=True, - video_in_enabled=False, - vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)), - ), - ) +async def run_example(transport: BaseTransport, _: argparse.Namespace, handle_sigint: bool): + logger.info(f"Starting Gemini Live Grounding Metadata Test Bot") # Create tools using ToolsSchema with custom tools for Gemini tools = ToolsSchema( standard_tools=[], # No standard function declarations needed - custom_tools={ - AdapterType.GEMINI: [ - {"google_search": {}}, - {"code_execution": {}} - ] - } + custom_tools={AdapterType.GEMINI: [{"google_search": {}}, {"code_execution": {}}]}, ) llm = GeminiMultimodalLiveLLMService( @@ -108,7 +118,7 @@ async def run_bot(webrtc_connection: SmallWebRTCConnection, _: argparse.Namespac messages = [ { "role": "user", - "content": 'Please introduce yourself and let me know that you can help with current information by searching the web. Ask me what current information I\'d like to know about.', + "content": "Please introduce yourself and let me know that you can help with current information by searching the web. Ask me what current information I'd like to know about.", }, ] @@ -150,6 +160,6 @@ async def on_client_closed(transport, client): if __name__ == "__main__": - from run import main + from pipecat.examples.run import main - main() + main(run_example, transport_params=transport_params) From 14c22234bb57a58692ea7d4d3a00b51dbed2cc51 Mon Sep 17 00:00:00 2001 From: Pete Date: Thu, 3 Jul 2025 18:02:24 -0400 Subject: [PATCH 17/26] Fix parameter name consistency in parse_server_event function - Change function body to use 'str' parameter consistently - Matches pattern used in OpenAI Realtime Beta service - Fixes bug where parameter was named 'str' but body used 'message_str' - Maintains consistency with existing codebase patterns --- src/pipecat/services/gemini_multimodal_live/events.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/pipecat/services/gemini_multimodal_live/events.py b/src/pipecat/services/gemini_multimodal_live/events.py index c8530b7ee3..0b40598305 100644 --- a/src/pipecat/services/gemini_multimodal_live/events.py +++ b/src/pipecat/services/gemini_multimodal_live/events.py @@ -487,13 +487,13 @@ def parse_server_event(str): ServerEvent instance if parsing succeeds, None otherwise. """ try: - evt_dict = json.loads(message_str) + evt_dict = json.loads(str) evt = ServerEvent.model_validate(evt_dict) return evt except Exception as e: logger.error(f"Error parsing server event: {e}") # Truncate raw message to avoid logging potentially sensitive or overly long data - truncated_message = message_str[:200] + "..." if len(message_str) > 200 else message_str + truncated_message = str[:200] + "..." if len(str) > 200 else str logger.error(f"Raw message (truncated): {truncated_message}") return None From 6f66ec1727f08b7c302eafb5e8e744dfaf6956e3 Mon Sep 17 00:00:00 2001 From: Pete <78183014+getchannel@users.noreply.github.com> Date: Thu, 3 Jul 2025 18:55:21 -0400 Subject: [PATCH 18/26] Update gemini.py tab indentation fix --- src/pipecat/services/gemini_multimodal_live/gemini.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pipecat/services/gemini_multimodal_live/gemini.py b/src/pipecat/services/gemini_multimodal_live/gemini.py index d8a848f667..d7217b00b1 100644 --- a/src/pipecat/services/gemini_multimodal_live/gemini.py +++ b/src/pipecat/services/gemini_multimodal_live/gemini.py @@ -253,7 +253,7 @@ def add_file_reference(self, file_uri: str, mime_type: str, text: Optional[str] self.messages.append(message) logger.info(f"Added file reference to context: {file_uri}") - def get_messages_for_initializing_history(self): + def get_messages_for_initializing_history(self): """Get messages formatted for Gemini history initialization. Returns: From 7ed4fe50d43310069950d2febc5df66ed7f961fa Mon Sep 17 00:00:00 2001 From: Pete <78183014+getchannel@users.noreply.github.com> Date: Thu, 3 Jul 2025 19:39:44 -0400 Subject: [PATCH 19/26] Update gemini.py -FunctionCallFromLLM -Delete duplicate Gemini imports --- src/pipecat/services/gemini_multimodal_live/gemini.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/pipecat/services/gemini_multimodal_live/gemini.py b/src/pipecat/services/gemini_multimodal_live/gemini.py index d7217b00b1..792f904f57 100644 --- a/src/pipecat/services/gemini_multimodal_live/gemini.py +++ b/src/pipecat/services/gemini_multimodal_live/gemini.py @@ -60,7 +60,7 @@ ) from pipecat.processors.frame_processor import FrameDirection from pipecat.services.google.frames import LLMSearchOrigin, LLMSearchResponseFrame, LLMSearchResult -from pipecat.services.llm_service import LLMService +from pipecat.services.llm_service import FunctionCallFromLLM, LLMService from pipecat.services.openai.llm import ( @@ -74,14 +74,11 @@ from pipecat.utils.tracing.service_decorators import traced_gemini_live, traced_stt from . import events -from .file_api import GeminiFileAPI from .audio_transcriber import AudioTranscriber from .file_api import GeminiFileAPI -from .file_api import GeminiFileAPI - try: import websockets except ModuleNotFoundError as e: From 8ba340a8a58e96ba292fae7988f1a9408a8fc2d4 Mon Sep 17 00:00:00 2001 From: Pete <78183014+getchannel@users.noreply.github.com> Date: Sun, 20 Jul 2025 18:21:42 -0400 Subject: [PATCH 20/26] remove debug logging --- src/pipecat/services/gemini_multimodal_live/gemini.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/pipecat/services/gemini_multimodal_live/gemini.py b/src/pipecat/services/gemini_multimodal_live/gemini.py index 792f904f57..f6cc51a5f8 100644 --- a/src/pipecat/services/gemini_multimodal_live/gemini.py +++ b/src/pipecat/services/gemini_multimodal_live/gemini.py @@ -961,12 +961,7 @@ async def _receive_task_handler(self): await self._handle_evt_error(evt) # errors are fatal, so exit the receive loop return - else: - # Log unhandled events that might contain grounding metadata - logger.warning(f"Received unhandled server event type: {evt}") - pass - - + # # # From e165d38277e501ede0f2163e6ae0532ebca5f28a Mon Sep 17 00:00:00 2001 From: Pete <78183014+getchannel@users.noreply.github.com> Date: Sun, 20 Jul 2025 18:27:21 -0400 Subject: [PATCH 21/26] remove truncated logging from debug --- .../services/gemini_multimodal_live/events.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/src/pipecat/services/gemini_multimodal_live/events.py b/src/pipecat/services/gemini_multimodal_live/events.py index d5bec0ff2f..63efdc31af 100644 --- a/src/pipecat/services/gemini_multimodal_live/events.py +++ b/src/pipecat/services/gemini_multimodal_live/events.py @@ -488,16 +488,10 @@ def parse_server_event(str): ServerEvent instance if parsing succeeds, None otherwise. """ try: - evt_dict = json.loads(str) - evt = ServerEvent.model_validate(evt_dict) - return evt + evt = json.loads(str) + return ServerEvent.model_validate(evt) except Exception as e: - logger.error(f"Error parsing server event: {e}") - # Truncate raw message to avoid logging potentially sensitive or overly long data - truncated_message = str[:200] + "..." if len(str) > 200 else str - logger.error(f"Raw message (truncated): {truncated_message}") - return None - + print(f"Error parsing server event: {e}") class ContextWindowCompressionConfig(BaseModel): """Configuration for context window compression. From b1a5cddde48367d476cc96104c9baa4e5d2a0226 Mon Sep 17 00:00:00 2001 From: Pete Date: Sun, 20 Jul 2025 18:38:59 -0400 Subject: [PATCH 22/26] Refactor whitespace and formatting in multiple files - Clean up unnecessary whitespace in `gemini.py`, `events.py`, and `file_api.py` - Ensure consistent formatting in `26g-gemini-multimodal-live-groundingMetadata.py` - Improve readability by aligning code and removing trailing spaces --- ...emini-multimodal-live-groundingMetadata.py | 2 +- .../services/gemini_multimodal_live/events.py | 8 +++- .../gemini_multimodal_live/file_api.py | 2 +- .../services/gemini_multimodal_live/gemini.py | 44 +++++++++---------- 4 files changed, 30 insertions(+), 26 deletions(-) diff --git a/examples/foundational/26g-gemini-multimodal-live-groundingMetadata.py b/examples/foundational/26g-gemini-multimodal-live-groundingMetadata.py index e26fbed515..b96e3ab26f 100644 --- a/examples/foundational/26g-gemini-multimodal-live-groundingMetadata.py +++ b/examples/foundational/26g-gemini-multimodal-live-groundingMetadata.py @@ -162,4 +162,4 @@ async def on_client_closed(transport, client): if __name__ == "__main__": from pipecat.examples.run import main - main(run_example, transport_params=transport_params) + main(run_example, transport_params=transport_params) diff --git a/src/pipecat/services/gemini_multimodal_live/events.py b/src/pipecat/services/gemini_multimodal_live/events.py index 63efdc31af..ddac795e79 100644 --- a/src/pipecat/services/gemini_multimodal_live/events.py +++ b/src/pipecat/services/gemini_multimodal_live/events.py @@ -256,22 +256,26 @@ class Config(BaseModel): class SearchEntryPoint(BaseModel): """Represents the search entry point with rendered content for search suggestions.""" + renderedContent: Optional[str] = None class WebSource(BaseModel): """Represents a web source from grounding chunks.""" + uri: Optional[str] = None title: Optional[str] = None class GroundingChunk(BaseModel): """Represents a grounding chunk containing web source information.""" + web: Optional[WebSource] = None class GroundingSegment(BaseModel): """Represents a segment of text that is grounded.""" + startIndex: Optional[int] = None endIndex: Optional[int] = None text: Optional[str] = None @@ -279,6 +283,7 @@ class GroundingSegment(BaseModel): class GroundingSupport(BaseModel): """Represents support information for grounded text segments.""" + segment: Optional[GroundingSegment] = None groundingChunkIndices: Optional[List[int]] = None confidenceScores: Optional[List[float]] = None @@ -286,6 +291,7 @@ class GroundingSupport(BaseModel): class GroundingMetadata(BaseModel): """Represents grounding metadata from Google Search.""" + searchEntryPoint: Optional[SearchEntryPoint] = None groundingChunks: Optional[List[GroundingChunk]] = None groundingSupports: Optional[List[GroundingSupport]] = None @@ -476,8 +482,6 @@ class ServerEvent(BaseModel): usageMetadata: Optional[UsageMetadata] = None - - def parse_server_event(str): """Parse a server event from JSON string. diff --git a/src/pipecat/services/gemini_multimodal_live/file_api.py b/src/pipecat/services/gemini_multimodal_live/file_api.py index 67871a7140..5ae7fdbb75 100644 --- a/src/pipecat/services/gemini_multimodal_live/file_api.py +++ b/src/pipecat/services/gemini_multimodal_live/file_api.py @@ -186,4 +186,4 @@ async def delete_file(self, name: str) -> bool: logger.error(f"Error deleting file: {error_text}") raise Exception(f"Failed to delete file: {response.status}") - return True \ No newline at end of file + return True diff --git a/src/pipecat/services/gemini_multimodal_live/gemini.py b/src/pipecat/services/gemini_multimodal_live/gemini.py index f6cc51a5f8..89fe3b2578 100644 --- a/src/pipecat/services/gemini_multimodal_live/gemini.py +++ b/src/pipecat/services/gemini_multimodal_live/gemini.py @@ -227,9 +227,9 @@ def extract_system_instructions(self): def add_file_reference(self, file_uri: str, mime_type: str, text: Optional[str] = None): """Add a file reference to the context. - + This adds a user message with a file reference that will be sent during context initialization. - + Args: file_uri: URI of the uploaded file mime_type: MIME type of the file @@ -482,7 +482,7 @@ class GeminiMultimodalLiveLLMService(LLMService): # Overriding the default adapter to use the Gemini one. adapter_class = GeminiLLMAdapter - + def __init__( self, *, @@ -1024,10 +1024,9 @@ async def _create_initial_response(self): self._needs_turn_complete_message = True async def _create_single_response(self, messages_list): - """Create a single response from a list of messages.""" - - # Refactor to combine this logic with same logic in GeminiMultimodalLiveContext + + # Refactor to combine this logic with same logic in GeminiMultimodalLiveContext messages = [] for item in messages_list: role = item.get("role") @@ -1206,7 +1205,9 @@ async def _handle_evt_turn_complete(self, evt): # Process grounding metadata if we have accumulated any if self._accumulated_grounding_metadata: - await self._process_grounding_metadata(self._accumulated_grounding_metadata, self._search_result_buffer) + await self._process_grounding_metadata( + self._accumulated_grounding_metadata, self._search_result_buffer + ) # Reset grounding tracking for next response self._search_result_buffer = "" @@ -1305,29 +1306,32 @@ async def _handle_evt_grounding_metadata(self, evt): # Process the grounding metadata immediately await self._process_grounding_metadata(grounding_metadata, self._search_result_buffer) - async def _process_grounding_metadata(self, grounding_metadata: events.GroundingMetadata, search_result: str = ""): + async def _process_grounding_metadata( + self, grounding_metadata: events.GroundingMetadata, search_result: str = "" + ): """Process grounding metadata and emit LLMSearchResponseFrame.""" if not grounding_metadata: return # Extract rendered content for search suggestions rendered_content = None - if grounding_metadata.searchEntryPoint and grounding_metadata.searchEntryPoint.renderedContent: + if ( + grounding_metadata.searchEntryPoint + and grounding_metadata.searchEntryPoint.renderedContent + ): rendered_content = grounding_metadata.searchEntryPoint.renderedContent # Convert grounding chunks and supports to LLMSearchOrigin format origins = [] - + if grounding_metadata.groundingChunks and grounding_metadata.groundingSupports: # Create a mapping of chunk indices to origins chunk_to_origin = {} - + for index, chunk in enumerate(grounding_metadata.groundingChunks): if chunk.web: origin = LLMSearchOrigin( - site_uri=chunk.web.uri, - site_title=chunk.web.title, - results=[] + site_uri=chunk.web.uri, site_title=chunk.web.title, results=[] ) chunk_to_origin[index] = origin origins.append(origin) @@ -1341,20 +1345,16 @@ async def _process_grounding_metadata(self, grounding_metadata: events.Grounding # Add this result to all origins referenced by this support for chunk_index in support.groundingChunkIndices: if chunk_index in chunk_to_origin: - result = LLMSearchResult( - text=text, - confidence=confidence_scores - ) + result = LLMSearchResult(text=text, confidence=confidence_scores) chunk_to_origin[chunk_index].results.append(result) # Create and push the search response frame search_frame = LLMSearchResponseFrame( - search_result=search_result, - origins=origins, - rendered_content=rendered_content + search_result=search_result, origins=origins, rendered_content=rendered_content ) - + await self.push_frame(search_frame) + async def _handle_evt_usage_metadata(self, evt): """Handle the usage metadata event.""" if not evt.usageMetadata: From ec361df0d1d804ee3e25ac305440707d468b32c1 Mon Sep 17 00:00:00 2001 From: Pete Date: Sun, 20 Jul 2025 18:58:54 -0400 Subject: [PATCH 23/26] Fix final ruff linting issues - Remove duplicate import in __init__.py - Clean up extra blank lines in gemini.py - Remove extra blank line in _create_single_response method --- .../services/gemini_multimodal_live/__init__.py | 1 - .../services/gemini_multimodal_live/events.py | 1 + .../services/gemini_multimodal_live/gemini.py | 15 ++++----------- 3 files changed, 5 insertions(+), 12 deletions(-) diff --git a/src/pipecat/services/gemini_multimodal_live/__init__.py b/src/pipecat/services/gemini_multimodal_live/__init__.py index f51c15ff75..513d9fd66f 100644 --- a/src/pipecat/services/gemini_multimodal_live/__init__.py +++ b/src/pipecat/services/gemini_multimodal_live/__init__.py @@ -1,3 +1,2 @@ from .file_api import GeminiFileAPI from .gemini import GeminiMultimodalLiveLLMService -from .file_api import GeminiFileAPI diff --git a/src/pipecat/services/gemini_multimodal_live/events.py b/src/pipecat/services/gemini_multimodal_live/events.py index ddac795e79..1fdca9ec03 100644 --- a/src/pipecat/services/gemini_multimodal_live/events.py +++ b/src/pipecat/services/gemini_multimodal_live/events.py @@ -497,6 +497,7 @@ def parse_server_event(str): except Exception as e: print(f"Error parsing server event: {e}") + class ContextWindowCompressionConfig(BaseModel): """Configuration for context window compression. diff --git a/src/pipecat/services/gemini_multimodal_live/gemini.py b/src/pipecat/services/gemini_multimodal_live/gemini.py index 89fe3b2578..3db150cce6 100644 --- a/src/pipecat/services/gemini_multimodal_live/gemini.py +++ b/src/pipecat/services/gemini_multimodal_live/gemini.py @@ -61,8 +61,6 @@ from pipecat.processors.frame_processor import FrameDirection from pipecat.services.google.frames import LLMSearchOrigin, LLMSearchResponseFrame, LLMSearchResult from pipecat.services.llm_service import FunctionCallFromLLM, LLMService - - from pipecat.services.openai.llm import ( OpenAIAssistantContextAggregator, OpenAIUserContextAggregator, @@ -74,11 +72,9 @@ from pipecat.utils.tracing.service_decorators import traced_gemini_live, traced_stt from . import events - from .audio_transcriber import AudioTranscriber from .file_api import GeminiFileAPI - try: import websockets except ModuleNotFoundError as e: @@ -575,7 +571,7 @@ def __init__( else {}, "extra": params.extra if isinstance(params.extra, dict) else {}, } - + # Initialize the File API client self.file_api = GeminiFileAPI(api_key=api_key, base_url=file_api_base_url) @@ -961,7 +957,7 @@ async def _receive_task_handler(self): await self._handle_evt_error(evt) # errors are fatal, so exit the receive loop return - + # # # @@ -1025,7 +1021,6 @@ async def _create_initial_response(self): async def _create_single_response(self, messages_list): """Create a single response from a list of messages.""" - # Refactor to combine this logic with same logic in GeminiMultimodalLiveContext messages = [] for item in messages_list: @@ -1202,7 +1197,6 @@ async def _handle_evt_turn_complete(self, evt): self._bot_text_buffer = "" self._llm_output_buffer = "" - # Process grounding metadata if we have accumulated any if self._accumulated_grounding_metadata: await self._process_grounding_metadata( @@ -1295,7 +1289,6 @@ async def _handle_evt_output_transcription(self, evt): # Collect text for tracing self._llm_output_buffer += text - await self.push_frame(LLMTextFrame(text=text)) await self.push_frame(TTSTextFrame(text=text)) @@ -1335,13 +1328,13 @@ async def _process_grounding_metadata( ) chunk_to_origin[index] = origin origins.append(origin) - + # Add grounding support results to the appropriate origins for support in grounding_metadata.groundingSupports: if support.segment and support.groundingChunkIndices: text = support.segment.text or "" confidence_scores = support.confidenceScores or [] - + # Add this result to all origins referenced by this support for chunk_index in support.groundingChunkIndices: if chunk_index in chunk_to_origin: From b54d1fb7fd55adda3a08ebeede4bb45b41d30d45 Mon Sep 17 00:00:00 2001 From: Pete Date: Sun, 20 Jul 2025 19:15:40 -0400 Subject: [PATCH 24/26] Resolve merge conflict and remove duplicate File API initialization - Remove duplicate file_api initialization lines - Keep grounding metadata tracking functionality - Maintain clean code structure --- src/pipecat/services/gemini_multimodal_live/gemini.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/pipecat/services/gemini_multimodal_live/gemini.py b/src/pipecat/services/gemini_multimodal_live/gemini.py index 3db150cce6..b427e752f8 100644 --- a/src/pipecat/services/gemini_multimodal_live/gemini.py +++ b/src/pipecat/services/gemini_multimodal_live/gemini.py @@ -579,12 +579,6 @@ def __init__( self._search_result_buffer = "" self._accumulated_grounding_metadata = None - # Initialize the File API client - self.file_api = GeminiFileAPI(api_key=api_key, base_url=file_api_base_url) - - # Initialize the File API client - self.file_api = GeminiFileAPI(api_key=api_key, base_url=file_api_base_url) - def can_generate_metrics(self) -> bool: """Check if the service can generate usage metrics. From 8467d87cfca329a3de632d48e2ca105c73c38adc Mon Sep 17 00:00:00 2001 From: Vanessa Pyne Date: Mon, 21 Jul 2025 09:52:32 -0500 Subject: [PATCH 25/26] small main-merge fixes - gemini.py --- src/pipecat/services/gemini_multimodal_live/gemini.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/pipecat/services/gemini_multimodal_live/gemini.py b/src/pipecat/services/gemini_multimodal_live/gemini.py index b427e752f8..5f8747da09 100644 --- a/src/pipecat/services/gemini_multimodal_live/gemini.py +++ b/src/pipecat/services/gemini_multimodal_live/gemini.py @@ -72,7 +72,6 @@ from pipecat.utils.tracing.service_decorators import traced_gemini_live, traced_stt from . import events -from .audio_transcriber import AudioTranscriber from .file_api import GeminiFileAPI try: @@ -281,7 +280,6 @@ def get_messages_for_initializing_history(self): } } ) - else: logger.warning(f"Unsupported content type: {str(part)[:80]}") else: @@ -1201,6 +1199,9 @@ async def _handle_evt_turn_complete(self, evt): self._search_result_buffer = "" self._accumulated_grounding_metadata = None + # Only push the TTSStoppedFrame if the bot is outputting audio + # when text is found, modalities is set to TEXT and no audio + # is produced. if not text: await self.push_frame(TTSStoppedFrame()) From cfea56064dc1a5bdfc454a8f11a0b0bc80f2c599 Mon Sep 17 00:00:00 2001 From: Vanessa Pyne Date: Mon, 21 Jul 2025 09:54:15 -0500 Subject: [PATCH 26/26] small merge-main nit fixes - gemini_multimodal_live events.py --- src/pipecat/services/gemini_multimodal_live/events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pipecat/services/gemini_multimodal_live/events.py b/src/pipecat/services/gemini_multimodal_live/events.py index 1fdca9ec03..0e229c22f2 100644 --- a/src/pipecat/services/gemini_multimodal_live/events.py +++ b/src/pipecat/services/gemini_multimodal_live/events.py @@ -12,7 +12,6 @@ from enum import Enum from typing import List, Literal, Optional -from loguru import logger from PIL import Image from pydantic import BaseModel, Field @@ -496,6 +495,7 @@ def parse_server_event(str): return ServerEvent.model_validate(evt) except Exception as e: print(f"Error parsing server event: {e}") + return None class ContextWindowCompressionConfig(BaseModel):