V3IO Frames ("Frames") is a multi-model open-source data-access library that provides a unified high-performance DataFrame API for working with different types of data sources (backends). The library was developed by Iguazio to simplify working with data in the Iguazio MLOps Platform ("the platform"), but it can be extended to support additional backend types.
- Overview
- User Authentication
ClientConstructor- Common
ClientMethod Parameters createMethodwriteMethodreadMethoddeleteMethodexecuteMethodhistoryMethod
The current version of Frames supports Python 3.7 and 3.9.
To use Frames, you first need to import the v3io_frames Python library. For example:
import v3io_frames as v3fThen, you need to create and initialize an instance of the Client class; see Client Constructor.
You can then use the client methods to perform different data operations on the supported backend types.
All Frames client methods receive a backend parameter for setting the Frames backend type.
Frames currently supports the following backend types:
nosql|kv— a platform NoSQL (key/value) table.
Note: The documentation uses the
"nosql"alias to the"kv"type, which was added in Frames v0.6.10-v0.9.13;"kv"is still supported for backwards compatibility with earlier releases.stream— a platform data stream [Tech Preview].tsdb— a time-series database (TSDB).
The Client class features the following methods for supporting operations on a data collection, such as a NoSQL or TSDB table or a data stream:
create— creates a new collection.delete— deletes a collection or specific items of the collection.read— reads data from a collection into pandas DataFrames.write— writes data from pandas DataFrames to a collection.execute— executes a backend-specific command on a collection. Each backend may support multiple commands.history— returns information about requests made to the service.
Note: Some methods or method parameters are backend-specific, as detailed in this reference.
When creating a Frames client, you must provide valid credentials for accessing the backend data, which Frames will use to identify the identity of the user. This can be done by using any of the following alternative methods (documented in order of precedence).
-
Provide the authentication credentials in the call to the
Clientconstructor — either by setting thetokenparameter to a valid authentication token (access key) or by setting theuserandpasswordparameters to a username and password. Note that you cannot set the token parameter concurrently with the username and password parameters. -
Provide the authentication credentials in environment variables — either by setting the
V3IO_ACCESS_KEYvariable to an authentication token or by setting theV3IO_USERNAMEandV3IO_PASSWORDvariables to a username and password.Note:
- When
V3IO_ACCESS_KEYis defined,V3IO_USERNAMEandV3IO_PASSWORDare ignored. - When the client constructor is called with authentication parameters (option #1), the authentication-credentials environment variables (if defined) are ignored.
- When
All Frames operations are executed via an object of the Client class.
Client(address=""[, data_url=""], container=""[, user="", password="", token=""])-
address — The address of the Frames service (
framesd). Use thegrpc://prefix for gRPC (default; recommended) or thehttp://prefix for HTTP. When running locally on the platform, set this parameter toframesd:8081to use the gRPC (recommended) or toframesd:8080to use HTTP.- Type:
str - Requirement: Required
- Type:
-
data_url — A web-API base URL for accessing the backend data. By default, the client uses the data URL that's configured for the Frames service; for the platform backends, this is typically the HTTPS URL of the web-APIs service of the parent tenant.
- Type:
str - Requirement: Optional
- Type:
-
container — The name of the data container that contains the backend data. For example,
"bigdata"or"users".- Type:
str - Requirement: Required
- Type:
-
user — The username of a user with permissions to access the backend data. See User Authentication.
-
password — A valid password for the user configured in the
userparameter. See User Authentication.- Type:
str - Requirement: Required when the
userparameter is set.
- Type:
-
token — A valid token that allows access to the backend data, such as a platform access key for the platform backends. See User Authentication.
Returns a new Frames Client data object.
The following examples, for local platform execution, both create a Frames client for accessing data in the "users" container by using the authentication credentials of user "iguazio"; the first example uses token (access-key) authentication while the second example uses username and password authentication (see User Authentication):
import v3io_frames as v3f
client = v3f.Client("framesd:8081", token="e8bd4ca2-537b-4175-bf01-8c74963e90bf", container="users")import v3io_frames as v3f
client = v3f.Client("framesd:8081", user="iguazio", password="mypass", container="users")All client methods receive the following common parameters; additional, method-specific parameters are described for each method.
-
backend — The backend data type for the operation. See Backend Types.
- Type:
str - Requirement: Required
- Valid Values:
"nosql"|"stream"|"tsdb"
- Type:
-
table — The relative path to a data collection of the specified backend type in the target data container (as configured for the client object). For example,
"mytable"or"/examples/tsdb/my_metrics".- Type:
str - Requirement: Required unless otherwise specified in the method-specific documentation
- Type:
Creates a new data collection in the configured client data container, according to the specified backend type.
Note: The
createmethod isn't applicable to thenosqlbackend, because NoSQL tables in the platform don't need to be created prior to ingestion; when ingesting data into a table that doesn't exist, the table is automatically created.
create(backend, table, schema=None, if_exists=FAIL, **kw)All Frames backends that support the create method support the following common parameters:
-
if_exists — Determines whether to raise an error when the specified collection (
table) already exists.- Type:
pb.ErrorOptionsenumeration. To use the enumeration, import theframes_pb2 module; for example:
from v3io_frames import frames_pb2 as fpb
- Requirement: Optional
- Valid Values:
FAILto raise an error when the specified collection already exist;IGNOREto ignore this - Default Value:
FAIL
- Type:
-
schema — a schema for describing unstructured collection data. This parameter is intended to be used only for testing purposes with the
csvbackend.- Type: Backend-specific or
None - Requirement: Optional
- Default Value:
None
- Type: Backend-specific or
-
kw — This parameter is used for passing a variable-length list of additional keyword (named) arguments. For more information, see the backend-specific method parameters.
- Type:
**— variable-length keyword arguments list - Requirement: Optional
- Type:
The following create parameters are specific to the tsdb backend and are passed as keyword arguments via the kw parameter:
-
rate — metric-samples ingestion rate.
- Type:
str - Requirement: Required
- Valid Values: A string of the format
"[0-9]+/[smh]"— where 's' = seconds, 'm' = minutes, and 'h' = hours. For example,"1/s"(one sample per minute),"20/m"(20 samples per minute), or"50/h"(50 samples per hour).
- Type:
-
aggregates — A list of aggregation functions for real-time aggregation during the samples ingestion ("pre-aggregation").
- Type:
str - Requirement: Optional
- Valid Values: A string containing a comma-separated list of supported aggregation functions —
avg|count|last|max|min|rate|stddev|stdvar|sum. For example,"count,avg,min,max".
- Type:
-
aggregation_granularity — Aggregation granularity; applicable when the
aggregatesparameter is set.- Type:
str - Requirement: Optional
- Valid Values: A string of the format
"[0-9]+[mhd]"— where 'm' = minutes, 'h' = hours, and 'd' = days. For example,"30m"(30 minutes),"2h"(2 hours), or"1d"(1 day). - Default Value:
"1h"(1 hour)
- Type:
The following create parameters are specific to the stream backend and are passed as keyword arguments via the kw parameter:
-
shards — The number of stream shards to create.
- Type:
int - Requirement: Optional
- Default Value:
1 - Valid Values: A positive integer (>= 1).
For example,
100.
- Type:
-
retention_hours — The stream's retention period, in hours.
- Type:
int - Requirement: Optional
- Default Value:
24 - Valid Values: A positive integer (>= 1).
For example,
2(2 hours).
- Type:
client.create("tsdb", table="mytsdb", rate="10/m")client.create("tsdb", table="/tsdb/my_metrics", rate="1/s", aggregates="count,avg,min,max", aggregation_granularity="1h")client.create("stream", table="/mystream", shards=3)client.create("stream", table="/my_streams/stream1", retention_hours=2)Writes data from a DataFrame to a data collection, according to the specified backend type.
write(backend, table, dfs, expression='', condition='', labels=None,
max_rows_in_msg=0, index_cols=None, save_mode='createNewItemsOnly',
partition_keys=None):Note: The
expressionparameter isn't supported in the current release.
All Frames backends that support the write method support the following common parameters:
-
dfs — One or more DataFrames containing the data to write.
- Type: A single DataFrame, a list of DataFrames, or a DataFrames iterator
- Requirement: Required
-
index_cols — A list of column (attribute) names to be used as index columns for the write operation, regardless of any index-column definitions in the DataFrame. By default, the DataFrame's index columns are used.
Note: The significance and supported number of index columns is backend specific. For example, the
nosqlbackend supports only a single index column for the primary-key item attribute, while thetsdbbackend supports additional index columns for metric labels.- Type:
[]str - Requirement: Optional
- Default Value:
None
- Type:
-
labels — This parameter is currently applicable only to the
tsdbbackend (although it's available for all backends) and is therefore documented as part of thewritemethod'stsdbbackend parameters.- Type:
dict - Requirement: Optional
- Type:
-
save_mode — This parameter is currently applicable only to the
nosqlbackend, and is therefore documented as part of thewritemethod'snosqlbackend parameters.- Type:
str - Requirement: Optional
- Type:
-
max_rows_in_msg — Maximum number of rows to write in each message (write chunk size).
- Type:
int - Requirement: Optional
- Default Value:
0
- Type:
The following write parameters are specific to the nosql backend:
-
condition — A platform condition expression that defines conditions for performing the write operation.
- Type:
str - Requirement: Optional
- Type:
-
save_mode — Save mode, which determines in which circumstances to write new item to the table.
- Type:
str - Requirement: Optional
- Valid Values:
createNewItemsOnly— write only new items; don't replace or update any existing table item with the same name (primary-key attribute value) as a written item."updateItem"— update items; add new items and update the attributes of existing table items."overwriteItem"— overwrite items; add new items and replace any existing table item with the same name as a written item."errorIfTableExists"— create a new table only; only write items if the target table doesn't already exist."overwriteTable"— overwrite the table; replace all existing table items (if any) with the written items.
- Default Value:
createNewItemsOnly
- Type:
The following write parameter descriptions are specific to the tsdb backend:
-
labels — A dictionary of metric labels of the format
{<label>: <value>[, <label>: <value>, ...]}to apply to all the DataFrame rows. For example,{"os": "linux", "arch": "x86"}.- Type:
dict - Requirement: Optional
- Default Value:
None
- Type:
data = [["tom", 10, "TLV"], ["nick", 15, "Berlin"], ["juli", 14, "NY"]]
df = pd.DataFrame(data, columns = ["name", "age", "city"])
df.set_index("name", inplace=True)
client.write(backend="nosql", table="mytable", dfs=df, condition="age>14")from datetime import datetime
df = pd.DataFrame(data=[[30.1, 12.7]], index=[[datetime.now()], ["1"]],
columns=["cpu", "disk"])
df.index.names = ["time", "node"]
client.write(backend="tsdb", table="mytsdb", dfs=df)import numpy as np
df = pd.DataFrame(np.random.rand(9, 3) * 100,
columns=["cpu", "mem", "disk"])
client.write("stream", table="mystream", dfs=df)Reads data from a data collection to a DataFrame, according to the specified backend type.
- Syntax
- Common parameters
nosqlbackendreadparameterstsdbbackendreadparametersstreambackendreadparameters- Return Value
- Examples
read(backend, table='', query='', columns=None, filter='', group_by='',
limit=0, data_format='', row_layout=False, max_rows_in_msg=0, marker='',
iterator=False, get_raw=False, **kw)Note: The
limit,data_format,row_layout, andmarkerparameters aren't supported in the current release, andget_rawis for internal use only.
All Frames backends that support the read method support the following common parameters:
-
iterator — set to
Trueto to return a pandas DataFrames iterator;False(default) returns a single DataFrame.- Type:
bool - Requirement: Optional
- Default Value:
False
- Type:
-
filter — A query filter. For example,
filter="col1=='my_value'".
This parameter is currently applicable only to thenosqlandtsdbbackends, and cannot be used concurrently with thequeryparameter of thetsdbbackend.- Type:
str - Requirement: Optional
- Type:
-
columns — A list of attributes (columns) to return.
This parameter is currently applicable only to thenosqlandtsdbbackends, and cannot be used concurrently with thequeryparameter of thetsdbbackend.- Type:
[]str - Requirement: Optional
- Type:
-
kw — This parameter is used for passing a variable-length list of additional keyword (named) arguments. For more information, see the backend-specific method parameters.
- Type:
**— variable-length keyword arguments list - Requirement: Optional
- Type:
The following read parameters are specific to the nosql backend:
The following parameters are passed as keyword arguments via the kw parameter:
-
reset_index — Set to
Trueto reset the index column of the returned DataFrame and use the auto-generated pandas range-index column;False(default) sets the index column to the table's primary-key attribute.- Type:
bool - Requirement: Optional
- Default Value:
False
- Type:
-
sharding_keys [Tech Preview] — A list of specific sharding keys to query, for range-scan formatted tables only.
- Type:
[]str - Requirement: Optional
- Type:
The following read parameters are specific to the tsdb backend:
-
group_by [Tech Preview] — A group-by query string.
This parameter cannot be used concurrently with thequeryparameter.- Type:
str - Requirement: Optional
- Type:
-
query [Tech Preview] — A query string in SQL format.
Note:
- When setting the
queryparameter, you must provide the path to the TSDB table as part of theFROMclause in the query string and not in thereadmethod'stableparameter. - This parameter cannot be set concurrently with the following parameters:
aggregators,columns,filter, orgroup_byparameters.
- Type:
str - Requirement: Optional
- When setting the
The following parameters are passed as keyword arguments via the kw parameter:
-
start — Start (minimum) time for the read operation.
- Type:
str - Requirement: Optional
- Valid Values: A string containing an RFC 3339 time, a Unix timestamp in milliseconds, a relative time of the format
"now"or"now-[0-9]+[mhd]"(wherem= minutes,h= hours, and'd'= days), or 0 for the earliest time. For example:"2016-01-02T15:34:26Z";"1451748866";"now-90m";"0". - Default Value:
<end time> - 1h
- Type:
-
end — End (maximum) time for the read operation.
- Type:
str - Requirement: Optional
- Valid Values: A string containing an RFC 3339 time, a Unix timestamp in milliseconds, a relative time of the format
"now"or"now-[0-9]+[mhd]"(wherem= minutes,h= hours, and'd'= days), or 0 for the earliest time. For example:"2018-09-26T14:10:20Z";"1537971006000";"now-3h";"now-7d". - Default Value:
now
- Type:
-
step — The query aggregation or downsampling step. The default step is the query's time range, which can be configured via the start and end parameters.
- Type:
str - Requirement: Optional
- Type:
-
aggregators — Aggregation information to return, as a comma-separated list of supported aggregation functions ("aggregators").
This parameter cannot be used concurrently with thequeryparameter.- Type:
str - Requirement: Optional
- Valid Value: The following aggregation functions are supported for over-time aggregation (across each unique label set); for cross-series aggregation (across all metric labels), add "
_all" to the end of the function name:
avg|count|last|max|min|rate|stddev|stdvar|sum
- Type:
-
aggregation_window [Tech Preview] — Aggregation interval for applying over-time aggregation functions, if set in the
aggregatorsorqueryparameters.- Type:
str - Requirement: Optional
- Valid Values: A string of the format
"[0-9]+[mhd]"where 'm' = minutes, 'h' = hours, and 'd' = days. For example,"30m"(30 minutes),"2h"(2 hours), or"1d"(1 day). - Default Value: The query's aggregation step
- Type:
-
multi_index — set to
Trueto display labels as index columns in the read results;False(default) displays only the metric's sample time as an index column.- Type:
bool - Requirement: Optional
- Default Value:
False
- Type:
The following read parameters are specific to the stream backend and are passed as keyword arguments via the kw parameter:
-
seek — Seek type.
When the"seq"or"sequence"seek type is set, you must set thesequenceparameter to the desired record sequence number.
When thetimeseek type is set, you must set thestartparameter to the desired seek start time.- Type:
str - Requirement: Required
- Valid Values:
"time"|"seq"|"sequence"|"latest"|"earliest"
- Type:
-
shard_id — The ID of the stream shard from which to read.
- Type:
str - Requirement: Required
- Valid values:
"0"..."<stream shard count> - 1"
- Type:
-
sequence — The sequence number of the record from which to start reading.
- Type:
int64 - Requirement: Required
- Type:
-
start — The earliest record ingestion time from which to start reading.
- Type:
str - Requirement: Required when
seek="time" - Valid Values: A string containing an RFC 3339 time, a Unix timestamp in milliseconds, a relative time of the format
"now"or"now-[0-9]+[mhd]"(wherem= minutes,h= hours, and'd'= days), or 0 for the earliest time. For example:"2016-01-02T15:34:26Z";"1451748866";"now-90m";"0".
- Type:
- When the value of the
iteratorparameter isFalse(default) — returns a single DataFrame. - When the value of the
iteratorparameter isTrue— returns a DataFrames iterator.
df = client.read(backend="nosql", table="mytable", filter="col1>666")df = client.read("tsdb", table="mytsdb" start="0", multi_index=True)df = client.read(backend="tsdb", query="select avg(cpu) as cpu, avg(disk) from 'mytsdb' where node='1'", start="now-1d", end="now", step="2h")df = client.read(backend="stream", table="mystream", seek="latest", shard_id="5")Deletes a data collection or specific collection items, according to the specified backend type.
delete(backend, table, filter='', start='', end='', if_missing=FAIL-
if_missing — Determines whether to raise an error when the specified collection (
table) doesn't exist.- Type:
pb.ErrorOptionsenumeration. To use the enumeration, import theframes_pb2 module; for example:
from v3io_frames import frames_pb2 as fpb
- Requirement: Optional
- Valid Values:
FAILto raise an error when the specified collection doesn't exist;IGNOREto ignore this - Default Value:
FAIL
- Type:
The following delete parameters are specific to the nosql backend:
-
filter — A filter expression that identifies specific items to delete.
- Type:
str - Requirement: Optional
- Default Value:
""— delete the entire table and its schema file
- Type:
The following delete parameters are specific to the tsdb backend:
-
start — Start (minimum) time for the delete operation — i.e., delete only items whose data sample time is at or after (
>=) the specified start time.- Type:
str - Requirement: Optional
- Valid Values: A string containing an RFC 3339 time, a Unix timestamp in milliseconds, a relative time of the format
"now"or"now-[0-9]+[mhd]"(wherem= minutes,h= hours, and'd'= days), or 0 for the earliest time. For example:"2016-01-02T15:34:26Z";"1451748866";"now-90m";"0". - Default Value:
""when neitherstartnorendare set — delete the entire table and its schema file (.schema);0whenendis set
- Type:
-
end —
str— End (maximum) time for the delete operation — i.e., delete only items whose data sample time is before or at (<=) the specified end time.- Type:
str - Requirement: Optional
- Valid Values: A string containing an RFC 3339 time, a Unix timestamp in milliseconds, a relative time of the format
"now"or"now-[0-9]+[mhd]"(wherem= minutes,h= hours, and'd'= days), or 0 for the earliest time. For example:"2018-09-26T14:10:20Z";"1537971006000";"now-3h";"now-7d". - Default Value:
""when neitherstartnorendare set — delete the entire table and its schema file (.schema);0whenstartis set
- Type:
Note:
- When neither the
startnorendparameters are set, the entire TSDB table and its schema file are deleted.- Only full table partitions within the specified time frame (as determined by the
startandendparameters) are deleted. Items within the specified time frames that reside within partitions that begin before the delete start time or end after the delete end time aren't deleted. The partition interval is calculated automatically based on the table's ingestion rate and is stored in the TSDB'spartitionerIntervalschema field (see the .schema file).
client.delete(backend="nosql", table="mytable", filter="age > 40")client.delete(backend="tsdb", table="mytsdb", start="now-1d", end="now-5h")from v3io_frames import frames_pb2 as fpb
client.delete(backend="stream", table="mystream", if_missing=fpb.IGNORE)Extends the basic CRUD functionality of the other client methods via backend-specific commands for performing operations on a data collection.
Note: Currently, no
executecommands are available for thetsdbbackend.
execute(backend, table, command="", args=None)All Frames backends that support the execute method support the following common parameters:
-
command — The command to execute.
- Type:
str - Requirement: Required
- Valid Values: Backend-specific
- Type:
-
args — A dictionary of
<argument name>: <value>pairs for passing command-specific parameters (arguments).- Type:
dict - Requirement and Valid Values: Backend-specific
- Default Value:
None
- Type:
The following execute commands are specific to the nosql backend:
-
infer | infer_schema — Infers the data schema of a given NoSQL table and creates a schema file for the table.
Example:
client.execute(backend="nosql", table="mytable", command="infer")
The following execute commands are specific to the stream backend:
-
put — Adds records to a stream shard.
Example:
client.execute('stream', table="mystream", command='put', args={'data': '{"cpu": 12.4, "mem": 31.1, "disk": 12.7}', "client_info": "my custom info", "partition_key": "PK1"})
By default every command ran with frames is logged.
History returns information about requests made to the service as a pandas DataFrame.
history(backend='', container='', table='', user='', action='', min_start_time='', max_start_time='',
min_duration=0, max_duration=0):-
backend — filter logs by backend.
- Type:
str - Requirement: Optional
- Type:
-
container — filter logs by container.
- Type:
str - Requirement: Optional
- Type:
-
- Type:
str - Requirement: Optional
- Type:
-
user — filter logs by the user that executed the command.
- Type:
str - Requirement: Optional
- Type:
-
action — filter logs by frames action.
- Type:
str - Requirement: Optional
- Valid Values:
"create"|"delete"|"execute"|"read"|"write"
- Type:
-
min_start_time — specify start time of the desired logs.
- Type:
str - Requirement: Optional
- Valid Values: A string containing an RFC 3339 time, a Unix timestamp in milliseconds, a relative time of the format
"now"or"now-[0-9]+[mhd]"(wherem= minutes,h= hours, and'd'= days), or 0 for the earliest time. For example:"2016-01-02T15:34:26Z";"1451748866";"now-90m";"0".
- Type:
-
max_start_time — specify end time of the desired logs.
- Type:
str - Requirement: Optional
- Valid Values: A string containing an RFC 3339 time, a Unix timestamp in milliseconds, a relative time of the format
"now"or"now-[0-9]+[mhd]"(wherem= minutes,h= hours, and'd'= days), or 0 for the earliest time. For example:"2016-01-02T15:34:26Z";"1451748866";"now-90m";"0".
- Type:
-
min_duration — specify minimum duration in milliseconds for the desired logs.
- Type:
int - Requirement: Optional
- Type:
-
max_duration — specify maximum duration in milliseconds for the desired logs.
- Type:
int - Requirement: Optional
- Type:
Returns a single DataFrame.
df = client.history()df = client.history(backend='tsdb', action='read')df = client.history(container='test-0', min_start_time='now-1d', min_duration='50')To contribute to V3IO Frames, you need to be aware of the following:
The following components are required for building Frames code:
- Go server with support for both the gRPC and HTTP protocols
- Go client
- Python client
The core is written in Go.
The development is done on the development branch and then released to the master branch.
Before submitting changes, test the code:
- To execute the Go tests, run
make test. - To execute the Python tests, run
make test-python.
- If you add Go dependencies, run
make update-go-deps. - If you add Python dependencies, update clients/py/Pipfile and run
make update-py-deps.
Integration tests are run on Travis CI. See .travis.yml for details.
The following environment variables are defined in the Travis settings:
- Docker Container Registry (Quay.io)
DOCKER_PASSWORD— a password for pushing images to Quay.io.DOCKER_USERNAME— a username for pushing images to Quay.io.
- Python Package Index (PyPI)
V3IO_PYPI_PASSWORD— a password for pushing a new release to PyPi.V3IO_PYPI_USER— a username for pushing a new release to PyPi.
- Iguazio Data Science Platform
-
V3IO_SESSION— a JSON encoded map with session information for running tests. For example:'{"url":"45.39.128.5:8081","container":"mitzi","user":"daffy","password":"rabbit season"}'Note: Make sure to embed the JSON object within single quotes (
'{...}').
-
Use the following command to build the Docker image:
make build-dockerUse the following command to run the Docker image:
docker run \
-v /path/to/config.yaml:/etc/framesd.yaml \
quay.io/v3io/frames:unstable