Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
cd4a893
add FileAPI to gemini.py
getchannel May 9, 2025
949971d
Create file_api
getchannel May 9, 2025
59c7744
add FileData class events.py
getchannel May 9, 2025
d86502e
add file_api __init__.py
getchannel May 9, 2025
e27da96
Rename file_api to file_api.py
getchannel May 14, 2025
40c7e3c
Update gemini.py
getchannel May 30, 2025
f2d5b9a
Create 26f-gemini-multimodal-live-files-api.py
getchannel May 30, 2025
7263d11
update correct upload endpoint file_api.py
getchannel May 30, 2025
f53f544
Create 26g-gemini-multimodal-live-groundingMetadata.py
getchannel May 30, 2025
43c6f1f
Add groundingMetadata and logging gemini.py
getchannel May 30, 2025
8070e15
Add groundingMetadata events.py
getchannel May 30, 2025
4d977fe
Merge branch 'main' into main
getchannel Jun 10, 2025
737e8e7
Merge branch 'main' into groundingMetadata
getchannel Jun 10, 2025
8d55e13
remove audio_transcriber from gemini.py
getchannel Jun 10, 2025
7360f79
Merge branch 'pipecat-ai:main' into main
getchannel Jun 11, 2025
2ed1ed6
Merge branch 'pipecat-ai:main' into main
getchannel Jun 14, 2025
4106f0d
Merge branch 'pipecat-ai:main' into main
getchannel Jun 21, 2025
77378d2
Merge branch 'pipecat-ai:main' into groundingMetadata
getchannel Jun 21, 2025
ae5e3e2
Merge branch 'main' into groundingMetadata
getchannel Jun 21, 2025
e3fe040
Update gemini.py
getchannel Jun 21, 2025
1cf0b35
Merge branch 'main' into groundingMetadata
getchannel Jun 25, 2025
a297e42
Merge branch 'main' into groundingMetadata
getchannel Jun 30, 2025
9b38f3e
Delete examples/foundational/26f-gemini-multimodal-live-files-api.py
getchannel Jul 3, 2025
4951c97
Clean up verbose logging in grounding metadata implementation
getchannel Jul 3, 2025
d565e9a
Update grounding metadata example with final refinements
getchannel Jul 3, 2025
14c2223
Fix parameter name consistency in parse_server_event function
getchannel Jul 3, 2025
c7e758f
Merge branch 'main' into groundingMetadata
getchannel Jul 3, 2025
6f66ec1
Update gemini.py
getchannel Jul 3, 2025
7ed4fe5
Update gemini.py
getchannel Jul 3, 2025
8ba340a
remove debug logging
getchannel Jul 20, 2025
e165d38
remove truncated logging from debug
getchannel Jul 20, 2025
b1a5cdd
Refactor whitespace and formatting in multiple files
getchannel Jul 20, 2025
ec361df
Fix final ruff linting issues
getchannel Jul 20, 2025
b54d1fb
Resolve merge conflict and remove duplicate File API initialization
getchannel Jul 20, 2025
948257c
Merge branch 'main' into groundingMetadata
getchannel Jul 20, 2025
8467d87
small main-merge fixes - gemini.py
vipyne Jul 21, 2025
cfea560
small merge-main nit fixes - gemini_multimodal_live events.py
vipyne Jul 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 165 additions & 0 deletions examples/foundational/26g-gemini-multimodal-live-groundingMetadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
import argparse
import os

from dotenv import load_dotenv
from loguru import logger

from pipecat.adapters.schemas.tools_schema import AdapterType, ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import Frame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.gemini_multimodal_live.gemini import GeminiMultimodalLiveLLMService
from pipecat.services.google.frames import LLMSearchResponseFrame
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.network.fastapi_websocket import FastAPIWebsocketParams
from pipecat.transports.services.daily import DailyParams

load_dotenv(override=True)


# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
video_in_enabled=False,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
video_in_enabled=False,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
video_in_enabled=False,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)),
),
}

SYSTEM_INSTRUCTION = """
You are a helpful AI assistant that actively uses Google Search to provide up-to-date, accurate information.

IMPORTANT: For ANY question about current events, news, recent developments, real-time information, or anything that might have changed recently, you MUST use the google_search tool to get the latest information.

You should use Google Search for:
- Current news and events
- Recent developments in any field
- Today's weather, stock prices, or other real-time data
- Any question that starts with "what's happening", "latest", "recent", "current", "today", etc.
- When you're not certain about recent information

Always be proactive about using search when the user asks about anything that could benefit from real-time information.

Your output will be converted to audio so don't include special characters in your answers.

Respond to what the user said in a creative and helpful way, always using search for current information.
"""


class GroundingMetadataProcessor(FrameProcessor):
"""Processor to capture and display grounding metadata from Gemini Live API."""

