diff --git a/examples/foundational/26g-gemini-multimodal-live-groundingMetadata.py b/examples/foundational/26g-gemini-multimodal-live-groundingMetadata.py new file mode 100644 index 0000000000..b96e3ab26f --- /dev/null +++ b/examples/foundational/26g-gemini-multimodal-live-groundingMetadata.py @@ -0,0 +1,165 @@ +import argparse +import os + +from dotenv import load_dotenv +from loguru import logger + +from pipecat.adapters.schemas.tools_schema import AdapterType, ToolsSchema +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.audio.vad.vad_analyzer import VADParams +from pipecat.frames.frames import Frame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor +from pipecat.services.gemini_multimodal_live.gemini import GeminiMultimodalLiveLLMService +from pipecat.services.google.frames import LLMSearchResponseFrame +from pipecat.transports.base_transport import BaseTransport, TransportParams +from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketParams +from pipecat.transports.services.daily import DailyParams + +load_dotenv(override=True) + + +# We store functions so objects (e.g. SileroVADAnalyzer) don't get +# instantiated. The function will be called when the desired transport gets +# selected. +transport_params = { + "daily": lambda: DailyParams( + audio_in_enabled=True, + audio_out_enabled=True, + video_in_enabled=False, + vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)), + ), + "twilio": lambda: FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + video_in_enabled=False, + vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)), + ), + "webrtc": lambda: TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + video_in_enabled=False, + vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)), + ), +} + +SYSTEM_INSTRUCTION = """ +You are a helpful AI assistant that actively uses Google Search to provide up-to-date, accurate information. + +IMPORTANT: For ANY question about current events, news, recent developments, real-time information, or anything that might have changed recently, you MUST use the google_search tool to get the latest information. + +You should use Google Search for: +- Current news and events +- Recent developments in any field +- Today's weather, stock prices, or other real-time data +- Any question that starts with "what's happening", "latest", "recent", "current", "today", etc. +- When you're not certain about recent information + +Always be proactive about using search when the user asks about anything that could benefit from real-time information. + +Your output will be converted to audio so don't include special characters in your answers. + +Respond to what the user said in a creative and helpful way, always using search for current information. +""" + + +class GroundingMetadataProcessor(FrameProcessor): + """Processor to capture and display grounding metadata from Gemini Live API.""" + + def __init__(self): + super().__init__() + self._grounding_count = 0 + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + + if isinstance(frame, LLMSearchResponseFrame): + self._grounding_count += 1 + logger.info(f"\n\nšŸ” GROUNDING METADATA RECEIVED #{self._grounding_count}\n") + logger.info(f"šŸ“ Search Result Text: {frame.search_result[:200]}...") + + if frame.rendered_content: + logger.info(f"šŸ”— Rendered Content: {frame.rendered_content}") + + if frame.origins: + logger.info(f"šŸ“ Number of Origins: {len(frame.origins)}") + for i, origin in enumerate(frame.origins): + logger.info(f" Origin {i + 1}: {origin.site_title} - {origin.site_uri}") + if origin.results: + logger.info(f" Results: {len(origin.results)} items") + + # Always push the frame downstream + await self.push_frame(frame, direction) + + +async def run_example(transport: BaseTransport, _: argparse.Namespace, handle_sigint: bool): + logger.info(f"Starting Gemini Live Grounding Metadata Test Bot") + + # Create tools using ToolsSchema with custom tools for Gemini + tools = ToolsSchema( + standard_tools=[], # No standard function declarations needed + custom_tools={AdapterType.GEMINI: [{"google_search": {}}, {"code_execution": {}}]}, + ) + + llm = GeminiMultimodalLiveLLMService( + api_key=os.getenv("GOOGLE_API_KEY"), + system_instruction=SYSTEM_INSTRUCTION, + voice_id="Charon", # Aoede, Charon, Fenrir, Kore, Puck + transcribe_user_audio=True, + tools=tools, + ) + + # Create a processor to capture grounding metadata + grounding_processor = GroundingMetadataProcessor() + + messages = [ + { + "role": "user", + "content": "Please introduce yourself and let me know that you can help with current information by searching the web. Ask me what current information I'd like to know about.", + }, + ] + + # Set up conversation context and management + context = OpenAILLMContext(messages) + context_aggregator = llm.create_context_aggregator(context) + + pipeline = Pipeline( + [ + transport.input(), + context_aggregator.user(), + llm, + grounding_processor, # Add our grounding processor here + transport.output(), + context_aggregator.assistant(), + ] + ) + + task = PipelineTask(pipeline) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected") + # Kick off the conversation. + await task.queue_frames([context_aggregator.user().get_context_frame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + + @transport.event_handler("on_client_closed") + async def on_client_closed(transport, client): + logger.info(f"Client closed connection") + await task.cancel() + + runner = PipelineRunner(handle_sigint=False) + + await runner.run(task) + + +if __name__ == "__main__": + from pipecat.examples.run import main + + main(run_example, transport_params=transport_params) diff --git a/src/pipecat/services/gemini_multimodal_live/events.py b/src/pipecat/services/gemini_multimodal_live/events.py index 8fea916668..0e229c22f2 100644 --- a/src/pipecat/services/gemini_multimodal_live/events.py +++ b/src/pipecat/services/gemini_multimodal_live/events.py @@ -248,6 +248,55 @@ class Config(BaseModel): setup: Setup +# +# Grounding metadata models +# + + +class SearchEntryPoint(BaseModel): + """Represents the search entry point with rendered content for search suggestions.""" + + renderedContent: Optional[str] = None + + +class WebSource(BaseModel): + """Represents a web source from grounding chunks.""" + + uri: Optional[str] = None + title: Optional[str] = None + + +class GroundingChunk(BaseModel): + """Represents a grounding chunk containing web source information.""" + + web: Optional[WebSource] = None + + +class GroundingSegment(BaseModel): + """Represents a segment of text that is grounded.""" + + startIndex: Optional[int] = None + endIndex: Optional[int] = None + text: Optional[str] = None + + +class GroundingSupport(BaseModel): + """Represents support information for grounded text segments.""" + + segment: Optional[GroundingSegment] = None + groundingChunkIndices: Optional[List[int]] = None + confidenceScores: Optional[List[float]] = None + + +class GroundingMetadata(BaseModel): + """Represents grounding metadata from Google Search.""" + + searchEntryPoint: Optional[SearchEntryPoint] = None + groundingChunks: Optional[List[GroundingChunk]] = None + groundingSupports: Optional[List[GroundingSupport]] = None + webSearchQueries: Optional[List[str]] = None + + # # Server events # @@ -339,6 +388,7 @@ class ServerContent(BaseModel): turnComplete: Optional[bool] = None inputTranscription: Optional[BidiGenerateContentTranscription] = None outputTranscription: Optional[BidiGenerateContentTranscription] = None + groundingMetadata: Optional[GroundingMetadata] = None class FunctionCall(BaseModel): diff --git a/src/pipecat/services/gemini_multimodal_live/gemini.py b/src/pipecat/services/gemini_multimodal_live/gemini.py index 30bb3c529d..5f8747da09 100644 --- a/src/pipecat/services/gemini_multimodal_live/gemini.py +++ b/src/pipecat/services/gemini_multimodal_live/gemini.py @@ -271,6 +271,7 @@ def get_messages_for_initializing_history(self): parts.append({"text": part.get("text")}) elif part.get("type") == "file_data": file_data = part.get("file_data", {}) + parts.append( { "fileData": { @@ -572,6 +573,10 @@ def __init__( # Initialize the File API client self.file_api = GeminiFileAPI(api_key=api_key, base_url=file_api_base_url) + # Grounding metadata tracking + self._search_result_buffer = "" + self._accumulated_grounding_metadata = None + def can_generate_metrics(self) -> bool: """Check if the service can generate usage metrics. @@ -936,6 +941,8 @@ async def _receive_task_handler(self): await self._handle_evt_input_transcription(evt) elif evt.serverContent and evt.serverContent.outputTranscription: await self._handle_evt_output_transcription(evt) + elif evt.serverContent and evt.serverContent.groundingMetadata: + await self._handle_evt_grounding_metadata(evt) elif evt.toolCall: await self._handle_evt_tool_call(evt) elif False: # !!! todo: error events? @@ -1027,6 +1034,7 @@ async def _create_single_response(self, messages_list): parts.append({"text": part.get("text")}) elif part.get("type") == "file_data": file_data = part.get("file_data", {}) + parts.append( { "fileData": { @@ -1107,8 +1115,13 @@ async def _handle_evt_model_turn(self, evt): await self.push_frame(LLMFullResponseStartFrame()) self._bot_text_buffer += text + self._search_result_buffer += text # Also accumulate for grounding await self.push_frame(LLMTextFrame(text=text)) + # Check for grounding metadata in server content + if evt.serverContent and evt.serverContent.groundingMetadata: + self._accumulated_grounding_metadata = evt.serverContent.groundingMetadata + inline_data = part.inlineData if not inline_data: return @@ -1176,6 +1189,16 @@ async def _handle_evt_turn_complete(self, evt): self._bot_text_buffer = "" self._llm_output_buffer = "" + # Process grounding metadata if we have accumulated any + if self._accumulated_grounding_metadata: + await self._process_grounding_metadata( + self._accumulated_grounding_metadata, self._search_result_buffer + ) + + # Reset grounding tracking for next response + self._search_result_buffer = "" + self._accumulated_grounding_metadata = None + # Only push the TTSStoppedFrame if the bot is outputting audio # when text is found, modalities is set to TEXT and no audio # is produced. @@ -1252,12 +1275,74 @@ async def _handle_evt_output_transcription(self, evt): if not text: return + # Accumulate text for grounding as well + self._search_result_buffer += text + + # Check for grounding metadata in server content + if evt.serverContent and evt.serverContent.groundingMetadata: + self._accumulated_grounding_metadata = evt.serverContent.groundingMetadata # Collect text for tracing self._llm_output_buffer += text await self.push_frame(LLMTextFrame(text=text)) await self.push_frame(TTSTextFrame(text=text)) + async def _handle_evt_grounding_metadata(self, evt): + """Handle dedicated grounding metadata events.""" + if evt.serverContent and evt.serverContent.groundingMetadata: + grounding_metadata = evt.serverContent.groundingMetadata + # Process the grounding metadata immediately + await self._process_grounding_metadata(grounding_metadata, self._search_result_buffer) + + async def _process_grounding_metadata( + self, grounding_metadata: events.GroundingMetadata, search_result: str = "" + ): + """Process grounding metadata and emit LLMSearchResponseFrame.""" + if not grounding_metadata: + return + + # Extract rendered content for search suggestions + rendered_content = None + if ( + grounding_metadata.searchEntryPoint + and grounding_metadata.searchEntryPoint.renderedContent + ): + rendered_content = grounding_metadata.searchEntryPoint.renderedContent + + # Convert grounding chunks and supports to LLMSearchOrigin format + origins = [] + + if grounding_metadata.groundingChunks and grounding_metadata.groundingSupports: + # Create a mapping of chunk indices to origins + chunk_to_origin = {} + + for index, chunk in enumerate(grounding_metadata.groundingChunks): + if chunk.web: + origin = LLMSearchOrigin( + site_uri=chunk.web.uri, site_title=chunk.web.title, results=[] + ) + chunk_to_origin[index] = origin + origins.append(origin) + + # Add grounding support results to the appropriate origins + for support in grounding_metadata.groundingSupports: + if support.segment and support.groundingChunkIndices: + text = support.segment.text or "" + confidence_scores = support.confidenceScores or [] + + # Add this result to all origins referenced by this support + for chunk_index in support.groundingChunkIndices: + if chunk_index in chunk_to_origin: + result = LLMSearchResult(text=text, confidence=confidence_scores) + chunk_to_origin[chunk_index].results.append(result) + + # Create and push the search response frame + search_frame = LLMSearchResponseFrame( + search_result=search_result, origins=origins, rendered_content=rendered_content + ) + + await self.push_frame(search_frame) + async def _handle_evt_usage_metadata(self, evt): """Handle the usage metadata event.""" if not evt.usageMetadata: