Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 20 additions & 21 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ jobs:
- name: Setup Python version
uses: actions/setup-python@v5
with:
python-version: '3.13'
python-version: '3.12'

- name: Install dependencies
run: |
pip install -r scripts/github_actions/requirements.txt
pip install black==24.10.0

- name: Run black on Python files
run: |
Expand All @@ -38,22 +38,21 @@ jobs:
run: |
black --include '\.pyi$' --check --verbose .

# typing:
# runs-on: ubuntu-latest
# steps:
# - name: Check out code
# uses: actions/checkout@v4

# - name: Setup Python version
# uses: actions/setup-python@v5
# with:
# python-version: '3.13'

# - name: Install dependencies
# run: |
# pip install -r scripts/github_actions/requirements.txt
# pip install -r dev-requirements.txt

# - name: Run mypy
# run: |
# mypy --check quasardb
typing:
runs-on: ubuntu-22.04
steps:
- name: Check out code
uses: actions/checkout@v4

- name: Setup Python version
uses: actions/setup-python@v5
with:
python-version: '3.7'

- name: Install dependencies
run: |
pip install -r dev-requirements.txt

- name: Run mypy
run: |
mypy --check quasardb
6 changes: 3 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,9 @@ celerybeat-schedule
*.sage.py

# Environments
.env
.venv
venv
.env*/
.venv*/
venv*/

# Spyder project settings
.spyderproject
Expand Down
3 changes: 2 additions & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ setuptools-git == 1.2

# Linting
black==24.10.0; python_version >= '3.9'
black == 23.3.0; python_version < '3.9'
black==23.3.0; python_version < '3.9'

# Stubs
mypy
pybind11-stubgen
pandas-stubs
5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,6 @@ xfail_strict = true
filterwarnings = []
testpaths = ["tests"]

# [tool.mypy]
# python_version = "3.9"
[tool.mypy]
python_version = "3.7"
disallow_untyped_defs = true
9 changes: 5 additions & 4 deletions quasardb/extensions/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from .writer import extend_writer
from typing import Any, List

from .writer import extend_writer

__all__ = []
__all__: List[Any] = []


def extend_module(m):
m.Writer = extend_writer(m.Writer)
def extend_module(m: Any) -> None:
extend_writer(m.Writer)
40 changes: 22 additions & 18 deletions quasardb/extensions/writer.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import copy
import quasardb
from typing import Any, Callable, Dict, List, Optional

import numpy as np
import numpy.ma as ma

__all__ = []
import quasardb

__all__: List[Any] = []


def _ensure_ctype(self, idx, ctype):
def _ensure_ctype(self: Any, idx: int, ctype: quasardb.ColumnType) -> None:
assert "table" in self._legacy_state
infos = self._legacy_state["table"].list_columns()
cinfo = infos[idx]
Expand All @@ -24,7 +27,7 @@ def _ensure_ctype(self, idx, ctype):
raise quasardb.IncompatibleTypeError()


def _legacy_next_row(self, table):
def _legacy_next_row(self: Any, table: Any) -> Dict[str, Any]:
if "pending" not in self._legacy_state:
self._legacy_state["pending"] = []

Expand All @@ -37,56 +40,56 @@ def _legacy_next_row(self, table):
return self._legacy_state["pending"][-1]


def _legacy_current_row(self):
def _legacy_current_row(self: Any) -> Dict[str, Any]:
return self._legacy_state["pending"][-1]


def _legacy_start_row(self, table, x):
def _legacy_start_row(self: Any, table: Any, x: np.datetime64) -> None:
row = _legacy_next_row(self, table)
assert "$timestamp" not in row
row["$timestamp"] = x


def _legacy_set_double(self, idx, x):
def _legacy_set_double(self: Any, idx: int, x: float) -> None:
_ensure_ctype(self, idx, quasardb.ColumnType.Double)
assert isinstance(x, float)
assert idx not in _legacy_current_row(self)["by_index"]
_legacy_current_row(self)["by_index"][idx] = x


def _legacy_set_int64(self, idx, x):
def _legacy_set_int64(self: Any, idx: int, x: int) -> None:
_ensure_ctype(self, idx, quasardb.ColumnType.Int64)
assert isinstance(x, int)
assert idx not in _legacy_current_row(self)["by_index"]
_legacy_current_row(self)["by_index"][idx] = x


def _legacy_set_timestamp(self, idx, x):
def _legacy_set_timestamp(self: Any, idx: int, x: np.datetime64) -> None:
_ensure_ctype(self, idx, quasardb.ColumnType.Timestamp)
assert idx not in _legacy_current_row(self)["by_index"]
_legacy_current_row(self)["by_index"][idx] = x


