Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 97 additions & 23 deletions litellm/llms/custom_httpx/http_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import ssl
import sys
import time
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Mapping, Optional, Union
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Mapping, Optional, Tuple, Union

import certifi
import httpx
Expand Down Expand Up @@ -47,6 +47,46 @@
_DEFAULT_TIMEOUT = httpx.Timeout(timeout=5.0, connect=5.0)


def _prepare_request_data_and_content(
data: Optional[Union[dict, str, bytes]] = None,
content: Any = None,
) -> Tuple[Optional[Union[dict, Mapping]], Any]:
"""
Helper function to route data/content parameters correctly for httpx requests

This prevents httpx DeprecationWarnings that cause memory leaks.

Background:
- httpx shows a DeprecationWarning when you pass bytes/str to `data=`
- It wants you to use `content=` instead for bytes/str
- The warning itself leaks memory when triggered repeatedly

Solution:
- Move bytes/str from `data=` to `content=` before calling build_request
- Keep dicts in `data=` (that's still the correct parameter for dicts)

Args:
data: Request data (can be dict, str, or bytes)
content: Request content (raw bytes/str)

Returns:
Tuple of (request_data, request_content) properly routed for httpx
"""
request_data = None
request_content = content

if data is not None:
if isinstance(data, (bytes, str)):
# Bytes/strings belong in content= (only if not already provided)
if content is None:
request_content = data
else:
# dict/Mapping stays in data= parameter
request_data = data

return request_data, request_content