def __init__(self):
super().__init__()
self._grounding_count = 0

async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

if isinstance(frame, LLMSearchResponseFrame):
self._grounding_count += 1
logger.info(f"\n\n🔍 GROUNDING METADATA RECEIVED #{self._grounding_count}\n")
logger.info(f"📝 Search Result Text: {frame.search_result[:200]}...")

if frame.rendered_content:
logger.info(f"🔗 Rendered Content: {frame.rendered_content}")

if frame.origins:
logger.info(f"📍 Number of Origins: {len(frame.origins)}")
for i, origin in enumerate(frame.origins):
logger.info(f" Origin {i + 1}: {origin.site_title} - {origin.site_uri}")
if origin.results:
logger.info(f" Results: {len(origin.results)} items")

# Always push the frame downstream
await self.push_frame(frame, direction)


async def run_example(transport: BaseTransport, _: argparse.Namespace, handle_sigint: bool):
logger.info(f"Starting Gemini Live Grounding Metadata Test Bot")

# Create tools using ToolsSchema with custom tools for Gemini
tools = ToolsSchema(
standard_tools=[], # No standard function declarations needed
custom_tools={AdapterType.GEMINI: [{"google_search": {}}, {"code_execution": {}}]},
)

llm = GeminiMultimodalLiveLLMService(
api_key=os.getenv("GOOGLE_API_KEY"),
system_instruction=SYSTEM_INSTRUCTION,
voice_id="Charon", # Aoede, Charon, Fenrir, Kore, Puck
transcribe_user_audio=True,
tools=tools,
)

# Create a processor to capture grounding metadata
grounding_processor = GroundingMetadataProcessor()

messages = [
{
"role": "user",
"content": "Please introduce yourself and let me know that you can help with current information by searching the web. Ask me what current information I'd like to know about.",
},
]

# Set up conversation context and management
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)

pipeline = Pipeline(
[
transport.input(),
context_aggregator.user(),
llm,
grounding_processor, # Add our grounding processor here
transport.output(),
context_aggregator.assistant(),
]
)

task = PipelineTask(pipeline)

@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])

@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")

@transport.event_handler("on_client_closed")
async def on_client_closed(transport, client):
logger.info(f"Client closed connection")
await task.cancel()

runner = PipelineRunner(handle_sigint=False)

await runner.run(task)


if __name__ == "__main__":
from pipecat.examples.run import main

main(run_example, transport_params=transport_params)
50 changes: 50 additions & 0 deletions src/pipecat/services/gemini_multimodal_live/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,55 @@ class Config(BaseModel):
setup: Setup


#
# Grounding metadata models
#


class SearchEntryPoint(BaseModel):
"""Represents the search entry point with rendered content for search suggestions."""

renderedContent: Optional[str] = None


class WebSource(BaseModel):
"""Represents a web source from grounding chunks."""

uri: Optional[str] = None
title: Optional[str] = None


class GroundingChunk(BaseModel):
"""Represents a grounding chunk containing web source information."""

web: Optional[WebSource] = None


class GroundingSegment(BaseModel):
"""Represents a segment of text that is grounded."""

startIndex: Optional[int] = None
endIndex: Optional[int] = None
text: Optional[str] = None


class GroundingSupport(BaseModel):
"""Represents support information for grounded text segments."""

segment: Optional[GroundingSegment] = None
groundingChunkIndices: Optional[List[int]] = None
confidenceScores: Optional[List[float]] = None


class GroundingMetadata(BaseModel):
"""Represents grounding metadata from Google Search."""

searchEntryPoint: Optional[SearchEntryPoint] = None
groundingChunks: Optional[List[GroundingChunk]] = None
groundingSupports: Optional[List[GroundingSupport]] = None
webSearchQueries: Optional[List[str]] = None


#
# Server events
#
Expand Down Expand Up @@ -339,6 +388,7 @@ class ServerContent(BaseModel):
turnComplete: Optional[bool] = None
inputTranscription: Optional[BidiGenerateContentTranscription] = None
outputTranscription: Optional[BidiGenerateContentTranscription] = None
groundingMetadata: Optional[GroundingMetadata] = None


class FunctionCall(BaseModel):
Expand Down
85 changes: 85 additions & 0 deletions src/pipecat/services/gemini_multimodal_live/gemini.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ def get_messages_for_initializing_history(self):
parts.append({"text": part.get("text")})
elif part.get("type") == "file_data":
file_data = part.get("file_data", {})

