diff --git a/python/grass/app/cli.py b/python/grass/app/cli.py index 227c3a53168..2b388e088db 100644 --- a/python/grass/app/cli.py +++ b/python/grass/app/cli.py @@ -20,10 +20,47 @@ import tempfile import os import sys +import subprocess from pathlib import Path + import grass.script as gs from grass.app.data import lock_mapset, unlock_mapset, MapsetLockingException +from grass.experimental.tools import Tools + +# Special flags supported besides help and --json which does not need special handling: +SPECIAL_FLAGS = [ + "--interface-description", + "--md-description", + "--wps-process-description", + "--script", +] +# To make this list shorter, we don't support outdated special flags: +# --help-text --html-description --rst-description + + +def subcommand_run_tool(args, tool_args: list, help: bool): + command = [args.tool, *tool_args] + with tempfile.TemporaryDirectory() as tmp_dir_name: + project_name = "project" + project_path = Path(tmp_dir_name) / project_name + gs.create_project(project_path) + with gs.setup.init(project_path) as session: + tools = Tools(session=session, capture_output=False) + try: + if help: + # We consumed the help flag, so we need to add it explicitly. + tools.no_nonsense_run_from_list([*command, "--help"]) + elif any(item in command for item in SPECIAL_FLAGS): + # This is here basically because of how --json behaves, + # two JSON flags are accepted, but --json currently overridden by + # other special flags, so later use of --json in tools will fail + # with the other flags active. + tools.no_nonsense_run_from_list(command) + else: + tools.run_from_list(command) + except subprocess.CalledProcessError as error: + return error.returncode def subcommand_lock_mapset(args): @@ -73,10 +110,22 @@ def main(args=None, program=None): description="Experimental low-level CLI interface to GRASS. Consult developers before using it.", prog=program, ) - subparsers = parser.add_subparsers(title="subcommands", required=True) + subparsers = parser.add_subparsers( + title="subcommands", dest="subcommand", required=True + ) # Subcommand parsers + run_subparser = subparsers.add_parser( + "run", + help="run a tool", + add_help=False, + epilog="Tool name is followed by it parameters.", + ) + run_subparser.add_argument("tool", type=str, nargs="?", help="name of a tool") + run_subparser.add_argument("--help", action="store_true") + run_subparser.set_defaults(func=subcommand_run_tool) + subparser = subparsers.add_parser("lock", help="lock a mapset") subparser.add_argument("mapset_path", type=str) subparser.add_argument( @@ -120,5 +169,12 @@ def main(args=None, program=None): subparser.add_argument("page", type=str) subparser.set_defaults(func=subcommand_show_man) - parsed_args = parser.parse_args(args) + # Parsing + parsed_args, other_args = parser.parse_known_args(args) + # Standard help already exited, but we need to handle tools separately. + if parsed_args.subcommand == "run": + if parsed_args.tool is None and parsed_args.help: + run_subparser.print_help() + return 0 + return parsed_args.func(parsed_args, other_args, help=parsed_args.help) return parsed_args.func(parsed_args) diff --git a/python/grass/experimental/Makefile b/python/grass/experimental/Makefile index 46525e3e91a..7b2c0e7d0b1 100644 --- a/python/grass/experimental/Makefile +++ b/python/grass/experimental/Makefile @@ -7,7 +7,8 @@ DSTDIR = $(ETC)/python/grass/experimental MODULES = \ create \ - mapset + mapset \ + tools PYFILES := $(patsubst %,$(DSTDIR)/%.py,$(MODULES) __init__) PYCFILES := $(patsubst %,$(DSTDIR)/%.pyc,$(MODULES) __init__) diff --git a/python/grass/experimental/tests/conftest.py b/python/grass/experimental/tests/conftest.py index 1dbbc95fcd7..910036b3717 100644 --- a/python/grass/experimental/tests/conftest.py +++ b/python/grass/experimental/tests/conftest.py @@ -33,6 +33,14 @@ def xy_session_for_module(tmp_path_factory): yield session +@pytest.fixture +def xy_dataset_session(tmp_path): + """Creates a session with a mapset which has vector with a float column""" + gs.core._create_location_xy(tmp_path, "test") # pylint: disable=protected-access + with gs.setup.init(tmp_path / "test") as session: + yield session + + @pytest.fixture def unique_id(): """A unique alphanumeric identifier""" @@ -69,3 +77,41 @@ def xy_mapset_non_permament(xy_session): # pylint: disable=redefined-outer-name "test1", create=True, env=xy_session.env ) as session: yield session + + +@pytest.fixture +def rows_raster_file3x3(tmp_path): + project = tmp_path / "xy_test3x3" + gs.create_project(project) + with gs.setup.init(project, env=os.environ.copy()) as session: + gs.run_command("g.region", rows=3, cols=3, env=session.env) + gs.mapcalc("rows = row()", env=session.env) + output_file = tmp_path / "rows3x3.grass_raster" + gs.run_command( + "r.pack", + input="rows", + output=output_file, + flags="c", + superquiet=True, + env=session.env, + ) + return output_file + + +@pytest.fixture +def rows_raster_file4x5(tmp_path): + project = tmp_path / "xy_test4x5" + gs.create_project(project) + with gs.setup.init(project, env=os.environ.copy()) as session: + gs.run_command("g.region", rows=4, cols=5, env=session.env) + gs.mapcalc("rows = row()", env=session.env) + output_file = tmp_path / "rows4x5.grass_raster" + gs.run_command( + "r.pack", + input="rows", + output=output_file, + flags="c", + superquiet=True, + env=session.env, + ) + return output_file diff --git a/python/grass/experimental/tests/grass_tools_test.py b/python/grass/experimental/tests/grass_tools_test.py new file mode 100644 index 00000000000..fa6e25c5346 --- /dev/null +++ b/python/grass/experimental/tests/grass_tools_test.py @@ -0,0 +1,423 @@ +"""Test grass.experimental.Tools class""" + +import os +import io + +import pytest + +import grass.script as gs +from grass.experimental.mapset import TemporaryMapsetSession +from grass.experimental.tools import Tools +from grass.exceptions import CalledModuleError + + +def test_key_value_parser_number(xy_dataset_session): + """Check that numbers are parsed as numbers""" + tools = Tools(session=xy_dataset_session) + assert tools.g_region(flags="g").keyval["nsres"] == 1 + + +def test_key_value_parser_multiple_values(xy_dataset_session): + """Check that strings and floats are parsed""" + tools = Tools(session=xy_dataset_session) + name = "surface" + tools.r_surf_gauss(output=name) # needs seed + result = tools.r_info(map=name, flags="g").keyval + assert result["datatype"] == "DCELL" + assert result["nsres"] == 1 + result = tools.r_univar(map=name, flags="g").keyval + # We don't worry about the specific output value, and just test the type. + assert isinstance(result["mean"], float) + + +def test_json_parser(xy_dataset_session): + """Check that JSON is parsed""" + tools = Tools(session=xy_dataset_session) + assert tools.g_region(flags="p", format="json").json["cols"] == 1 + + +def test_json_with_name_and_parameter_call(xy_dataset_session): + """Check that JSON is parsed with a name-and-parameters style call""" + tools = Tools(session=xy_dataset_session) + assert tools.run("g.region", flags="p", format="json").json["cols"] == 1 + + +def test_json_with_direct_name_and_parameter_call(xy_dataset_session): + """Check that JSON is parsed with a name-and-parameters style call""" + tools = Tools(session=xy_dataset_session) + assert tools.no_nonsense_run("g.region", flags="p", format="json").json["cols"] == 1 + + +def test_json_with_subprocess_run_like_call(xy_dataset_session): + """Check that JSON is parsed with a name-and-parameters style call""" + tools = Tools(session=xy_dataset_session) + assert tools.run_from_list(["g.region", "format=json", "-p"]).json["cols"] == 1 + + +def test_json_with_direct_subprocess_run_like_call(xy_dataset_session): + """Check that JSON is parsed with a name-and-parameters style call""" + tools = Tools(session=xy_dataset_session) + assert ( + tools.no_nonsense_run_from_list(["g.region", "format=json", "-p"]).json["cols"] + == 1 + ) + + +def test_help_call_no_parameters(xy_dataset_session): + """Check that JSON is parsed with a name-and-parameters style call""" + tools = Tools(session=xy_dataset_session) + assert ( + "r.slope.aspect" + in tools.no_nonsense_run_from_list(["r.slope.aspect", "--help"]).stderr + ) + + +def test_help_call_with_parameters(xy_dataset_session): + """Check that JSON is parsed with a name-and-parameters style call""" + tools = Tools(session=xy_dataset_session) + assert ( + "r.slope.aspect" + in tools.no_nonsense_run_from_list( + ["r.slope.aspect", "elevation=dem", "slope=slope", "--help"] + ).stderr + ) + + +def test_json_call_with_low_level_call(xy_dataset_session): + """Check that --json call works including JSON data parsing""" + tools = Tools(session=xy_dataset_session) + data = tools.no_nonsense_run_from_list( + ["r.slope.aspect", "elevation=dem", "slope=slope", "--json"] + ).json + assert "inputs" in data + assert data["inputs"][0]["value"] == "dem" + + +def test_json_call_with_high_level_call(xy_dataset_session): + """Check that --json call works including JSON data parsing""" + tools = Tools(session=xy_dataset_session) + data = tools.run_from_list( + ["r.slope.aspect", "elevation=dem", "slope=slope", "--json"] + ).json + assert "inputs" in data + assert data["inputs"][0]["value"] == "dem" + + +def test_json_direct_access(xy_dataset_session): + """Check that JSON is parsed""" + tools = Tools(session=xy_dataset_session) + assert tools.g_search_modules(keyword="random", flags="j")[0]["name"] == "r.random" + + +def test_json_direct_access_bad_key_type(xy_dataset_session): + """Check that JSON is parsed""" + tools = Tools(session=xy_dataset_session) + with pytest.raises(TypeError): + tools.g_search_modules(keyword="random", flags="j")["name"] + + +def test_json_direct_access_bad_key_value(xy_dataset_session): + """Check that JSON is parsed""" + tools = Tools(session=xy_dataset_session) + high_number = 100_000_000 + with pytest.raises(IndexError): + tools.g_search_modules(keyword="random", flags="j")[high_number] + + +def test_json_direct_access_not_json(xy_dataset_session): + """Check that JSON parsing creates an ValueError + + Specifically, this tests the case when format="json" is not set. + """ + tools = Tools(session=xy_dataset_session) + with pytest.raises(ValueError, match=r"format.*json"): + tools.g_search_modules(keyword="random")[0]["name"] + + +def test_stdout_as_text(xy_dataset_session): + """Check that simple text is parsed and has no whitespace""" + tools = Tools(session=xy_dataset_session) + assert tools.g_mapset(flags="p").text == "PERMANENT" + + +def test_stdout_as_space_items(xy_dataset_session): + """Check that whitespace-separated items are parsed""" + tools = Tools(session=xy_dataset_session) + assert tools.g_mapset(flags="l").space_items == ["PERMANENT"] + + +def test_stdout_split_whitespace(xy_dataset_session): + """Check that whitespace-based split function works""" + tools = Tools(session=xy_dataset_session) + assert tools.g_mapset(flags="l").text_split() == ["PERMANENT"] + + +def test_stdout_split_space(xy_dataset_session): + """Check that the split function works with space""" + tools = Tools(session=xy_dataset_session) + # Not a good example usage, but it tests the functionality. + assert tools.g_mapset(flags="l").text_split(" ") == ["PERMANENT"] + + +def test_stdout_without_capturing(xy_dataset_session): + """Check that text is not present when not capturing it""" + tools = Tools(session=xy_dataset_session, capture_output=False) + result = tools.g_mapset(flags="p") + assert not result.stdout + assert result.stdout is None + assert not result.text + assert result.text is None + + +def test_capturing_stderr(xy_dataset_session): + """Check that text is not present when not capturing it""" + tools = Tools(session=xy_dataset_session, errors="ignore") + result = tools.g_mapset(mapset="does_not_exist") + assert result.stderr + assert "does_not_exist" in result.stderr + + +def test_stderr_without_capturing(xy_dataset_session): + """Check that text is not present when not capturing it""" + tools = Tools(session=xy_dataset_session, capture_output=False, errors="ignore") + result = tools.g_mapset(mapset="does_not_exist") + assert not result.stderr + assert result.stderr is None + + +def test_direct_overwrite(xy_dataset_session): + """Check overwrite as a parameter""" + tools = Tools(session=xy_dataset_session) + tools.r_random_surface(output="surface", seed=42) + tools.r_random_surface(output="surface", seed=42, overwrite=True) + + +def test_object_overwrite(xy_dataset_session): + """Check overwrite as parameter of the tools object""" + tools = Tools(session=xy_dataset_session, overwrite=True) + tools.r_random_surface(output="surface", seed=42) + tools.r_random_surface(output="surface", seed=42) + + +def test_no_overwrite(xy_dataset_session): + """Check that it fails without overwrite""" + tools = Tools(session=xy_dataset_session) + tools.r_random_surface(output="surface", seed=42) + with pytest.raises(CalledModuleError, match="overwrite"): + tools.r_random_surface(output="surface", seed=42) + + +def test_env_overwrite(xy_dataset_session): + """Check that overwrite from env parameter is used""" + # env = xy_dataset_session.env.copy() # ideally + env = os.environ.copy() # for now + env["GRASS_OVERWRITE"] = "1" + tools = Tools(session=xy_dataset_session, env=env) + tools.r_random_surface(output="surface", seed=42) + tools.r_random_surface(output="surface", seed=42) + + +def test_global_overwrite_vs_env(xy_dataset_session): + """Check that global overwrite is not used when separate env is used""" + # env = xy_dataset_session.env.copy() # ideally + env = os.environ.copy() # for now + os.environ["GRASS_OVERWRITE"] = "1" # change to xy_dataset_session.env + tools = Tools(session=xy_dataset_session, env=env) + tools.r_random_surface(output="surface", seed=42) + with pytest.raises(CalledModuleError, match="overwrite"): + tools.r_random_surface(output="surface", seed=42) + del os.environ["GRASS_OVERWRITE"] # check or ideally remove this + + +def test_global_overwrite_vs_init(xy_dataset_session): + """Check that global overwrite is not used when separate env is used""" + tools = Tools(session=xy_dataset_session) + os.environ["GRASS_OVERWRITE"] = "1" # change to xy_dataset_session.env + tools.r_random_surface(output="surface", seed=42) + with pytest.raises(CalledModuleError, match="overwrite"): + tools.r_random_surface(output="surface", seed=42) + del os.environ["GRASS_OVERWRITE"] # check or ideally remove this + + +def test_stdin(xy_dataset_session): + """Test that stdin is accepted""" + tools = Tools(session=xy_dataset_session) + tools.feed_input_to("13.45,29.96,200").v_in_ascii( + input="-", output="point", separator="," + ) + + +def test_raises(xy_dataset_session): + """Test that exception is raised for wrong parameter value""" + tools = Tools(session=xy_dataset_session) + wrong_name = "wrong_standard" + with pytest.raises(CalledModuleError, match=wrong_name): + tools.feed_input_to("13.45,29.96,200").v_in_ascii( + input="-", + output="point", + format=wrong_name, + ) + + +def test_run_command(xy_dataset_session): + """Check run_command and its overwrite parameter""" + tools = Tools(session=xy_dataset_session) + tools.run_command("r.random.surface", output="surface", seed=42) + tools.run_command("r.random.surface", output="surface", seed=42, overwrite=True) + + +def test_parse_command_key_value(xy_dataset_session): + tools = Tools(session=xy_dataset_session) + assert tools.parse_command("g.region", flags="g")["nsres"] == "1" + + +def test_parse_command_json(xy_dataset_session): + tools = Tools(session=xy_dataset_session) + assert ( + tools.parse_command("g.region", flags="g", format="json")["region"]["ns-res"] + == 1 + ) + + +def test_with_context_managers(tmpdir): + project = tmpdir / "project" + gs.create_project(project) + with gs.setup.init(project) as session: + tools = Tools(session=session) + tools.r_random_surface(output="surface", seed=42) + with TemporaryMapsetSession(env=tools.env) as mapset: + tools.r_random_surface(output="surface", seed=42, env=mapset.env) + with gs.MaskManager(env=mapset.env) as mask: + # TODO: Do actual test + tools.r_univar(map="surface", env=mask.env, format="json")["mean"] + + +def test_misspelling(xy_dataset_session): + tools = Tools(session=xy_dataset_session) + with pytest.raises(AttributeError, match=r"r_slope_aspect"): + tools.r_sloppy_respect() + + +def test_multiple_suggestions(xy_dataset_session): + tools = Tools(session=xy_dataset_session) + with pytest.raises(AttributeError, match=r"v_db_univar|db_univar"): + tools.db_v_uni_var() + + +def test_tool_group_vs_model_name(xy_dataset_session): + tools = Tools(session=xy_dataset_session) + with pytest.raises(AttributeError, match=r"r_sim_water"): + tools.rSIMWEwater() + + +def test_wrong_attribute(xy_dataset_session): + tools = Tools(session=xy_dataset_session) + with pytest.raises(AttributeError, match="execute_big_command"): + tools.execute_big_command() + + +def test_stdin_as_stringio_object(xy_dataset_session): + """Check that global overwrite is not used when separate env is used""" + tools = Tools(session=xy_dataset_session) + tools.v_edit(map="points", type="point", tool="create") + tools.v_edit( + map="points", + type="point", + tool="add", + input=io.StringIO("P 1 0\n 10 20"), + flags="n", + ) + assert tools.v_info(map="points", format="json")["points"] == 1 + + +def test_tool_attribute_access_c_tools(): + tools = Tools() + assert "g_region" in dir(tools) + assert "r_slope_aspect" in dir(tools) + + +def test_tool_attribute_access_python_tools(): + tools = Tools() + assert "g_search_modules" in dir(tools) + assert "r_mask" in dir(tools) + + +def test_tool_doc_access_c_tools(): + tools = Tools() + assert tools.g_region.__doc__ + assert tools.r_slope_aspect.__doc__ + + +def test_tool_doc_access_python_tools(): + tools = Tools() + assert tools.g_search_modules.__doc__ + assert tools.r_mask.__doc__ + + +# Prefixes + + +def test_tool_attribute_access_with_prefix(): + raster = Tools(prefix="r") + assert "slope_aspect" in dir(raster) + assert "mask" in dir(raster) + + +def test_tool_attribute_access_with_long_prefix(): + simwe = Tools(prefix="r.sim") + assert "water" in dir(simwe) + assert "sediment" in dir(simwe) + + +def test_tool_doc_access_with_prefix(): + raster = Tools(prefix="r") + assert raster.slope_aspect.__doc__ + assert raster.mask.__doc__ + + +def test_tool_which_is_keyword_with_prefix_applied(): + """Check that when prefix is applied we support trailing underscore""" + vector = Tools(prefix="v") + raster = Tools(prefix="r") + assert vector.import_.__doc__ + assert raster.import_.__doc__ + + +def test_tool_by_prefix_shows_full_tool_name_in_error(): + """Check that when prefix is applied we support trailing underscore""" + raster = Tools(prefix="r") + with pytest.raises(AttributeError, match=r"r\.sloppy\.respect"): + assert raster.sloppy_respect(map="surface") + + +def test_tool_prefix_raster(xy_dataset_session): + """Check that global overwrite is not used when separate env is used""" + raster = Tools(session=xy_dataset_session, prefix="r") + raster.mapcalc(expression="streams = if(row() > 1, 1, null())") + raster.buffer(input="streams", output="buffer", distance=1) + assert raster.info(map="streams", format="json")["datatype"] == "CELL" + + +def test_tool_prefix_vector(xy_dataset_session): + """Check that global overwrite is not used when separate env is used""" + vector = Tools(prefix="v") + vector.edit(map="points", type="point", tool="create", env=xy_dataset_session.env) + # Here, the feed_input_to style does not make sense, but we are not using StringIO + # here to test the feed_input_to functionality and avoid dependence on the StringIO + # functionality. + # The ASCII format is for one point with no categories. + vector.feed_input_to("P 1 0\n 10 20").edit( + map="points", + type="point", + tool="add", + input="-", + flags="n", + env=xy_dataset_session.env, + ) + vector.buffer( + input="points", output="buffer", distance=1, env=xy_dataset_session.env + ) + assert ( + vector.info(map="buffer", format="json", env=xy_dataset_session.env)["areas"] + == 1 + ) diff --git a/python/grass/experimental/tools.py b/python/grass/experimental/tools.py new file mode 100644 index 00000000000..54aba3867a7 --- /dev/null +++ b/python/grass/experimental/tools.py @@ -0,0 +1,540 @@ +#!/usr/bin/env python + +############################################################################## +# AUTHOR(S): Vaclav Petras +# +# PURPOSE: API to call GRASS tools (modules) as Python functions +# +# COPYRIGHT: (C) 2023-2025 Vaclav Petras and the GRASS Development Team +# +# This program is free software under the GNU General Public +# License (>=v2). Read the file COPYING that comes with GRASS +# for details. +############################################################################## + +"""API to call GRASS tools (modules) as Python functions""" + +from __future__ import annotations + +import json +import os +import shutil +from io import StringIO + + +import grass.script as gs +from grass.exceptions import CalledModuleError + + +class ObjectParameterHandler: + def __init__(self): + self._numpy_inputs = {} + self._numpy_outputs = {} + self._numpy_inputs_ordered = [] + self.stdin = None + + def process_parameters(self, kwargs): + for key, value in kwargs.items(): + if isinstance(value, StringIO): + kwargs[key] = "-" + self.stdin = value.getvalue() + + +class ToolFunctionNameHelper: + def __init__(self, *, run_function, env, prefix=None): + self._run_function = run_function + self._env = env + self._prefix = prefix + self._names = None + + # def __getattr__(self, name): + # self.get_function(name, exception_type=AttributeError) + + def get_function(self, name, exception_type): + """Parse attribute to GRASS display module. Attribute should be in + the form 'd_module_name'. For example, 'd.rast' is called with 'd_rast'. + """ + # Convert snake case attribute name to dotted tool name, + # and apply prefix is provided. + dotted_name = name.replace("_", ".") + if self._prefix: + # Allow trailing underscore (now dot) with prefix usage to avoid conflict + # with Python keywords. + dotted_name = dotted_name.removesuffix(".") + search_name = name.removesuffix("_") + tool_name = f"{self._prefix}.{dotted_name}" + else: + tool_name = dotted_name + search_name = name + # We first try to find the tool on path which is much faster than getting + # and checking the names, but if the tool is not found, likely because runtime + # is not set up, we check the names. + if ( + not shutil.which(tool_name, path=self._env["PATH"]) + and search_name not in self.names() + ): + suggestions = self.suggest_tools(tool_name) + if suggestions: + # While Python may automatically suggest the closest match, + # we show more matches. We also show single match more often + # (this may change in the future). + msg = ( + f"Tool {name} ({tool_name}) not found" + f" (but found {', '.join(suggestions)})" + ) + raise AttributeError(msg) + msg = ( + f"Tool or attribute {name} ({tool_name}) not found" + " (check session setup and documentation for tool and attribute names)" + ) + raise AttributeError(msg) + + def wrapper(**kwargs): + return self._run_function(tool_name, **kwargs) + + wrapper.__doc__ = f"Run {tool_name} as function" + + return wrapper + + @staticmethod + def levenshtein_distance(text1: str, text2: str) -> int: + if len(text1) < len(text2): + return ToolFunctionNameHelper.levenshtein_distance(text2, text1) + + if len(text2) == 0: + return len(text1) + + previous_row = list(range(len(text2) + 1)) + for i, char1 in enumerate(text1): + current_row = [i + 1] + for j, char2 in enumerate(text2): + insertions = previous_row[j + 1] + 1 + deletions = current_row[j] + 1 + substitutions = previous_row[j] + (char1 != char2) + current_row.append(min(insertions, deletions, substitutions)) + previous_row = current_row + + return previous_row[-1] + + def suggest_tools(self, tool): + # TODO: cache commands also for dir + result = [] + max_suggestions = 5 + for name in self.names(): + if ToolFunctionNameHelper.levenshtein_distance(tool, name) < len(tool) / 2: + result.append(name) + if len(result) >= max_suggestions: + break + return result + + def names(self): + if self._names: + return self._names + if self._prefix: + self._names = [ + name.replace(f"{self._prefix}.", "").replace(".", "_") + for name in gs.get_commands()[0] + if name.startswith(f"{self._prefix}.") + ] + else: + self._names = [name.replace(".", "_") for name in gs.get_commands()[0]] + return self._names + + +class ExecutedTool: + """Result returned after executing a tool""" + + def __init__(self, *, name, kwargs, returncode, stdout, stderr): + self._name = name + self._kwargs = kwargs + self.returncode = returncode + self._stdout = stdout + self._stderr = stderr + self._text = None + self._cached_json = None + + @property + def text(self) -> str | None: + """Text output as decoded string""" + if self._text is not None: + return self._text + if self._stdout is None: + return None + if isinstance(self._stdout, bytes): + decoded_stdout = gs.decode(self._stdout) + else: + decoded_stdout = self._stdout + self._text = decoded_stdout.strip() + return self._text + + @property + def json(self) -> dict: + """Text output read as JSON + + This returns the nested structure of dictionaries and lists or fails when + the output is not JSON. + """ + if self._cached_json is None: + self._cached_json = json.loads(self._stdout) + return self._cached_json + + @property + def keyval(self) -> dict: + """Text output read as key-value pairs separated by equal signs""" + + def conversion(value): + """Convert text to int or float if possible, otherwise return it as is""" + try: + return int(value) + except ValueError: + pass + try: + return float(value) + except ValueError: + pass + return value + + return gs.parse_key_val(self._stdout, val_type=conversion) + + @property + def comma_items(self) -> list: + """Text output read as comma-separated list""" + return self.text_split(",") + + @property + def space_items(self) -> list: + """Text output read as whitespace-separated list""" + return self.text_split(None) + + def text_split(self, separator=None) -> list: + """Parse text output read as list separated by separators + + Any leading or trailing newlines are removed prior to parsing. + """ + # The use of strip is assuming that the output is one line which + # ends with a newline character which is for display only. + return self.text.split(separator) + + @property + def stdout(self) -> str | bytes: + return self._stdout + + @property + def stderr(self) -> str | bytes: + return self._stderr + + def __getitem__(self, name): + if self._stdout: + # We are testing just std out and letting rest to the parse and the user. + # This makes no assumption about how JSON is produced by the tool. + try: + return self.json[name] + except json.JSONDecodeError as error: + if self._kwargs.get("format") == "json": + raise + msg = ( + f"Output of {self._name} cannot be parsed as JSON. " + 'Did you use format="json"?' + ) + raise ValueError(msg) from error + msg = f"No text output for {self._name} to be parsed as JSON" + raise ValueError(msg) + + +class Tools: + """Call GRASS tools as methods + + GRASS tools (modules) can be executed as methods of this class. + """ + + def __init__( + self, + *, + session=None, + env=None, + overwrite=False, + quiet=False, + verbose=False, + superquiet=False, + freeze_region=False, + stdin=None, + errors=None, + capture_output=True, + prefix=None, + ): + if env: + self._env = env.copy() + elif session and hasattr(session, "env"): + self._env = session.env.copy() + else: + self._env = os.environ.copy() + self._region_is_frozen = False + if freeze_region: + self._freeze_region() + if overwrite: + self._overwrite() + # This hopefully sets the numbers directly. An alternative implementation would + # be to pass the parameter every time. + # Does not check for multiple set at the same time, but the most verbose wins + # for safety. + if superquiet: + self._env["GRASS_VERBOSE"] = "0" + if quiet: + self._env["GRASS_VERBOSE"] = "1" + if verbose: + self._env["GRASS_VERBOSE"] = "3" + self._set_stdin(stdin) + self._errors = errors + self._capture_output = capture_output + self._prefix = prefix + self._name_helper = None + + # These could be public, not protected. + def _freeze_region(self): + self._env["GRASS_REGION"] = gs.region_env(env=self._env) + self._region_is_frozen = True + + def _overwrite(self): + self._env["GRASS_OVERWRITE"] = "1" + + def _set_stdin(self, stdin, /): + self._stdin = stdin + + @property + def env(self): + """Internally used environment (reference to it, not a copy)""" + return self._env + + def _process_parameters(self, command, **popen_options): + popen_options["stdin"] = None + popen_options["stdout"] = gs.PIPE + # We respect whatever is in the stderr option because that's what the user + # asked for and will expect to get in case of error (we pretend that it was + # the intended run, not our special run before the actual run). + return self.no_nonsense_run_from_list([*command, "--json"], **popen_options) + + def run(self, name, /, **kwargs): + """Run modules from the GRASS display family (modules starting with "d."). + + This function passes arguments directly to grass.script.run_command() + so the syntax is the same. + + :param str module: name of GRASS module + :param `**kwargs`: named arguments passed to run_command()""" + + object_parameter_handler = ObjectParameterHandler() + object_parameter_handler.process_parameters(kwargs) + + args, popen_options = gs.popen_args_command(name, **kwargs) + + interface_result = self._process_parameters(args, **popen_options) + if interface_result.returncode != 0: + # This is only for the error states. + return gs.handle_errors( + interface_result.returncode, + result=None, + args=[name], + kwargs=kwargs, + stderr=interface_result.stderr, + handler="raise", + ) + parameters = json.loads(interface_result.stdout) + + # We approximate tool_kwargs as original kwargs. + return self.run_from_list( + args, + tool_kwargs=kwargs, + processed_parameters=parameters, + stdin=object_parameter_handler.stdin, + **popen_options, + ) + + def run_from_list( + self, + command, + tool_kwargs=None, + stdin=None, + processed_parameters=None, + **popen_options, + ): + if not processed_parameters: + interface_result = self._process_parameters(command, **popen_options) + if interface_result.returncode != 0: + # This is only for the error states. + return gs.handle_errors( + interface_result.returncode, + result=None, + args=[command], + kwargs=tool_kwargs, + stderr=interface_result.stderr, + handler="raise", + ) + processed_parameters = json.loads(interface_result.stdout) + + # We approximate tool_kwargs as original kwargs. + return self.no_nonsense_run_from_list( + command, + tool_kwargs=tool_kwargs, + stdin=stdin, + **popen_options, + ) + + def run_command(self, name, /, **kwargs): + # TODO: Provide custom implementation for full control + return gs.run_command(name, **kwargs, env=self._env) + + def parse_command(self, name, /, **kwargs): + # TODO: Provide custom implementation for full control + return gs.parse_command(name, **kwargs, env=self._env) + + def no_nonsense_run(self, name, /, *, tool_kwargs=None, stdin=None, **kwargs): + args, popen_options = gs.popen_args_command(name, **kwargs) + return self.no_nonsense_run_from_list( + args, tool_kwargs=tool_kwargs, stdin=stdin, **popen_options + ) + + # Make this an overload of run. + def no_nonsense_run_from_list( + self, command, tool_kwargs=None, stdin=None, **popen_options + ): + if self._capture_output: + if "stdout" not in popen_options: + popen_options["stdout"] = gs.PIPE + if "stderr" not in popen_options: + popen_options["stderr"] = gs.PIPE + if self._stdin: + stdin_pipe = gs.PIPE + stdin = self._stdin + elif stdin: + stdin_pipe = gs.PIPE + else: + stdin_pipe = None + stdin = None + # Use text mode by default + if "text" not in popen_options and "universal_newlines" not in popen_options: + popen_options["text"] = True + # Allowing to overwrite env, but that's just to have maximum flexibility when + # the session is actually set up, but it may be confusing. + if "env" not in popen_options: + popen_options["env"] = self._env + process = gs.Popen( + command, + stdin=stdin_pipe, + **popen_options, + ) + stdout, stderr = process.communicate(input=stdin) + if stderr: + stderr = gs.utils.decode(stderr) + returncode = process.poll() + if returncode and self._errors != "ignore": + raise CalledModuleError( + command[0], + code=" ".join(command), + returncode=returncode, + errors=stderr, + ) + # TODO: solve tool_kwargs is None + # We don't have the keyword arguments to pass to the resulting object. + return ExecutedTool( + name=command[0], + kwargs=tool_kwargs, + returncode=returncode, + stdout=stdout, + stderr=stderr, + ) + + def feed_input_to(self, stdin, /): + """Get a new object which will feed text input to a tool or tools""" + return Tools( + env=self._env, + stdin=stdin, + freeze_region=self._region_is_frozen, + errors=self._errors, + capture_output=self._capture_output, + prefix=self._prefix, + ) + + def ignore_errors_of(self): + """Get a new object which will ignore errors of the called tools""" + return Tools(env=self._env, errors="ignore") + + def __getattr__(self, name): + """Parse attribute to GRASS display module. Attribute should be in + the form 'd_module_name'. For example, 'd.rast' is called with 'd_rast'. + """ + if not self._name_helper: + self._name_helper = ToolFunctionNameHelper( + run_function=self.run, + env=self.env, + prefix=self._prefix, + ) + return self._name_helper.get_function(name, exception_type=AttributeError) + + def __dir__(self): + if not self._name_helper: + self._name_helper = ToolFunctionNameHelper( + run_function=self.run, + env=self.env, + prefix=self._prefix, + ) + # Collect instance and class attributes + static_attrs = set(dir(type(self))) | set(self.__dict__.keys()) + return list(static_attrs) + self._name_helper.names() + + +def _test(): + """Ad-hoc tests and examples of the Tools class""" + session = gs.setup.init("~/grassdata/nc_spm_08_grass7/user1") + + tools = Tools() + tools.g_region(raster="elevation") + tools.r_slope_aspect(elevation="elevation", slope="slope", overwrite=True) + print(tools.r_univar(map="slope", flags="g").keyval) + + print(tools.v_info(map="bridges", flags="c").text) + print( + tools.v_db_univar(map="bridges", column="YEAR_BUILT", format="json").json[ + "statistics" + ]["mean"] + ) + + print(tools.g_mapset(flags="p").text) + print(tools.g_mapsets(flags="l").text_split()) + print(tools.g_mapsets(flags="l").space_items) + print(tools.g_gisenv(get="GISDBASE,LOCATION_NAME,MAPSET", sep="comma").comma_items) + + print(tools.g_region(flags="g").keyval) + + env = os.environ.copy() + env["GRASS_REGION"] = gs.region_env(res=250) + coarse_computation = Tools(env=env) + current_region = coarse_computation.g_region(flags="g").keyval + print(current_region["ewres"], current_region["nsres"]) + coarse_computation.r_slope_aspect( + elevation="elevation", slope="slope", flags="a", overwrite=True + ) + print(coarse_computation.r_info(map="slope", flags="g").keyval) + + independent_computation = Tools(session=session, freeze_region=True) + tools.g_region(res=500) # we would do this for another computation elsewhere + print(independent_computation.g_region(flags="g").keyval["ewres"]) + + tools_pro = Tools( + session=session, freeze_region=True, overwrite=True, superquiet=True + ) + tools_pro.r_slope_aspect(elevation="elevation", slope="slope") + tools_pro.feed_input_to("13.45,29.96,200").v_in_ascii( + input="-", output="point", separator="," + ) + print(tools_pro.v_info(map="point", flags="t").keyval["points"]) + + print(tools_pro.ignore_errors_of().g_version(flags="rge").keyval) + + elevation = "elevation" + exaggerated = "exaggerated" + tools_pro.r_mapcalc(expression=f"{exaggerated} = 5 * {elevation}") + tools_pro.feed_input_to(f"{exaggerated} = 5 * {elevation}").r_mapcalc(file="-") + + +if __name__ == "__main__": + _test() diff --git a/python/grass/script/core.py b/python/grass/script/core.py index 915fbeccaab..452031e37b2 100644 --- a/python/grass/script/core.py +++ b/python/grass/script/core.py @@ -315,7 +315,7 @@ def make_command( return args -def handle_errors(returncode, result, args, kwargs): +def handle_errors(returncode, result, args, kwargs, handler=None, stderr=None): """Error handler for :func:`run_command()` and similar functions The functions which are using this function to handle errors, @@ -365,7 +365,8 @@ def get_module_and_code(args, kwargs): code = " ".join(args) return module, code - handler = kwargs.get("errors", "raise") + if handler is None: + handler = kwargs.get("errors", "raise") if handler.lower() == "status": return returncode if returncode == 0: @@ -383,7 +384,9 @@ def get_module_and_code(args, kwargs): sys.exit(returncode) else: module, code = get_module_and_code(args, kwargs) - raise CalledModuleError(module=module, code=code, returncode=returncode) + raise CalledModuleError( + module=module, code=code, returncode=returncode, errors=stderr + ) def popen_args_command(