def get_ssl_configuration(
ssl_verify: Optional[VerifyTypes] = None,
) -> Union[bool, str, ssl.SSLContext]:
Expand Down Expand Up @@ -301,17 +341,20 @@ async def post(
if timeout is None:
timeout = self.timeout

# Prepare data/content parameters to prevent httpx DeprecationWarning (memory leak fix)
request_data, request_content = _prepare_request_data_and_content(data, content)

req = self.client.build_request(
"POST",
url,
data=data, # type: ignore
data=request_data,
json=json,
params=params,
headers=headers,
timeout=timeout,
files=files,
content=content,
)
content=request_content,
)
response = await self.client.send(req, stream=stream)
response.raise_for_status()
return response
Expand Down Expand Up @@ -364,19 +407,23 @@ async def post(
async def put(
self,
url: str,
data: Optional[Union[dict, str]] = None, # type: ignore
data: Optional[Union[dict, str, bytes]] = None, # type: ignore
json: Optional[dict] = None,
params: Optional[dict] = None,
headers: Optional[dict] = None,
timeout: Optional[Union[float, httpx.Timeout]] = None,
stream: bool = False,
content: Any = None,
):
try:
if timeout is None:
timeout = self.timeout

# Prepare data/content parameters to prevent httpx DeprecationWarning (memory leak fix)
request_data, request_content = _prepare_request_data_and_content(data, content)

req = self.client.build_request(
"PUT", url, data=data, json=json, params=params, headers=headers, timeout=timeout # type: ignore
"PUT", url, data=request_data, json=json, params=params, headers=headers, timeout=timeout, content=request_content # type: ignore
)
response = await self.client.send(req)
response.raise_for_status()
Expand Down Expand Up @@ -424,19 +471,23 @@ async def put(
async def patch(
self,
url: str,
data: Optional[Union[dict, str]] = None, # type: ignore
data: Optional[Union[dict, str, bytes]] = None, # type: ignore
json: Optional[dict] = None,
params: Optional[dict] = None,
headers: Optional[dict] = None,
timeout: Optional[Union[float, httpx.Timeout]] = None,
stream: bool = False,
content: Any = None,
):
try:
if timeout is None:
timeout = self.timeout

# Prepare data/content parameters to prevent httpx DeprecationWarning (memory leak fix)
request_data, request_content = _prepare_request_data_and_content(data, content)

req = self.client.build_request(
"PATCH", url, data=data, json=json, params=params, headers=headers, timeout=timeout # type: ignore
"PATCH", url, data=request_data, json=json, params=params, headers=headers, timeout=timeout, content=request_content # type: ignore
)
response = await self.client.send(req)
response.raise_for_status()
Expand Down Expand Up @@ -484,18 +535,23 @@ async def patch(
async def delete(
self,
url: str,
data: Optional[Union[dict, str]] = None, # type: ignore
data: Optional[Union[dict, str, bytes]] = None, # type: ignore
json: Optional[dict] = None,
params: Optional[dict] = None,
headers: Optional[dict] = None,
timeout: Optional[Union[float, httpx.Timeout]] = None,
stream: bool = False,
content: Any = None,
):
try:
if timeout is None:
timeout = self.timeout

# Prepare data/content parameters to prevent httpx DeprecationWarning (memory leak fix)
request_data, request_content = _prepare_request_data_and_content(data, content)

req = self.client.build_request(
"DELETE", url, data=data, json=json, params=params, headers=headers, timeout=timeout # type: ignore
"DELETE", url, data=request_data, json=json, params=params, headers=headers, timeout=timeout, content=request_content # type: ignore
)
response = await self.client.send(req, stream=stream)
response.raise_for_status()
Expand Down Expand Up @@ -543,8 +599,11 @@ async def single_connection_post_request(

Used for retrying connection client errors.
"""
# Prepare data/content parameters to prevent httpx DeprecationWarning (memory leak fix)
request_data, request_content = _prepare_request_data_and_content(data, content)

req = client.build_request(
"POST", url, data=data, json=json, params=params, headers=headers, content=content # type: ignore
"POST", url, data=request_data, json=json, params=params, headers=headers, content=request_content # type: ignore
)
response = await client.send(req, stream=stream)
response.raise_for_status()
Expand Down Expand Up @@ -798,21 +857,24 @@ def post(
logging_obj: Optional[LiteLLMLoggingObject] = None,
):
try:
# Prepare data/content parameters to prevent httpx DeprecationWarning (memory leak fix)
request_data, request_content = _prepare_request_data_and_content(data, content)

if timeout is not None:
req = self.client.build_request(
"POST",
url,
data=data, # type: ignore
data=request_data, # type: ignore
json=json,
params=params,
headers=headers,
timeout=timeout,
files=files,
content=content, # type: ignore
content=request_content, # type: ignore
)
else:
req = self.client.build_request(
"POST", url, data=data, json=json, params=params, headers=headers, files=files, content=content # type: ignore
"POST", url, data=request_data, json=json, params=params, headers=headers, files=files, content=request_content # type: ignore
)
response = self.client.send(req, stream=stream)
response.raise_for_status()
Expand Down Expand Up @@ -840,21 +902,25 @@ def post(
def patch(
self,
url: str,
data: Optional[Union[dict, str]] = None,
data: Optional[Union[dict, str, bytes]] = None,
json: Optional[Union[dict, str]] = None,
params: Optional[dict] = None,
headers: Optional[dict] = None,
stream: bool = False,
timeout: Optional[Union[float, httpx.Timeout]] = None,
content: Any = None,
):
try:
# Prepare data/content parameters to prevent httpx DeprecationWarning (memory leak fix)
request_data, request_content = _prepare_request_data_and_content(data, content)

if timeout is not None:
req = self.client.build_request(
"PATCH", url, data=data, json=json, params=params, headers=headers, timeout=timeout # type: ignore
"PATCH", url, data=request_data, json=json, params=params, headers=headers, timeout=timeout, content=request_content # type: ignore
)
else:
req = self.client.build_request(
"PATCH", url, data=data, json=json, params=params, headers=headers # type: ignore
"PATCH", url, data=request_data, json=json, params=params, headers=headers, content=request_content # type: ignore
)
response = self.client.send(req, stream=stream)
response.raise_for_status()
Expand Down Expand Up @@ -883,21 +949,25 @@ def patch(
def put(
self,
url: str,
data: Optional[Union[dict, str]] = None,
data: Optional[Union[dict, str, bytes]] = None,
json: Optional[Union[dict, str]] = None,
params: Optional[dict] = None,
headers: Optional[dict] = None,
stream: bool = False,
timeout: Optional[Union[float, httpx.Timeout]] = None,
content: Any = None,
):
try:
# Prepare data/content parameters to prevent httpx DeprecationWarning (memory leak fix)
request_data, request_content = _prepare_request_data_and_content(data, content)

if timeout is not None:
req = self.client.build_request(
"PUT", url, data=data, json=json, params=params, headers=headers, timeout=timeout # type: ignore
"PUT", url, data=request_data, json=json, params=params, headers=headers, timeout=timeout, content=request_content # type: ignore
)
else:
req = self.client.build_request(
"PUT", url, data=data, json=json, params=params, headers=headers # type: ignore
"PUT", url, data=request_data, json=json, params=params, headers=headers, content=request_content # type: ignore
)
response = self.client.send(req, stream=stream)
return response
Expand All @@ -913,21 +983,25 @@ def put(
def delete(
self,
url: str,
data: Optional[Union[dict, str]] = None, # type: ignore
data: Optional[Union[dict, str, bytes]] = None, # type: ignore
json: Optional[dict] = None,
params: Optional[dict] = None,
headers: Optional[dict] = None,
timeout: Optional[Union[float, httpx.Timeout]] = None,
stream: bool = False,
content: Any = None,
):
try:
# Prepare data/content parameters to prevent httpx DeprecationWarning (memory leak fix)
request_data, request_content = _prepare_request_data_and_content(data, content)

if timeout is not None:
req = self.client.build_request(
"DELETE", url, data=data, json=json, params=params, headers=headers, timeout=timeout # type: ignore
"DELETE", url, data=request_data, json=json, params=params, headers=headers, timeout=timeout, content=request_content # type: ignore
)
else:
req = self.client.build_request(
"DELETE", url, data=data, json=json, params=params, headers=headers # type: ignore
"DELETE", url, data=request_data, json=json, params=params, headers=headers, content=request_content # type: ignore
)
response = self.client.send(req, stream=stream)
response.raise_for_status()
Expand Down
Loading