def _legacy_set_string(self, idx, x):
def _legacy_set_string(self: Any, idx: int, x: str) -> None:
_ensure_ctype(self, idx, quasardb.ColumnType.String)
assert isinstance(x, str)
assert idx not in _legacy_current_row(self)["by_index"]

_legacy_current_row(self)["by_index"][idx] = x


def _legacy_set_blob(self, idx, x):
def _legacy_set_blob(self: Any, idx: int, x: bytes) -> None:
_ensure_ctype(self, idx, quasardb.ColumnType.Blob)
assert isinstance(x, bytes)
assert idx not in _legacy_current_row(self)["by_index"]

_legacy_current_row(self)["by_index"][idx] = x


def _legacy_push(self):
def _legacy_push(self: Any) -> Optional[quasardb.WriterData]:
if "pending" not in self._legacy_state:
# Extremely likely default case, no "old" rows
return
return None

assert "table" in self._legacy_state
table = self._legacy_state["table"]
Expand All @@ -109,7 +112,7 @@ def _legacy_push(self):
all_idx = set(ctype_by_idx.keys())

# Prepare data structure
pivoted = {"$timestamp": [], "by_index": {}}
pivoted: Dict[str, Any] = {"$timestamp": [], "by_index": {}}
for i in all_idx:
pivoted["by_index"][i] = []

Expand Down Expand Up @@ -140,7 +143,6 @@ def _legacy_push(self):

mask = [x is None for x in xs]

xs_ = []
if all(mask):
xs_ = ma.masked_all(len(xs), dtype=dtype)
else:
Expand All @@ -159,9 +161,11 @@ def _legacy_push(self):
return push_data


def _wrap_fn(old_fn, replace_fn):
def _wrap_fn(
old_fn: Callable[..., Any], replace_fn: Callable[..., Optional[quasardb.WriterData]]
) -> Callable[..., Any]:

def wrapped(self, *args, **kwargs):
def wrapped(self: Any, *args: Any, **kwargs: Any) -> Any:
data = replace_fn(self)
if data:
return old_fn(self, data, *args, **kwargs)
Expand All @@ -171,7 +175,7 @@ def wrapped(self, *args, **kwargs):
return wrapped


def extend_writer(x):
def extend_writer(x: Any) -> None:
"""
Extends the writer with the "old", batch inserter API. This is purely
a backwards compatibility layer, and we want to avoid having to maintain that
Expand Down
25 changes: 17 additions & 8 deletions quasardb/firehose.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,27 @@
import time
import quasardb
import logging
import time
from typing import Any, Dict, Iterator, List, Optional, Tuple

import numpy as np

from quasardb import Cluster

FIREHOSE_TABLE = "$qdb.firehose"
POLL_INTERVAL = 0.1

logger = logging.getLogger("quasardb.firehose")


def _init():
def _init() -> Dict[str, Any]:
"""
Initialize our internal state.
"""
return {"last": None, "seen": set()}


def _get_transactions_since(conn, table_name, last):
def _get_transactions_since(
conn: Cluster, table_name: str, last: Optional[Dict[str, Any]]
) -> List[Dict[str, Any]]:
"""
Retrieve all transactions since a certain timestamp. `last` is expected to be a dict
firehose row with at least a $timestamp attached.
Expand All @@ -33,15 +38,19 @@ def _get_transactions_since(conn, table_name, last):
return conn.query(q)


def _get_transaction_data(conn, table_name, begin, end):
def _get_transaction_data(
conn: Cluster, table_name: str, begin: str, end: str
) -> List[Dict[str, Any]]:
"""
Gets all data from a certain table.
"""
q = 'SELECT * FROM "{}" IN RANGE ({}, {}) '.format(table_name, begin, end)
return conn.query(q)


def _get_next(conn, table_name, state):
def _get_next(
conn: Cluster, table_name: str, state: Dict[str, Any]
) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]:

# Our flow to retrieve new data is as follows:
# 1. Based on the state's last processed transaction, retrieve all transactions
Expand All @@ -52,7 +61,7 @@ def _get_next(conn, table_name, state):

txs = _get_transactions_since(conn, table_name, state["last"])

xs = list()
xs: List[Dict[str, Any]] = []
for tx in txs:
txid = tx["transaction_id"]

Expand Down Expand Up @@ -83,7 +92,7 @@ def _get_next(conn, table_name, state):
return (state, xs)


def subscribe(conn, table_name):
def subscribe(conn: Cluster, table_name: str) -> Iterator[Dict[str, Any]]:
state = _init()

while True:
Expand Down
Loading