Skip to content

Commit fb7b3f9

Browse files
lebuniAdrian Immer
andauthored
feat: Add Dedicated DataFrame Methods for Writing and Querying (#177)
* feat: add dedicated functions for dataframes Dataframes were handled before by the general write command, but not documented and not typechecked. Therefore implemented separate functions for writing and querying dataframes. * fix: write_api TypeError in kwargs PR #158 introduced a bug writing dataframes. Filtering now the kwargs before forwarding them to lower levels --------- Co-authored-by: Adrian Immer <a.immer@munichelectrification.com>
1 parent 538fac9 commit fb7b3f9

File tree

7 files changed

+422
-7
lines changed

7 files changed

+422
-7
lines changed

.vscode/settings.json

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
{
2+
"python.testing.pytestEnabled": true,
3+
"python.testing.unittestEnabled": false,
4+
"python.testing.pytestArgs": [
5+
"tests"
6+
]
7+
}

CHANGELOG.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,17 @@
22

33
# 0.17.0 [unreleased]
44

5+
### Features
6+
7+
1. [#177](https://github.com/InfluxCommunity/influxdb3-python/pull/177): Add dedicated DataFrame methods for improved usability and type safety:
8+
- `write_dataframe()`: New method for writing pandas and polars DataFrames with explicit parameters (`measurement`, `timestamp_column`, `tags`, `timestamp_timezone`).
9+
- `query_dataframe()`: New method for querying data directly to a pandas or polars DataFrame via the `frame_type` parameter.
10+
- Updated README with clear examples for DataFrame operations.
11+
12+
### Bug Fixes
13+
14+
1. [#177](https://github.com/InfluxCommunity/influxdb3-python/pull/177): Fix `TypeError` when writing DataFrames. Serializer-specific kwargs (e.g., `data_frame_measurement_name`) are now filtered before being passed to the HTTP layer.
15+
516
### CI
617

718
1. [#164](https://github.com/InfluxCommunity/influxdb3-python/pull/164): Fix pipelines not downloading the correct python images.

README.md

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -134,14 +134,46 @@ print(f'DONE writing from csv in {callback.write_count} batch(es)')
134134

135135
```
136136

137-
### Pandas DF
137+
### Pandas DataFrame
138138
```python
139-
client._write_api.write(bucket="pokemon-codex", record=pd_df, data_frame_measurement_name='caught', data_frame_tag_columns=['trainer', 'id', 'num'], data_frame_timestamp_column='timestamp')
139+
import pandas as pd
140+
141+
# Create a DataFrame with a timestamp column
142+
df = pd.DataFrame({
143+
'time': pd.to_datetime(['2024-01-01', '2024-01-02', '2024-01-03']),
144+
'trainer': ['Ash', 'Misty', 'Brock'],
145+
'pokemon_id': [25, 120, 74],
146+
'pokemon_name': ['Pikachu', 'Staryu', 'Geodude']
147+
})
148+
149+
# Write the DataFrame - timestamp_column is required for consistency
150+
client.write_dataframe(
151+
df,
152+
measurement='caught',
153+
timestamp_column='time',
154+
tags=['trainer', 'pokemon_id']
155+
)
140156
```
141157

142-
### Polars DF
158+
### Polars DataFrame
143159
```python
144-
client._write_api.write(bucket="pokemon-codex", record=pl_df, data_frame_measurement_name='caught', data_frame_tag_columns=['trainer', 'id', 'num'], data_frame_timestamp_column='timestamp')
160+
import polars as pl
161+
162+
# Create a DataFrame with a timestamp column
163+
df = pl.DataFrame({
164+
'time': ['2024-01-01T00:00:00Z', '2024-01-02T00:00:00Z'],
165+
'trainer': ['Ash', 'Misty'],
166+
'pokemon_id': [25, 120],
167+
'pokemon_name': ['Pikachu', 'Staryu']
168+
})
169+
170+
# Write the DataFrame - same API works for both pandas and polars
171+
client.write_dataframe(
172+
df,
173+
measurement='caught',
174+
timestamp_column='time',
175+
tags=['trainer', 'pokemon_id']
176+
)
145177
```
146178

147179
## Querying
@@ -154,6 +186,15 @@ table = reader.read_all()
154186
print(table.to_pandas().to_markdown())
155187
```
156188

189+
### Querying to DataFrame
190+
```python
191+
# Query directly to a pandas DataFrame (default)
192+
df = client.query_dataframe("SELECT * FROM caught WHERE trainer = 'Ash'")
193+
194+
# Query to a polars DataFrame
195+
df = client.query_dataframe("SELECT * FROM caught", frame_type="polars")
196+
```
197+
157198
### Querying with influxql
158199
```python
159200
query = "select * from measurement"

influxdb_client_3/__init__.py

Lines changed: 121 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
import importlib.util
22
import os
33
import urllib.parse
4-
from typing import Any
4+
from typing import Any, List, Literal, Optional, TYPE_CHECKING
55

66
import pyarrow as pa
7+
8+
if TYPE_CHECKING:
9+
import pandas as pd
10+
import polars as pl
711
from pyarrow import ArrowException
812

913
from influxdb_client_3.exceptions import InfluxDB3ClientQueryError
@@ -385,6 +389,77 @@ def write(self, record=None, database=None, **kwargs):
385389
except InfluxDBError as e:
386390
raise e
387391

392+
def write_dataframe(
393+
self,
394+
df: "pd.DataFrame | pl.DataFrame",
395+
measurement: str,
396+
timestamp_column: str,
397+
tags: Optional[List[str]] = None,
398+
timestamp_timezone: Optional[str] = None,
399+
database: Optional[str] = None,
400+
**kwargs
401+
):
402+
"""
403+
Write a DataFrame to InfluxDB.
404+
405+
This method supports both pandas and polars DataFrames, automatically detecting
406+
the DataFrame type and using the appropriate serializer.
407+
408+
:param df: The DataFrame to write. Can be a pandas or polars DataFrame.
409+
:type df: pandas.DataFrame or polars.DataFrame
410+
:param measurement: The name of the measurement to write to.
411+
:type measurement: str
412+
:param timestamp_column: The name of the column containing timestamps.
413+
This parameter is required for consistency between pandas and polars.
414+
:type timestamp_column: str
415+
:param tags: List of column names to use as tags. Remaining columns will be fields.
416+
:type tags: list[str], optional
417+
:param timestamp_timezone: Timezone for the timestamp column (e.g., 'UTC', 'America/New_York').
418+
:type timestamp_timezone: str, optional
419+
:param database: The database to write to. If not provided, uses the database from initialization.
420+
:type database: str, optional
421+
:param kwargs: Additional arguments to pass to the write API.
422+
:raises TypeError: If df is not a pandas or polars DataFrame.
423+
:raises InfluxDBError: If there is an error writing to the database.
424+
425+
Example:
426+
>>> import pandas as pd
427+
>>> df = pd.DataFrame({
428+
... 'time': pd.to_datetime(['2024-01-01', '2024-01-02']),
429+
... 'city': ['London', 'Paris'],
430+
... 'temperature': [15.0, 18.0]
431+
... })
432+
>>> client.write_dataframe(
433+
... df,
434+
... measurement='weather',
435+
... timestamp_column='time',
436+
... tags=['city']
437+
... )
438+
"""
439+
if database is None:
440+
database = self._database
441+
442+
# Detect DataFrame type
443+
df_type = str(type(df))
444+
if 'pandas' not in df_type and 'polars' not in df_type:
445+
raise TypeError(
446+
f"Expected a pandas or polars DataFrame, but got {type(df).__name__}. "
447+
"Please pass a valid DataFrame object."
448+
)
449+
450+
try:
451+
return self._write_api.write(
452+
bucket=database,
453+
record=df,
454+
data_frame_measurement_name=measurement,
455+
data_frame_tag_columns=tags or [],
456+
data_frame_timestamp_column=timestamp_column,
457+
data_frame_timestamp_timezone=timestamp_timezone,
458+
**kwargs
459+
)
460+
except InfluxDBError as e:
461+
raise e
462+
388463
def write_file(self, file, measurement_name=None, tag_columns=None, timestamp_column='time', database=None,
389464
file_parser_options=None, **kwargs):
390465
"""
@@ -467,6 +542,51 @@ def query(self, query: str, language: str = "sql", mode: str = "all", database:
467542
except ArrowException as e:
468543
raise InfluxDB3ClientQueryError(f"Error while executing query: {e}")
469544

545+
def query_dataframe(
546+
self,
547+
query: str,
548+
language: str = "sql",
549+
database: Optional[str] = None,
550+
frame_type: Literal["pandas", "polars"] = "pandas",
551+
**kwargs
552+
) -> "pd.DataFrame | pl.DataFrame":
553+
"""
554+
Query data from InfluxDB and return as a DataFrame.
555+
556+
This is a convenience method that wraps query() and returns the result
557+
directly as a pandas or polars DataFrame.
558+
559+
:param query: The query to execute on the database.
560+
:type query: str
561+
:param language: The query language to use. Should be "sql" or "influxql". Defaults to "sql".
562+
:type language: str
563+
:param database: The database to query from. If not provided, uses the database from initialization.
564+
:type database: str, optional
565+
:param frame_type: The type of DataFrame to return. Either "pandas" or "polars". Defaults to "pandas".
566+
:type frame_type: Literal["pandas", "polars"]
567+
:param kwargs: Additional arguments to pass to the query API.
568+
:keyword query_parameters: Query parameters as a dictionary of key-value pairs.
569+
:return: Query result as a pandas or polars DataFrame.
570+
:rtype: pandas.DataFrame or polars.DataFrame
571+
:raises ImportError: If polars is requested but not installed.
572+
573+
Example:
574+
>>> # Query and get a pandas DataFrame
575+
>>> df = client.query_dataframe("SELECT * FROM weather WHERE city = 'London'")
576+
>>>
577+
>>> # Query and get a polars DataFrame
578+
>>> df = client.query_dataframe(
579+
... "SELECT * FROM weather",
580+
... frame_type="polars"
581+
... )
582+
"""
583+
if frame_type == "polars" and polars is False:
584+
raise ImportError(
585+
"Polars is not installed. Please install it with `pip install polars`."
586+
)
587+
588+
return self.query(query=query, language=language, mode=frame_type, database=database, **kwargs)
589+
470590
async def query_async(self, query: str, language: str = "sql", mode: str = "all", database: str = None, **kwargs):
471591
"""Query data from InfluxDB asynchronously.
472592

influxdb_client_3/write_client/client/write_api.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,21 @@
2727
DEFAULT_WRITE_NO_SYNC = False
2828
DEFAULT_WRITE_TIMEOUT = 10_000
2929

30+
# Kwargs consumed during serialization that should not be passed to _post_write
31+
SERIALIZER_KWARGS = {
32+
# DataFrame-specific kwargs
33+
'data_frame_measurement_name',
34+
'data_frame_tag_columns',
35+
'data_frame_timestamp_column',
36+
'data_frame_timestamp_timezone',
37+
# Record-specific kwargs (dict, NamedTuple, dataclass)
38+
'record_measurement_key',
39+
'record_measurement_name',
40+
'record_time_key',
41+
'record_tag_keys',
42+
'record_field_keys',
43+
}
44+
3045
logger = logging.getLogger('influxdb_client_3.write_client.client.write_api')
3146

3247
if _HAS_DATACLASS:
@@ -397,9 +412,12 @@ def write(self, bucket: str, org: str = None,
397412

398413
_async_req = True if self._write_options.write_type == WriteType.asynchronous else False
399414

415+
# Filter out serializer-specific kwargs before passing to _post_write
416+
http_kwargs = {k: v for k, v in kwargs.items() if k not in SERIALIZER_KWARGS}
417+
400418
def write_payload(payload):
401419
final_string = b'\n'.join(payload[1])
402-
return self._post_write(_async_req, bucket, org, final_string, payload[0], no_sync, **kwargs)
420+
return self._post_write(_async_req, bucket, org, final_string, payload[0], no_sync, **http_kwargs)
403421

404422
results = list(map(write_payload, payloads.items()))
405423
if not _async_req:

0 commit comments

Comments
 (0)