diff --git a/algorithm_catalog/gisat/record.json b/algorithm_catalog/gisat/record.json new file mode 100644 index 00000000..cdfc0b3e --- /dev/null +++ b/algorithm_catalog/gisat/record.json @@ -0,0 +1,63 @@ +{ + "id": "gisat", + "type": "Feature", + "conformsTo": [ + "http://www.opengis.net/spec/ogcapi-records-1/1.0/req/record-core" + ], + "properties": { + "created": "2026-01-01T13:00:00Z", + "updated": "2025-02-01T13:00:00Z", + "type": "algorithm_provider", + "title": "Gisat s.r.o.", + "description": "GISAT s.r.o. is a Czech Earth Observation and geospatial analytics company delivering advanced remote sensing solutions and satellite-based analytics.", + "keywords": [], + "language": { + "code": "en-US", + "name": "English (United States)" + }, + "languages": [ + { + "code": "en-US", + "name": "English (United States)" + } + ], + "contacts": [ + { + "name": "GISAT s.r.o.", + "organization": "GISAT s.r.o.", + "roles": ["provider"], + "emails": [ + { + "value": "gisat@gisat.cz" + } + ] + } + ], + "themes": [], + "license": "other", + "acl": { + "admin": ["@gisat.cz","@vito.be"] + } + }, + "linkTemplates": [], + "links": [ + { + "rel": "website", + "type": "text/html", + "title": "Gisat s.r.o.", + "href": "https://www.gisat.cz/" + }, + { + "rel": "logo-light", + "type": "image/svg+xml", + "title": "GISAT Logo", + "href": "https://www.gisat.cz/assets/img/symbol.svg" + }, + { + "rel": "logo-dark", + "type": "image/svg+xml", + "title": "GISAT Logo", + "href": "https://www.gisat.cz/assets/img/symbol.svg" + } + ] +} \ No newline at end of file diff --git a/algorithm_catalog/gisat/sentinel1_changedetection/benchmark_scenarios/sentinel1_changedetection.json b/algorithm_catalog/gisat/sentinel1_changedetection/benchmark_scenarios/sentinel1_changedetection.json new file mode 100644 index 00000000..b0732682 --- /dev/null +++ b/algorithm_catalog/gisat/sentinel1_changedetection/benchmark_scenarios/sentinel1_changedetection.json @@ -0,0 +1,28 @@ +[ + { + "id": "sentinel1_changedetection", + "type": "openeo", + "description": "Sentinel 1 change detection example", + "backend": "openeo.dataspace.copernicus.eu", + "process_graph": { + "s1stats1": { + "process_id": "sentinel1_changedetection", + "namespace": "https://raw.githubusercontent.com/ESA-APEx/apex_algorithms/refs/heads/main/algorithm_catalog/gisat/sentinel1_changedetection/openeo_udp/sentinel1_changedetection.json", + "arguments": { + "spatial_extent": { + "east": -54.59, + "north": -12.08, + "south": -12.26, + "west": -54.81 + }, + "temporal_extent":["2020-03-11", "2020-03-23"] + }, + "result": true + } + }, + "reference_data": { + "job-results.json": "https://s3.waw3-1.cloudferro.com/apex-benchmarks/gh-11743427213!tests_test_benchmarks.py__test_run_benchmark_sentinel1_stats_!actual/job-results.json", + "openEO.tif": "https://s3.waw3-1.cloudferro.com/apex-benchmarks/gh-11743427213!tests_test_benchmarks.py__test_run_benchmark_sentinel1_stats_!actual/openEO.tif" + } + } +] \ No newline at end of file diff --git a/algorithm_catalog/gisat/sentinel1_changedetection/openeo_udp/sentinel1_changedetection.json b/algorithm_catalog/gisat/sentinel1_changedetection/openeo_udp/sentinel1_changedetection.json new file mode 100644 index 00000000..9c4d3e92 --- /dev/null +++ b/algorithm_catalog/gisat/sentinel1_changedetection/openeo_udp/sentinel1_changedetection.json @@ -0,0 +1,358 @@ +{ + "process_graph": { + "loadcollection1": { + "process_id": "load_collection", + "arguments": { + "bands": [ + "VH", + "VV" + ], + "id": "SENTINEL1_GRD", + "spatial_extent": null, + "temporal_extent": null + } + }, + "runudf1": { + "process_id": "run_udf", + "arguments": { + "data": { + "from_parameter": "temporal_extent" + }, + "runtime": "python", + "udf": "from openeo.udf import UdfData, StructuredData\nimport datetime\nfrom typing import List, Tuple\n\n# Phase boundaries as DATES\nPHASE1_END = datetime.date(2021, 12, 16)\nPHASE2_END = datetime.date(2025, 3, 30)\n\n# \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500 Step logic (your rule) \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\n\ndef step_forward(start_d: datetime.date, acq_frequency: int = 6) -> int:\n \"\"\"\n Decide interval length (N or 2N) from a given START date.\n\n Rule:\n - If start + N does NOT cross PHASE1_END => use N\n - Else, as long as start < PHASE2_END => use 2N\n - Once start >= PHASE2_END => use N\n \"\"\"\n N = acq_frequency\n\n if start_d + datetime.timedelta(days=N) <= PHASE1_END:\n # Still in Phase 1\n return N\n elif start_d < PHASE2_END:\n # Phase 2 (including the interval that crosses PHASE2_END)\n return 2 * N\n else:\n # Phase 3\n return N\n\n\n# \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500 Backwards (no truncation) \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\n\ndef prev_interval(cur_start: datetime.date, acq_frequency: int = 6) -> Tuple[datetime.date, datetime.date]:\n \"\"\"\n Given the START of the current interval, find the previous full interval [prev_start, cur_start],\n such that its length is either N or 2N and is consistent with step_forward(prev_start).\n\n No truncation: length is exactly N or 2N.\n \"\"\"\n N = acq_frequency\n\n # Candidate 1: previous interval length N\n cand1_start = cur_start - datetime.timedelta(days=N)\n cand1_len = N\n cand1_ok = (\n step_forward(cand1_start, N) == cand1_len and\n cand1_start + datetime.timedelta(days=cand1_len) == cur_start\n )\n\n # Candidate 2: previous interval length 2N\n cand2_start = cur_start - datetime.timedelta(days=2 * N)\n cand2_len = 2 * N\n cand2_ok = (\n step_forward(cand2_start, N) == cand2_len and\n cand2_start + datetime.timedelta(days=cand2_len) == cur_start\n )\n\n if not cand1_ok and not cand2_ok:\n raise RuntimeError(f\"No valid previous interval for start={cur_start}\")\n\n if cand1_ok and not cand2_ok:\n return cand1_start, cur_start\n if cand2_ok and not cand1_ok:\n return cand2_start, cur_start\n\n # Both valid (rare near boundaries) \u2013 prefer 2N by convention\n return cand2_start, cur_start\n\n\ndef back_chain(anchor_start: datetime.date, n_back: int, acq_frequency: int = 6) -> List[Tuple[datetime.date, datetime.date]]:\n \"\"\"\n Build n_back intervals BEFORE anchor_start, going backwards, with no truncation.\n \"\"\"\n intervals: List[Tuple[datetime.date, datetime.date]] = []\n cur_start = anchor_start\n\n for _ in range(n_back):\n prev_start, prev_end = prev_interval(cur_start, acq_frequency)\n intervals.append((prev_start, prev_end))\n cur_start = prev_start\n\n # Reverse to chronological order\n return list(reversed(intervals))\n\n\n# \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500 Forwards (no truncation) \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\n\ndef forward_chain(anchor_start: datetime.date, n_forw: int, acq_frequency: int = 6) -> List[Tuple[datetime.date, datetime.date]]:\n \"\"\"\n Build n_forw intervals starting from anchor_start (first interval starts at anchor_start).\n \"\"\"\n intervals: List[Tuple[datetime.date, datetime.date]] = []\n cur_start = anchor_start\n\n for _ in range(n_forw):\n length = step_forward(cur_start, acq_frequency)\n end = cur_start + datetime.timedelta(days=length)\n intervals.append((cur_start, end))\n cur_start = end\n\n return intervals\n\n# \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500 Main helper: 5 back + 4 forward \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\n\ndef get_context_intervals(\n start_d: datetime.date,\n back: int = 5,\n forward: int = 4,\n acq_frequency: int = 6\n) -> List[Tuple[datetime.date, datetime.date]]:\n \"\"\"\n Returns:\n - `back` intervals before start_date\n - the interval starting at start_date\n - `forward` intervals after that\n\n Total = back + 1 + forward intervals.\n The 6th interval's START is exactly start_date if back=5.\n \"\"\"\n before = back_chain(start_d, back, acq_frequency) # 5 intervals before\n after = forward_chain(start_d, forward + 1, acq_frequency) # includes anchor as first\n return before + after\n\ndef get_overall_start_end(intervals: List[Tuple[datetime.date, datetime.date]]):\n \"\"\"\n Given a list of intervals [(start, end), ...],\n return (overall_start, overall_end).\n \"\"\"\n overall_start = min(s for s, e in intervals)\n overall_end = max(e for s, e in intervals)\n return overall_start, overall_end\n\ndef get_temporal_extends(udf_data: UdfData) -> UdfData:\n \"\"\"\n UDF that takes a temporal extent as input and returns temporal products needed for further processing.\n \"\"\"\n temporal_extent = udf_data.get_structured_data_list()[0].data\n start_date = temporal_extent[0]\n end_date = temporal_extent[1]\n\n start_datetime = datetime.date.fromisoformat(start_date)\n end_datetime = datetime.date.fromisoformat(end_date)\n\n delta_days = (end_datetime - start_datetime).days\n acq_freq = abs(delta_days)\n\n intervals = get_context_intervals(start_datetime, acq_frequency=acq_freq)\n overall_start, overall_end = get_overall_start_end(intervals)\n\n udf_data.set_structured_data_list([])\n udf_data.set_structured_data_list([\n StructuredData(\n description=\"Extended temporal interval\",\n data=[\n overall_start.isoformat(),\n overall_end.isoformat()\n ],\n type=\"list\"\n )\n ])\n return udf_data" + } + }, + "arrayelement1": { + "process_id": "array_element", + "arguments": { + "data": { + "from_node": "runudf1" + }, + "index": 0 + } + }, + "arrayelement2": { + "process_id": "array_element", + "arguments": { + "data": { + "from_node": "runudf1" + }, + "index": 1 + } + }, + "filtertemporal1": { + "process_id": "filter_temporal", + "arguments": { + "data": { + "from_node": "loadcollection1" + }, + "extent": [ + { + "from_node": "arrayelement1" + }, + { + "from_node": "arrayelement2" + } + ] + } + }, + "filterbbox1": { + "process_id": "filter_bbox", + "arguments": { + "data": { + "from_node": "filtertemporal1" + }, + "extent": { + "from_parameter": "spatial_extent" + } + } + }, + "resamplespatial1": { + "process_id": "resample_spatial", + "arguments": { + "align": "upper-left", + "data": { + "from_node": "filterbbox1" + }, + "method": "near", + "projection": null, + "resolution": 20 + } + }, + "sarbackscatter1": { + "process_id": "sar_backscatter", + "arguments": { + "coefficient": "sigma0-ellipsoid", + "contributing_area": false, + "data": { + "from_node": "resamplespatial1" + }, + "elevation_model": "COPERNICUS_30", + "ellipsoid_incidence_angle": false, + "local_incidence_angle": false, + "mask": false, + "noise_removal": true + } + }, + "applydimension1": { + "process_id": "apply_dimension", + "arguments": { + "data": { + "from_node": "sarbackscatter1" + }, + "dimension": "t", + "process": { + "process_graph": { + "runudf2": { + "process_id": "run_udf", + "arguments": { + "context": { + "spatial_extent": { + "from_parameter": "spatial_extent" + }, + "detection_extent": { + "from_parameter": "temporal_extent" + } + }, + "data": { + "from_parameter": "data" + }, + "runtime": "Python", + "udf": "import xarray as xr\nimport pandas as pd\nfrom osgeo import osr, ogr\nimport re\nfrom datetime import timedelta\nimport datetime\nfrom collections import defaultdict, Counter, OrderedDict\nfrom typing import Dict, List, Tuple, Optional\nimport requests\nfrom shapely.geometry import shape\nfrom scipy import ndimage as ndi\n\nimport time\nimport numpy as np\nfrom scipy.stats import ttest_ind_from_stats\nfrom copy import copy\nimport logging\n\nDEBUG = False\nAPPLY_SIEVE_FILTER = True\n\nlogger = logging.getLogger(__name__)\n\n# -------------------------\n# Time extents\n# -------------------------\n# Phase boundaries as DATES\n\nMASTER_START = datetime.datetime(2015, 4, 28)\nPHASE1_END = datetime.datetime(2021, 12, 16)\nPHASE2_END = datetime.datetime(2025, 3, 30)\n\n# \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500 Step logic (your rule) \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\n\ndef step_forward(start_d: datetime.datetime, acq_frequency: int = 6) -> int:\n \"\"\"\n Decide interval length (N or 2N) from a given START date.\n\n Rule:\n - If start + N does NOT cross PHASE1_END => use N\n - Else, as long as start < PHASE2_END => use 2N\n - Once start >= PHASE2_END => use N\n \"\"\"\n N = acq_frequency\n\n if start_d + timedelta(days=N) <= PHASE1_END:\n # Still in Phase 1\n return N\n elif start_d < PHASE2_END:\n # Phase 2 (including the interval that crosses PHASE2_END)\n return 2 * N\n else:\n # Phase 3\n return N\n\n\n# \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500 Backwards (no truncation) \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\n\ndef prev_interval(cur_start: datetime.datetime, acq_frequency: int = 6) -> Tuple[datetime.date, datetime.date]:\n \"\"\"\n Given the START of the current interval, find the previous full interval [prev_start, cur_start],\n such that its length is either N or 2N and is consistent with step_forward(prev_start).\n\n No truncation: length is exactly N or 2N.\n \"\"\"\n N = acq_frequency\n\n # Candidate 1: previous interval length N\n cand1_start = cur_start - timedelta(days=N)\n cand1_len = N\n cand1_ok = (\n step_forward(cand1_start, N) == cand1_len and\n cand1_start + timedelta(days=cand1_len) == cur_start\n )\n\n # Candidate 2: previous interval length 2N\n cand2_start = cur_start - timedelta(days=2 * N)\n cand2_len = 2 * N\n cand2_ok = (\n step_forward(cand2_start, N) == cand2_len and\n cand2_start + timedelta(days=cand2_len) == cur_start\n )\n\n if not cand1_ok and not cand2_ok:\n raise RuntimeError(f\"No valid previous interval for start={cur_start}\")\n\n if cand1_ok and not cand2_ok:\n return cand1_start, cur_start\n if cand2_ok and not cand1_ok:\n return cand2_start, cur_start\n\n # Both valid (rare near boundaries) \u2013 prefer 2N by convention\n return cand2_start, cur_start\n\n\ndef back_chain(anchor_start: datetime.datetime, n_back: int, acq_frequency: int = 6) -> List[Tuple[datetime.date, datetime.date]]:\n \"\"\"\n Build n_back intervals BEFORE anchor_start, going backwards, with no truncation.\n \"\"\"\n intervals: List[Tuple[datetime.date, datetime.date]] = []\n cur_start = anchor_start\n\n for _ in range(n_back):\n prev_start, prev_end = prev_interval(cur_start, acq_frequency)\n intervals.append((prev_start, prev_end))\n cur_start = prev_start\n\n # Reverse to chronological order\n return list(reversed(intervals))\n\n\n# \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500 Forwards (no truncation) \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\n\ndef forward_chain(anchor_start: datetime.datetime, n_forw: int, acq_frequency: int = 6) -> List[Tuple[datetime.date, datetime.date]]:\n \"\"\"\n Build n_forw intervals starting from anchor_start (first interval starts at anchor_start).\n \"\"\"\n intervals: List[Tuple[datetime.date, datetime.date]] = []\n cur_start = anchor_start\n\n for _ in range(n_forw):\n length = step_forward(cur_start, acq_frequency)\n end = cur_start + timedelta(days=length)\n intervals.append((cur_start, end))\n cur_start = end\n\n return intervals\n\n# \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500 Main helper: 5 back + 4 forward \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\n\ndef get_context_intervals(\n start_str: str,\n back: int = 5,\n forward: int = 4,\n acq_frequency: int = 6\n) -> List[Tuple[datetime.date, datetime.date]]:\n \"\"\"\n Returns:\n - `back` intervals before start_date\n - the interval starting at start_date\n - `forward` intervals after that\n\n Total = back + 1 + forward intervals.\n The 6th interval's START is exactly start_date if back=5.\n \"\"\"\n start_d = datetime.datetime.strptime(start_str, \"%Y-%m-%d\")\n before = back_chain(start_d, back, acq_frequency) # 5 intervals before\n after = forward_chain(start_d, forward + 1, acq_frequency) # includes anchor as first\n return before + after\n\ndef get_overall_start_end(intervals: List[Tuple[datetime.date, datetime.date]]):\n \"\"\"\n Given a list of intervals [(start, end), ...],\n return (overall_start, overall_end).\n \"\"\"\n overall_start = min(s for s, e in intervals)\n overall_end = max(e for s, e in intervals)\n return overall_start, overall_end\n\n\n\n# -------------------------\n# Config / Constants\n# -------------------------\nS1_SEARCH_URL = \"https://catalogue.dataspace.copernicus.eu/resto/api/collections/Sentinel1/search.json\"\nDATE_RE = re.compile(r'_(\\d{8})T\\d{6}_') # e.g., ..._20211201T091308_...\n\n# -------------------------\n# Datacube utils\n# -------------------------\ndef get_spatial_extent(spatial_extent) -> Tuple[dict, List[float]]:\n\n \"\"\"Get spatial bounds in WGS84.\"\"\"\n # x_coord = arr.coords['x'].values\n # y_coord = arr.coords['y'].values\n #\n # west, east = float(x_coord.min()), float(x_coord.max())\n # south, north = float(y_coord.min()), float(y_coord.max())\n west, east, south, north = spatial_extent[\"west\"], spatial_extent[\"east\"], spatial_extent[\"south\"], spatial_extent[\"north\"]\n source_epsg = spatial_extent.get(\"crs\", \"EPSG:4326\").split(\":\")[-1]\n # ------------------------------------\n # Build polygon from bbox (in source CRS)\n # ------------------------------------\n if int(source_epsg) != 4326:\n ring = ogr.Geometry(ogr.wkbLinearRing)\n ring.AddPoint(west, south)\n ring.AddPoint(east, south)\n ring.AddPoint(east, north)\n ring.AddPoint(west, north)\n ring.AddPoint(west, south) # close ring\n\n geom = ogr.Geometry(ogr.wkbPolygon)\n geom.AddGeometry(ring)\n geom = geom.Clone() # 2D is fine here\n\n geom_wkt = geom.ExportToWkt()\n print(geom_wkt)\n\n # ------------------------------------\n # Define CRS and transformation\n # ------------------------------------\n source_epsg = int(source_epsg) # e.g. 3857 or 32633; must be int\n CATALOG_EPSG = 4326\n\n src_srs = osr.SpatialReference()\n src_srs.ImportFromEPSG(source_epsg)\n\n dst_srs = osr.SpatialReference()\n dst_srs.ImportFromEPSG(CATALOG_EPSG)\n\n # Assign SRS to geometry (good practice)\n geom.AssignSpatialReference(src_srs)\n\n trans_to_catalog = osr.CoordinateTransformation(src_srs, dst_srs)\n\n # Sanity check (optional)\n if trans_to_catalog is None:\n raise RuntimeError(\"Failed to create CoordinateTransformation\")\n\n # ------------------------------------\n # Transform and get envelope\n # ------------------------------------\n catalog_aoi_geom = geom.Clone()\n catalog_aoi_geom.Transform(trans_to_catalog)\n\n west, east, south, north = catalog_aoi_geom.GetEnvelope()\n\n return {'west': west, 'east': east, 'south': south, 'north': north}, [south, west, north, east]\n\ndef get_temporal_extent(arr: xr.DataArray) -> dict:\n \"\"\"Get temporal extent from time dimension.\"\"\"\n time_dim = 't'\n if 't' in arr.dims:\n times = arr.coords[time_dim].values\n times = pd.to_datetime(times).to_pydatetime()\n start = pd.to_datetime(times.min())\n end = pd.to_datetime(times.max())\n log_string = '\\n'.join([f'{i} {v}' for i, v in enumerate(times)])\n return {'start': start, 'end': end, 'times': times}\n\n\n\n# -------------------------\n# Utilities\n# -------------------------\ndef remove_small_objects(mask, min_size=2, connectivity=1):\n \"\"\"\n Removes connected components of 1s smaller than min_size.\n Fastest possible implementation (SciPy C backend).\n \"\"\"\n mask = np.asarray(mask).astype(bool)\n\n if min_size <= 1:\n return mask\n\n # Label connected components in the foreground (1-pixels)\n structure = ndi.generate_binary_structure(mask.ndim, connectivity)\n labels, num = ndi.label(mask, structure=structure)\n\n if num == 0:\n return mask\n\n # Compute size of each component\n sizes = ndi.sum(mask, labels, index=np.arange(1, num + 1))\n\n # Select components to keep\n keep_labels = np.where(sizes >= min_size)[0] + 1\n\n # Build output\n out = np.isin(labels, keep_labels)\n\n return out\n\n\ndef parse_date_from_title(title: str) -> Optional[datetime.datetime]:\n \"\"\"Extract YYYYMMDD as datetime from a Sentinel-1 title. Return None if not found.\"\"\"\n m = DATE_RE.search(title)\n if not m:\n return None\n return datetime.datetime.strptime(m.group(1), \"%Y%m%d\")\n\n\ndef intersection_ratio_bbox2_in_bbox1(b1: List[float], b2: List[float]) -> float:\n \"\"\"\n Fraction of bbox2's area inside bbox1. bboxes are [minx, miny, maxx, maxy] in lon/lat.\n \"\"\"\n x1_min, y1_min, x1_max, y1_max = b1\n x2_min, y2_min, x2_max, y2_max = b2\n\n inter_min_x = max(x1_min, x2_min)\n inter_min_y = max(y1_min, y2_min)\n inter_max_x = min(x1_max, x2_max)\n inter_max_y = min(y1_max, y2_max)\n\n if inter_min_x >= inter_max_x or inter_min_y >= inter_max_y:\n return 0.0\n\n inter_area = (inter_max_x - inter_min_x) * (inter_max_y - inter_min_y)\n area2 = (x2_max - x2_min) * (y2_max - y2_min)\n return 0.0 if area2 <= 0 else inter_area / area2\n\n\n\n\n\ndef create_timewindow_groups(intervals, window_size=10):\n out = {}\n for i in range(len(intervals) - window_size + 1):\n win = intervals[i:i + window_size]\n key = win[5][0] # start time of 6th interval\n out[key] = win\n return out\n\n\ndef fetch_s1_features(bbox: List[float], start_iso: str, end_iso: str) -> List[dict]:\n \"\"\"\n Query the Copernicus Dataspace Resto API for Sentinel-1 features within bbox and datetime range.\n Only returns features in JSON 'features' list.\n \"\"\"\n params = {\n \"box\": \",\".join(map(str, bbox)),\n \"page\": 1,\n \"maxRecords\": 1000,\n \"status\": \"ONLINE\",\n \"dataset\": \"ESA-DATASET\",\n \"processingLevel\": \"LEVEL1\",\n \"productType\": \"IW_GRDH_1S-COG\",\n \"startDate\": f\"{start_iso.strftime('%Y-%m-%dT00:00:00Z')}\",\n \"completionDate\": f\"{end_iso.strftime('%Y-%m-%dT23:59:59.999999999Z')}\",\n }\n r = requests.get(S1_SEARCH_URL, params=params, timeout=30)\n r.raise_for_status()\n # logger.info(f\"s1query {r.url}\")\n return r.json().get(\"features\", [])\n\n\ndef build_index_by_date_orbit(features: List[dict]) -> Dict[Tuple[datetime.datetime, str], List[dict]]:\n \"\"\"\n Keep only GRDH (non-CARD_BS, non-COG) scenes and index them by (date, orbitDirection),\n ordered chronologically by date.\n \"\"\"\n temp = defaultdict(list)\n for ft in features:\n props = ft.get(\"properties\", {})\n title = props.get(\"title\", \"\")\n if \"_GRDH_\" in title and \"_CARD_BS\" not in title: # and \"_COG.\" not in title:\n dt = parse_date_from_title(title)\n if dt is None:\n continue\n orbit = props.get(\"orbitDirection\", \"UNKNOWN\")\n temp[(dt, orbit)].append(ft)\n\n # Sort keys by datetime\n ordered = OrderedDict(sorted(temp.items(), key=lambda kv: kv[0][0]))\n return ordered\n\ndef filter_index_to_dates(index_do: Dict[Tuple[datetime.datetime, str], List[dict]],\n dt_list: List[datetime.datetime]) -> Dict[Tuple[datetime.datetime, str], List[dict]]:\n \"\"\"Keep only entries whose date is in dt_list (exact date match).\"\"\"\n dates = set(dt_list)\n return {k: v for k, v in index_do.items() if k[0] in dates}\n\n\ndef filter_index_by_orbit(index_do, selected_orbit):\n \"\"\"\n Filter (date, orbit) \u2192 [features] dictionary to keep only the chosen orbit direction.\n Returns a new dict preserving chronological order.\n \"\"\"\n if not selected_orbit:\n return index_do # no orbit chosen, keep all\n\n return dict(\n (k, v) for k, v in index_do.items() if k[1] == selected_orbit\n )\n\n\ndef pick_orbit_direction(index_do: Dict[Tuple[datetime.datetime, str], List[dict]],\n aoi_bbox: List[float]) -> Optional[str]:\n \"\"\"\n Choose orbit direction deterministically:\n 1) If only one orbit exists \u2192 return it.\n 2) Else compare counts per orbit \u2192 choose higher count.\n 3) If tie, compute each orbit's minimum (worst-case) fraction of scene-bbox inside AOI \u2192 prefer higher.\n 4) If still tied \u2192 return None.\n \"\"\"\n if not index_do:\n return None\n\n orbits = [k[1] for k in index_do.keys()]\n counts = Counter(orbits)\n if len(counts) == 1:\n return next(iter(counts))\n\n # Step 2: counts\n top_count = max(counts.values())\n leaders = [o for o, c in counts.items() if c == top_count]\n if len(leaders) == 1:\n return leaders[0]\n\n # Step 3: tie-break by worst-case overlap\n # For each orbit, compute the MIN overlap ratio across its scenes; pick the orbit with higher MIN\n min_overlap_by_orbit = {}\n for orbit in leaders:\n min_ratio = float(\"inf\")\n for (dt, ob), fts in index_do.items():\n if ob != orbit:\n continue\n for ft in fts:\n try:\n # Use feature geometry bounds as scene bbox\n scene_bbox = list(shape(ft[\"geometry\"]).bounds)\n except Exception:\n continue\n ratio = intersection_ratio_bbox2_in_bbox1(scene_bbox, aoi_bbox)\n if ratio < min_ratio:\n min_ratio = ratio\n if min_ratio == float(\"inf\"):\n min_ratio = 0.0\n min_overlap_by_orbit[orbit] = min_ratio\n\n # Compare worst-case overlap; if tie, return None\n max_min_overlap = max(min_overlap_by_orbit.values())\n overlap_leaders = [o for o, r in min_overlap_by_orbit.items() if r == max_min_overlap]\n return overlap_leaders[0] if len(overlap_leaders) == 1 else None\n\n\ndef get_scene_indices(index_do: Dict[Tuple[datetime.datetime, str], List[dict]],\n feature_names: List[str]) -> List[Tuple[int, str]]:\n \"\"\"\n Returns indices & filenames from feature_names that match ANY date present in the index.\n \"\"\"\n dates_yymmdd = {k[0].strftime(\"%Y%m%d\") for k in index_do.keys()}\n matched = []\n for i, name in enumerate(feature_names):\n if any(d in name for d in dates_yymmdd):\n matched.append((i, name))\n return matched\n\n\n#################################################################################\n##################### BACKSCATTER CHANGE DETECTION ##############################\n#################################################################################\n\ndef _as_dtype(a, dtype):\n return a if (a.dtype == dtype and a.flags['C_CONTIGUOUS']) else np.ascontiguousarray(a, dtype=dtype)\n\n\ndef _nanmean_along(a, axis, mask=None):\n \"\"\"Numerically stable nanmean with float64 accumulation.\"\"\"\n if mask is None:\n mask = np.isfinite(a)\n sums = np.where(mask, a, 0).sum(axis=axis, dtype=np.float64)\n counts = mask.sum(axis=axis, dtype=np.int64).astype(np.float64)\n out = np.divide(sums, counts, out=np.zeros_like(sums, dtype=np.float64), where=counts > 0)\n return out, counts\n\n\ndef create_nan_mask(numpy_array, vv_vh_bandcount):\n \"\"\"Return combined invalid mask (VV or VH invalid). Expects (T,H,W).\"\"\"\n mask_vv = np.isnan(numpy_array[:vv_vh_bandcount]) | (numpy_array[:vv_vh_bandcount] < -99)\n mask_vh = np.isnan(numpy_array[vv_vh_bandcount:2 * vv_vh_bandcount]) | (\n numpy_array[vv_vh_bandcount:2 * vv_vh_bandcount] < -99\n )\n return mask_vv | mask_vh\n\n\ndef ttest_from_stats(\n past,\n future,\n min_valid_per_window=3,\n valid_mask=None,\n compute_dtype=np.float32,\n alternative=\"greater\"\n):\n \"\"\"\n Column-wise Welch t-test (one-sided: past > future) using summary stats.\n NaNs are ignored. Outputs:\n t_stat_full : (H, W) compute_dtype\n p_one_full : (H, W) compute_dtype\n insufficient_full : (H, W) bool (True where either group has < min_valid samples)\n\n Parameters\n ----------\n past, future : np.ndarray, shape (T, H, W)\n Time-series stacks for the two windows being compared.\n min_valid_per_window : int\n Minimum non-NaN samples required in each window per pixel.\n valid_mask : np.ndarray | None, shape (H, W)\n Optional boolean mask of pixels to evaluate.\n compute_dtype : np.dtype\n Dtype for returned t and p arrays (e.g., np.float32).\n alternative : {\"greater\", \"less\"}\n \"greater\" : H1 is mean(past) > mean(future) (decrease in future)\n \"less\" : H1 is mean(past) < mean(future) (increase in future)\n \"\"\"\n assert past.shape == future.shape, \"past and future must have same shape (T,H,W)\"\n if alternative not in (\"greater\", \"less\"):\n raise ValueError(f\"alternative must be 'greater' or 'less', got {alternative!r}\")\n\n T, H, W = past.shape\n start_time = time.time()\n\n # Prepare outputs\n t_full = np.zeros((H, W), dtype=compute_dtype)\n p_full = np.ones((H, W), dtype=compute_dtype)\n ins_full = np.ones((H, W), dtype=bool)\n\n # Valid pixel mask\n vm = np.ones((H, W), dtype=bool) if valid_mask is None else valid_mask\n if not np.any(vm):\n return t_full, p_full, ins_full\n\n # Views as (T, N) and select only valid pixels (to reduce work/memory)\n N = H * W\n idx = vm.ravel()\n past_v = past.reshape(T, N)[:, idx]\n future_v = future.reshape(T, N)[:, idx]\n\n # Counts (ignore NaNs)\n n1 = np.sum(np.isfinite(past_v), axis=0).astype(np.int32, copy=False)\n n2 = np.sum(np.isfinite(future_v), axis=0).astype(np.int32, copy=False)\n\n # Pixels that have enough samples in BOTH groups\n test_idx = (n1 >= min_valid_per_window) & (n2 >= min_valid_per_window)\n\n out_t = np.zeros(idx.sum(), dtype=np.float64)\n out_p = np.ones(idx.sum(), dtype=np.float64)\n insufficient_v = ~test_idx\n\n if np.any(test_idx):\n # Summary stats in float64 for stability\n # Sample mean and sample variance (ddof=1)\n m1 = np.nanmean(past_v[:, test_idx], axis=0).astype(np.float64, copy=False)\n m2 = np.nanmean(future_v[:, test_idx], axis=0).astype(np.float64, copy=False)\n v1 = np.nanvar(past_v[:, test_idx], axis=0, ddof=1).astype(np.float64, copy=False)\n v2 = np.nanvar(future_v[:, test_idx], axis=0, ddof=1).astype(np.float64, copy=False)\n\n # Convert variance -> std (ttest_ind_from_stats expects std)\n s1 = np.sqrt(np.maximum(v1, 0.0))\n s2 = np.sqrt(np.maximum(v2, 0.0))\n\n nn1 = n1[test_idx].astype(np.float64, copy=False)\n nn2 = n2[test_idx].astype(np.float64, copy=False)\n\n try:\n # SciPy >= 1.9: supports one-sided alternative\n t_v, p_v = ttest_ind_from_stats(\n m1, s1, nn1,\n m2, s2, nn2,\n equal_var=False,\n alternative=alternative\n )\n except TypeError:\n # Older SciPy fallback: compute two-sided, convert to one-sided \"greater\"\n t_v, p2_v = ttest_ind_from_stats(\n m1, s1, nn1,\n m2, s2, nn2,\n equal_var=False\n )\n # Ensure array types\n t_v = np.asarray(t_v, dtype=np.float64)\n p2_v = np.asarray(p2_v, dtype=np.float64)\n\n p_v = np.ones_like(p2_v, dtype=np.float64)\n good = np.isfinite(t_v) & np.isfinite(p2_v)\n\n if alternative == \"greater\":\n pos = good & (t_v > 0)\n # For t > 0, one-sided p = two-sided p / 2\n p_v[pos] = p2_v[pos] / 2.0\n # For t <= 0, one-sided p = 1 - (two-sided p / 2)\n p_v[good & ~pos] = 1.0 - (p2_v[good & ~pos] / 2.0)\n else:\n # H1: mean(past) < mean(future)\n neg = good & (t_v < 0)\n p_v[neg] = p2_v[neg] / 2.0\n p_v[good & ~neg] = 1.0 - (p2_v[good & ~neg] / 2.0)\n\n # Sanitize NaNs/Infs in t\n t_v = np.asarray(t_v, dtype=np.float64)\n p_v = np.asarray(p_v, dtype=np.float64)\n bad = ~np.isfinite(t_v)\n if np.any(bad):\n t_v[bad] = 0.0\n\n out_t[test_idx] = t_v\n out_p[test_idx] = p_v\n\n # Scatter back to (H, W)\n t_full.ravel()[idx] = out_t.astype(compute_dtype, copy=False)\n p_full.ravel()[idx] = out_p.astype(compute_dtype, copy=False)\n ins_full.ravel()[idx] = insufficient_v\n return t_full, p_full, ins_full\n\ndef apply_threshold(stat_array, pol_item, DEC_array_threshold,\n stat_item_name=None, previous_stat_array_bool=None):\n \"\"\"\n Convert a stat array into a binary mask and accumulate into DEC. Dead pixels should be masked upstream.\n \"\"\"\n\n stat_array_copy = copy(stat_array)\n\n if stat_item_name == 'std':\n pol_thr = 2.0 if pol_item == \"VH\" else 1.5\n stat_array = np.where(np.isnan(stat_array) | (stat_array < pol_thr), 0, 1)\n\n elif stat_item_name == 'mean_change':\n # Looking for decreases (future - past <= threshold)\n pol_thr = -1.75\n stat_array = np.where(np.isnan(stat_array) | (stat_array > pol_thr), 0, 1)\n\n elif stat_item_name == 'pval':\n t_stat = stat_array[0]\n p_val = stat_array[1]\n insufficient = stat_array[2].astype(bool)\n pvalue_thr = 0.05\n\n is_significant = (p_val < pvalue_thr) & (~insufficient)\n stat_array = is_significant.astype(np.uint8)\n\n if previous_stat_array_bool is not None:\n stat_array[~previous_stat_array_bool.astype(bool)] = 0\n\n elif stat_item_name == 'ratio_slope':\n slope = stat_array[0]\n r2 = stat_array[1]\n insufficient = stat_array[2].astype(bool)\n\n slope_mask = np.isfinite(slope) & (slope >= 0.025) & ~insufficient\n r2_mask = np.isfinite(r2) & (r2 >= 0.60) & ~insufficient\n stat_array = (slope_mask & r2_mask).astype(np.uint8)\n\n elif stat_item_name == 'ratio_mean_change':\n stat_thr = 2.0\n stat_array = np.where(np.isnan(stat_array) | (stat_array < stat_thr), 0, 1)\n\n DEC_array_threshold += stat_array.astype(int)\n if DEBUG:\n return DEC_array_threshold, stat_array_copy, stat_array.astype(int)\n else:\n return DEC_array_threshold, None, stat_array.astype(int)\n\n\ndef calculate_lsfit_r(vv_vh_r, min_valid=3, center_time=True, valid_mask=None, compute_dtype=np.float32):\n T, H, W = vv_vh_r.shape\n x = np.arange(T, dtype=np.float64) # keep time in float64\n if center_time:\n x = x - x.mean()\n\n slope_full = np.full((H, W), np.nan, dtype=compute_dtype)\n r2_full = np.full((H, W), np.nan, dtype=compute_dtype)\n insufficient_full = np.ones((H, W), dtype=bool)\n\n vm = np.ones((H, W), dtype=bool) if valid_mask is None else valid_mask\n if not np.any(vm):\n return slope_full, r2_full, insufficient_full\n\n vv_vh_r = _as_dtype(vv_vh_r, compute_dtype)\n\n idx = vm.ravel()\n y = vv_vh_r.reshape(T, -1)[:, idx] # (T,N) compute_dtype\n mask = np.isfinite(y)\n n = mask.sum(axis=0)\n insufficient = n < min_valid\n\n # means with float64 accumulation\n y_mean, _ = _nanmean_along(y.astype(np.float64, copy=False), axis=0, mask=mask)\n\n x2 = x[:, None] # (T,1), float64\n y_center = np.where(mask, y.astype(np.float64, copy=False) - y_mean[None, :], 0.0)\n\n Sxx = ((x2 ** 2) * mask).sum(axis=0, dtype=np.float64)\n Sxy = (x2 * y_center).sum(axis=0, dtype=np.float64)\n\n with np.errstate(invalid='ignore', divide='ignore'):\n slope64 = np.divide(Sxy, Sxx, out=np.zeros_like(Sxy), where=Sxx > 0)\n\n yhat = slope64[None, :] * x2 + y_mean[None, :]\n resid = np.where(mask, y.astype(np.float64, copy=False) - yhat, 0.0)\n\n SSE = (resid ** 2).sum(axis=0, dtype=np.float64)\n SST = (np.where(mask, (y.astype(np.float64, copy=False) - y_mean[None, :]) ** 2, 0.0)\n ).sum(axis=0, dtype=np.float64)\n\n r2_64 = np.zeros_like(SSE)\n good_sst = SST > 0\n r2_64[good_sst] = 1.0 - (SSE[good_sst] / SST[good_sst])\n\n slope64[insufficient] = np.nan\n r2_64[insufficient] = np.nan\n\n slope_full.reshape(-1)[idx] = slope64.astype(compute_dtype, copy=False)\n r2_full.reshape(-1)[idx] = r2_64.astype(compute_dtype, copy=False)\n insufficient_full.reshape(-1)[idx] = insufficient\n return slope_full, r2_full, insufficient_full\n\n\ndef _dead_mask_for_window(stack, past_len, future_len):\n \"\"\"Pixels with all-NaN in past or in future (axis=0). stack: (T,H,W).\"\"\"\n past = stack[0:past_len, :, :]\n future = stack[past_len:past_len + future_len, :, :]\n past_dead = ~np.any(np.isfinite(past), axis=0)\n future_dead = ~np.any(np.isfinite(future), axis=0)\n return past_dead | future_dead\n\n\ndef apply_stat_datacube(numpy_stack_pol_dict, window_size=10, compute_dtype=np.float32):\n \"\"\"\n Multi-criteria change detection with early skipping of dead pixels\n (all-NaN in past or future window). Dead pixels => DEC outputs = 0.\n Heavy ops (ttest, lsfit) are computed only on valid pixels.\n \"\"\"\n start_time = time.time()\n VV = _as_dtype(numpy_stack_pol_dict[\"VV\"], compute_dtype)\n VH = _as_dtype(numpy_stack_pol_dict[\"VH\"], compute_dtype)\n\n bands, dim1, dim2 = numpy_stack_pol_dict[\"VV\"].shape\n assert window_size % 2 == 0, \"window_size must be even (split equally into past/future)\"\n assert bands >= window_size, f\"Need at least {window_size} time steps; got {bands}\"\n past_len = future_len = window_size // 2\n\n R = VV - VH\n\n # Build dead masks per signal, then combine\n dead_vv = _dead_mask_for_window(VV, past_len, future_len)\n dead_vh = _dead_mask_for_window(VH, past_len, future_len)\n dead_r = _dead_mask_for_window(R, past_len, future_len)\n dead_any = dead_vv | dead_vh | dead_r # (H,W)\n valid_mask = ~dead_any\n if not DEBUG: del dead_vv, dead_vh, dead_r\n\n # If everything is dead, return zeros fast\n if not np.any(valid_mask):\n DEC_array_threshold = np.zeros((dim1, dim2), dtype=np.int32)\n DEC_array_mask = np.zeros((dim1, dim2), dtype=np.uint8)\n return DEC_array_threshold, DEC_array_mask\n\n DEC_array_threshold = np.zeros((dim1, dim2), dtype=np.int32)\n\n # --- Per-pol statistics (VV and VH) ---\n for pol_item, stack in ((\"VV\", VV), (\"VH\", VH)):\n\n # Restrict to window once (cheap) \u2013 nanmean/nanstd will be fast on masked arrays anyway\n past = stack[0:past_len, :, :]\n future = stack[past_len:past_len + future_len, :, :]\n used = stack[0:past_len + future_len, :, :]\n\n # means/std with float64 reductions\n Stack_p_MIN, _ = _nanmean_along(past.astype(np.float64, copy=False), axis=0)\n Stack_f_MIN, _ = _nanmean_along(future.astype(np.float64, copy=False), axis=0)\n POL_std = np.nanstd(used.astype(np.float64, copy=False), axis=0).astype(compute_dtype, copy=False)\n\n # Mask out dead pixels before thresholding so they never vote\n POL_std[dead_any] = np.nan\n DEC_array_threshold, _, _ = apply_threshold(\n POL_std, pol_item, DEC_array_threshold, stat_item_name=\"std\"\n )\n if not DEBUG:\n del POL_std\n\n # Mean change (future - past)\n POL_mean_change = (Stack_f_MIN - Stack_p_MIN).astype(compute_dtype, copy=False)\n POL_mean_change[dead_any] = np.nan\n DEC_array_threshold, _, POL_mean_change_bool = apply_threshold(\n POL_mean_change, pol_item, DEC_array_threshold, stat_item_name=\"mean_change\"\n )\n\n # Paired t-test (compute only on valid pixels)\n t_t, t_p, mask_sufficient = ttest_from_stats(\n past, future, min_valid_per_window=3, valid_mask=valid_mask, compute_dtype=compute_dtype,\n alternative=\"greater\"\n )\n DEC_array_threshold, _, _ = apply_threshold(\n [t_t, t_p, mask_sufficient], pol_item, DEC_array_threshold,\n stat_item_name=\"pval\", previous_stat_array_bool=POL_mean_change_bool)\n\n # --- Ratio-based stats (VV - VH) ---\n ratio = R\n # Linear trend only on valid pixels\n ratio_slope, ratio_r2, mask_sufficient_ratio = calculate_lsfit_r(\n ratio, valid_mask=valid_mask, compute_dtype=compute_dtype\n )\n DEC_array_threshold, _, _ = apply_threshold(\n [ratio_slope, ratio_r2, mask_sufficient_ratio],\n pol_item=\"RATIO\",\n DEC_array_threshold=DEC_array_threshold,\n stat_item_name=\"ratio_slope\"\n )\n if not DEBUG:\n del ratio_slope, ratio_r2\n\n # Mean change of ratio (future - past) with correct axis\n ratio_mean_change = (\n _nanmean_along(R[past_len:past_len + future_len].astype(np.float64, copy=False), axis=0)[0]\n - _nanmean_along(R[0:past_len].astype(np.float64, copy=False), axis=0)[0]\n ).astype(compute_dtype, copy=False)\n ratio_mean_change[dead_any] = np.nan\n DEC_array_threshold, _, ratio_mean_change_bool = apply_threshold(\n ratio_mean_change, pol_item=\"RATIO\", DEC_array_threshold=DEC_array_threshold,\n stat_item_name=\"ratio_mean_change\"\n )\n if not DEBUG: del ratio_mean_change\n\n # T-test on ratio (valid pixels only)\n t_t, t_p, mask_sufficient = ttest_from_stats(\n ratio[0:past_len, :, :], ratio[past_len:past_len + future_len, :, :],\n min_valid_per_window=3, valid_mask=valid_mask, compute_dtype=compute_dtype, alternative=\"less\")\n DEC_array_threshold, _, _ = apply_threshold(\n [t_t, t_p, mask_sufficient], pol_item=\"RATIO\", DEC_array_threshold=DEC_array_threshold,\n stat_item_name=\"pval\", previous_stat_array_bool=ratio_mean_change_bool)\n\n # --- Final mask post-processing ---\n DEC_array_mask = np.zeros_like(DEC_array_threshold, dtype=np.uint8)\n DEC_array_mask[DEC_array_threshold > 4] = 1\n\n # Force dead pixels to zero in both outputs (per your requirement)\n DEC_array_threshold[dead_any] = 0\n DEC_array_mask[dead_any] = 0\n\n return DEC_array_threshold, DEC_array_mask\n\n\ndef apply_datacube(cube: xr.DataArray, context: Dict) -> xr.DataArray:\n \"\"\"\n Simple UDF: Check S1 observation frequency via STAC and aggregate temporally.\n \"\"\"\n\n arr = cube\n\n # Get temporal extent\n spatial_extent = context.get(\"spatial_extent\", 0)\n logger.info(f\"spatial context {spatial_extent}\")\n detection_start_time = context[\"detection_extent\"][0]\n detection_end_time = context[\"detection_extent\"][1]\n\n start_d = datetime.datetime.strptime(detection_start_time, \"%Y-%m-%d\")\n end_d = datetime.datetime.strptime(detection_end_time, \"%Y-%m-%d\")\n delta_days = (end_d - start_d).days\n acq_frequency = abs(delta_days)\n\n # temporal extent\n days_interval = get_context_intervals(detection_start_time, acq_frequency=acq_frequency)\n start_time, end_time = get_overall_start_end(days_interval)\n # logger.info(f\"Processingfromto: {start_time} to {end_time}\")\n\n group_days_interval = create_timewindow_groups(days_interval)\n\n # Get spatial extent\n spatial_extent_4326, bbox_4326 = get_spatial_extent(spatial_extent)\n # logger.info(f\"Spatial extent in EPSG:{epsg_code}: {spatial_extent_4326} {bbox_4326}\")\n\n temporal_extent = get_temporal_extent(arr)\n\n # Fetch & build index\n feats = fetch_s1_features(bbox_4326, temporal_extent[\"start\"], temporal_extent[\"end\"])\n index_do = build_index_by_date_orbit(feats)\n\n template_array = np.zeros_like(arr[0, 0, :, :])\n\n # 2) Filter to your dates of interest\n # logger.info(f\"featuresdateorb: {index_do.keys()}\")\n # logger.info(f\"filteringscenesusing: {temporal_extent['times']}\")\n index_do = filter_index_to_dates(index_do, temporal_extent[\"times\"])\n # logger.info(f\"AfterTimeFilter: {index_do.keys()}\")\n # 3) Decide the orbit direction (or None if tie after tie-break)\n # selected_orbit = pick_orbit_direction(index_do, bbox)\n\n arr = 10 * np.log10(arr)\n\n # 4).\n DEC_array_combined = None\n entered_wininterval_loop = False\n\n DEC_temporal_list = []\n win_list = []\n # logger.info(f\"Processingtimewindows {len(group_days_interval)}\")\n for win, win_days_interval in group_days_interval.items():\n DEC_array_list = []\n entered_wininterval_loop = True\n DEC_array_stack = []\n DEC_array_threshold_stack = []\n for orbit_dir in [\"ASCENDING\", \"DESCENDING\"]:\n index_orb_do = filter_index_by_orbit(index_do, orbit_dir)\n\n if len(index_orb_do) == 0:\n DEC_array_stack.append(template_array)\n DEC_array_threshold_stack.append(template_array)\n continue\n\n vv_list = []\n vh_list = []\n\n for interval_start, interval_end in win_days_interval:\n vh_window_stack = []\n vv_window_stack = []\n orbit_dir_period = None\n time_points_averaged_str = \"\"\n for (dt, ob), fts in index_orb_do.items():\n if interval_start <= dt < interval_end:\n idx = next((i for i, d in enumerate(temporal_extent[\"times\"]) if d == dt), None)\n scene_array = arr[idx, :, :, :]\n vh_band = scene_array[0, :, :]\n vv_band = scene_array[1, :, :]\n\n vh_window_stack.append(vh_band)\n vv_window_stack.append(vv_band)\n\n time_points_averaged_str += f\"{dt.date()}, {idx} --\"\n # Average over the scenes in the interval\n if len(vh_window_stack) == 0 or len(vv_window_stack) == 0:\n vh_avg = np.full_like(template_array, np.nan)\n vv_avg = np.full_like(template_array, np.nan)\n else:\n vh_avg = np.nanmean(vh_window_stack, axis=0)\n vv_avg = np.nanmean(vv_window_stack, axis=0)\n # logger.info(f\"AvgInfo: shapes {vh_avg.shape} {vv_avg.shape} {interval_start.date()} to {interval_end.date()}, win: {win}, {time_points_averaged_str} -- Orbit: {orbit_dir}, -- avg {len(vh_window_stack)} scenes.\")\n\n vh_list.append(vh_avg)\n vv_list.append(vv_avg)\n\n vh_array_stack = np.stack(vh_list, axis=0)\n vv_array_stack = np.stack(vv_list, axis=0)\n logger.info(f\"StackInfo: shapes {vh_array_stack.shape} {vv_array_stack.shape} -- Orbit: {orbit_dir}\")\n\n DEC_array_threshold, DEC_array_mask = apply_stat_datacube({\"VV\": vv_array_stack, \"VH\": vh_array_stack}, window_size=10)\n DEC_array_mask = remove_small_objects(DEC_array_mask, min_size=4)\n DEC_array_stack.append(DEC_array_mask)\n DEC_array_threshold_stack.append(DEC_array_threshold)\n\n\n DEC_array_combined = np.nanmax(np.stack(DEC_array_stack, axis=0), axis=0)\n DEC_array_list.append(DEC_array_combined)\n DEC_array_list.extend(DEC_array_stack)\n DEC_array_list.extend(DEC_array_threshold_stack)\n\n win_list.append(win)\n # logger.info(f\"DECArraylistlen {len(DEC_array_list)}\")\n DEC_temporal_list.append(np.stack(DEC_array_list, axis=0))\n # logger.info(f\"DECArrayliststackshape {np.stack(DEC_array_list, axis=0).shape}\")\n\n # logger.info(f\"DECtemporallistlen {len(DEC_temporal_list)}\")\n if not entered_wininterval_loop:\n DEC_array_list = [template_array]\n win_list = [datetime.datetime(1, 1, 1, 0, 0, 0, 0)]\n\n if len(DEC_array_list) == 0:\n DEC_array_list = [template_array]\n win_list = [datetime.datetime(1, 1, 1, 0, 0, 0, 0)]\n\n DEC_temporal_array = np.stack(DEC_temporal_list, axis=0)\n # logger.info(f\"DECtemporalarray {DEC_temporal_array.shape}\")\n\n # create xarray with single timestamp\n output_xarraycube = xr.DataArray(\n DEC_temporal_array, #DEC_array_combined[np.newaxis, np.newaxis, :, :], # add a time dimension\n dims=[\"t\", \"bands\", \"y\", \"x\"],\n coords={\n \"t\": win_list,\n \"bands\": [\"DEC\", \"DEC_asc\", \"DEC_asc_threshold\", \"DEC_des\", \"DEC_des_threshold\"],# win is your datetime.datetime object\n \"y\": arr.coords[\"y\"],\n \"x\": arr.coords[\"x\"],\n }\n )\n\n return output_xarraycube\n" + }, + "result": true + } + } + } + } + }, + "renamelabels1": { + "process_id": "rename_labels", + "arguments": { + "data": { + "from_node": "applydimension1" + }, + "dimension": "bands", + "target": [ + "DEC", + "DEC_asc", + "DEC_asc_threshold", + "DEC_des", + "DEC_des_threshold" + ] + } + }, + "applyneighborhood1": { + "process_id": "apply_neighborhood", + "arguments": { + "data": { + "from_node": "sarbackscatter1" + }, + "overlap": [ + { + "dimension": "x", + "value": 32, + "unit": "px" + }, + { + "dimension": "y", + "value": 32, + "unit": "px" + } + ], + "process": { + "process_graph": { + "runudf3": { + "process_id": "run_udf", + "arguments": { + "context": { + "spatial_extent": { + "from_parameter": "spatial_extent" + }, + "detection_extent": { + "from_parameter": "temporal_extent" + }, + "datacube_ai_time_window": 5 + }, + "data": { + "from_parameter": "data" + }, + "runtime": "Python", + "udf": "import functools\nimport sys\nimport xarray as xr\nfrom osgeo import osr, ogr\nimport re\nfrom datetime import timedelta\nfrom typing import Dict, List, Tuple, Optional\nimport requests\nimport datetime\nimport numpy as np\nimport pandas as pd\nfrom shapely.geometry import shape\nfrom scipy import ndimage as ndi\nimport logging\nfrom collections import defaultdict, Counter, OrderedDict\n\n# The onnx_deps folder contains the extracted contents of the dependencies archive provided in the job options\nsys.path.insert(0, \"onnx_deps\")\nimport onnxruntime as ort\n\nDEBUG = False\n\nLOWER_CUTOFF = -30\nlogger = logging.getLogger(__name__)\n\n\n\n\n# -------------------------\n# Time extents\n# -------------------------\n# Phase boundaries as DATES\n\nMASTER_START = datetime.datetime(2015, 4, 28)\nPHASE1_END = datetime.datetime(2021, 12, 16)\nPHASE2_END = datetime.datetime(2025, 3, 30)\n\n# \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500 Step logic (your rule) \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\n\ndef step_forward(start_d: datetime.datetime, acq_frequency: int = 6) -> int:\n \"\"\"\n Decide interval length (N or 2N) from a given START date.\n\n Rule:\n - If start + N does NOT cross PHASE1_END => use N\n - Else, as long as start < PHASE2_END => use 2N\n - Once start >= PHASE2_END => use N\n \"\"\"\n N = acq_frequency\n\n if start_d + timedelta(days=N) <= PHASE1_END:\n # Still in Phase 1\n return N\n elif start_d < PHASE2_END:\n # Phase 2 (including the interval that crosses PHASE2_END)\n return 2 * N\n else:\n # Phase 3\n return N\n\n\n# \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500 Backwards (no truncation) \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\n\ndef prev_interval(cur_start: datetime.datetime, acq_frequency: int = 6) -> Tuple[datetime.date, datetime.date]:\n \"\"\"\n Given the START of the current interval, find the previous full interval [prev_start, cur_start],\n such that its length is either N or 2N and is consistent with step_forward(prev_start).\n\n No truncation: length is exactly N or 2N.\n \"\"\"\n N = acq_frequency\n\n # Candidate 1: previous interval length N\n cand1_start = cur_start - timedelta(days=N)\n cand1_len = N\n cand1_ok = (\n step_forward(cand1_start, N) == cand1_len and\n cand1_start + timedelta(days=cand1_len) == cur_start\n )\n\n # Candidate 2: previous interval length 2N\n cand2_start = cur_start - timedelta(days=2 * N)\n cand2_len = 2 * N\n cand2_ok = (\n step_forward(cand2_start, N) == cand2_len and\n cand2_start + timedelta(days=cand2_len) == cur_start\n )\n\n if not cand1_ok and not cand2_ok:\n raise RuntimeError(f\"No valid previous interval for start={cur_start}\")\n\n if cand1_ok and not cand2_ok:\n return cand1_start, cur_start\n if cand2_ok and not cand1_ok:\n return cand2_start, cur_start\n\n # Both valid (rare near boundaries) \u2013 prefer 2N by convention\n return cand2_start, cur_start\n\n\ndef back_chain(anchor_start: datetime.datetime, n_back: int, acq_frequency: int = 6) -> List[Tuple[datetime.date, datetime.date]]:\n \"\"\"\n Build n_back intervals BEFORE anchor_start, going backwards, with no truncation.\n \"\"\"\n intervals: List[Tuple[datetime.date, datetime.date]] = []\n cur_start = anchor_start\n\n for _ in range(n_back):\n prev_start, prev_end = prev_interval(cur_start, acq_frequency)\n intervals.append((prev_start, prev_end))\n cur_start = prev_start\n\n # Reverse to chronological order\n return list(reversed(intervals))\n\n\n# \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500 Forwards (no truncation) \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\n\ndef forward_chain(anchor_start: datetime.datetime, n_forw: int, acq_frequency: int = 6) -> List[Tuple[datetime.date, datetime.date]]:\n \"\"\"\n Build n_forw intervals starting from anchor_start (first interval starts at anchor_start).\n \"\"\"\n intervals: List[Tuple[datetime.date, datetime.date]] = []\n cur_start = anchor_start\n\n for _ in range(n_forw):\n length = step_forward(cur_start, acq_frequency)\n end = cur_start + timedelta(days=length)\n intervals.append((cur_start, end))\n cur_start = end\n\n return intervals\n\n# \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500 Main helper: 5 back + 4 forward \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\n\ndef get_context_intervals(\n start_str: str,\n back: int = 5,\n forward: int = 4,\n acq_frequency: int = 6\n) -> List[Tuple[datetime.date, datetime.date]]:\n \"\"\"\n Returns:\n - `back` intervals before start_date\n - the interval starting at start_date\n - `forward` intervals after that\n\n Total = back + 1 + forward intervals.\n The 6th interval's START is exactly start_date if back=5.\n \"\"\"\n start_d = datetime.datetime.strptime(start_str, \"%Y-%m-%d\")\n before = back_chain(start_d, back, acq_frequency) # 5 intervals before\n after = forward_chain(start_d, forward + 1, acq_frequency) # includes anchor as first\n return before + after\n\ndef get_overall_start_end(intervals: List[Tuple[datetime.date, datetime.date]]):\n \"\"\"\n Given a list of intervals [(start, end), ...],\n return (overall_start, overall_end).\n \"\"\"\n overall_start = min(s for s, e in intervals)\n overall_end = max(e for s, e in intervals)\n return overall_start, overall_end\n\n\n\n# -------------------------\n# Config / Constants\n# -------------------------\nS1_SEARCH_URL = \"https://catalogue.dataspace.copernicus.eu/resto/api/collections/Sentinel1/search.json\"\nDATE_RE = re.compile(r'_(\\d{8})T\\d{6}_') # e.g., ..._20211201T091308_...\n\n# -------------------------\n# Datacube utils\n# -------------------------\ndef get_spatial_extent(spatial_extent) -> Tuple[dict, List[float]]:\n\n \"\"\"Get spatial bounds in WGS84.\"\"\"\n # x_coord = arr.coords['x'].values\n # y_coord = arr.coords['y'].values\n #\n # west, east = float(x_coord.min()), float(x_coord.max())\n # south, north = float(y_coord.min()), float(y_coord.max())\n west, east, south, north = spatial_extent[\"west\"], spatial_extent[\"east\"], spatial_extent[\"south\"], spatial_extent[\"north\"]\n source_epsg = spatial_extent.get(\"crs\", \"EPSG:4326\").split(\":\")[-1]\n # ------------------------------------\n # Build polygon from bbox (in source CRS)\n # ------------------------------------\n if int(source_epsg) != 4326:\n ring = ogr.Geometry(ogr.wkbLinearRing)\n ring.AddPoint(west, south)\n ring.AddPoint(east, south)\n ring.AddPoint(east, north)\n ring.AddPoint(west, north)\n ring.AddPoint(west, south) # close ring\n\n geom = ogr.Geometry(ogr.wkbPolygon)\n geom.AddGeometry(ring)\n geom = geom.Clone() # 2D is fine here\n\n geom_wkt = geom.ExportToWkt()\n print(geom_wkt)\n\n # ------------------------------------\n # Define CRS and transformation\n # ------------------------------------\n source_epsg = int(source_epsg) # e.g. 3857 or 32633; must be int\n CATALOG_EPSG = 4326\n\n src_srs = osr.SpatialReference()\n src_srs.ImportFromEPSG(source_epsg)\n\n dst_srs = osr.SpatialReference()\n dst_srs.ImportFromEPSG(CATALOG_EPSG)\n\n # Assign SRS to geometry (good practice)\n geom.AssignSpatialReference(src_srs)\n\n trans_to_catalog = osr.CoordinateTransformation(src_srs, dst_srs)\n\n # Sanity check (optional)\n if trans_to_catalog is None:\n raise RuntimeError(\"Failed to create CoordinateTransformation\")\n\n # ------------------------------------\n # Transform and get envelope\n # ------------------------------------\n catalog_aoi_geom = geom.Clone()\n catalog_aoi_geom.Transform(trans_to_catalog)\n\n west, east, south, north = catalog_aoi_geom.GetEnvelope()\n\n return {'west': west, 'east': east, 'south': south, 'north': north}, [south, west, north, east]\n\ndef get_temporal_extent(arr: xr.DataArray) -> dict:\n \"\"\"Get temporal extent from time dimension.\"\"\"\n time_dim = 't'\n if 't' in arr.dims:\n times = arr.coords[time_dim].values\n times = pd.to_datetime(times).to_pydatetime()\n start = pd.to_datetime(times.min())\n end = pd.to_datetime(times.max())\n log_string = '\\n'.join([f'{i} {v}' for i, v in enumerate(times)])\n return {'start': start, 'end': end, 'times': times}\n\n\n\n# -------------------------\n# Utilities\n# -------------------------\ndef remove_small_objects(mask, min_size=2, connectivity=1):\n \"\"\"\n Removes connected components of 1s smaller than min_size.\n Fastest possible implementation (SciPy C backend).\n \"\"\"\n mask = np.asarray(mask).astype(bool)\n\n if min_size <= 1:\n return mask\n\n # Label connected components in the foreground (1-pixels)\n structure = ndi.generate_binary_structure(mask.ndim, connectivity)\n labels, num = ndi.label(mask, structure=structure)\n\n if num == 0:\n return mask\n\n # Compute size of each component\n sizes = ndi.sum(mask, labels, index=np.arange(1, num + 1))\n\n # Select components to keep\n keep_labels = np.where(sizes >= min_size)[0] + 1\n\n # Build output\n out = np.isin(labels, keep_labels)\n\n return out\n\n\ndef parse_date_from_title(title: str) -> Optional[datetime.datetime]:\n \"\"\"Extract YYYYMMDD as datetime from a Sentinel-1 title. Return None if not found.\"\"\"\n m = DATE_RE.search(title)\n if not m:\n return None\n return datetime.datetime.strptime(m.group(1), \"%Y%m%d\")\n\n\ndef intersection_ratio_bbox2_in_bbox1(b1: List[float], b2: List[float]) -> float:\n \"\"\"\n Fraction of bbox2's area inside bbox1. bboxes are [minx, miny, maxx, maxy] in lon/lat.\n \"\"\"\n x1_min, y1_min, x1_max, y1_max = b1\n x2_min, y2_min, x2_max, y2_max = b2\n\n inter_min_x = max(x1_min, x2_min)\n inter_min_y = max(y1_min, y2_min)\n inter_max_x = min(x1_max, x2_max)\n inter_max_y = min(y1_max, y2_max)\n\n if inter_min_x >= inter_max_x or inter_min_y >= inter_max_y:\n return 0.0\n\n inter_area = (inter_max_x - inter_min_x) * (inter_max_y - inter_min_y)\n area2 = (x2_max - x2_min) * (y2_max - y2_min)\n return 0.0 if area2 <= 0 else inter_area / area2\n\n\ndef create_timewindow_groups(intervals, window_size=10):\n out = {}\n for i in range(len(intervals) - window_size + 1):\n win = intervals[i:i + window_size]\n key = win[5][0] # start time of 6th interval\n out[key] = win\n return out\n\n\ndef fetch_s1_features(bbox: List[float], start_iso: str, end_iso: str) -> List[dict]:\n \"\"\"\n Query the Copernicus Dataspace Resto API for Sentinel-1 features within bbox and datetime range.\n Only returns features in JSON 'features' list.\n \"\"\"\n params = {\n \"box\": \",\".join(map(str, bbox)),\n \"page\": 1,\n \"maxRecords\": 1000,\n \"status\": \"ONLINE\",\n \"dataset\": \"ESA-DATASET\",\n \"processingLevel\": \"LEVEL1\",\n \"productType\": \"IW_GRDH_1S-COG\",\n \"startDate\": f\"{start_iso.strftime('%Y-%m-%dT00:00:00Z')}\",\n \"completionDate\": f\"{end_iso.strftime('%Y-%m-%dT23:59:59.999999999Z')}\",\n }\n r = requests.get(S1_SEARCH_URL, params=params, timeout=30)\n r.raise_for_status()\n # logger.info(f\"s1query {r.url}\")\n return r.json().get(\"features\", [])\n\n\ndef build_index_by_date_orbit(features: List[dict]) -> Dict[Tuple[datetime.datetime, str], List[dict]]:\n \"\"\"\n Keep only GRDH (non-CARD_BS, non-COG) scenes and index them by (date, orbitDirection),\n ordered chronologically by date.\n \"\"\"\n temp = defaultdict(list)\n for ft in features:\n props = ft.get(\"properties\", {})\n title = props.get(\"title\", \"\")\n if \"_GRDH_\" in title and \"_CARD_BS\" not in title: # and \"_COG.\" not in title:\n dt = parse_date_from_title(title)\n if dt is None:\n continue\n orbit = props.get(\"orbitDirection\", \"UNKNOWN\")\n temp[(dt, orbit)].append(ft)\n\n # Sort keys by datetime\n ordered = OrderedDict(sorted(temp.items(), key=lambda kv: kv[0][0]))\n return ordered\n\ndef filter_index_to_dates(index_do: Dict[Tuple[datetime.datetime, str], List[dict]],\n dt_list: List[datetime.datetime]) -> Dict[Tuple[datetime.datetime, str], List[dict]]:\n \"\"\"Keep only entries whose date is in dt_list (exact date match).\"\"\"\n dates = set(dt_list)\n return {k: v for k, v in index_do.items() if k[0] in dates}\n\n\ndef filter_index_by_orbit(index_do, selected_orbit):\n \"\"\"\n Filter (date, orbit) \u2192 [features] dictionary to keep only the chosen orbit direction.\n Returns a new dict preserving chronological order.\n \"\"\"\n if not selected_orbit:\n return index_do # no orbit chosen, keep all\n\n return dict(\n (k, v) for k, v in index_do.items() if k[1] == selected_orbit\n )\n\n\ndef pick_orbit_direction(index_do: Dict[Tuple[datetime.datetime, str], List[dict]],\n aoi_bbox: List[float]) -> Optional[str]:\n \"\"\"\n Choose orbit direction deterministically:\n 1) If only one orbit exists \u2192 return it.\n 2) Else compare counts per orbit \u2192 choose higher count.\n 3) If tie, compute each orbit's minimum (worst-case) fraction of scene-bbox inside AOI \u2192 prefer higher.\n 4) If still tied \u2192 return None.\n \"\"\"\n if not index_do:\n return None\n\n orbits = [k[1] for k in index_do.keys()]\n counts = Counter(orbits)\n if len(counts) == 1:\n return next(iter(counts))\n\n # Step 2: counts\n top_count = max(counts.values())\n leaders = [o for o, c in counts.items() if c == top_count]\n if len(leaders) == 1:\n return leaders[0]\n\n # Step 3: tie-break by worst-case overlap\n # For each orbit, compute the MIN overlap ratio across its scenes; pick the orbit with higher MIN\n min_overlap_by_orbit = {}\n for orbit in leaders:\n min_ratio = float(\"inf\")\n for (dt, ob), fts in index_do.items():\n if ob != orbit:\n continue\n for ft in fts:\n try:\n # Use feature geometry bounds as scene bbox\n scene_bbox = list(shape(ft[\"geometry\"]).bounds)\n except Exception:\n continue\n ratio = intersection_ratio_bbox2_in_bbox1(scene_bbox, aoi_bbox)\n if ratio < min_ratio:\n min_ratio = ratio\n if min_ratio == float(\"inf\"):\n min_ratio = 0.0\n min_overlap_by_orbit[orbit] = min_ratio\n\n # Compare worst-case overlap; if tie, return None\n max_min_overlap = max(min_overlap_by_orbit.values())\n overlap_leaders = [o for o, r in min_overlap_by_orbit.items() if r == max_min_overlap]\n return overlap_leaders[0] if len(overlap_leaders) == 1 else None\n\n\ndef get_scene_indices(index_do: Dict[Tuple[datetime.datetime, str], List[dict]],\n feature_names: List[str]) -> List[Tuple[int, str]]:\n \"\"\"\n Returns indices & filenames from feature_names that match ANY date present in the index.\n \"\"\"\n dates_yymmdd = {k[0].strftime(\"%Y%m%d\") for k in index_do.keys()}\n matched = []\n for i, name in enumerate(feature_names):\n if any(d in name for d in dates_yymmdd):\n matched.append((i, name))\n return matched\n\n#################################################################################\n##################### BACKSCATTER CHANGE DETECTION ##############################\n#################################################################################\ndef has_enough_valid(vh_win, vv_win, min_valid_frac=0.01):\n # vh_win, vv_win: (T, Y, X)\n valid = np.isfinite(vh_win) & np.isfinite(vv_win)\n # valid anywhere across time for a pixel counts as valid pixel\n valid_pix = np.any(valid, axis=0) # (Y, X)\n frac = valid_pix.mean()\n return frac >= min_valid_frac\n\n@functools.lru_cache(maxsize=5)\ndef load_onnx_model(model_name: str) -> ort.InferenceSession:\n \"\"\"\n Loads an ONNX model from the onnx_models folder and returns an ONNX runtime session.\n\n Extracting the model loading code into a separate function allows us to cache the loaded model.\n This prevents the model from being loaded for every chunk of data that is processed, but only once per executor,\n which can save a lot of time, memory and ultimately processing costs.\n\n Should you have to download the model from a remote location, you can add the download code here, and cache the model.\n\n Make sure that the arguments of the method you add the @functools.lru_cache decorator to are hashable.\n Be careful with using this decorator for class methods, as the self argument is not hashable.\n In that case you can use a static method or make sure your class is hashable (more difficult): https://docs.python.org/3/faq/programming.html#faq-cache-method-calls.\n\n More information on this functool can be found here:\n https://docs.python.org/3/library/functools.html#functools.lru_cache\n \"\"\"\n # The onnx_models folder contains the content of the model archive provided in the job options\n if DEBUG:\n return ort.InferenceSession(f\"{model_name}\")\n else:\n return ort.InferenceSession(f\"onnx_models/{model_name}\")\n\ndef run_inference(input_np: np.ndarray, ort_session: ort.InferenceSession) -> tuple:\n \"\"\"\n Run inference using the ONNX runtime session and return predicted labels and probabilities.\n \"\"\"\n # Get the input name expected by the ONNX model\n input_name = ort_session._inputs_meta[0].name # Extract input name from metadata\n\n # Ensure input_np is a NumPy array and reshape to match model input shape\n input_np = input_np.astype(np.float32) # Ensure correct data type\n\n # Run inference\n outputs = ort_session.run(None, {input_name: input_np})\n\n predicted_labels = outputs[0]\n\n return predicted_labels\n\n\ndef postprocess_output(predicted_labels: np.ndarray, input_shape: tuple) -> tuple:\n \"\"\"\n Postprocess the output by reshaping the predicted labels and probabilities into the original spatial structure.\n \"\"\"\n\n # Reshape to match the (y, x) spatial structure\n predicted_labels = np.squeeze(predicted_labels, axis=-1) # Remove the last axis\n predicted_labels = np.squeeze(predicted_labels, axis=0) # Remove the last axis\n return predicted_labels\n\n\ndef create_output_xarray(predicted_labels: np.ndarray,\n input_xr: xr.DataArray) -> xr.DataArray:\n \"\"\"\n Create an xarray DataArray with predicted labels and probabilities stacked along the bands dimension.\n \"\"\"\n\n return xr.DataArray(\n predicted_labels,\n dims=[\"bands\", \"y\", \"x\"],\n coords={\n 'y': input_xr.coords['y'],\n 'x': input_xr.coords['x']\n }\n )\n\ndef apply_datacube(cube: xr.DataArray, context: Dict) -> xr.DataArray:\n \"\"\"\n Simple UDF: Check S1 observation frequency via STAC and aggregate temporally.\n \"\"\"\n\n ## Step 1: Load the ONNX model\n ort_session = load_onnx_model(\"ml_model.onnx\")\n\n arr = cube\n\n # Get temporal extent\n spatial_extent = context[\"spatial_extent\"]\n datection_start_time = context[\"detection_extent\"][0]\n detection_end_time = context[\"detection_extent\"][1]\n datacube_window = context[\"datacube_ai_time_window\"]\n\n start_d = datetime.datetime.strptime(datection_start_time, \"%Y-%m-%d\")\n end_d = datetime.datetime.strptime(detection_end_time, \"%Y-%m-%d\")\n delta_days = (end_d - start_d).days\n acq_frequency = abs(delta_days)\n\n # temporal extent\n days_interval = get_context_intervals(datection_start_time, acq_frequency=acq_frequency)\n start_time, end_time = get_overall_start_end(days_interval)\n # logger.info(f\"Processingfromto: {start_time} to {end_time}\")\n\n group_days_interval = create_timewindow_groups(days_interval)\n\n # Get spatial extent\n spatial_extent_4326, bbox_4326 = get_spatial_extent(spatial_extent)\n # logger.info(f\"Spatial extent in EPSG:{epsg_code}: {spatial_extent_4326} {bbox_4326}\")\n\n temporal_extent = get_temporal_extent(arr)\n\n # Fetch & build index\n feats = fetch_s1_features(bbox_4326, temporal_extent[\"start\"], temporal_extent[\"end\"])\n index_do = build_index_by_date_orbit(feats)\n\n template_array = np.zeros_like(arr[0, 0, :, :])\n\n # 2) Filter to your dates of interest\n # logger.info(f\"featuresdateorb: {index_do.keys()}\")\n # logger.info(f\"filteringscenesusing: {temporal_extent['times']}\")\n index_do = filter_index_to_dates(index_do, temporal_extent[\"times\"])\n # logger.info(f\"AfterTimeFilter: {index_do.keys()}\")\n # 3) Decide the orbit direction (or None if tie after tie-break)\n # selected_orbit = pick_orbit_direction(index_do, bbox)\n\n arr_cube = arr.astype(np.float32)\n arr = np.where(arr_cube >0, 10 * np.log10(arr_cube), np.nan) # 10 * np.log10(arr)\n\n # 4).\n DEC_array_combined = None\n entered_wininterval_loop = False\n\n DEC_temporal_list = []\n win_list = []\n # logger.info(f\"Processingtimewindows {len(group_days_interval)}\")\n for win, win_days_interval in group_days_interval.items():\n DEC_array_list = []\n entered_wininterval_loop = True\n DEC_array_stack = []\n DEC_array_threshold_stack = []\n for orbit_dir in [\"ASCENDING\", \"DESCENDING\"]:\n index_orb_do = filter_index_by_orbit(index_do, orbit_dir)\n\n if len(index_orb_do) == 0:\n DEC_array_stack.append(template_array)\n DEC_array_threshold_stack.append(template_array)\n continue\n\n vv_list = []\n vh_list = []\n\n for interval_start, interval_end in win_days_interval:\n vh_window_stack = []\n vv_window_stack = []\n orbit_dir_period = None\n time_points_averaged_str = \"\"\n for (dt, ob), fts in index_orb_do.items():\n if interval_start <= dt < interval_end:\n idx = next((i for i, d in enumerate(temporal_extent[\"times\"]) if d == dt), None)\n scene_array = arr[idx, :, :, :]\n vh_band = scene_array[0, :, :]\n vv_band = scene_array[1, :, :]\n\n vh_window_stack.append(vh_band)\n vv_window_stack.append(vv_band)\n\n time_points_averaged_str += f\"{dt.date()}, {idx} --\"\n # Average over the scenes in the interval\n if len(vh_window_stack) == 0 or len(vv_window_stack) == 0:\n vh_avg = np.full_like(template_array, np.nan)\n vv_avg = np.full_like(template_array, np.nan)\n else:\n vh_avg = np.nanmean(vh_window_stack, axis=0)\n vv_avg = np.nanmean(vv_window_stack, axis=0)\n # logger.info(f\"AvgInfo: shapes {vh_avg.shape} {vv_avg.shape} {interval_start.date()} to {interval_end.date()}, win: {win}, {time_points_averaged_str} -- Orbit: {orbit_dir}, -- avg {len(vh_window_stack)} scenes.\")\n\n vh_list.append(vh_avg)\n vv_list.append(vv_avg)\n\n vh_array_stack = np.stack(vh_list, axis=0)\n vv_array_stack = np.stack(vv_list, axis=0)\n\n vh_array_window = vh_array_stack[:datacube_window, :, :]\n vv_array_window = vv_array_stack[:datacube_window, :, :]\n\n if not has_enough_valid(vh_array_window, vv_array_window, min_valid_frac= 0.01):\n DEC_array_stack.append(template_array)\n DEC_array_threshold_stack.append(template_array)\n continue\n\n if np.nanstd(vh_array_window) < 1e-6 and np.nanstd(vv_array_window) < 1e-6:\n DEC_array_stack.append(template_array.astype(np.int16))\n DEC_array_threshold_stack.append(template_array)\n continue\n\n vh_vv_ratio = vh_array_window - vv_array_window\n\n vh = np.where(np.isfinite(vh_array_window), vh_array_window, LOWER_CUTOFF)\n vv = np.where(np.isfinite(vv_array_window), vv_array_window, LOWER_CUTOFF)\n vh_vv_ratio = np.where(np.isfinite(vh_vv_ratio), vh_vv_ratio, LOWER_CUTOFF)\n\n # Stack VH, VV, and VH/VV ratio\n result = np.stack((vh, vv, vh_vv_ratio), axis=-1)\n # logger.info(f\"InputNPShape {result.shape} for orbit {orbit_dir} in window starting {win}\")\n\n result = result[:, :256, :256, :]\n\n input_np = np.transpose(result, (1, 2, 0, 3))\n input_np = input_np.reshape(256, 256, datacube_window * 3)\n input_np = input_np[np.newaxis, ...]\n\n # Step 3: Perform inference\n predicted_labels = run_inference(input_np, ort_session)\n\n # Step 4: Postprocess the output\n predicted_labels = postprocess_output(predicted_labels, input_np.shape)\n\n DEC_array_mask = (predicted_labels < 0.05).astype(np.int16)\n DEC_array_stack.append(DEC_array_mask)\n\n\n DEC_array_combined = np.nanmax(np.stack(DEC_array_stack, axis=0), axis=0)\n DEC_array_list.append(DEC_array_combined)\n\n win_list.append(win)\n # logger.info(f\"DECArraylistlen {len(DEC_array_list)}\")\n DEC_temporal_list.append(np.stack(DEC_array_list, axis=0))\n # logger.info(f\"DECArrayliststackshape {np.stack(DEC_array_list, axis=0).shape}\")\n\n\n DEC_temporal_array = np.stack(DEC_temporal_list, axis=0)\n # logger.info(f\"DECtemporalarray {DEC_temporal_array.shape}\")\n\n # create xarray with single timestamp\n output_xarraycube = xr.DataArray(\n DEC_temporal_array, #DEC_array_combined[np.newaxis, np.newaxis, :, :], # add a time dimension\n dims=[\"t\", \"bands\", \"y\", \"x\"],\n coords={\n \"t\": win_list,\n \"bands\": [\"DEC\"],# win is your datetime.datetime object\n \"y\": arr_cube.coords[\"y\"],\n \"x\": arr_cube.coords[\"x\"],\n }\n )\n\n return output_xarraycube\n" + }, + "result": true + } + } + }, + "size": [ + { + "dimension": "x", + "value": 192, + "unit": "px" + }, + { + "dimension": "y", + "value": 192, + "unit": "px" + } + ] + } + }, + "renamelabels2": { + "process_id": "rename_labels", + "arguments": { + "data": { + "from_node": "applyneighborhood1" + }, + "dimension": "bands", + "target": [ + "MCD_AI" + ] + } + }, + "mergecubes1": { + "process_id": "merge_cubes", + "arguments": { + "cube1": { + "from_node": "renamelabels1" + }, + "cube2": { + "from_node": "renamelabels2" + } + }, + "result": true + } + }, + "id": "sentinel1_changedetection", + "summary": "Sentinel-1 change detection 20m resolution.", + "description": "Description: A guide on using Sentinel-1 data for change detection in remote sensing applications.", + "returns": { + "description": "A data cube with the newly computed values.\n\nAll dimensions stay the same, except for the dimensions specified in corresponding parameters. There are three cases how the dimensions can change:\n\n1. The source dimension is the target dimension:\n - The (number of) dimensions remain unchanged as the source dimension is the target dimension.\n - The source dimension properties name and type remain unchanged.\n - The dimension labels, the reference system and the resolution are preserved only if the number of values in the source dimension is equal to the number of values computed by the process. Otherwise, all other dimension properties change as defined in the list below.\n2. The source dimension is not the target dimension. The target dimension exists with a single label only:\n - The number of dimensions decreases by one as the source dimension is 'dropped' and the target dimension is filled with the processed data that originates from the source dimension.\n - The target dimension properties name and type remain unchanged. All other dimension properties change as defined in the list below.\n3. The source dimension is not the target dimension and the latter does not exist:\n - The number of dimensions remain unchanged, but the source dimension is replaced with the target dimension.\n - The target dimension has the specified name and the type other. All other dimension properties are set as defined in the list below.\n\nUnless otherwise stated above, for the given (target) dimension the following applies:\n\n- the number of dimension labels is equal to the number of values computed by the process,\n- the dimension labels are incrementing integers starting from zero,\n- the resolution changes, and\n- the reference system is undefined.", + "schema": { + "type": "object", + "subtype": "datacube" + } + }, + "categories": [ + "sentinel-1", + "change-detection", + "forestcover" + ], + "parameters": [ + { + "name": "spatial_extent", + "description": "Limits the data to process to the specified bounding box or polygons.\n\nFor raster data, the process loads the pixel into the data cube if the point\nat the pixel center intersects with the bounding box or any of the polygons\n(as defined in the Simple Features standard by the OGC).\n\nFor vector data, the process loads the geometry into the data cube if the geometry\nis fully within the bounding box or any of the polygons (as defined in the\nSimple Features standard by the OGC). Empty geometries may only be in the\ndata cube if no spatial extent has been provided.\n\nEmpty geometries are ignored.\n\nSet this parameter to null to set no limit for the spatial extent.", + "schema": [ + { + "title": "Bounding Box", + "type": "object", + "subtype": "bounding-box", + "required": [ + "west", + "south", + "east", + "north" + ], + "properties": { + "west": { + "description": "West (lower left corner, coordinate axis 1).", + "type": "number" + }, + "south": { + "description": "South (lower left corner, coordinate axis 2).", + "type": "number" + }, + "east": { + "description": "East (upper right corner, coordinate axis 1).", + "type": "number" + }, + "north": { + "description": "North (upper right corner, coordinate axis 2).", + "type": "number" + }, + "base": { + "description": "Base (optional, lower left corner, coordinate axis 3).", + "type": [ + "number", + "null" + ], + "default": null + }, + "height": { + "description": "Height (optional, upper right corner, coordinate axis 3).", + "type": [ + "number", + "null" + ], + "default": null + }, + "crs": { + "description": "Coordinate reference system of the extent, specified as as [EPSG code](http://www.epsg-registry.org/) or [WKT2 CRS string](http://docs.opengeospatial.org/is/18-010r7/18-010r7.html). Defaults to `4326` (EPSG code 4326) unless the client explicitly requests a different coordinate reference system.", + "anyOf": [ + { + "title": "EPSG Code", + "type": "integer", + "subtype": "epsg-code", + "minimum": 1000, + "examples": [ + 3857 + ] + }, + { + "title": "WKT2", + "type": "string", + "subtype": "wkt2-definition" + } + ], + "default": 4326 + } + } + }, + { + "title": "Vector data cube", + "description": "Limits the data cube to the bounding box of the given geometries in the vector data cube. For raster data, all pixels inside the bounding box that do not intersect with any of the polygons will be set to no data (`null`). Empty geometries are ignored.", + "type": "object", + "subtype": "datacube", + "dimensions": [ + { + "type": "geometry" + } + ] + }, + { + "title": "No filter", + "description": "Don't filter spatially. All data is included in the data cube.", + "type": "null" + } + ] + }, + { + "name": "temporal_extent", + "description": "Temporal extent specified as two-element array with start and end date/date-time.", + "schema": { + "type": "array", + "subtype": "temporal-interval", + "uniqueItems": true, + "minItems": 2, + "maxItems": 2, + "items": { + "anyOf": [ + { + "type": "string", + "subtype": "date-time", + "format": "date-time" + }, + { + "type": "string", + "subtype": "date", + "format": "date" + }, + { + "type": "null" + } + ] + } + } + } + ] +} \ No newline at end of file diff --git a/algorithm_catalog/gisat/sentinel1_changedetection/records/S1_changedetection_20200311.png b/algorithm_catalog/gisat/sentinel1_changedetection/records/S1_changedetection_20200311.png new file mode 100644 index 00000000..33e4c82d Binary files /dev/null and b/algorithm_catalog/gisat/sentinel1_changedetection/records/S1_changedetection_20200311.png differ diff --git a/algorithm_catalog/gisat/sentinel1_changedetection/records/sentinel1_changedetection.json b/algorithm_catalog/gisat/sentinel1_changedetection/records/sentinel1_changedetection.json new file mode 100644 index 00000000..40758858 --- /dev/null +++ b/algorithm_catalog/gisat/sentinel1_changedetection/records/sentinel1_changedetection.json @@ -0,0 +1,135 @@ +{ + "id": "sentinel1_changedetection", + "type": "Feature", + "conformsTo": [ + "http://www.opengis.net/spec/ogcapi-records-1/1.0/req/record-core", + "https://apex.esa.int/core/openeo-udp" + ], + "geometry": null, + "properties": { + "created": "2025-09-09T00:00:00Z", + "updated": "2026-02-01T00:00:00Z", + "type": "service", + "title": "Change detection using Sentinel-1 data", + "description": "For a given geometry or set of geometries, detects changes in land cover using Sentinel-1 data. This algorithm implements a scalable workflow for Sentinel-1 SAR time-series change detection, based on the StatCubes design used in the Sentinel-1 for Science Amazonas project. It computes multi-temporal statistical descriptors on harmonised C-band backscatter series and applies robust change-detection logic to identify potential forest-disturbance events across large spatial domains.", + "cost_estimate": 0.01, + "cost_unit": "platform credits per km²", + "keywords": [ + "Change Detection Services", + "Sentinel-1", + "Land Use/Land Cover Change", + "Deforestation" + ], + "language": { + "code": "en-US", + "name": "English (United States)" + }, + "languages": [ + { + "code": "en-US", + "name": "English (United States)" + } + ], + "contacts": [ + { + "name": "Sivasankar Arul", + "position": "Data Scientist", + "organization": "Gisat s.r.o.", + "links": [ + { + "href": "https://www.gisat.cz/", + "title": "GISAT Website", + "rel": "about", + "type": "text/html" + }, + { + "href": "https://github.com/sivasanarul", + "title": "GitHub", + "rel": "about", + "type": "text/html" + } + ], + "contactInstructions": "Contact via Gisat", + "roles": [ + "principal investigator" + ] + }, + { + "name": "GISAT", + "links": [ + { + "href": "https://www.gisat.cz/", + "title": "GISAT Website", + "rel": "about", + "type": "text/html" + } + ], + "contactInstructions": "SEE WEBSITE", + "roles": [ + "processor" + ] + } + ], + "themes": [ + { + "concepts": [ + { + "id": "CHANGE DETECTION SERVICES" + }, + { + "id": "SENTINEL-1" + }, + { + "id": "LAND COVER CHANGE" + } + ], + "scheme": "https://gcmd.earthdata.nasa.gov/kms/concept/concept_scheme/science_keywords" + } + ], + "formats": [ + { + "name": "GeoTIFF" + } + ], + "license": "CC-BY-4.0" + }, + "linkTemplates": [], + "links": [ + { + "rel": "platform", + "type": "application/json", + "title": "CDSE openEO federation", + "href": "../../../../platform_catalog/cdse_openeo_federation.json" + }, + { + "rel": "provider", + "type": "application/json", + "title": "GISAT", + "href": "../../record.json" + }, + { + "rel": "application", + "type": "application/vnd.openeo+json;type=process", + "title": "openEO Process Definition", + "href": "https://raw.githubusercontent.com/ESA-APEx/apex_algorithms/refs/heads/main/algorithm_catalog/gisat/sentinel1_changedetection/openeo_udp/sentinel1_changedetection.json" + }, + { + "rel": "service", + "type": "application/json", + "title": "openEO platform", + "href": "https://openeo.dataspace.copernicus.eu" + }, + { + "rel": "webapp", + "type": "text/html", + "title": "OpenEO Web Editor", + "href": "https://editor.openeo.org/?wizard=UDP&wizard~process=sentinel1_changedetection&wizard~processUrl=https://raw.githubusercontent.com/ESA-APEx/apex_algorithms/refs/heads/main/algorithm_catalog/gisat/sentinel1_changedetection/openeo_udp/sentinel1_changedetection.json&server=https://openeofed.dataspace.copernicus.eu" + }, + { + "rel": "thumbnail", + "type": "image/png", + "title": "Thumbnail image", + "href": "https://raw.githubusercontent.com/ESA-APEx/apex_algorithms/refs/heads/main/algorithm_catalog/gisat/sentinel1_changedetection/records/S1_changedetection_20200311.png" + } + ] +} diff --git a/schemas/record.json b/schemas/record.json index a13237c6..3ad373df 100644 --- a/schemas/record.json +++ b/schemas/record.json @@ -106,6 +106,8 @@ "enum": [ "Agriculture", "Land Use/Land Cover Classification", + "Land Use/Land Cover Change", + "Deforestation", "Vegetation", "Normalized Difference Vegetation Index (NDVI)", "Leaf Area Index (LAI)", @@ -125,7 +127,8 @@ "Digital Elevation/Terrain Model (DEM)", "ECMWF ERA5", "Data Analysis and Visualization", - "Statistical Applications" + "Statistical Applications", + "Change Detection Services" ] }, "minItems": 1,