Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions e2e-test/integration-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ under the License.
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-agents-integrations-chat-models-azureai</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-agents-integrations-chat-models-ollama</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.agents.integration.test;

import org.apache.flink.agents.api.Agent;
import org.apache.flink.agents.api.InputEvent;
import org.apache.flink.agents.api.OutputEvent;
import org.apache.flink.agents.api.annotation.*;
import org.apache.flink.agents.api.chat.messages.ChatMessage;
import org.apache.flink.agents.api.chat.messages.MessageRole;
import org.apache.flink.agents.api.context.RunnerContext;
import org.apache.flink.agents.api.event.ChatRequestEvent;
import org.apache.flink.agents.api.event.ChatResponseEvent;
import org.apache.flink.agents.api.resource.ResourceDescriptor;
import org.apache.flink.agents.integrations.chatmodels.azureai.AzureAIChatModelConnection;
import org.apache.flink.agents.integrations.chatmodels.azureai.AzureAIChatModelSetup;

import java.util.Collections;

public class AgentWithAzureAI extends Agent {

private static final String AZURE_ENDPOINT = "";
private static final String AZURE_API_KEY = "";

public static boolean callingRealMode() {
if (AZURE_ENDPOINT != null
&& !AZURE_ENDPOINT.isEmpty()
&& AZURE_API_KEY != null
&& !AZURE_API_KEY.isEmpty()) {
return true;
} else {
return false;
}
}

@ChatModelConnection
public static ResourceDescriptor azureAIChatModelConnection() {
return ResourceDescriptor.Builder.newBuilder(AzureAIChatModelConnection.class.getName())
.addInitialArgument("endpoint", AZURE_ENDPOINT)
.addInitialArgument("apiKey", AZURE_API_KEY)
.build();
}

@ChatModelSetup
public static ResourceDescriptor azureAIChatModel() {
System.out.println(
"Calling real Azure AI service. Make sure the endpoint and apiKey are correct.");
return ResourceDescriptor.Builder.newBuilder(AzureAIChatModelSetup.class.getName())
.addInitialArgument("connection", "azureAIChatModelConnection")
.addInitialArgument("model", "gpt-4o")
.build();
}

@Tool(description = "Converts temperature between Celsius and Fahrenheit")
public static double convertTemperature(
@ToolParam(name = "value", description = "Temperature value to convert") Double value,
@ToolParam(
name = "fromUnit",
description = "Source unit ('C' for Celsius or 'F' for Fahrenheit)")
String fromUnit,
@ToolParam(
name = "toUnit",
description = "Target unit ('C' for Celsius or 'F' for Fahrenheit)")
String toUnit) {

fromUnit = fromUnit.toUpperCase();
toUnit = toUnit.toUpperCase();

if (fromUnit.equals(toUnit)) {
return value;
}

if (fromUnit.equals("C") && toUnit.equals("F")) {
return (value * 9 / 5) + 32;
} else if (fromUnit.equals("F") && toUnit.equals("C")) {
return (value - 32) * 5 / 9;
} else {
throw new IllegalArgumentException("Invalid temperature units. Use 'C' or 'F'");
}
}

@Tool(description = "Calculates Body Mass Index (BMI)")
public static double calculateBMI(
@ToolParam(name = "weightKg", description = "Weight in kilograms") Double weightKg,
@ToolParam(name = "heightM", description = "Height in meters") Double heightM) {

if (weightKg <= 0 || heightM <= 0) {
throw new IllegalArgumentException("Weight and height must be positive values");
}
return weightKg / (heightM * heightM);
}

@Tool(description = "Create a random number")
public static double createRandomNumber() {
return Math.random();
}

@Action(listenEvents = {InputEvent.class})
public static void process(InputEvent event, RunnerContext ctx) throws Exception {
ctx.sendEvent(
new ChatRequestEvent(
"azureAIChatModel",
Collections.singletonList(
new ChatMessage(MessageRole.USER, (String) event.getInput()))));
}

@Action(listenEvents = {ChatResponseEvent.class})
public static void processChatResponse(ChatResponseEvent event, RunnerContext ctx) {
ctx.sendEvent(new OutputEvent(event.getResponse().getContent()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.agents.integration.test;

import org.apache.flink.agents.api.AgentsExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class AgentWithAzureAIExample {
/** Runs the example pipeline. */
public static void main(String[] args) throws Exception {
if (!AgentWithAzureAI.callingRealMode()) {
// print warning information
System.err.println(
"Please set the AZURE_ENDPOINT and AZURE_API_KEY in the AgentWithAzureAI class to run this example in real mode.");
System.err.println("Falling back to mock mode.");
AgentWithResourceExample.main(args);
return;
}
// Create the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

// Use prompts that trigger different tool calls in the agent
DataStream<String> inputStream =
env.fromData(
"Convert 25 degrees Celsius to Fahrenheit",
"What is 98.6 Fahrenheit in Celsius?",
"Change 32 degrees Celsius to Fahrenheit",
"If it's 75 degrees Fahrenheit, what would that be in Celsius?",
"Convert room temperature of 20C to F",
"Calculate BMI for someone who is 1.75 meters tall and weighs 70 kg",
"What's the BMI for a person weighing 85 kg with height 1.80 meters?",
"Can you tell me the BMI if I'm 1.65m tall and weigh 60kg?",
"Find BMI for 75kg weight and 1.78m height",
"Create me a random number please");

// Create agents execution environment
AgentsExecutionEnvironment agentsEnv =
AgentsExecutionEnvironment.getExecutionEnvironment(env);

// Apply agent to the DataStream and use the prompt itself as the key
DataStream<Object> outputStream =
agentsEnv
.fromDataStream(inputStream, (KeySelector<String, String>) value -> value)
.apply(new AgentWithAzureAI())
.toDataStream();

// Print the results
outputStream.print();

// Execute the pipeline
agentsEnv.execute();
}
}
59 changes: 59 additions & 0 deletions integrations/chat-models/azureai/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-agents-integrations-chat-models</artifactId>
<version>0.2-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>flink-agents-integrations-chat-models-azureai</artifactId>
<name>Flink Agents : Integrations: Chat Models: Azure AI</name>
<packaging>jar</packaging>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-agents-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-agents-plan</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-ai-inference</artifactId>
<version>1.0.0-beta.5</version>
</dependency>

<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
<version>1.13.3</version>
</dependency>
</dependencies>

</project>
Loading