diff --git a/litellm/llms/custom_httpx/http_handler.py b/litellm/llms/custom_httpx/http_handler.py index accdddbc4ddf..cc451e5fa951 100644 --- a/litellm/llms/custom_httpx/http_handler.py +++ b/litellm/llms/custom_httpx/http_handler.py @@ -3,7 +3,7 @@ import ssl import sys import time -from typing import TYPE_CHECKING, Any, Callable, Dict, List, Mapping, Optional, Union +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Mapping, Optional, Tuple, Union import certifi import httpx @@ -47,6 +47,46 @@ _DEFAULT_TIMEOUT = httpx.Timeout(timeout=5.0, connect=5.0) +def _prepare_request_data_and_content( + data: Optional[Union[dict, str, bytes]] = None, + content: Any = None, +) -> Tuple[Optional[Union[dict, Mapping]], Any]: + """ + Helper function to route data/content parameters correctly for httpx requests + + This prevents httpx DeprecationWarnings that cause memory leaks. + + Background: + - httpx shows a DeprecationWarning when you pass bytes/str to `data=` + - It wants you to use `content=` instead for bytes/str + - The warning itself leaks memory when triggered repeatedly + + Solution: + - Move bytes/str from `data=` to `content=` before calling build_request + - Keep dicts in `data=` (that's still the correct parameter for dicts) + + Args: + data: Request data (can be dict, str, or bytes) + content: Request content (raw bytes/str) + + Returns: + Tuple of (request_data, request_content) properly routed for httpx + """ + request_data = None + request_content = content + + if data is not None: + if isinstance(data, (bytes, str)): + # Bytes/strings belong in content= (only if not already provided) + if content is None: + request_content = data + else: + # dict/Mapping stays in data= parameter + request_data = data + + return request_data, request_content + + def get_ssl_configuration( ssl_verify: Optional[VerifyTypes] = None, ) -> Union[bool, str, ssl.SSLContext]: @@ -301,17 +341,20 @@ async def post( if timeout is None: timeout = self.timeout + # Prepare data/content parameters to prevent httpx DeprecationWarning (memory leak fix) + request_data, request_content = _prepare_request_data_and_content(data, content) + req = self.client.build_request( "POST", url, - data=data, # type: ignore + data=request_data, json=json, params=params, headers=headers, timeout=timeout, files=files, - content=content, - ) + content=request_content, + ) response = await self.client.send(req, stream=stream) response.raise_for_status() return response @@ -364,19 +407,23 @@ async def post( async def put( self, url: str, - data: Optional[Union[dict, str]] = None, # type: ignore + data: Optional[Union[dict, str, bytes]] = None, # type: ignore json: Optional[dict] = None, params: Optional[dict] = None, headers: Optional[dict] = None, timeout: Optional[Union[float, httpx.Timeout]] = None, stream: bool = False, + content: Any = None, ): try: if timeout is None: timeout = self.timeout + # Prepare data/content parameters to prevent httpx DeprecationWarning (memory leak fix) + request_data, request_content = _prepare_request_data_and_content(data, content) + req = self.client.build_request( - "PUT", url, data=data, json=json, params=params, headers=headers, timeout=timeout # type: ignore + "PUT", url, data=request_data, json=json, params=params, headers=headers, timeout=timeout, content=request_content # type: ignore ) response = await self.client.send(req) response.raise_for_status() @@ -424,19 +471,23 @@ async def put( async def patch( self, url: str, - data: Optional[Union[dict, str]] = None, # type: ignore + data: Optional[Union[dict, str, bytes]] = None, # type: ignore json: Optional[dict] = None, params: Optional[dict] = None, headers: Optional[dict] = None, timeout: Optional[Union[float, httpx.Timeout]] = None, stream: bool = False, + content: Any = None, ): try: if timeout is None: timeout = self.timeout + # Prepare data/content parameters to prevent httpx DeprecationWarning (memory leak fix) + request_data, request_content = _prepare_request_data_and_content(data, content) + req = self.client.build_request( - "PATCH", url, data=data, json=json, params=params, headers=headers, timeout=timeout # type: ignore + "PATCH", url, data=request_data, json=json, params=params, headers=headers, timeout=timeout, content=request_content # type: ignore ) response = await self.client.send(req) response.raise_for_status() @@ -484,18 +535,23 @@ async def patch( async def delete( self, url: str, - data: Optional[Union[dict, str]] = None, # type: ignore + data: Optional[Union[dict, str, bytes]] = None, # type: ignore json: Optional[dict] = None, params: Optional[dict] = None, headers: Optional[dict] = None, timeout: Optional[Union[float, httpx.Timeout]] = None, stream: bool = False, + content: Any = None, ): try: if timeout is None: timeout = self.timeout + + # Prepare data/content parameters to prevent httpx DeprecationWarning (memory leak fix) + request_data, request_content = _prepare_request_data_and_content(data, content) + req = self.client.build_request( - "DELETE", url, data=data, json=json, params=params, headers=headers, timeout=timeout # type: ignore + "DELETE", url, data=request_data, json=json, params=params, headers=headers, timeout=timeout, content=request_content # type: ignore ) response = await self.client.send(req, stream=stream) response.raise_for_status() @@ -543,8 +599,11 @@ async def single_connection_post_request( Used for retrying connection client errors. """ + # Prepare data/content parameters to prevent httpx DeprecationWarning (memory leak fix) + request_data, request_content = _prepare_request_data_and_content(data, content) + req = client.build_request( - "POST", url, data=data, json=json, params=params, headers=headers, content=content # type: ignore + "POST", url, data=request_data, json=json, params=params, headers=headers, content=request_content # type: ignore ) response = await client.send(req, stream=stream) response.raise_for_status() @@ -798,21 +857,24 @@ def post( logging_obj: Optional[LiteLLMLoggingObject] = None, ): try: + # Prepare data/content parameters to prevent httpx DeprecationWarning (memory leak fix) + request_data, request_content = _prepare_request_data_and_content(data, content) + if timeout is not None: req = self.client.build_request( "POST", url, - data=data, # type: ignore + data=request_data, # type: ignore json=json, params=params, headers=headers, timeout=timeout, files=files, - content=content, # type: ignore + content=request_content, # type: ignore ) else: req = self.client.build_request( - "POST", url, data=data, json=json, params=params, headers=headers, files=files, content=content # type: ignore + "POST", url, data=request_data, json=json, params=params, headers=headers, files=files, content=request_content # type: ignore ) response = self.client.send(req, stream=stream) response.raise_for_status() @@ -840,21 +902,25 @@ def post( def patch( self, url: str, - data: Optional[Union[dict, str]] = None, + data: Optional[Union[dict, str, bytes]] = None, json: Optional[Union[dict, str]] = None, params: Optional[dict] = None, headers: Optional[dict] = None, stream: bool = False, timeout: Optional[Union[float, httpx.Timeout]] = None, + content: Any = None, ): try: + # Prepare data/content parameters to prevent httpx DeprecationWarning (memory leak fix) + request_data, request_content = _prepare_request_data_and_content(data, content) + if timeout is not None: req = self.client.build_request( - "PATCH", url, data=data, json=json, params=params, headers=headers, timeout=timeout # type: ignore + "PATCH", url, data=request_data, json=json, params=params, headers=headers, timeout=timeout, content=request_content # type: ignore ) else: req = self.client.build_request( - "PATCH", url, data=data, json=json, params=params, headers=headers # type: ignore + "PATCH", url, data=request_data, json=json, params=params, headers=headers, content=request_content # type: ignore ) response = self.client.send(req, stream=stream) response.raise_for_status() @@ -883,21 +949,25 @@ def patch( def put( self, url: str, - data: Optional[Union[dict, str]] = None, + data: Optional[Union[dict, str, bytes]] = None, json: Optional[Union[dict, str]] = None, params: Optional[dict] = None, headers: Optional[dict] = None, stream: bool = False, timeout: Optional[Union[float, httpx.Timeout]] = None, + content: Any = None, ): try: + # Prepare data/content parameters to prevent httpx DeprecationWarning (memory leak fix) + request_data, request_content = _prepare_request_data_and_content(data, content) + if timeout is not None: req = self.client.build_request( - "PUT", url, data=data, json=json, params=params, headers=headers, timeout=timeout # type: ignore + "PUT", url, data=request_data, json=json, params=params, headers=headers, timeout=timeout, content=request_content # type: ignore ) else: req = self.client.build_request( - "PUT", url, data=data, json=json, params=params, headers=headers # type: ignore + "PUT", url, data=request_data, json=json, params=params, headers=headers, content=request_content # type: ignore ) response = self.client.send(req, stream=stream) return response @@ -913,21 +983,25 @@ def put( def delete( self, url: str, - data: Optional[Union[dict, str]] = None, # type: ignore + data: Optional[Union[dict, str, bytes]] = None, # type: ignore json: Optional[dict] = None, params: Optional[dict] = None, headers: Optional[dict] = None, timeout: Optional[Union[float, httpx.Timeout]] = None, stream: bool = False, + content: Any = None, ): try: + # Prepare data/content parameters to prevent httpx DeprecationWarning (memory leak fix) + request_data, request_content = _prepare_request_data_and_content(data, content) + if timeout is not None: req = self.client.build_request( - "DELETE", url, data=data, json=json, params=params, headers=headers, timeout=timeout # type: ignore + "DELETE", url, data=request_data, json=json, params=params, headers=headers, timeout=timeout, content=request_content # type: ignore ) else: req = self.client.build_request( - "DELETE", url, data=data, json=json, params=params, headers=headers # type: ignore + "DELETE", url, data=request_data, json=json, params=params, headers=headers, content=request_content # type: ignore ) response = self.client.send(req, stream=stream) response.raise_for_status()