-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Expand file tree
/
Copy pathopenai_chat_client_basic.py
More file actions
77 lines (57 loc) · 2.29 KB
/
openai_chat_client_basic.py
File metadata and controls
77 lines (57 loc) · 2.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from random import randint
from typing import Annotated
from agent_framework import tool
from agent_framework.openai import OpenAIChatClient
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
"""
OpenAI Chat Client Basic Example
This sample demonstrates basic usage of OpenAIChatClient for direct chat-based
interactions, showing both streaming and non-streaming responses.
"""
# NOTE: approval_mode="never_require" is for sample brevity. Use "always_require" in production;
# see samples/02-agents/tools/function_tool_with_approval.py
# and samples/02-agents/tools/function_tool_with_approval_and_sessions.py.
@tool(approval_mode="never_require")
def get_weather(
location: Annotated[str, "The location to get the weather for."],
) -> str:
"""Get the weather for a given location."""
conditions = ["sunny", "cloudy", "rainy", "stormy"]
return f"The weather in {location} is {conditions[randint(0, 3)]} with a high of {randint(10, 30)}°C."
async def non_streaming_example() -> None:
"""Example of non-streaming response (get the complete result at once)."""
print("=== Non-streaming Response Example ===")
agent = OpenAIChatClient().as_agent(
name="WeatherAgent",
instructions="You are a helpful weather agent.",
tools=get_weather,
)
query = "What's the weather like in Seattle?"
print(f"User: {query}")
result = await agent.run(query)
print(f"Result: {result}\n")
async def streaming_example() -> None:
"""Example of streaming response (get results as they are generated)."""
print("=== Streaming Response Example ===")
agent = OpenAIChatClient().as_agent(
name="WeatherAgent",
instructions="You are a helpful weather agent.",
tools=get_weather,
)
query = "What's the weather like in Portland?"
print(f"User: {query}")
print("Agent: ", end="", flush=True)
async for chunk in agent.run(query, stream=True):
if chunk.text:
print(chunk.text, end="", flush=True)
print("\n")
async def main() -> None:
print("=== Basic OpenAI Chat Client Agent Example ===")
await non_streaming_example()
await streaming_example()
if __name__ == "__main__":
asyncio.run(main())