Skip to content

Conversation

@anuj-tambwekar
Copy link

@anuj-tambwekar anuj-tambwekar commented Dec 29, 2025

Issue overview

The current implementation of _stream uses the sync Gradient client. Since no async streaming implementation is available, it causes async streams to still remain blocking under the hood.

Additionally, this causes a bug when creating a streaming agent with the DigitalOcean Gradient ADK, where agents built with langchain_gradient will buffer their entire response instead of streaming.

For a minimal reproducible example, follow the Gradient ADK example with the following code

Agent:

import os
import json
from langchain_gradient import ChatGradient
from langchain_openai import ChatOpenAI
from dotenv import load_dotenv
from typing import Dict
from gradient_adk import entrypoint

load_dotenv()

@entrypoint
async def main(input: Dict, context: Dict):
    """Entrypoint"""

    input_request = input.get("prompt")

    # model = ChatOpenAI(
    #     model="openai-gpt-4.1",
    #     base_url="https://inference.do-ai.run/v1",
    #     api_key=os.getenv("DIGITALOCEAN_INFERENCE_KEY"),
    #     streaming=True,
    # ) # This streams fine

    model = ChatGradient(
        model="openai-gpt-4.1",
        api_key=os.getenv("DIGITALOCEAN_INFERENCE_KEY"),
        streaming=True,
    ) # Outputs are not streamed out

    async for chunk in model.astream(input_request["messages"]):
        response_text = chunk.content
        yield json.dumps({"response": response_text}) + "\n"

Client Side:

import requests
import os
import json
from dotenv import load_dotenv

load_dotenv()


def stream_endpoint(url: str, body: dict, headers: dict = {}, chunk_size: int = 1024):
    payload = json.dumps(body)
    with requests.post(url, data=payload, headers=headers, stream=True) as resp:
        resp.raise_for_status()
        for chunk in resp.iter_content(chunk_size=chunk_size):
            if chunk:  # filter keep-alive chunks
                yield chunk

url = "http://localhost:8080/run" 

headers = {"Authorization": f"Bearer {os.getenv('DIGITALOCEAN_API_TOKEN')}"}

body = {"prompt": {"messages": "Tell me a joke involving computers, taxidermied mice, and cheese."}}

buffer = ""
for chunk in stream_endpoint(url, body=body, headers=headers):
    buffer += chunk.decode("utf-8")
    while "\n" in buffer:
        line, buffer = buffer.split("\n", 1)
        if not line.strip():
            continue
        response = json.loads(line)
        print(response["response"], end="", flush=True)
print()

When using the ChatGradient client, the entire response is generated before being streamed out. Meanwhile with the OpenAI client, streaming occurs properly.

Fix Details

This PR fixes the incompatibility by using the async client within _astream and making async streaming non-blocking.

Copy link

@dillonledoux dillonledoux left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks reasonable to me, we probably need to wait until code freeze lifts until we merge it however

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants