Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion libs/executors/garf_executors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,4 @@ def setup_executor(
'ApiExecutionContext',
]

__version__ = '0.0.9'
__version__ = '0.1.0'
138 changes: 85 additions & 53 deletions libs/executors/garf_executors/entrypoints/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,90 +19,122 @@

from __future__ import annotations

import argparse
import sys
from typing import Optional

import typer
from garf_io import reader
from typing_extensions import Annotated

import garf_executors
from garf_executors import config, exceptions
from garf_executors import exceptions
from garf_executors.config import Config
from garf_executors.entrypoints import utils

typer_app = typer.Typer()

def main():
parser = argparse.ArgumentParser()
parser.add_argument('query', nargs='*')
parser.add_argument('-c', '--config', dest='config', default=None)
parser.add_argument('--source', dest='source', default=None)
parser.add_argument('--output', dest='output', default='console')
parser.add_argument('--input', dest='input', default='file')
parser.add_argument('--log', '--loglevel', dest='loglevel', default='info')
parser.add_argument('--logger', dest='logger', default='local')
parser.add_argument(
'--parallel-queries', dest='parallel_queries', action='store_true'
)
parser.add_argument(
'--no-parallel-queries', dest='parallel_queries', action='store_false'
)
parser.add_argument('--dry-run', dest='dry_run', action='store_true')
parser.add_argument('-v', '--version', dest='version', action='store_true')
parser.add_argument(
'--parallel-threshold', dest='parallel_threshold', default=None, type=int
)
parser.set_defaults(parallel_queries=True)
parser.set_defaults(dry_run=False)
args, kwargs = parser.parse_known_args()

if args.version:
print(garf_executors.__version__)
sys.exit()
logger = utils.init_logging(
loglevel=args.loglevel.upper(), logger_type=args.logger
def _version_callback(show_version: bool) -> None:
if show_version:
print(f'Garf version: {garf_executors.__version__}')
raise typer.Exit()


@typer_app.command(
context_settings={'allow_extra_args': True, 'ignore_unknown_options': True}
)
def main(
source: Annotated[str, typer.Option(help='API alias')],
queries: Annotated[
list[str], typer.Argument(help='One or several query files')
],
config: Annotated[
Optional[str],
typer.Option('--config', '-c', help='Yaml file with parameters for garf'),
] = None,
input: Annotated[
str,
typer.Option(help='Where to get queries from'),
] = 'file',
output: Annotated[
str,
typer.Option(help='Where to write data'),
] = 'console',
parallel_queries: Annotated[
bool, typer.Option(help='Whether to run queries in parallel')
] = True,
version: Annotated[
bool,
typer.Option(
help='Display version of garf',
callback=_version_callback,
is_eager=True,
expose_value=False,
),
] = False,
loglevel: Annotated[
str,
typer.Option(help='Level of logging'),
] = 'INFO',
logger: Annotated[
str,
typer.Option(help='Type of logging'),
] = 'rich',
) -> None:
garf_logger = utils.init_logging(
loglevel=loglevel.upper(), logger_type=logger
)
if not args.query:
logger.error('Please provide one or more queries to run')
found_queries = []
parameters = []
for query in queries:
if query.startswith('--'):
parameters.append(query)
else:
found_queries.append(query)
if not found_queries:
garf_logger.error('Please provide one or more queries to run')
raise exceptions.GarfExecutorError(
'Please provide one or more queries to run'
)
reader_client = reader.create_reader(args.input)
if config_file := args.config:
execution_config = config.Config.from_file(config_file)
if not (context := execution_config.sources.get(args.source)):
reader_client = reader.create_reader(input)
if config:
garf_logger.info('Running queries with config: %s', config)
execution_config = Config.from_file(config)
if not (context := execution_config.sources.get(source)):
raise exceptions.GarfExecutorError(
f'No execution context found for source {args.source} in {config_file}'
f'No execution context found for source {source} in {config}'
)
query_executor = garf_executors.setup_executor(
args.source, context.fetcher_parameters
source, context.fetcher_parameters
)
batch = {query: reader_client.read(query) for query in args.query}
query_executor.execute_batch(batch, context, args.parallel_queries)
batch = {query: reader_client.read(query) for query in found_queries}
query_executor.execute_batch(batch, context, parallel_queries)
else:
extra_parameters = utils.ParamsParser(
['source', args.output, 'macro', 'template']
).parse(kwargs)
['source', output, 'macro', 'template']
).parse(parameters)
source_parameters = extra_parameters.get('source', {})

context = garf_executors.api_executor.ApiExecutionContext(
query_parameters={
'macro': extra_parameters.get('macro'),
'template': extra_parameters.get('template'),
},
writer=args.output,
writer_parameters=extra_parameters.get(args.output),
writer=output,
writer_parameters=extra_parameters.get(output),
fetcher_parameters=source_parameters,
)
query_executor = garf_executors.setup_executor(
args.source, context.fetcher_parameters
source, context.fetcher_parameters
)
if args.parallel_queries:
logger.info('Running queries in parallel')
batch = {query: reader_client.read(query) for query in args.query}
query_executor.execute_batch(batch, context, args.parallel_queries)
if parallel_queries:
garf_logger.info('Running queries in parallel')
batch = {query: reader_client.read(query) for query in found_queries}
query_executor.execute_batch(batch, context, parallel_queries)
else:
logger.info('Running queries sequentially')
for query in args.query:
garf_logger.info('Running queries sequentially')
for query in found_queries:
query_executor.execute(reader_client.read(query), query, context)


if __name__ == '__main__':
main()
typer_app()
3 changes: 2 additions & 1 deletion libs/executors/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ dependencies = [
"garf-io",
"pyyaml",
"pydantic",
"typer",
]
authors = [
{name = "Google Inc. (gTech gPS CSE team)", email = "no-reply@google.com"},
Expand Down Expand Up @@ -52,4 +53,4 @@ all = [
"garf-executors[bq,sql,server]"
]
[project.scripts]
garf="garf_executors.entrypoints.cli:main"
garf="garf_executors.entrypoints.cli:typer_app"
Loading