Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions BQ_MCP_Server_Manual.markdown
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# How to Run BQ MCP Server Locally

## 1. Install MCP Toolbox
Install the [MCP Toolbox](https://googleapis.github.io/genai-toolbox/) using Homebrew:
```bash
brew install mcp-toolbox
```

## 2. Install Google Cloud CLI
Download and install the Google Cloud CLI by following the instructions at:
[Google Cloud CLI Installation](https://cloud.google.com/sdk/docs/install)

## 3. Authenticate with Google Account for Local CLI Usage
Set up Application Default Credentials (ADC) for local development:
[Set Up ADC for Local Development](https://cloud.google.com/docs/authentication/set-up-adc-local-dev-environment)

Run the following command to log in:
```bash
gcloud auth application-default login
```

## 4. Create Tools Configuration File
Create a [`tools.yaml`](./tools.yaml) file for the MCP Server. Refer to the documentation for details:
[BigQuery Tools Configuration](https://googleapis.github.io/genai-toolbox/resources/tools/bigquery/)

## 5. Run MCP Server Locally
Start the MCP Server with the configuration file:
```bash
toolbox --tools-file "tools.yaml"
```

### 5.1 Run MCP Server with UI
To enable the UI, include the `--ui` flag:
```bash
toolbox --tools-file "tools.yaml" --ui
```

## 6. Install MCP Inspector
Install the MCP Inspector tool:
[MCP Inspector GitHub](https://github.com/modelcontextprotocol/inspector)

Run the following command to install and run:
```bash
npx @modelcontextprotocol/inspector
```

## 7. Connect MCP Inspector to Your MCP Server
1. After running the inspector, a console output will provide a link with a token. Open this link in your browser to access the UI.
2. In the web interface, select **Streamable HTTP** as the Transport Type.
3. Enter `http://127.0.0.1:5000/mcp` as the URL.
4. Click the **Connect** button.
5. Verify the connection by pulling the list of tables to ensure everything is functioning correctly.
6. PROFIT!
15 changes: 13 additions & 2 deletions ai_clients/open_ai/master_agent.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,33 @@
from agents import Agent, WebSearchTool, Runner

from ai_clients.open_ai.sub_agents.query_optimization_agent import QueryOptimizationClient
from common.classes import AIClient
from prompts.master_agent_prompt import master_agent_prompt


class OpenAIClient(AIClient):
def __init__(self):
self.client = None
self.optimization_agent = QueryOptimizationClient()

def setup(self):
async def setup(self):
await self.optimization_agent.setup()
self.client = Agent(
name="master_agent",
model="gpt-4.1",
instructions=master_agent_prompt,
tools=[
WebSearchTool()
WebSearchTool(),
self.optimization_agent.client.as_tool(
tool_name="bigquery_query_optimizer_ai_agent",
tool_description="This AI agent optimizes Google BigQuery SQL queries for performance and cost. It analyzes queries, identifies inefficiencies, and provides rewritten versions with clear explanations.",
)
]
)

async def destroy(self):
await self.optimization_agent.destroy()

async def chat(self, user_request: str):
if self.client is None:
raise Exception("Client is not initialized, run setup method first")
Expand Down
50 changes: 50 additions & 0 deletions ai_clients/open_ai/sub_agents/query_optimization_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from agents import Agent, WebSearchTool, Runner
from agents.mcp import MCPServerStreamableHttp

from common.classes import AIClient
from prompts.bigquery.query_optimization_agent_prompt import query_optimization_agent_prompt

query_optimization_agent = Agent(
name='query_optimization_agent',
instructions=query_optimization_agent_prompt,
tools=[
WebSearchTool(),
]
)


class QueryOptimizationClient(AIClient):
def __init__(self):
self.client = None
self.bq_mcp_server = MCPServerStreamableHttp(
name="mcp_bigquery_server",
params={
"url": "http://127.0.0.1:5000/mcp",
}
)

async def setup(self):
try:
await self.bq_mcp_server.connect()
except Exception as e:
raise Exception(f"Failed to connect to MCP BigQuery server: {e}")

self.client = Agent(
name='query_optimization_agent',
model="gpt-5",
instructions=query_optimization_agent_prompt,
tools=[
WebSearchTool(),
],
mcp_servers=[self.bq_mcp_server]
)

async def destroy(self):
await self.bq_mcp_server.cleanup()

async def chat(self, user_request: str):
if self.client is None:
raise Exception("Client is not initialized, run setup method first")

result = await Runner.run(self.client, user_request)
return result.final_output
13 changes: 5 additions & 8 deletions assistant_agent_module/routes.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
from fastapi import APIRouter
from pydantic import BaseModel
from fastapi import APIRouter, Body

from ai_clients.open_ai.master_agent import OpenAIClient
# from ai_clients.open_ai.sub_agents.query_optimization_agent import QueryOptimizationClient
from common.classes import AIAssistant

router = APIRouter(prefix="/agent", tags=["AI Agent"])


class AssistRequest(BaseModel):
message: str


@router.post("/assist")
async def query_handler(request: AssistRequest):
user_request = request.message
async def query_handler(user_request: str = Body(..., media_type="text/plain")):
ai_assistant = AIAssistant(client=OpenAIClient)
await ai_assistant.initialize()
response = await ai_assistant.chat(user_request)
await ai_assistant.cleanup()
return response
14 changes: 12 additions & 2 deletions common/classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,25 @@ async def chat(self, user_request: str):
pass

@abstractmethod
def setup(self):
async def setup(self):
"""Initialization agent with sub-agents and internal tools like MCP servers"""
pass

@abstractmethod
def destroy(self):
"""Destroying agent with sub-agents and disconnect from MCP servers"""
pass


class AIAssistant:
def __init__(self, client: Type[AIClient]):
self.client = client()
self.client.setup()

async def initialize(self):
await self.client.setup()

async def cleanup(self):
await self.client.destroy()

async def chat(self, user_request: str):
return await self.client.chat(user_request)
56 changes: 56 additions & 0 deletions prompts/bigquery/query_optimization_agent_prompt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
query_optimization_agent_prompt = """
You are a BigQuery specialist, and your primary goal is to optimize provided SQL queries for Google BigQuery.
Focus on improving efficiency, reducing execution time, and minimizing costs by applying best practices.

You must not use the following statements under any circumstances, as the query should only retrieve or analyze data, not modify it or alter schemas:
- DELETE
- DROP
- INSERT
- UPDATE
- ALTER
- MERGE
- TRUNCATE
- CREATE
- CREATE TABLE
- CREATE SCHEMA
- CREATE VIEW
- CREATE MATERIALIZED VIEW
- CREATE OR REPLACE TABLE
- CREATE OR REPLACE VIEW
- DROP SCHEMA
- DROP VIEW
- CALL

To achieve this, use web search and any available tools to research optimizations. And official BigQuery documentation, including but not limited to:

- https://cloud.google.com/bigquery/docs/best-practices-performance-compute
- https://cloud.google.com/bigquery/docs/best-practices-performance-functions

Always analyze the query step-by-step: first, identify potential issues or inefficiencies; then, apply optimizations; and finally, verify improvements using reasoning or tools if needed.

Key optimization guidelines:
- Avoid common anti-SQL patterns, such as unnecessary subqueries, correlated subqueries, or overuse of wildcard SELECT statements (e.g., prefer explicit column selection over SELECT *).
- For partitioned tables, ensure the WHERE clause includes a filter on the partitioning column (e.g., _PARTITIONTIME or _PARTITIONDATE) to enable partition pruning and reduce scanned data.
- Check if JOINs can be avoided entirely; for example, if the data is already denormalized or if a single table contains the required joined columns, refactor accordingly to simplify the query.
- When JOINs are necessary, place the larger table first (on the left side) to optimize join order and performance.
- Apply filters in the WHERE clause as early as possible. For JOINs, include filters on both sides to eliminate unnecessary data before the join operation.
- Order filters in the WHERE clause so that the most selective ones (those eliminating the most rows) come first.
- Remember that LIMIT does not reduce the amount of data scanned in BigQuery; advise using WHERE clauses for effective filtering instead.

Output format rules (must follow):
- Return only the final optimized SQL query.
- Place inline comments (`-- comment`) directly at the location of each change inside the query.
- Each comment must explain what was changed and why (e.g., `-- replaced SELECT * with explicit columns to reduce scanned data`).
- No explanations above or below the query—only inline.
- Don't add comments to parts you haven't changed.
- If no optimizations are possible, return the original query with inline comments like `-- no change needed here`.
- Never output text outside the SQL code block.

Example of required output style:

```sql
SELECT user_id, created_at -- replaced SELECT * with explicit columns for efficiency
FROM my_table
WHERE _PARTITIONDATE >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY) -- added partition filter to reduce scanned data
```
"""
11 changes: 10 additions & 1 deletion prompts/master_agent_prompt.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
master_agent_prompt = """
You are the Assistant AI, a central orchestrator designed to manage and fulfill all types of user requests
You are the Assistant AI, a central orchestrator designed to manage and fulfill all types of user requests.

Your primary responsibility:
- Always delegate the user’s request without changes to the most appropriate specialized subagent.
- Do NOT attempt to optimize, rewrite, or otherwise modify queries or text that subagents return.
- The subagent’s output must be delivered to the user verbatim, without editing, formatting, or explanation.

Fallback behavior:
- Only if no subagent exists for the request, you may generate a direct response.
- In such cases, you must clearly state: "⚠️ This is a generic response, as no specialized subagent was available. It may not fully fit your request."
"""
76 changes: 76 additions & 0 deletions tools.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Define data sources
sources:
my-bigquery-source:
kind: bigquery
project: fiverr-bigquery-dev
# authentication defaults to ADC (Application Default Credentials)
# If running locally, ensure `gcloud auth application-default login` is done.

# Expose tools from that source
tools:
ask_data_insights:
kind: bigquery-conversational-analytics
source: my-bigquery-source
description: |
Use this tool to perform data analysis, get insights, or answer complex
questions about the contents of specific BigQuery tables.

execute_sql_tool:
kind: bigquery-execute-sql
source: my-bigquery-source
description: Use this tool to execute sql statement.

forecast_tool:
kind: bigquery-forecast
source: my-bigquery-source
description: Use this tool to forecast time series data in BigQuery.

bigquery_get_dataset_info:
kind: bigquery-get-dataset-info
source: my-bigquery-source
description: Use this tool to get dataset metadata.

bigquery_get_table_info:
kind: bigquery-get-table-info
source: my-bigquery-source
description: Use this tool to get table metadata.

bigquery_list_dataset_ids:
kind: bigquery-list-dataset-ids
source: my-bigquery-source
description: Use this tool to get dataset metadata.

bigquery_list_table_ids:
kind: bigquery-list-table-ids
source: my-bigquery-source
description: Use this tool to get table metadata.

# # Example: Querying a user table in BigQuery
# search_users_bq:
# kind: bigquery-sql
# source: my-bigquery-source
# statement: |
# SELECT
# id,
# name,
# email
# FROM
# `my-project.my-dataset.users`
# WHERE
# id = @id OR email = @email;
# description: |
# Use this tool to get information for a specific user.
# Takes an id number or a name and returns info on the user.
#
# Example:
# {{
# "id": 123,
# "name": "Alice",
# }}
# parameters:
# - name: id
# type: integer
# description: User ID
# - name: email
# type: string
# description: Email address of the user