parts.append(
{
"fileData": {
Expand Down Expand Up @@ -572,6 +573,10 @@ def __init__(
# Initialize the File API client
self.file_api = GeminiFileAPI(api_key=api_key, base_url=file_api_base_url)

# Grounding metadata tracking
self._search_result_buffer = ""
self._accumulated_grounding_metadata = None

def can_generate_metrics(self) -> bool:
"""Check if the service can generate usage metrics.

Expand Down Expand Up @@ -936,6 +941,8 @@ async def _receive_task_handler(self):
await self._handle_evt_input_transcription(evt)
elif evt.serverContent and evt.serverContent.outputTranscription:
await self._handle_evt_output_transcription(evt)
elif evt.serverContent and evt.serverContent.groundingMetadata:
await self._handle_evt_grounding_metadata(evt)
elif evt.toolCall:
await self._handle_evt_tool_call(evt)
elif False: # !!! todo: error events?
Expand Down Expand Up @@ -1027,6 +1034,7 @@ async def _create_single_response(self, messages_list):
parts.append({"text": part.get("text")})
elif part.get("type") == "file_data":
file_data = part.get("file_data", {})

parts.append(
{
"fileData": {
Expand Down Expand Up @@ -1107,8 +1115,13 @@ async def _handle_evt_model_turn(self, evt):
await self.push_frame(LLMFullResponseStartFrame())

self._bot_text_buffer += text
self._search_result_buffer += text # Also accumulate for grounding
await self.push_frame(LLMTextFrame(text=text))

# Check for grounding metadata in server content
if evt.serverContent and evt.serverContent.groundingMetadata:
self._accumulated_grounding_metadata = evt.serverContent.groundingMetadata

inline_data = part.inlineData
if not inline_data:
return
Expand Down Expand Up @@ -1176,6 +1189,16 @@ async def _handle_evt_turn_complete(self, evt):
self._bot_text_buffer = ""
self._llm_output_buffer = ""

# Process grounding metadata if we have accumulated any
if self._accumulated_grounding_metadata:
await self._process_grounding_metadata(
self._accumulated_grounding_metadata, self._search_result_buffer
)

# Reset grounding tracking for next response
self._search_result_buffer = ""
self._accumulated_grounding_metadata = None

# Only push the TTSStoppedFrame if the bot is outputting audio
# when text is found, modalities is set to TEXT and no audio
# is produced.
Expand Down Expand Up @@ -1252,12 +1275,74 @@ async def _handle_evt_output_transcription(self, evt):
if not text:
return

# Accumulate text for grounding as well
self._search_result_buffer += text

# Check for grounding metadata in server content
if evt.serverContent and evt.serverContent.groundingMetadata:
self._accumulated_grounding_metadata = evt.serverContent.groundingMetadata
# Collect text for tracing
self._llm_output_buffer += text

await self.push_frame(LLMTextFrame(text=text))
await self.push_frame(TTSTextFrame(text=text))

async def _handle_evt_grounding_metadata(self, evt):
"""Handle dedicated grounding metadata events."""
if evt.serverContent and evt.serverContent.groundingMetadata:
grounding_metadata = evt.serverContent.groundingMetadata
# Process the grounding metadata immediately
await self._process_grounding_metadata(grounding_metadata, self._search_result_buffer)

async def _process_grounding_metadata(
self, grounding_metadata: events.GroundingMetadata, search_result: str = ""
):
"""Process grounding metadata and emit LLMSearchResponseFrame."""
if not grounding_metadata:
return

# Extract rendered content for search suggestions
rendered_content = None
if (
grounding_metadata.searchEntryPoint
and grounding_metadata.searchEntryPoint.renderedContent
):
rendered_content = grounding_metadata.searchEntryPoint.renderedContent

# Convert grounding chunks and supports to LLMSearchOrigin format
origins = []

if grounding_metadata.groundingChunks and grounding_metadata.groundingSupports:
# Create a mapping of chunk indices to origins
chunk_to_origin = {}

for index, chunk in enumerate(grounding_metadata.groundingChunks):
if chunk.web:
origin = LLMSearchOrigin(
site_uri=chunk.web.uri, site_title=chunk.web.title, results=[]
)
chunk_to_origin[index] = origin
origins.append(origin)

# Add grounding support results to the appropriate origins
for support in grounding_metadata.groundingSupports:
if support.segment and support.groundingChunkIndices:
text = support.segment.text or ""
confidence_scores = support.confidenceScores or []

# Add this result to all origins referenced by this support
for chunk_index in support.groundingChunkIndices:
if chunk_index in chunk_to_origin:
result = LLMSearchResult(text=text, confidence=confidence_scores)
chunk_to_origin[chunk_index].results.append(result)

# Create and push the search response frame
search_frame = LLMSearchResponseFrame(
search_result=search_result, origins=origins, rendered_content=rendered_content
)

await self.push_frame(search_frame)

async def _handle_evt_usage_metadata(self, evt):
"""Handle the usage metadata event."""
if not evt.usageMetadata:
Expand Down