Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions examples/chat/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
## 背景

基于 trpc-a2a-go 和 langchaingo 实现一个带 AI 机器人的简单聊天室。

## 环境

- OS: Ubuntu 24.04
- Go: 1.24 及以上
- 依赖:trpc-a2a-go、langchaingo、OpenAI API Key(如需 AI 机器人)

## 功能细节

1. 每个聊天室有独立 ID,每个用户有独立 ID,一个用户不能重复进入同一个聊天室。
2. 有人发消息会广播至所有人(包括自己)。
3. 消息以 "@ai" 开头,则触发机器人回复。
4. AI 机器人回复基于 langchaingo(OpenAI)。

## 目录结构

```
examples/chat/
├── client/
│ └── main.go # 聊天室命令行客户端
└── server/
└── main.go # 聊天室服务端(含 AI 机器人)
```

## 运行方式

### 1. 安装依赖

```shell
cd examples/chat
# 安装 trpc-a2a-go 及 langchaingo 依赖
# (如未安装)
go mod tidy
```

### 2. 配置 OpenAI API Key(如需 AI 机器人)

```shell
export OPENAI_API_KEY=sk-xxxxxx
```

### 3. 启动服务端

```shell
cd server
# 默认监听 8080 端口
go run .
```

### 4. 启动客户端

```shell
cd ../client
# -room 指定聊天室ID,-user 指定用户名
go run . -room=room1 -user=张三
# 可多开终端模拟多用户
go run . -room=room1 -user=李四
```

### 5. 聊天体验

- 输入内容直接发送消息。
- 输入 `@ai 你好` 可触发 AI 机器人回复。
- 所有用户实时收到群聊消息。

## 备注

- 支持流式消息推送。
- 支持多聊天室并发。
- 如需自定义端口、服务地址等,可用 `-host`、`-port` 参数。
- 代码结构清晰,便于二次开发。

---

如遇问题可参考 `examples/chat/server/main.go` 和 `examples/chat/client/main.go`,或 issue 反馈。
142 changes: 142 additions & 0 deletions examples/chat/client/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package main

import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"strings"
"time"

"github.com/google/uuid"
"trpc.group/trpc-go/trpc-a2a-go/client"
"trpc.group/trpc-go/trpc-a2a-go/log"
"trpc.group/trpc-go/trpc-a2a-go/protocol"
)

func main() {
agentURL := flag.String("agent", "http://localhost:8080/", "Target A2A agent URL")
roomID := flag.String("room", "default", "Chat room ID")
userName := flag.String("user", "user", "Your name")
streaming := flag.Bool("streaming", true, "Use streaming mode")
flag.Parse()

userID := uuid.New().String()

a2aClient, err := client.NewA2AClient(*agentURL, client.WithTimeout(60*time.Second))
if err != nil {
log.Fatalf("Failed to create A2A client: %v", err)
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// 捕获 Ctrl+C 退出
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt)
go func() {
<-sigChan
fmt.Println("\nExiting...")
cancel()
os.Exit(0)
}()

fmt.Printf("Welcome %s! Room: %s\n", *userName, *roomID)

// 启动流式接收
go receiveStreaming(ctx, a2aClient, *roomID, userID, *userName)

// 主循环发送消息
for {
var input string
fmt.Print("> ")
if _, err := fmt.Scanln(&input); err != nil {
continue
}
sendMessage(ctx, a2aClient, *roomID, userID, *userName, input, *streaming)
}
}

func sendMessage(ctx context.Context, a2aClient *client.A2AClient, roomID, userID, userName, msg string, streaming bool) {
userMsg := protocol.NewMessage(
protocol.MessageRoleUser,
[]protocol.Part{protocol.NewTextPart(msg)},
)
userMsg.ContextID = &roomID // 作为room_id
userMsg.Metadata = map[string]interface{}{
"user_id": userID,
"user_name": userName,
}

params := protocol.SendMessageParams{
Message: userMsg,
Configuration: &protocol.SendMessageConfiguration{
Blocking: boolPtr(false),
},
}
if !streaming {
result, err := a2aClient.SendMessage(ctx, params)
if err != nil {
log.Errorf("Send failed: %v", err)
return
}
if m, ok := result.Result.(*protocol.Message); ok {
printMessage(*m)
}
return
}
// 流式模式只需发送,接收由 receiveStreaming 负责
_, err := a2aClient.SendMessage(ctx, params)
if err != nil {
log.Errorf("Send failed: %v", err)
}
}

func receiveStreaming(ctx context.Context, a2aClient *client.A2AClient, roomID, userID, userName string) {
// 启动流式订阅
userMsg := protocol.NewMessage(
protocol.MessageRoleUser,
[]protocol.Part{protocol.NewTextPart(fmt.Sprintf("%s joined the chat.", userName))},
)
userMsg.ContextID = &roomID
userMsg.Metadata = map[string]interface{}{
"user_id": userID,
"user_name": userName,
}

params := protocol.SendMessageParams{
Message: userMsg,
Configuration: &protocol.SendMessageConfiguration{
Blocking: boolPtr(false),
},
}
eventChan, err := a2aClient.StreamMessage(ctx, params)
if err != nil {
log.Fatalf("Failed to start streaming: %v", err)
}
for {
select {
case event, ok := <-eventChan:
if !ok {
log.Infof("Stream closed by server.")
return
}
if m, ok := event.Result.(*protocol.Message); ok {
printMessage(*m)
}
case <-ctx.Done():
return
}
}
}

func printMessage(message protocol.Message) {
for _, part := range message.Parts {
if textPart, ok := part.(*protocol.TextPart); ok {
fmt.Printf("\n%s\n> ", strings.TrimSpace(textPart.Text))
}
}
}

func boolPtr(b bool) *bool { return &b }
Loading