diff --git a/apex_generate.py b/apex_generate.py index 1739e7a..20a2dc3 100644 --- a/apex_generate.py +++ b/apex_generate.py @@ -48,10 +48,9 @@ def generate() -> dict: #################### # PART 3: Apply statcube processing ####################: - context_udf = {"spatial_extent": {"from_parameter": "spatial_extent"}, "detection_start_time": {"from_parameter": "temporal_extent"}, - "detection_end_time": {"from_parameter": "temporal_extent"}} - udf = UDF.from_file("udf_apex_S1backscatter_changedetection.py", context=context_udf) - output_statmcd = s1_backcatter.apply_dimension(process=udf, dimension="t") + context_udf = {"spatial_extent": {"from_parameter": "spatial_extent"}, "detection_extent": {"from_parameter": "temporal_extent"}} + udf = UDF.from_file("udf_apex_S1backscatter_changedetection.py", context={"from_parameter": "context"}) + output_statmcd = s1_backcatter.apply_dimension(process=udf, dimension="t", context=context_udf) output_statmcd = output_statmcd.rename_labels(dimension="bands", target=["DEC", "DEC_asc", "DEC_asc_threshold", "DEC_des", "DEC_des_threshold"]) diff --git a/sentinel1_mcd.json b/sentinel1_mcd.json index 9dbc87e..c34e841 100644 --- a/sentinel1_mcd.json +++ b/sentinel1_mcd.json @@ -97,6 +97,14 @@ "applydimension1": { "process_id": "apply_dimension", "arguments": { + "context": { + "spatial_extent": { + "from_parameter": "spatial_extent" + }, + "detection_extent": { + "from_parameter": "temporal_extent" + } + }, "data": { "from_node": "sarbackscatter1" }, @@ -107,21 +115,13 @@ "process_id": "run_udf", "arguments": { "context": { - "spatial_extent": { - "from_parameter": "spatial_extent" - }, - "detection_start_time": { - "from_parameter": "temporal_extent" - }, - "detection_end_time": { - "from_parameter": "temporal_extent" - } + "from_parameter": "context" }, "data": { "from_parameter": "data" }, "runtime": "Python", - "udf": "import xarray as xr\nimport pandas as pd\nfrom osgeo import osr, ogr\nimport re\nfrom datetime import timedelta\nimport datetime\nfrom collections import defaultdict, Counter, OrderedDict\nfrom typing import Dict, List, Tuple, Optional\nimport requests\nfrom shapely.geometry import shape\nfrom scipy import ndimage as ndi\n\nimport time\nimport numpy as np\nfrom scipy.stats import ttest_ind_from_stats\nfrom copy import copy\nimport logging\n\nDEBUG = False\nAPPLY_SIEVE_FILTER = True\n\nlogger = logging.getLogger(__name__)\n\n# -------------------------\n# Time extents\n# -------------------------\n# Phase boundaries as DATES\n\nMASTER_START = datetime.datetime(2015, 4, 28)\nPHASE1_END = datetime.datetime(2021, 12, 16)\nPHASE2_END = datetime.datetime(2025, 3, 30)\n\n# \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500 Step logic (your rule) \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\n\ndef step_forward(start_d: datetime.datetime, acq_frequency: int = 6) -> int:\n \"\"\"\n Decide interval length (N or 2N) from a given START date.\n\n Rule:\n - If start + N does NOT cross PHASE1_END => use N\n - Else, as long as start < PHASE2_END => use 2N\n - Once start >= PHASE2_END => use N\n \"\"\"\n N = acq_frequency\n\n if start_d + timedelta(days=N) <= PHASE1_END:\n # Still in Phase 1\n return N\n elif start_d < PHASE2_END:\n # Phase 2 (including the interval that crosses PHASE2_END)\n return 2 * N\n else:\n # Phase 3\n return N\n\n\n# \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500 Backwards (no truncation) \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\n\ndef prev_interval(cur_start: datetime.datetime, acq_frequency: int = 6) -> Tuple[datetime.date, datetime.date]:\n \"\"\"\n Given the START of the current interval, find the previous full interval [prev_start, cur_start],\n such that its length is either N or 2N and is consistent with step_forward(prev_start).\n\n No truncation: length is exactly N or 2N.\n \"\"\"\n N = acq_frequency\n\n # Candidate 1: previous interval length N\n cand1_start = cur_start - timedelta(days=N)\n cand1_len = N\n cand1_ok = (\n step_forward(cand1_start, N) == cand1_len and\n cand1_start + timedelta(days=cand1_len) == cur_start\n )\n\n # Candidate 2: previous interval length 2N\n cand2_start = cur_start - timedelta(days=2 * N)\n cand2_len = 2 * N\n cand2_ok = (\n step_forward(cand2_start, N) == cand2_len and\n cand2_start + timedelta(days=cand2_len) == cur_start\n )\n\n if not cand1_ok and not cand2_ok:\n raise RuntimeError(f\"No valid previous interval for start={cur_start}\")\n\n if cand1_ok and not cand2_ok:\n return cand1_start, cur_start\n if cand2_ok and not cand1_ok:\n return cand2_start, cur_start\n\n # Both valid (rare near boundaries) \u2013 prefer 2N by convention\n return cand2_start, cur_start\n\n\ndef back_chain(anchor_start: datetime.datetime, n_back: int, acq_frequency: int = 6) -> List[Tuple[datetime.date, datetime.date]]:\n \"\"\"\n Build n_back intervals BEFORE anchor_start, going backwards, with no truncation.\n \"\"\"\n intervals: List[Tuple[datetime.date, datetime.date]] = []\n cur_start = anchor_start\n\n for _ in range(n_back):\n prev_start, prev_end = prev_interval(cur_start, acq_frequency)\n intervals.append((prev_start, prev_end))\n cur_start = prev_start\n\n # Reverse to chronological order\n return list(reversed(intervals))\n\n\n# \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500 Forwards (no truncation) \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\n\ndef forward_chain(anchor_start: datetime.datetime, n_forw: int, acq_frequency: int = 6) -> List[Tuple[datetime.date, datetime.date]]:\n \"\"\"\n Build n_forw intervals starting from anchor_start (first interval starts at anchor_start).\n \"\"\"\n intervals: List[Tuple[datetime.date, datetime.date]] = []\n cur_start = anchor_start\n\n for _ in range(n_forw):\n length = step_forward(cur_start, acq_frequency)\n end = cur_start + timedelta(days=length)\n intervals.append((cur_start, end))\n cur_start = end\n\n return intervals\n\n# \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500 Main helper: 5 back + 4 forward \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\n\ndef get_context_intervals(\n start_str: str,\n back: int = 5,\n forward: int = 4,\n acq_frequency: int = 6\n) -> List[Tuple[datetime.date, datetime.date]]:\n \"\"\"\n Returns:\n - `back` intervals before start_date\n - the interval starting at start_date\n - `forward` intervals after that\n\n Total = back + 1 + forward intervals.\n The 6th interval's START is exactly start_date if back=5.\n \"\"\"\n start_d = datetime.datetime.strptime(start_str, \"%Y-%m-%d\")\n before = back_chain(start_d, back, acq_frequency) # 5 intervals before\n after = forward_chain(start_d, forward + 1, acq_frequency) # includes anchor as first\n return before + after\n\ndef get_overall_start_end(intervals: List[Tuple[datetime.date, datetime.date]]):\n \"\"\"\n Given a list of intervals [(start, end), ...],\n return (overall_start, overall_end).\n \"\"\"\n overall_start = min(s for s, e in intervals)\n overall_end = max(e for s, e in intervals)\n return overall_start, overall_end\n\n\n\n# -------------------------\n# Config / Constants\n# -------------------------\nS1_SEARCH_URL = \"https://catalogue.dataspace.copernicus.eu/resto/api/collections/Sentinel1/search.json\"\nDATE_RE = re.compile(r'_(\\d{8})T\\d{6}_') # e.g., ..._20211201T091308_...\n\n# -------------------------\n# Datacube utils\n# -------------------------\ndef get_spatial_extent(spatial_extent) -> Tuple[dict, List[float]]:\n\n \"\"\"Get spatial bounds in WGS84.\"\"\"\n # x_coord = arr.coords['x'].values\n # y_coord = arr.coords['y'].values\n #\n # west, east = float(x_coord.min()), float(x_coord.max())\n # south, north = float(y_coord.min()), float(y_coord.max())\n west, east, south, north = spatial_extent[\"west\"], spatial_extent[\"east\"], spatial_extent[\"south\"], spatial_extent[\"north\"]\n source_epsg = spatial_extent.get(\"crs\", \"EPSG:4326\").split(\":\")[-1]\n # ------------------------------------\n # Build polygon from bbox (in source CRS)\n # ------------------------------------\n if int(source_epsg) != 4326:\n ring = ogr.Geometry(ogr.wkbLinearRing)\n ring.AddPoint(west, south)\n ring.AddPoint(east, south)\n ring.AddPoint(east, north)\n ring.AddPoint(west, north)\n ring.AddPoint(west, south) # close ring\n\n geom = ogr.Geometry(ogr.wkbPolygon)\n geom.AddGeometry(ring)\n geom = geom.Clone() # 2D is fine here\n\n geom_wkt = geom.ExportToWkt()\n print(geom_wkt)\n\n # ------------------------------------\n # Define CRS and transformation\n # ------------------------------------\n source_epsg = int(source_epsg) # e.g. 3857 or 32633; must be int\n CATALOG_EPSG = 4326\n\n src_srs = osr.SpatialReference()\n src_srs.ImportFromEPSG(source_epsg)\n\n dst_srs = osr.SpatialReference()\n dst_srs.ImportFromEPSG(CATALOG_EPSG)\n\n # Assign SRS to geometry (good practice)\n geom.AssignSpatialReference(src_srs)\n\n trans_to_catalog = osr.CoordinateTransformation(src_srs, dst_srs)\n\n # Sanity check (optional)\n if trans_to_catalog is None:\n raise RuntimeError(\"Failed to create CoordinateTransformation\")\n\n # ------------------------------------\n # Transform and get envelope\n # ------------------------------------\n catalog_aoi_geom = geom.Clone()\n catalog_aoi_geom.Transform(trans_to_catalog)\n\n west, east, south, north = catalog_aoi_geom.GetEnvelope()\n\n return {'west': west, 'east': east, 'south': south, 'north': north}, [south, west, north, east]\n\ndef get_temporal_extent(arr: xr.DataArray) -> dict:\n \"\"\"Get temporal extent from time dimension.\"\"\"\n time_dim = 't'\n if 't' in arr.dims:\n times = arr.coords[time_dim].values\n times = pd.to_datetime(times).to_pydatetime()\n start = pd.to_datetime(times.min())\n end = pd.to_datetime(times.max())\n log_string = '\\n'.join([f'{i} {v}' for i, v in enumerate(times)])\n return {'start': start, 'end': end, 'times': times}\n\n\n\n# -------------------------\n# Utilities\n# -------------------------\ndef remove_small_objects(mask, min_size=2, connectivity=1):\n \"\"\"\n Removes connected components of 1s smaller than min_size.\n Fastest possible implementation (SciPy C backend).\n \"\"\"\n mask = np.asarray(mask).astype(bool)\n\n if min_size <= 1:\n return mask\n\n # Label connected components in the foreground (1-pixels)\n structure = ndi.generate_binary_structure(mask.ndim, connectivity)\n labels, num = ndi.label(mask, structure=structure)\n\n if num == 0:\n return mask\n\n # Compute size of each component\n sizes = ndi.sum(mask, labels, index=np.arange(1, num + 1))\n\n # Select components to keep\n keep_labels = np.where(sizes >= min_size)[0] + 1\n\n # Build output\n out = np.isin(labels, keep_labels)\n\n return out\n\n\ndef parse_date_from_title(title: str) -> Optional[datetime.datetime]:\n \"\"\"Extract YYYYMMDD as datetime from a Sentinel-1 title. Return None if not found.\"\"\"\n m = DATE_RE.search(title)\n if not m:\n return None\n return datetime.datetime.strptime(m.group(1), \"%Y%m%d\")\n\n\ndef intersection_ratio_bbox2_in_bbox1(b1: List[float], b2: List[float]) -> float:\n \"\"\"\n Fraction of bbox2's area inside bbox1. bboxes are [minx, miny, maxx, maxy] in lon/lat.\n \"\"\"\n x1_min, y1_min, x1_max, y1_max = b1\n x2_min, y2_min, x2_max, y2_max = b2\n\n inter_min_x = max(x1_min, x2_min)\n inter_min_y = max(y1_min, y2_min)\n inter_max_x = min(x1_max, x2_max)\n inter_max_y = min(y1_max, y2_max)\n\n if inter_min_x >= inter_max_x or inter_min_y >= inter_max_y:\n return 0.0\n\n inter_area = (inter_max_x - inter_min_x) * (inter_max_y - inter_min_y)\n area2 = (x2_max - x2_min) * (y2_max - y2_min)\n return 0.0 if area2 <= 0 else inter_area / area2\n\n\n\n\n\ndef create_timewindow_groups(intervals, window_size=10):\n out = {}\n for i in range(len(intervals) - window_size + 1):\n win = intervals[i:i + window_size]\n key = win[5][0] # start time of 6th interval\n out[key] = win\n return out\n\n\ndef fetch_s1_features(bbox: List[float], start_iso: str, end_iso: str) -> List[dict]:\n \"\"\"\n Query the Copernicus Dataspace Resto API for Sentinel-1 features within bbox and datetime range.\n Only returns features in JSON 'features' list.\n \"\"\"\n params = {\n \"box\": \",\".join(map(str, bbox)),\n \"page\": 1,\n \"maxRecords\": 1000,\n \"status\": \"ONLINE\",\n \"dataset\": \"ESA-DATASET\",\n \"processingLevel\": \"LEVEL1\",\n \"productType\": \"IW_GRDH_1S-COG\",\n \"startDate\": f\"{start_iso.strftime('%Y-%m-%dT00:00:00Z')}\",\n \"completionDate\": f\"{end_iso.strftime('%Y-%m-%dT23:59:59.999999999Z')}\",\n }\n r = requests.get(S1_SEARCH_URL, params=params, timeout=30)\n r.raise_for_status()\n # logger.info(f\"s1query {r.url}\")\n return r.json().get(\"features\", [])\n\n\ndef build_index_by_date_orbit(features: List[dict]) -> Dict[Tuple[datetime.datetime, str], List[dict]]:\n \"\"\"\n Keep only GRDH (non-CARD_BS, non-COG) scenes and index them by (date, orbitDirection),\n ordered chronologically by date.\n \"\"\"\n temp = defaultdict(list)\n for ft in features:\n props = ft.get(\"properties\", {})\n title = props.get(\"title\", \"\")\n if \"_GRDH_\" in title and \"_CARD_BS\" not in title: # and \"_COG.\" not in title:\n dt = parse_date_from_title(title)\n if dt is None:\n continue\n orbit = props.get(\"orbitDirection\", \"UNKNOWN\")\n temp[(dt, orbit)].append(ft)\n\n # Sort keys by datetime\n ordered = OrderedDict(sorted(temp.items(), key=lambda kv: kv[0][0]))\n return ordered\n\ndef filter_index_to_dates(index_do: Dict[Tuple[datetime.datetime, str], List[dict]],\n dt_list: List[datetime.datetime]) -> Dict[Tuple[datetime.datetime, str], List[dict]]:\n \"\"\"Keep only entries whose date is in dt_list (exact date match).\"\"\"\n dates = set(dt_list)\n return {k: v for k, v in index_do.items() if k[0] in dates}\n\n\ndef filter_index_by_orbit(index_do, selected_orbit):\n \"\"\"\n Filter (date, orbit) \u2192 [features] dictionary to keep only the chosen orbit direction.\n Returns a new dict preserving chronological order.\n \"\"\"\n if not selected_orbit:\n return index_do # no orbit chosen, keep all\n\n return dict(\n (k, v) for k, v in index_do.items() if k[1] == selected_orbit\n )\n\n\ndef pick_orbit_direction(index_do: Dict[Tuple[datetime.datetime, str], List[dict]],\n aoi_bbox: List[float]) -> Optional[str]:\n \"\"\"\n Choose orbit direction deterministically:\n 1) If only one orbit exists \u2192 return it.\n 2) Else compare counts per orbit \u2192 choose higher count.\n 3) If tie, compute each orbit's minimum (worst-case) fraction of scene-bbox inside AOI \u2192 prefer higher.\n 4) If still tied \u2192 return None.\n \"\"\"\n if not index_do:\n return None\n\n orbits = [k[1] for k in index_do.keys()]\n counts = Counter(orbits)\n if len(counts) == 1:\n return next(iter(counts))\n\n # Step 2: counts\n top_count = max(counts.values())\n leaders = [o for o, c in counts.items() if c == top_count]\n if len(leaders) == 1:\n return leaders[0]\n\n # Step 3: tie-break by worst-case overlap\n # For each orbit, compute the MIN overlap ratio across its scenes; pick the orbit with higher MIN\n min_overlap_by_orbit = {}\n for orbit in leaders:\n min_ratio = float(\"inf\")\n for (dt, ob), fts in index_do.items():\n if ob != orbit:\n continue\n for ft in fts:\n try:\n # Use feature geometry bounds as scene bbox\n scene_bbox = list(shape(ft[\"geometry\"]).bounds)\n except Exception:\n continue\n ratio = intersection_ratio_bbox2_in_bbox1(scene_bbox, aoi_bbox)\n if ratio < min_ratio:\n min_ratio = ratio\n if min_ratio == float(\"inf\"):\n min_ratio = 0.0\n min_overlap_by_orbit[orbit] = min_ratio\n\n # Compare worst-case overlap; if tie, return None\n max_min_overlap = max(min_overlap_by_orbit.values())\n overlap_leaders = [o for o, r in min_overlap_by_orbit.items() if r == max_min_overlap]\n return overlap_leaders[0] if len(overlap_leaders) == 1 else None\n\n\ndef get_scene_indices(index_do: Dict[Tuple[datetime.datetime, str], List[dict]],\n feature_names: List[str]) -> List[Tuple[int, str]]:\n \"\"\"\n Returns indices & filenames from feature_names that match ANY date present in the index.\n \"\"\"\n dates_yymmdd = {k[0].strftime(\"%Y%m%d\") for k in index_do.keys()}\n matched = []\n for i, name in enumerate(feature_names):\n if any(d in name for d in dates_yymmdd):\n matched.append((i, name))\n return matched\n\n\n#################################################################################\n##################### BACKSCATTER CHANGE DETECTION ##############################\n#################################################################################\n\ndef _as_dtype(a, dtype):\n return a if (a.dtype == dtype and a.flags['C_CONTIGUOUS']) else np.ascontiguousarray(a, dtype=dtype)\n\n\ndef _nanmean_along(a, axis, mask=None):\n \"\"\"Numerically stable nanmean with float64 accumulation.\"\"\"\n if mask is None:\n mask = np.isfinite(a)\n sums = np.where(mask, a, 0).sum(axis=axis, dtype=np.float64)\n counts = mask.sum(axis=axis, dtype=np.int64).astype(np.float64)\n out = np.divide(sums, counts, out=np.zeros_like(sums, dtype=np.float64), where=counts > 0)\n return out, counts\n\n\ndef create_nan_mask(numpy_array, vv_vh_bandcount):\n \"\"\"Return combined invalid mask (VV or VH invalid). Expects (T,H,W).\"\"\"\n mask_vv = np.isnan(numpy_array[:vv_vh_bandcount]) | (numpy_array[:vv_vh_bandcount] < -99)\n mask_vh = np.isnan(numpy_array[vv_vh_bandcount:2 * vv_vh_bandcount]) | (\n numpy_array[vv_vh_bandcount:2 * vv_vh_bandcount] < -99\n )\n return mask_vv | mask_vh\n\n\ndef ttest_from_stats(\n past,\n future,\n min_valid_per_window=3,\n valid_mask=None,\n compute_dtype=np.float32,\n alternative=\"greater\"\n):\n \"\"\"\n Column-wise Welch t-test (one-sided: past > future) using summary stats.\n NaNs are ignored. Outputs:\n t_stat_full : (H, W) compute_dtype\n p_one_full : (H, W) compute_dtype\n insufficient_full : (H, W) bool (True where either group has < min_valid samples)\n\n Parameters\n ----------\n past, future : np.ndarray, shape (T, H, W)\n Time-series stacks for the two windows being compared.\n min_valid_per_window : int\n Minimum non-NaN samples required in each window per pixel.\n valid_mask : np.ndarray | None, shape (H, W)\n Optional boolean mask of pixels to evaluate.\n compute_dtype : np.dtype\n Dtype for returned t and p arrays (e.g., np.float32).\n alternative : {\"greater\", \"less\"}\n \"greater\" : H1 is mean(past) > mean(future) (decrease in future)\n \"less\" : H1 is mean(past) < mean(future) (increase in future)\n \"\"\"\n assert past.shape == future.shape, \"past and future must have same shape (T,H,W)\"\n if alternative not in (\"greater\", \"less\"):\n raise ValueError(f\"alternative must be 'greater' or 'less', got {alternative!r}\")\n\n T, H, W = past.shape\n start_time = time.time()\n\n # Prepare outputs\n t_full = np.zeros((H, W), dtype=compute_dtype)\n p_full = np.ones((H, W), dtype=compute_dtype)\n ins_full = np.ones((H, W), dtype=bool)\n\n # Valid pixel mask\n vm = np.ones((H, W), dtype=bool) if valid_mask is None else valid_mask\n if not np.any(vm):\n return t_full, p_full, ins_full\n\n # Views as (T, N) and select only valid pixels (to reduce work/memory)\n N = H * W\n idx = vm.ravel()\n past_v = past.reshape(T, N)[:, idx]\n future_v = future.reshape(T, N)[:, idx]\n\n # Counts (ignore NaNs)\n n1 = np.sum(np.isfinite(past_v), axis=0).astype(np.int32, copy=False)\n n2 = np.sum(np.isfinite(future_v), axis=0).astype(np.int32, copy=False)\n\n # Pixels that have enough samples in BOTH groups\n test_idx = (n1 >= min_valid_per_window) & (n2 >= min_valid_per_window)\n\n out_t = np.zeros(idx.sum(), dtype=np.float64)\n out_p = np.ones(idx.sum(), dtype=np.float64)\n insufficient_v = ~test_idx\n\n if np.any(test_idx):\n # Summary stats in float64 for stability\n # Sample mean and sample variance (ddof=1)\n m1 = np.nanmean(past_v[:, test_idx], axis=0).astype(np.float64, copy=False)\n m2 = np.nanmean(future_v[:, test_idx], axis=0).astype(np.float64, copy=False)\n v1 = np.nanvar(past_v[:, test_idx], axis=0, ddof=1).astype(np.float64, copy=False)\n v2 = np.nanvar(future_v[:, test_idx], axis=0, ddof=1).astype(np.float64, copy=False)\n\n # Convert variance -> std (ttest_ind_from_stats expects std)\n s1 = np.sqrt(np.maximum(v1, 0.0))\n s2 = np.sqrt(np.maximum(v2, 0.0))\n\n nn1 = n1[test_idx].astype(np.float64, copy=False)\n nn2 = n2[test_idx].astype(np.float64, copy=False)\n\n try:\n # SciPy >= 1.9: supports one-sided alternative\n t_v, p_v = ttest_ind_from_stats(\n m1, s1, nn1,\n m2, s2, nn2,\n equal_var=False,\n alternative=alternative\n )\n except TypeError:\n # Older SciPy fallback: compute two-sided, convert to one-sided \"greater\"\n t_v, p2_v = ttest_ind_from_stats(\n m1, s1, nn1,\n m2, s2, nn2,\n equal_var=False\n )\n # Ensure array types\n t_v = np.asarray(t_v, dtype=np.float64)\n p2_v = np.asarray(p2_v, dtype=np.float64)\n\n p_v = np.ones_like(p2_v, dtype=np.float64)\n good = np.isfinite(t_v) & np.isfinite(p2_v)\n\n if alternative == \"greater\":\n pos = good & (t_v > 0)\n # For t > 0, one-sided p = two-sided p / 2\n p_v[pos] = p2_v[pos] / 2.0\n # For t <= 0, one-sided p = 1 - (two-sided p / 2)\n p_v[good & ~pos] = 1.0 - (p2_v[good & ~pos] / 2.0)\n else:\n # H1: mean(past) < mean(future)\n neg = good & (t_v < 0)\n p_v[neg] = p2_v[neg] / 2.0\n p_v[good & ~neg] = 1.0 - (p2_v[good & ~neg] / 2.0)\n\n # Sanitize NaNs/Infs in t\n t_v = np.asarray(t_v, dtype=np.float64)\n p_v = np.asarray(p_v, dtype=np.float64)\n bad = ~np.isfinite(t_v)\n if np.any(bad):\n t_v[bad] = 0.0\n\n out_t[test_idx] = t_v\n out_p[test_idx] = p_v\n\n # Scatter back to (H, W)\n t_full.ravel()[idx] = out_t.astype(compute_dtype, copy=False)\n p_full.ravel()[idx] = out_p.astype(compute_dtype, copy=False)\n ins_full.ravel()[idx] = insufficient_v\n return t_full, p_full, ins_full\n\ndef apply_threshold(stat_array, pol_item, DEC_array_threshold,\n stat_item_name=None, previous_stat_array_bool=None):\n \"\"\"\n Convert a stat array into a binary mask and accumulate into DEC. Dead pixels should be masked upstream.\n \"\"\"\n\n stat_array_copy = copy(stat_array)\n\n if stat_item_name == 'std':\n pol_thr = 2.0 if pol_item == \"VH\" else 1.5\n stat_array = np.where(np.isnan(stat_array) | (stat_array < pol_thr), 0, 1)\n\n elif stat_item_name == 'mean_change':\n # Looking for decreases (future - past <= threshold)\n pol_thr = -1.75\n stat_array = np.where(np.isnan(stat_array) | (stat_array > pol_thr), 0, 1)\n\n elif stat_item_name == 'pval':\n t_stat = stat_array[0]\n p_val = stat_array[1]\n insufficient = stat_array[2].astype(bool)\n pvalue_thr = 0.05\n\n is_significant = (p_val < pvalue_thr) & (~insufficient)\n stat_array = is_significant.astype(np.uint8)\n\n if previous_stat_array_bool is not None:\n stat_array[~previous_stat_array_bool.astype(bool)] = 0\n\n elif stat_item_name == 'ratio_slope':\n slope = stat_array[0]\n r2 = stat_array[1]\n insufficient = stat_array[2].astype(bool)\n\n slope_mask = np.isfinite(slope) & (slope >= 0.025) & ~insufficient\n r2_mask = np.isfinite(r2) & (r2 >= 0.60) & ~insufficient\n stat_array = (slope_mask & r2_mask).astype(np.uint8)\n\n elif stat_item_name == 'ratio_mean_change':\n stat_thr = 2.0\n stat_array = np.where(np.isnan(stat_array) | (stat_array < stat_thr), 0, 1)\n\n DEC_array_threshold += stat_array.astype(int)\n if DEBUG:\n return DEC_array_threshold, stat_array_copy, stat_array.astype(int)\n else:\n return DEC_array_threshold, None, stat_array.astype(int)\n\n\ndef calculate_lsfit_r(vv_vh_r, min_valid=3, center_time=True, valid_mask=None, compute_dtype=np.float32):\n T, H, W = vv_vh_r.shape\n x = np.arange(T, dtype=np.float64) # keep time in float64\n if center_time:\n x = x - x.mean()\n\n slope_full = np.full((H, W), np.nan, dtype=compute_dtype)\n r2_full = np.full((H, W), np.nan, dtype=compute_dtype)\n insufficient_full = np.ones((H, W), dtype=bool)\n\n vm = np.ones((H, W), dtype=bool) if valid_mask is None else valid_mask\n if not np.any(vm):\n return slope_full, r2_full, insufficient_full\n\n vv_vh_r = _as_dtype(vv_vh_r, compute_dtype)\n\n idx = vm.ravel()\n y = vv_vh_r.reshape(T, -1)[:, idx] # (T,N) compute_dtype\n mask = np.isfinite(y)\n n = mask.sum(axis=0)\n insufficient = n < min_valid\n\n # means with float64 accumulation\n y_mean, _ = _nanmean_along(y.astype(np.float64, copy=False), axis=0, mask=mask)\n\n x2 = x[:, None] # (T,1), float64\n y_center = np.where(mask, y.astype(np.float64, copy=False) - y_mean[None, :], 0.0)\n\n Sxx = ((x2 ** 2) * mask).sum(axis=0, dtype=np.float64)\n Sxy = (x2 * y_center).sum(axis=0, dtype=np.float64)\n\n with np.errstate(invalid='ignore', divide='ignore'):\n slope64 = np.divide(Sxy, Sxx, out=np.zeros_like(Sxy), where=Sxx > 0)\n\n yhat = slope64[None, :] * x2 + y_mean[None, :]\n resid = np.where(mask, y.astype(np.float64, copy=False) - yhat, 0.0)\n\n SSE = (resid ** 2).sum(axis=0, dtype=np.float64)\n SST = (np.where(mask, (y.astype(np.float64, copy=False) - y_mean[None, :]) ** 2, 0.0)\n ).sum(axis=0, dtype=np.float64)\n\n r2_64 = np.zeros_like(SSE)\n good_sst = SST > 0\n r2_64[good_sst] = 1.0 - (SSE[good_sst] / SST[good_sst])\n\n slope64[insufficient] = np.nan\n r2_64[insufficient] = np.nan\n\n slope_full.reshape(-1)[idx] = slope64.astype(compute_dtype, copy=False)\n r2_full.reshape(-1)[idx] = r2_64.astype(compute_dtype, copy=False)\n insufficient_full.reshape(-1)[idx] = insufficient\n return slope_full, r2_full, insufficient_full\n\n\ndef _dead_mask_for_window(stack, past_len, future_len):\n \"\"\"Pixels with all-NaN in past or in future (axis=0). stack: (T,H,W).\"\"\"\n past = stack[0:past_len, :, :]\n future = stack[past_len:past_len + future_len, :, :]\n past_dead = ~np.any(np.isfinite(past), axis=0)\n future_dead = ~np.any(np.isfinite(future), axis=0)\n return past_dead | future_dead\n\n\ndef apply_stat_datacube(numpy_stack_pol_dict, window_size=10, compute_dtype=np.float32):\n \"\"\"\n Multi-criteria change detection with early skipping of dead pixels\n (all-NaN in past or future window). Dead pixels => DEC outputs = 0.\n Heavy ops (ttest, lsfit) are computed only on valid pixels.\n \"\"\"\n start_time = time.time()\n VV = _as_dtype(numpy_stack_pol_dict[\"VV\"], compute_dtype)\n VH = _as_dtype(numpy_stack_pol_dict[\"VH\"], compute_dtype)\n\n bands, dim1, dim2 = numpy_stack_pol_dict[\"VV\"].shape\n assert window_size % 2 == 0, \"window_size must be even (split equally into past/future)\"\n assert bands >= window_size, f\"Need at least {window_size} time steps; got {bands}\"\n past_len = future_len = window_size // 2\n\n R = VV - VH\n\n # Build dead masks per signal, then combine\n dead_vv = _dead_mask_for_window(VV, past_len, future_len)\n dead_vh = _dead_mask_for_window(VH, past_len, future_len)\n dead_r = _dead_mask_for_window(R, past_len, future_len)\n dead_any = dead_vv | dead_vh | dead_r # (H,W)\n valid_mask = ~dead_any\n if not DEBUG: del dead_vv, dead_vh, dead_r\n\n # If everything is dead, return zeros fast\n if not np.any(valid_mask):\n DEC_array_threshold = np.zeros((dim1, dim2), dtype=np.int32)\n DEC_array_mask = np.zeros((dim1, dim2), dtype=np.uint8)\n return DEC_array_threshold, DEC_array_mask\n\n DEC_array_threshold = np.zeros((dim1, dim2), dtype=np.int32)\n\n # --- Per-pol statistics (VV and VH) ---\n for pol_item, stack in ((\"VV\", VV), (\"VH\", VH)):\n\n # Restrict to window once (cheap) \u2013 nanmean/nanstd will be fast on masked arrays anyway\n past = stack[0:past_len, :, :]\n future = stack[past_len:past_len + future_len, :, :]\n used = stack[0:past_len + future_len, :, :]\n\n # means/std with float64 reductions\n Stack_p_MIN, _ = _nanmean_along(past.astype(np.float64, copy=False), axis=0)\n Stack_f_MIN, _ = _nanmean_along(future.astype(np.float64, copy=False), axis=0)\n POL_std = np.nanstd(used.astype(np.float64, copy=False), axis=0).astype(compute_dtype, copy=False)\n\n # Mask out dead pixels before thresholding so they never vote\n POL_std[dead_any] = np.nan\n DEC_array_threshold, _, _ = apply_threshold(\n POL_std, pol_item, DEC_array_threshold, stat_item_name=\"std\"\n )\n if not DEBUG:\n del POL_std\n\n # Mean change (future - past)\n POL_mean_change = (Stack_f_MIN - Stack_p_MIN).astype(compute_dtype, copy=False)\n POL_mean_change[dead_any] = np.nan\n DEC_array_threshold, _, POL_mean_change_bool = apply_threshold(\n POL_mean_change, pol_item, DEC_array_threshold, stat_item_name=\"mean_change\"\n )\n\n # Paired t-test (compute only on valid pixels)\n t_t, t_p, mask_sufficient = ttest_from_stats(\n past, future, min_valid_per_window=3, valid_mask=valid_mask, compute_dtype=compute_dtype,\n alternative=\"greater\"\n )\n DEC_array_threshold, _, _ = apply_threshold(\n [t_t, t_p, mask_sufficient], pol_item, DEC_array_threshold,\n stat_item_name=\"pval\", previous_stat_array_bool=POL_mean_change_bool)\n\n # --- Ratio-based stats (VV - VH) ---\n ratio = R\n # Linear trend only on valid pixels\n ratio_slope, ratio_r2, mask_sufficient_ratio = calculate_lsfit_r(\n ratio, valid_mask=valid_mask, compute_dtype=compute_dtype\n )\n DEC_array_threshold, _, _ = apply_threshold(\n [ratio_slope, ratio_r2, mask_sufficient_ratio],\n pol_item=\"RATIO\",\n DEC_array_threshold=DEC_array_threshold,\n stat_item_name=\"ratio_slope\"\n )\n if not DEBUG:\n del ratio_slope, ratio_r2\n\n # Mean change of ratio (future - past) with correct axis\n ratio_mean_change = (\n _nanmean_along(R[past_len:past_len + future_len].astype(np.float64, copy=False), axis=0)[0]\n - _nanmean_along(R[0:past_len].astype(np.float64, copy=False), axis=0)[0]\n ).astype(compute_dtype, copy=False)\n ratio_mean_change[dead_any] = np.nan\n DEC_array_threshold, _, ratio_mean_change_bool = apply_threshold(\n ratio_mean_change, pol_item=\"RATIO\", DEC_array_threshold=DEC_array_threshold,\n stat_item_name=\"ratio_mean_change\"\n )\n if not DEBUG: del ratio_mean_change\n\n # T-test on ratio (valid pixels only)\n t_t, t_p, mask_sufficient = ttest_from_stats(\n ratio[0:past_len, :, :], ratio[past_len:past_len + future_len, :, :],\n min_valid_per_window=3, valid_mask=valid_mask, compute_dtype=compute_dtype, alternative=\"less\")\n DEC_array_threshold, _, _ = apply_threshold(\n [t_t, t_p, mask_sufficient], pol_item=\"RATIO\", DEC_array_threshold=DEC_array_threshold,\n stat_item_name=\"pval\", previous_stat_array_bool=ratio_mean_change_bool)\n\n # --- Final mask post-processing ---\n DEC_array_mask = np.zeros_like(DEC_array_threshold, dtype=np.uint8)\n DEC_array_mask[DEC_array_threshold > 4] = 1\n\n # Force dead pixels to zero in both outputs (per your requirement)\n DEC_array_threshold[dead_any] = 0\n DEC_array_mask[dead_any] = 0\n\n return DEC_array_threshold, DEC_array_mask\n\n\ndef apply_datacube(cube: xr.DataArray, context: Dict) -> xr.DataArray:\n \"\"\"\n Simple UDF: Check S1 observation frequency via STAC and aggregate temporally.\n \"\"\"\n\n arr = cube\n\n # Get temporal extent\n spatial_extent = context[\"spatial_extent\"]\n datection_time = context[\"detection_start_time\"]\n detection_end_time = context[\"detection_end_time\"]\n\n start_d = datetime.datetime.strptime(datection_time, \"%Y-%m-%d\")\n end_d = datetime.datetime.strptime(detection_end_time, \"%Y-%m-%d\")\n delta_days = (end_d - start_d).days\n acq_frequency = abs(delta_days)\n\n # temporal extent\n days_interval = get_context_intervals(datection_time, acq_frequency=acq_frequency)\n start_time, end_time = get_overall_start_end(days_interval)\n # logger.info(f\"Processingfromto: {start_time} to {end_time}\")\n\n group_days_interval = create_timewindow_groups(days_interval)\n\n # Get spatial extent\n spatial_extent_4326, bbox_4326 = get_spatial_extent(spatial_extent)\n # logger.info(f\"Spatial extent in EPSG:{epsg_code}: {spatial_extent_4326} {bbox_4326}\")\n\n temporal_extent = get_temporal_extent(arr)\n\n # Fetch & build index\n feats = fetch_s1_features(bbox_4326, temporal_extent[\"start\"], temporal_extent[\"end\"])\n index_do = build_index_by_date_orbit(feats)\n\n template_array = np.zeros_like(arr[0, 0, :, :])\n\n # 2) Filter to your dates of interest\n # logger.info(f\"featuresdateorb: {index_do.keys()}\")\n # logger.info(f\"filteringscenesusing: {temporal_extent['times']}\")\n index_do = filter_index_to_dates(index_do, temporal_extent[\"times\"])\n # logger.info(f\"AfterTimeFilter: {index_do.keys()}\")\n # 3) Decide the orbit direction (or None if tie after tie-break)\n # selected_orbit = pick_orbit_direction(index_do, bbox)\n\n arr = 10 * np.log10(arr)\n\n # 4).\n DEC_array_combined = None\n entered_wininterval_loop = False\n\n DEC_temporal_list = []\n win_list = []\n # logger.info(f\"Processingtimewindows {len(group_days_interval)}\")\n for win, win_days_interval in group_days_interval.items():\n DEC_array_list = []\n entered_wininterval_loop = True\n DEC_array_stack = []\n DEC_array_threshold_stack = []\n for orbit_dir in [\"ASCENDING\", \"DESCENDING\"]:\n index_orb_do = filter_index_by_orbit(index_do, orbit_dir)\n\n if len(index_orb_do) == 0:\n DEC_array_stack.append(template_array)\n DEC_array_threshold_stack.append(template_array)\n continue\n\n vv_list = []\n vh_list = []\n\n for interval_start, interval_end in win_days_interval:\n vh_window_stack = []\n vv_window_stack = []\n orbit_dir_period = None\n time_points_averaged_str = \"\"\n for (dt, ob), fts in index_orb_do.items():\n if interval_start <= dt < interval_end:\n idx = next((i for i, d in enumerate(temporal_extent[\"times\"]) if d == dt), None)\n scene_array = arr[idx, :, :, :]\n vh_band = scene_array[0, :, :]\n vv_band = scene_array[1, :, :]\n\n vh_window_stack.append(vh_band)\n vv_window_stack.append(vv_band)\n\n time_points_averaged_str += f\"{dt.date()}, {idx} --\"\n # Average over the scenes in the interval\n if len(vh_window_stack) == 0 or len(vv_window_stack) == 0:\n vh_avg = np.full_like(template_array, np.nan)\n vv_avg = np.full_like(template_array, np.nan)\n else:\n vh_avg = np.nanmean(vh_window_stack, axis=0)\n vv_avg = np.nanmean(vv_window_stack, axis=0)\n # logger.info(f\"AvgInfo: shapes {vh_avg.shape} {vv_avg.shape} {interval_start.date()} to {interval_end.date()}, win: {win}, {time_points_averaged_str} -- Orbit: {orbit_dir}, -- avg {len(vh_window_stack)} scenes.\")\n\n vh_list.append(vh_avg)\n vv_list.append(vv_avg)\n\n vh_array_stack = np.stack(vh_list, axis=0)\n vv_array_stack = np.stack(vv_list, axis=0)\n logger.info(f\"StackInfo: shapes {vh_array_stack.shape} {vv_array_stack.shape} -- Orbit: {orbit_dir}\")\n\n DEC_array_threshold, DEC_array_mask = apply_stat_datacube({\"VV\": vv_array_stack, \"VH\": vh_array_stack}, window_size=10)\n DEC_array_mask = remove_small_objects(DEC_array_mask, min_size=4)\n DEC_array_stack.append(DEC_array_mask)\n DEC_array_threshold_stack.append(DEC_array_threshold)\n\n\n DEC_array_combined = np.nanmax(np.stack(DEC_array_stack, axis=0), axis=0)\n DEC_array_list.append(DEC_array_combined)\n DEC_array_list.extend(DEC_array_stack)\n DEC_array_list.extend(DEC_array_threshold_stack)\n\n win_list.append(win)\n # logger.info(f\"DECArraylistlen {len(DEC_array_list)}\")\n DEC_temporal_list.append(np.stack(DEC_array_list, axis=0))\n # logger.info(f\"DECArrayliststackshape {np.stack(DEC_array_list, axis=0).shape}\")\n\n # logger.info(f\"DECtemporallistlen {len(DEC_temporal_list)}\")\n if not entered_wininterval_loop:\n DEC_array_list = [template_array]\n win_list = [datetime.datetime(1, 1, 1, 0, 0, 0, 0)]\n\n if len(DEC_array_list) == 0:\n DEC_array_list = [template_array]\n win_list = [datetime.datetime(1, 1, 1, 0, 0, 0, 0)]\n\n DEC_temporal_array = np.stack(DEC_temporal_list, axis=0)\n # logger.info(f\"DECtemporalarray {DEC_temporal_array.shape}\")\n\n # create xarray with single timestamp\n output_xarraycube = xr.DataArray(\n DEC_temporal_array, #DEC_array_combined[np.newaxis, np.newaxis, :, :], # add a time dimension\n dims=[\"t\", \"bands\", \"y\", \"x\"],\n coords={\n \"t\": win_list,\n \"bands\": [\"DEC\", \"DEC_asc\", \"DEC_asc_threshold\", \"DEC_des\", \"DEC_des_threshold\"],# win is your datetime.datetime object\n \"y\": arr.coords[\"y\"],\n \"x\": arr.coords[\"x\"],\n }\n )\n\n return output_xarraycube\n" + "udf": "import xarray as xr\nimport pandas as pd\nfrom osgeo import osr, ogr\nimport re\nfrom datetime import timedelta\nimport datetime\nfrom collections import defaultdict, Counter, OrderedDict\nfrom typing import Dict, List, Tuple, Optional\nimport requests\nfrom shapely.geometry import shape\nfrom scipy import ndimage as ndi\n\nimport time\nimport numpy as np\nfrom scipy.stats import ttest_ind_from_stats\nfrom copy import copy\nimport logging\n\nDEBUG = False\nAPPLY_SIEVE_FILTER = True\n\nlogger = logging.getLogger(__name__)\n\n# -------------------------\n# Time extents\n# -------------------------\n# Phase boundaries as DATES\n\nMASTER_START = datetime.datetime(2015, 4, 28)\nPHASE1_END = datetime.datetime(2021, 12, 16)\nPHASE2_END = datetime.datetime(2025, 3, 30)\n\n# \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500 Step logic (your rule) \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\n\ndef step_forward(start_d: datetime.datetime, acq_frequency: int = 6) -> int:\n \"\"\"\n Decide interval length (N or 2N) from a given START date.\n\n Rule:\n - If start + N does NOT cross PHASE1_END => use N\n - Else, as long as start < PHASE2_END => use 2N\n - Once start >= PHASE2_END => use N\n \"\"\"\n N = acq_frequency\n\n if start_d + timedelta(days=N) <= PHASE1_END:\n # Still in Phase 1\n return N\n elif start_d < PHASE2_END:\n # Phase 2 (including the interval that crosses PHASE2_END)\n return 2 * N\n else:\n # Phase 3\n return N\n\n\n# \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500 Backwards (no truncation) \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\n\ndef prev_interval(cur_start: datetime.datetime, acq_frequency: int = 6) -> Tuple[datetime.date, datetime.date]:\n \"\"\"\n Given the START of the current interval, find the previous full interval [prev_start, cur_start],\n such that its length is either N or 2N and is consistent with step_forward(prev_start).\n\n No truncation: length is exactly N or 2N.\n \"\"\"\n N = acq_frequency\n\n # Candidate 1: previous interval length N\n cand1_start = cur_start - timedelta(days=N)\n cand1_len = N\n cand1_ok = (\n step_forward(cand1_start, N) == cand1_len and\n cand1_start + timedelta(days=cand1_len) == cur_start\n )\n\n # Candidate 2: previous interval length 2N\n cand2_start = cur_start - timedelta(days=2 * N)\n cand2_len = 2 * N\n cand2_ok = (\n step_forward(cand2_start, N) == cand2_len and\n cand2_start + timedelta(days=cand2_len) == cur_start\n )\n\n if not cand1_ok and not cand2_ok:\n raise RuntimeError(f\"No valid previous interval for start={cur_start}\")\n\n if cand1_ok and not cand2_ok:\n return cand1_start, cur_start\n if cand2_ok and not cand1_ok:\n return cand2_start, cur_start\n\n # Both valid (rare near boundaries) \u2013 prefer 2N by convention\n return cand2_start, cur_start\n\n\ndef back_chain(anchor_start: datetime.datetime, n_back: int, acq_frequency: int = 6) -> List[Tuple[datetime.date, datetime.date]]:\n \"\"\"\n Build n_back intervals BEFORE anchor_start, going backwards, with no truncation.\n \"\"\"\n intervals: List[Tuple[datetime.date, datetime.date]] = []\n cur_start = anchor_start\n\n for _ in range(n_back):\n prev_start, prev_end = prev_interval(cur_start, acq_frequency)\n intervals.append((prev_start, prev_end))\n cur_start = prev_start\n\n # Reverse to chronological order\n return list(reversed(intervals))\n\n\n# \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500 Forwards (no truncation) \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\n\ndef forward_chain(anchor_start: datetime.datetime, n_forw: int, acq_frequency: int = 6) -> List[Tuple[datetime.date, datetime.date]]:\n \"\"\"\n Build n_forw intervals starting from anchor_start (first interval starts at anchor_start).\n \"\"\"\n intervals: List[Tuple[datetime.date, datetime.date]] = []\n cur_start = anchor_start\n\n for _ in range(n_forw):\n length = step_forward(cur_start, acq_frequency)\n end = cur_start + timedelta(days=length)\n intervals.append((cur_start, end))\n cur_start = end\n\n return intervals\n\n# \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500 Main helper: 5 back + 4 forward \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\n\ndef get_context_intervals(\n start_str: str,\n back: int = 5,\n forward: int = 4,\n acq_frequency: int = 6\n) -> List[Tuple[datetime.date, datetime.date]]:\n \"\"\"\n Returns:\n - `back` intervals before start_date\n - the interval starting at start_date\n - `forward` intervals after that\n\n Total = back + 1 + forward intervals.\n The 6th interval's START is exactly start_date if back=5.\n \"\"\"\n start_d = datetime.datetime.strptime(start_str, \"%Y-%m-%d\")\n before = back_chain(start_d, back, acq_frequency) # 5 intervals before\n after = forward_chain(start_d, forward + 1, acq_frequency) # includes anchor as first\n return before + after\n\ndef get_overall_start_end(intervals: List[Tuple[datetime.date, datetime.date]]):\n \"\"\"\n Given a list of intervals [(start, end), ...],\n return (overall_start, overall_end).\n \"\"\"\n overall_start = min(s for s, e in intervals)\n overall_end = max(e for s, e in intervals)\n return overall_start, overall_end\n\n\n\n# -------------------------\n# Config / Constants\n# -------------------------\nS1_SEARCH_URL = \"https://catalogue.dataspace.copernicus.eu/resto/api/collections/Sentinel1/search.json\"\nDATE_RE = re.compile(r'_(\\d{8})T\\d{6}_') # e.g., ..._20211201T091308_...\n\n# -------------------------\n# Datacube utils\n# -------------------------\ndef get_spatial_extent(spatial_extent) -> Tuple[dict, List[float]]:\n\n \"\"\"Get spatial bounds in WGS84.\"\"\"\n # x_coord = arr.coords['x'].values\n # y_coord = arr.coords['y'].values\n #\n # west, east = float(x_coord.min()), float(x_coord.max())\n # south, north = float(y_coord.min()), float(y_coord.max())\n west, east, south, north = spatial_extent[\"west\"], spatial_extent[\"east\"], spatial_extent[\"south\"], spatial_extent[\"north\"]\n source_epsg = spatial_extent.get(\"crs\", \"EPSG:4326\").split(\":\")[-1]\n # ------------------------------------\n # Build polygon from bbox (in source CRS)\n # ------------------------------------\n if int(source_epsg) != 4326:\n ring = ogr.Geometry(ogr.wkbLinearRing)\n ring.AddPoint(west, south)\n ring.AddPoint(east, south)\n ring.AddPoint(east, north)\n ring.AddPoint(west, north)\n ring.AddPoint(west, south) # close ring\n\n geom = ogr.Geometry(ogr.wkbPolygon)\n geom.AddGeometry(ring)\n geom = geom.Clone() # 2D is fine here\n\n geom_wkt = geom.ExportToWkt()\n print(geom_wkt)\n\n # ------------------------------------\n # Define CRS and transformation\n # ------------------------------------\n source_epsg = int(source_epsg) # e.g. 3857 or 32633; must be int\n CATALOG_EPSG = 4326\n\n src_srs = osr.SpatialReference()\n src_srs.ImportFromEPSG(source_epsg)\n\n dst_srs = osr.SpatialReference()\n dst_srs.ImportFromEPSG(CATALOG_EPSG)\n\n # Assign SRS to geometry (good practice)\n geom.AssignSpatialReference(src_srs)\n\n trans_to_catalog = osr.CoordinateTransformation(src_srs, dst_srs)\n\n # Sanity check (optional)\n if trans_to_catalog is None:\n raise RuntimeError(\"Failed to create CoordinateTransformation\")\n\n # ------------------------------------\n # Transform and get envelope\n # ------------------------------------\n catalog_aoi_geom = geom.Clone()\n catalog_aoi_geom.Transform(trans_to_catalog)\n\n west, east, south, north = catalog_aoi_geom.GetEnvelope()\n\n return {'west': west, 'east': east, 'south': south, 'north': north}, [south, west, north, east]\n\ndef get_temporal_extent(arr: xr.DataArray) -> dict:\n \"\"\"Get temporal extent from time dimension.\"\"\"\n time_dim = 't'\n if 't' in arr.dims:\n times = arr.coords[time_dim].values\n times = pd.to_datetime(times).to_pydatetime()\n start = pd.to_datetime(times.min())\n end = pd.to_datetime(times.max())\n log_string = '\\n'.join([f'{i} {v}' for i, v in enumerate(times)])\n return {'start': start, 'end': end, 'times': times}\n\n\n\n# -------------------------\n# Utilities\n# -------------------------\ndef remove_small_objects(mask, min_size=2, connectivity=1):\n \"\"\"\n Removes connected components of 1s smaller than min_size.\n Fastest possible implementation (SciPy C backend).\n \"\"\"\n mask = np.asarray(mask).astype(bool)\n\n if min_size <= 1:\n return mask\n\n # Label connected components in the foreground (1-pixels)\n structure = ndi.generate_binary_structure(mask.ndim, connectivity)\n labels, num = ndi.label(mask, structure=structure)\n\n if num == 0:\n return mask\n\n # Compute size of each component\n sizes = ndi.sum(mask, labels, index=np.arange(1, num + 1))\n\n # Select components to keep\n keep_labels = np.where(sizes >= min_size)[0] + 1\n\n # Build output\n out = np.isin(labels, keep_labels)\n\n return out\n\n\ndef parse_date_from_title(title: str) -> Optional[datetime.datetime]:\n \"\"\"Extract YYYYMMDD as datetime from a Sentinel-1 title. Return None if not found.\"\"\"\n m = DATE_RE.search(title)\n if not m:\n return None\n return datetime.datetime.strptime(m.group(1), \"%Y%m%d\")\n\n\ndef intersection_ratio_bbox2_in_bbox1(b1: List[float], b2: List[float]) -> float:\n \"\"\"\n Fraction of bbox2's area inside bbox1. bboxes are [minx, miny, maxx, maxy] in lon/lat.\n \"\"\"\n x1_min, y1_min, x1_max, y1_max = b1\n x2_min, y2_min, x2_max, y2_max = b2\n\n inter_min_x = max(x1_min, x2_min)\n inter_min_y = max(y1_min, y2_min)\n inter_max_x = min(x1_max, x2_max)\n inter_max_y = min(y1_max, y2_max)\n\n if inter_min_x >= inter_max_x or inter_min_y >= inter_max_y:\n return 0.0\n\n inter_area = (inter_max_x - inter_min_x) * (inter_max_y - inter_min_y)\n area2 = (x2_max - x2_min) * (y2_max - y2_min)\n return 0.0 if area2 <= 0 else inter_area / area2\n\n\n\n\n\ndef create_timewindow_groups(intervals, window_size=10):\n out = {}\n for i in range(len(intervals) - window_size + 1):\n win = intervals[i:i + window_size]\n key = win[5][0] # start time of 6th interval\n out[key] = win\n return out\n\n\ndef fetch_s1_features(bbox: List[float], start_iso: str, end_iso: str) -> List[dict]:\n \"\"\"\n Query the Copernicus Dataspace Resto API for Sentinel-1 features within bbox and datetime range.\n Only returns features in JSON 'features' list.\n \"\"\"\n params = {\n \"box\": \",\".join(map(str, bbox)),\n \"page\": 1,\n \"maxRecords\": 1000,\n \"status\": \"ONLINE\",\n \"dataset\": \"ESA-DATASET\",\n \"processingLevel\": \"LEVEL1\",\n \"productType\": \"IW_GRDH_1S-COG\",\n \"startDate\": f\"{start_iso.strftime('%Y-%m-%dT00:00:00Z')}\",\n \"completionDate\": f\"{end_iso.strftime('%Y-%m-%dT23:59:59.999999999Z')}\",\n }\n r = requests.get(S1_SEARCH_URL, params=params, timeout=30)\n r.raise_for_status()\n # logger.info(f\"s1query {r.url}\")\n return r.json().get(\"features\", [])\n\n\ndef build_index_by_date_orbit(features: List[dict]) -> Dict[Tuple[datetime.datetime, str], List[dict]]:\n \"\"\"\n Keep only GRDH (non-CARD_BS, non-COG) scenes and index them by (date, orbitDirection),\n ordered chronologically by date.\n \"\"\"\n temp = defaultdict(list)\n for ft in features:\n props = ft.get(\"properties\", {})\n title = props.get(\"title\", \"\")\n if \"_GRDH_\" in title and \"_CARD_BS\" not in title: # and \"_COG.\" not in title:\n dt = parse_date_from_title(title)\n if dt is None:\n continue\n orbit = props.get(\"orbitDirection\", \"UNKNOWN\")\n temp[(dt, orbit)].append(ft)\n\n # Sort keys by datetime\n ordered = OrderedDict(sorted(temp.items(), key=lambda kv: kv[0][0]))\n return ordered\n\ndef filter_index_to_dates(index_do: Dict[Tuple[datetime.datetime, str], List[dict]],\n dt_list: List[datetime.datetime]) -> Dict[Tuple[datetime.datetime, str], List[dict]]:\n \"\"\"Keep only entries whose date is in dt_list (exact date match).\"\"\"\n dates = set(dt_list)\n return {k: v for k, v in index_do.items() if k[0] in dates}\n\n\ndef filter_index_by_orbit(index_do, selected_orbit):\n \"\"\"\n Filter (date, orbit) \u2192 [features] dictionary to keep only the chosen orbit direction.\n Returns a new dict preserving chronological order.\n \"\"\"\n if not selected_orbit:\n return index_do # no orbit chosen, keep all\n\n return dict(\n (k, v) for k, v in index_do.items() if k[1] == selected_orbit\n )\n\n\ndef pick_orbit_direction(index_do: Dict[Tuple[datetime.datetime, str], List[dict]],\n aoi_bbox: List[float]) -> Optional[str]:\n \"\"\"\n Choose orbit direction deterministically:\n 1) If only one orbit exists \u2192 return it.\n 2) Else compare counts per orbit \u2192 choose higher count.\n 3) If tie, compute each orbit's minimum (worst-case) fraction of scene-bbox inside AOI \u2192 prefer higher.\n 4) If still tied \u2192 return None.\n \"\"\"\n if not index_do:\n return None\n\n orbits = [k[1] for k in index_do.keys()]\n counts = Counter(orbits)\n if len(counts) == 1:\n return next(iter(counts))\n\n # Step 2: counts\n top_count = max(counts.values())\n leaders = [o for o, c in counts.items() if c == top_count]\n if len(leaders) == 1:\n return leaders[0]\n\n # Step 3: tie-break by worst-case overlap\n # For each orbit, compute the MIN overlap ratio across its scenes; pick the orbit with higher MIN\n min_overlap_by_orbit = {}\n for orbit in leaders:\n min_ratio = float(\"inf\")\n for (dt, ob), fts in index_do.items():\n if ob != orbit:\n continue\n for ft in fts:\n try:\n # Use feature geometry bounds as scene bbox\n scene_bbox = list(shape(ft[\"geometry\"]).bounds)\n except Exception:\n continue\n ratio = intersection_ratio_bbox2_in_bbox1(scene_bbox, aoi_bbox)\n if ratio < min_ratio:\n min_ratio = ratio\n if min_ratio == float(\"inf\"):\n min_ratio = 0.0\n min_overlap_by_orbit[orbit] = min_ratio\n\n # Compare worst-case overlap; if tie, return None\n max_min_overlap = max(min_overlap_by_orbit.values())\n overlap_leaders = [o for o, r in min_overlap_by_orbit.items() if r == max_min_overlap]\n return overlap_leaders[0] if len(overlap_leaders) == 1 else None\n\n\ndef get_scene_indices(index_do: Dict[Tuple[datetime.datetime, str], List[dict]],\n feature_names: List[str]) -> List[Tuple[int, str]]:\n \"\"\"\n Returns indices & filenames from feature_names that match ANY date present in the index.\n \"\"\"\n dates_yymmdd = {k[0].strftime(\"%Y%m%d\") for k in index_do.keys()}\n matched = []\n for i, name in enumerate(feature_names):\n if any(d in name for d in dates_yymmdd):\n matched.append((i, name))\n return matched\n\n\n#################################################################################\n##################### BACKSCATTER CHANGE DETECTION ##############################\n#################################################################################\n\ndef _as_dtype(a, dtype):\n return a if (a.dtype == dtype and a.flags['C_CONTIGUOUS']) else np.ascontiguousarray(a, dtype=dtype)\n\n\ndef _nanmean_along(a, axis, mask=None):\n \"\"\"Numerically stable nanmean with float64 accumulation.\"\"\"\n if mask is None:\n mask = np.isfinite(a)\n sums = np.where(mask, a, 0).sum(axis=axis, dtype=np.float64)\n counts = mask.sum(axis=axis, dtype=np.int64).astype(np.float64)\n out = np.divide(sums, counts, out=np.zeros_like(sums, dtype=np.float64), where=counts > 0)\n return out, counts\n\n\ndef create_nan_mask(numpy_array, vv_vh_bandcount):\n \"\"\"Return combined invalid mask (VV or VH invalid). Expects (T,H,W).\"\"\"\n mask_vv = np.isnan(numpy_array[:vv_vh_bandcount]) | (numpy_array[:vv_vh_bandcount] < -99)\n mask_vh = np.isnan(numpy_array[vv_vh_bandcount:2 * vv_vh_bandcount]) | (\n numpy_array[vv_vh_bandcount:2 * vv_vh_bandcount] < -99\n )\n return mask_vv | mask_vh\n\n\ndef ttest_from_stats(\n past,\n future,\n min_valid_per_window=3,\n valid_mask=None,\n compute_dtype=np.float32,\n alternative=\"greater\"\n):\n \"\"\"\n Column-wise Welch t-test (one-sided: past > future) using summary stats.\n NaNs are ignored. Outputs:\n t_stat_full : (H, W) compute_dtype\n p_one_full : (H, W) compute_dtype\n insufficient_full : (H, W) bool (True where either group has < min_valid samples)\n\n Parameters\n ----------\n past, future : np.ndarray, shape (T, H, W)\n Time-series stacks for the two windows being compared.\n min_valid_per_window : int\n Minimum non-NaN samples required in each window per pixel.\n valid_mask : np.ndarray | None, shape (H, W)\n Optional boolean mask of pixels to evaluate.\n compute_dtype : np.dtype\n Dtype for returned t and p arrays (e.g., np.float32).\n alternative : {\"greater\", \"less\"}\n \"greater\" : H1 is mean(past) > mean(future) (decrease in future)\n \"less\" : H1 is mean(past) < mean(future) (increase in future)\n \"\"\"\n assert past.shape == future.shape, \"past and future must have same shape (T,H,W)\"\n if alternative not in (\"greater\", \"less\"):\n raise ValueError(f\"alternative must be 'greater' or 'less', got {alternative!r}\")\n\n T, H, W = past.shape\n start_time = time.time()\n\n # Prepare outputs\n t_full = np.zeros((H, W), dtype=compute_dtype)\n p_full = np.ones((H, W), dtype=compute_dtype)\n ins_full = np.ones((H, W), dtype=bool)\n\n # Valid pixel mask\n vm = np.ones((H, W), dtype=bool) if valid_mask is None else valid_mask\n if not np.any(vm):\n return t_full, p_full, ins_full\n\n # Views as (T, N) and select only valid pixels (to reduce work/memory)\n N = H * W\n idx = vm.ravel()\n past_v = past.reshape(T, N)[:, idx]\n future_v = future.reshape(T, N)[:, idx]\n\n # Counts (ignore NaNs)\n n1 = np.sum(np.isfinite(past_v), axis=0).astype(np.int32, copy=False)\n n2 = np.sum(np.isfinite(future_v), axis=0).astype(np.int32, copy=False)\n\n # Pixels that have enough samples in BOTH groups\n test_idx = (n1 >= min_valid_per_window) & (n2 >= min_valid_per_window)\n\n out_t = np.zeros(idx.sum(), dtype=np.float64)\n out_p = np.ones(idx.sum(), dtype=np.float64)\n insufficient_v = ~test_idx\n\n if np.any(test_idx):\n # Summary stats in float64 for stability\n # Sample mean and sample variance (ddof=1)\n m1 = np.nanmean(past_v[:, test_idx], axis=0).astype(np.float64, copy=False)\n m2 = np.nanmean(future_v[:, test_idx], axis=0).astype(np.float64, copy=False)\n v1 = np.nanvar(past_v[:, test_idx], axis=0, ddof=1).astype(np.float64, copy=False)\n v2 = np.nanvar(future_v[:, test_idx], axis=0, ddof=1).astype(np.float64, copy=False)\n\n # Convert variance -> std (ttest_ind_from_stats expects std)\n s1 = np.sqrt(np.maximum(v1, 0.0))\n s2 = np.sqrt(np.maximum(v2, 0.0))\n\n nn1 = n1[test_idx].astype(np.float64, copy=False)\n nn2 = n2[test_idx].astype(np.float64, copy=False)\n\n try:\n # SciPy >= 1.9: supports one-sided alternative\n t_v, p_v = ttest_ind_from_stats(\n m1, s1, nn1,\n m2, s2, nn2,\n equal_var=False,\n alternative=alternative\n )\n except TypeError:\n # Older SciPy fallback: compute two-sided, convert to one-sided \"greater\"\n t_v, p2_v = ttest_ind_from_stats(\n m1, s1, nn1,\n m2, s2, nn2,\n equal_var=False\n )\n # Ensure array types\n t_v = np.asarray(t_v, dtype=np.float64)\n p2_v = np.asarray(p2_v, dtype=np.float64)\n\n p_v = np.ones_like(p2_v, dtype=np.float64)\n good = np.isfinite(t_v) & np.isfinite(p2_v)\n\n if alternative == \"greater\":\n pos = good & (t_v > 0)\n # For t > 0, one-sided p = two-sided p / 2\n p_v[pos] = p2_v[pos] / 2.0\n # For t <= 0, one-sided p = 1 - (two-sided p / 2)\n p_v[good & ~pos] = 1.0 - (p2_v[good & ~pos] / 2.0)\n else:\n # H1: mean(past) < mean(future)\n neg = good & (t_v < 0)\n p_v[neg] = p2_v[neg] / 2.0\n p_v[good & ~neg] = 1.0 - (p2_v[good & ~neg] / 2.0)\n\n # Sanitize NaNs/Infs in t\n t_v = np.asarray(t_v, dtype=np.float64)\n p_v = np.asarray(p_v, dtype=np.float64)\n bad = ~np.isfinite(t_v)\n if np.any(bad):\n t_v[bad] = 0.0\n\n out_t[test_idx] = t_v\n out_p[test_idx] = p_v\n\n # Scatter back to (H, W)\n t_full.ravel()[idx] = out_t.astype(compute_dtype, copy=False)\n p_full.ravel()[idx] = out_p.astype(compute_dtype, copy=False)\n ins_full.ravel()[idx] = insufficient_v\n return t_full, p_full, ins_full\n\ndef apply_threshold(stat_array, pol_item, DEC_array_threshold,\n stat_item_name=None, previous_stat_array_bool=None):\n \"\"\"\n Convert a stat array into a binary mask and accumulate into DEC. Dead pixels should be masked upstream.\n \"\"\"\n\n stat_array_copy = copy(stat_array)\n\n if stat_item_name == 'std':\n pol_thr = 2.0 if pol_item == \"VH\" else 1.5\n stat_array = np.where(np.isnan(stat_array) | (stat_array < pol_thr), 0, 1)\n\n elif stat_item_name == 'mean_change':\n # Looking for decreases (future - past <= threshold)\n pol_thr = -1.75\n stat_array = np.where(np.isnan(stat_array) | (stat_array > pol_thr), 0, 1)\n\n elif stat_item_name == 'pval':\n t_stat = stat_array[0]\n p_val = stat_array[1]\n insufficient = stat_array[2].astype(bool)\n pvalue_thr = 0.05\n\n is_significant = (p_val < pvalue_thr) & (~insufficient)\n stat_array = is_significant.astype(np.uint8)\n\n if previous_stat_array_bool is not None:\n stat_array[~previous_stat_array_bool.astype(bool)] = 0\n\n elif stat_item_name == 'ratio_slope':\n slope = stat_array[0]\n r2 = stat_array[1]\n insufficient = stat_array[2].astype(bool)\n\n slope_mask = np.isfinite(slope) & (slope >= 0.025) & ~insufficient\n r2_mask = np.isfinite(r2) & (r2 >= 0.60) & ~insufficient\n stat_array = (slope_mask & r2_mask).astype(np.uint8)\n\n elif stat_item_name == 'ratio_mean_change':\n stat_thr = 2.0\n stat_array = np.where(np.isnan(stat_array) | (stat_array < stat_thr), 0, 1)\n\n DEC_array_threshold += stat_array.astype(int)\n if DEBUG:\n return DEC_array_threshold, stat_array_copy, stat_array.astype(int)\n else:\n return DEC_array_threshold, None, stat_array.astype(int)\n\n\ndef calculate_lsfit_r(vv_vh_r, min_valid=3, center_time=True, valid_mask=None, compute_dtype=np.float32):\n T, H, W = vv_vh_r.shape\n x = np.arange(T, dtype=np.float64) # keep time in float64\n if center_time:\n x = x - x.mean()\n\n slope_full = np.full((H, W), np.nan, dtype=compute_dtype)\n r2_full = np.full((H, W), np.nan, dtype=compute_dtype)\n insufficient_full = np.ones((H, W), dtype=bool)\n\n vm = np.ones((H, W), dtype=bool) if valid_mask is None else valid_mask\n if not np.any(vm):\n return slope_full, r2_full, insufficient_full\n\n vv_vh_r = _as_dtype(vv_vh_r, compute_dtype)\n\n idx = vm.ravel()\n y = vv_vh_r.reshape(T, -1)[:, idx] # (T,N) compute_dtype\n mask = np.isfinite(y)\n n = mask.sum(axis=0)\n insufficient = n < min_valid\n\n # means with float64 accumulation\n y_mean, _ = _nanmean_along(y.astype(np.float64, copy=False), axis=0, mask=mask)\n\n x2 = x[:, None] # (T,1), float64\n y_center = np.where(mask, y.astype(np.float64, copy=False) - y_mean[None, :], 0.0)\n\n Sxx = ((x2 ** 2) * mask).sum(axis=0, dtype=np.float64)\n Sxy = (x2 * y_center).sum(axis=0, dtype=np.float64)\n\n with np.errstate(invalid='ignore', divide='ignore'):\n slope64 = np.divide(Sxy, Sxx, out=np.zeros_like(Sxy), where=Sxx > 0)\n\n yhat = slope64[None, :] * x2 + y_mean[None, :]\n resid = np.where(mask, y.astype(np.float64, copy=False) - yhat, 0.0)\n\n SSE = (resid ** 2).sum(axis=0, dtype=np.float64)\n SST = (np.where(mask, (y.astype(np.float64, copy=False) - y_mean[None, :]) ** 2, 0.0)\n ).sum(axis=0, dtype=np.float64)\n\n r2_64 = np.zeros_like(SSE)\n good_sst = SST > 0\n r2_64[good_sst] = 1.0 - (SSE[good_sst] / SST[good_sst])\n\n slope64[insufficient] = np.nan\n r2_64[insufficient] = np.nan\n\n slope_full.reshape(-1)[idx] = slope64.astype(compute_dtype, copy=False)\n r2_full.reshape(-1)[idx] = r2_64.astype(compute_dtype, copy=False)\n insufficient_full.reshape(-1)[idx] = insufficient\n return slope_full, r2_full, insufficient_full\n\n\ndef _dead_mask_for_window(stack, past_len, future_len):\n \"\"\"Pixels with all-NaN in past or in future (axis=0). stack: (T,H,W).\"\"\"\n past = stack[0:past_len, :, :]\n future = stack[past_len:past_len + future_len, :, :]\n past_dead = ~np.any(np.isfinite(past), axis=0)\n future_dead = ~np.any(np.isfinite(future), axis=0)\n return past_dead | future_dead\n\n\ndef apply_stat_datacube(numpy_stack_pol_dict, window_size=10, compute_dtype=np.float32):\n \"\"\"\n Multi-criteria change detection with early skipping of dead pixels\n (all-NaN in past or future window). Dead pixels => DEC outputs = 0.\n Heavy ops (ttest, lsfit) are computed only on valid pixels.\n \"\"\"\n start_time = time.time()\n VV = _as_dtype(numpy_stack_pol_dict[\"VV\"], compute_dtype)\n VH = _as_dtype(numpy_stack_pol_dict[\"VH\"], compute_dtype)\n\n bands, dim1, dim2 = numpy_stack_pol_dict[\"VV\"].shape\n assert window_size % 2 == 0, \"window_size must be even (split equally into past/future)\"\n assert bands >= window_size, f\"Need at least {window_size} time steps; got {bands}\"\n past_len = future_len = window_size // 2\n\n R = VV - VH\n\n # Build dead masks per signal, then combine\n dead_vv = _dead_mask_for_window(VV, past_len, future_len)\n dead_vh = _dead_mask_for_window(VH, past_len, future_len)\n dead_r = _dead_mask_for_window(R, past_len, future_len)\n dead_any = dead_vv | dead_vh | dead_r # (H,W)\n valid_mask = ~dead_any\n if not DEBUG: del dead_vv, dead_vh, dead_r\n\n # If everything is dead, return zeros fast\n if not np.any(valid_mask):\n DEC_array_threshold = np.zeros((dim1, dim2), dtype=np.int32)\n DEC_array_mask = np.zeros((dim1, dim2), dtype=np.uint8)\n return DEC_array_threshold, DEC_array_mask\n\n DEC_array_threshold = np.zeros((dim1, dim2), dtype=np.int32)\n\n # --- Per-pol statistics (VV and VH) ---\n for pol_item, stack in ((\"VV\", VV), (\"VH\", VH)):\n\n # Restrict to window once (cheap) \u2013 nanmean/nanstd will be fast on masked arrays anyway\n past = stack[0:past_len, :, :]\n future = stack[past_len:past_len + future_len, :, :]\n used = stack[0:past_len + future_len, :, :]\n\n # means/std with float64 reductions\n Stack_p_MIN, _ = _nanmean_along(past.astype(np.float64, copy=False), axis=0)\n Stack_f_MIN, _ = _nanmean_along(future.astype(np.float64, copy=False), axis=0)\n POL_std = np.nanstd(used.astype(np.float64, copy=False), axis=0).astype(compute_dtype, copy=False)\n\n # Mask out dead pixels before thresholding so they never vote\n POL_std[dead_any] = np.nan\n DEC_array_threshold, _, _ = apply_threshold(\n POL_std, pol_item, DEC_array_threshold, stat_item_name=\"std\"\n )\n if not DEBUG:\n del POL_std\n\n # Mean change (future - past)\n POL_mean_change = (Stack_f_MIN - Stack_p_MIN).astype(compute_dtype, copy=False)\n POL_mean_change[dead_any] = np.nan\n DEC_array_threshold, _, POL_mean_change_bool = apply_threshold(\n POL_mean_change, pol_item, DEC_array_threshold, stat_item_name=\"mean_change\"\n )\n\n # Paired t-test (compute only on valid pixels)\n t_t, t_p, mask_sufficient = ttest_from_stats(\n past, future, min_valid_per_window=3, valid_mask=valid_mask, compute_dtype=compute_dtype,\n alternative=\"greater\"\n )\n DEC_array_threshold, _, _ = apply_threshold(\n [t_t, t_p, mask_sufficient], pol_item, DEC_array_threshold,\n stat_item_name=\"pval\", previous_stat_array_bool=POL_mean_change_bool)\n\n # --- Ratio-based stats (VV - VH) ---\n ratio = R\n # Linear trend only on valid pixels\n ratio_slope, ratio_r2, mask_sufficient_ratio = calculate_lsfit_r(\n ratio, valid_mask=valid_mask, compute_dtype=compute_dtype\n )\n DEC_array_threshold, _, _ = apply_threshold(\n [ratio_slope, ratio_r2, mask_sufficient_ratio],\n pol_item=\"RATIO\",\n DEC_array_threshold=DEC_array_threshold,\n stat_item_name=\"ratio_slope\"\n )\n if not DEBUG:\n del ratio_slope, ratio_r2\n\n # Mean change of ratio (future - past) with correct axis\n ratio_mean_change = (\n _nanmean_along(R[past_len:past_len + future_len].astype(np.float64, copy=False), axis=0)[0]\n - _nanmean_along(R[0:past_len].astype(np.float64, copy=False), axis=0)[0]\n ).astype(compute_dtype, copy=False)\n ratio_mean_change[dead_any] = np.nan\n DEC_array_threshold, _, ratio_mean_change_bool = apply_threshold(\n ratio_mean_change, pol_item=\"RATIO\", DEC_array_threshold=DEC_array_threshold,\n stat_item_name=\"ratio_mean_change\"\n )\n if not DEBUG: del ratio_mean_change\n\n # T-test on ratio (valid pixels only)\n t_t, t_p, mask_sufficient = ttest_from_stats(\n ratio[0:past_len, :, :], ratio[past_len:past_len + future_len, :, :],\n min_valid_per_window=3, valid_mask=valid_mask, compute_dtype=compute_dtype, alternative=\"less\")\n DEC_array_threshold, _, _ = apply_threshold(\n [t_t, t_p, mask_sufficient], pol_item=\"RATIO\", DEC_array_threshold=DEC_array_threshold,\n stat_item_name=\"pval\", previous_stat_array_bool=ratio_mean_change_bool)\n\n # --- Final mask post-processing ---\n DEC_array_mask = np.zeros_like(DEC_array_threshold, dtype=np.uint8)\n DEC_array_mask[DEC_array_threshold > 4] = 1\n\n # Force dead pixels to zero in both outputs (per your requirement)\n DEC_array_threshold[dead_any] = 0\n DEC_array_mask[dead_any] = 0\n\n return DEC_array_threshold, DEC_array_mask\n\n\ndef apply_datacube(cube: xr.DataArray, context: Dict) -> xr.DataArray:\n \"\"\"\n Simple UDF: Check S1 observation frequency via STAC and aggregate temporally.\n \"\"\"\n\n arr = cube\n\n # Get temporal extent\n spatial_extent = context[\"spatial_extent\"]\n detection_start_time = context[\"detection_extent\"][0]\n detection_end_time = context[\"detection_extent\"][1]\n\n start_d = datetime.datetime.strptime(detection_start_time, \"%Y-%m-%d\")\n end_d = datetime.datetime.strptime(detection_end_time, \"%Y-%m-%d\")\n delta_days = (end_d - start_d).days\n acq_frequency = abs(delta_days)\n\n # temporal extent\n days_interval = get_context_intervals(detection_start_time, acq_frequency=acq_frequency)\n start_time, end_time = get_overall_start_end(days_interval)\n # logger.info(f\"Processingfromto: {start_time} to {end_time}\")\n\n group_days_interval = create_timewindow_groups(days_interval)\n\n # Get spatial extent\n spatial_extent_4326, bbox_4326 = get_spatial_extent(spatial_extent)\n # logger.info(f\"Spatial extent in EPSG:{epsg_code}: {spatial_extent_4326} {bbox_4326}\")\n\n temporal_extent = get_temporal_extent(arr)\n\n # Fetch & build index\n feats = fetch_s1_features(bbox_4326, temporal_extent[\"start\"], temporal_extent[\"end\"])\n index_do = build_index_by_date_orbit(feats)\n\n template_array = np.zeros_like(arr[0, 0, :, :])\n\n # 2) Filter to your dates of interest\n # logger.info(f\"featuresdateorb: {index_do.keys()}\")\n # logger.info(f\"filteringscenesusing: {temporal_extent['times']}\")\n index_do = filter_index_to_dates(index_do, temporal_extent[\"times\"])\n # logger.info(f\"AfterTimeFilter: {index_do.keys()}\")\n # 3) Decide the orbit direction (or None if tie after tie-break)\n # selected_orbit = pick_orbit_direction(index_do, bbox)\n\n arr = 10 * np.log10(arr)\n\n # 4).\n DEC_array_combined = None\n entered_wininterval_loop = False\n\n DEC_temporal_list = []\n win_list = []\n # logger.info(f\"Processingtimewindows {len(group_days_interval)}\")\n for win, win_days_interval in group_days_interval.items():\n DEC_array_list = []\n entered_wininterval_loop = True\n DEC_array_stack = []\n DEC_array_threshold_stack = []\n for orbit_dir in [\"ASCENDING\", \"DESCENDING\"]:\n index_orb_do = filter_index_by_orbit(index_do, orbit_dir)\n\n if len(index_orb_do) == 0:\n DEC_array_stack.append(template_array)\n DEC_array_threshold_stack.append(template_array)\n continue\n\n vv_list = []\n vh_list = []\n\n for interval_start, interval_end in win_days_interval:\n vh_window_stack = []\n vv_window_stack = []\n orbit_dir_period = None\n time_points_averaged_str = \"\"\n for (dt, ob), fts in index_orb_do.items():\n if interval_start <= dt < interval_end:\n idx = next((i for i, d in enumerate(temporal_extent[\"times\"]) if d == dt), None)\n scene_array = arr[idx, :, :, :]\n vh_band = scene_array[0, :, :]\n vv_band = scene_array[1, :, :]\n\n vh_window_stack.append(vh_band)\n vv_window_stack.append(vv_band)\n\n time_points_averaged_str += f\"{dt.date()}, {idx} --\"\n # Average over the scenes in the interval\n if len(vh_window_stack) == 0 or len(vv_window_stack) == 0:\n vh_avg = np.full_like(template_array, np.nan)\n vv_avg = np.full_like(template_array, np.nan)\n else:\n vh_avg = np.nanmean(vh_window_stack, axis=0)\n vv_avg = np.nanmean(vv_window_stack, axis=0)\n # logger.info(f\"AvgInfo: shapes {vh_avg.shape} {vv_avg.shape} {interval_start.date()} to {interval_end.date()}, win: {win}, {time_points_averaged_str} -- Orbit: {orbit_dir}, -- avg {len(vh_window_stack)} scenes.\")\n\n vh_list.append(vh_avg)\n vv_list.append(vv_avg)\n\n vh_array_stack = np.stack(vh_list, axis=0)\n vv_array_stack = np.stack(vv_list, axis=0)\n logger.info(f\"StackInfo: shapes {vh_array_stack.shape} {vv_array_stack.shape} -- Orbit: {orbit_dir}\")\n\n DEC_array_threshold, DEC_array_mask = apply_stat_datacube({\"VV\": vv_array_stack, \"VH\": vh_array_stack}, window_size=10)\n DEC_array_mask = remove_small_objects(DEC_array_mask, min_size=4)\n DEC_array_stack.append(DEC_array_mask)\n DEC_array_threshold_stack.append(DEC_array_threshold)\n\n\n DEC_array_combined = np.nanmax(np.stack(DEC_array_stack, axis=0), axis=0)\n DEC_array_list.append(DEC_array_combined)\n DEC_array_list.extend(DEC_array_stack)\n DEC_array_list.extend(DEC_array_threshold_stack)\n\n win_list.append(win)\n # logger.info(f\"DECArraylistlen {len(DEC_array_list)}\")\n DEC_temporal_list.append(np.stack(DEC_array_list, axis=0))\n # logger.info(f\"DECArrayliststackshape {np.stack(DEC_array_list, axis=0).shape}\")\n\n # logger.info(f\"DECtemporallistlen {len(DEC_temporal_list)}\")\n if not entered_wininterval_loop:\n DEC_array_list = [template_array]\n win_list = [datetime.datetime(1, 1, 1, 0, 0, 0, 0)]\n\n if len(DEC_array_list) == 0:\n DEC_array_list = [template_array]\n win_list = [datetime.datetime(1, 1, 1, 0, 0, 0, 0)]\n\n DEC_temporal_array = np.stack(DEC_temporal_list, axis=0)\n # logger.info(f\"DECtemporalarray {DEC_temporal_array.shape}\")\n\n # create xarray with single timestamp\n output_xarraycube = xr.DataArray(\n DEC_temporal_array, #DEC_array_combined[np.newaxis, np.newaxis, :, :], # add a time dimension\n dims=[\"t\", \"bands\", \"y\", \"x\"],\n coords={\n \"t\": win_list,\n \"bands\": [\"DEC\", \"DEC_asc\", \"DEC_asc_threshold\", \"DEC_des\", \"DEC_des_threshold\"],# win is your datetime.datetime object\n \"y\": arr.coords[\"y\"],\n \"x\": arr.coords[\"x\"],\n }\n )\n\n return output_xarraycube\n" }, "result": true } diff --git a/udf_apex_S1backscatter_changedetection.py b/udf_apex_S1backscatter_changedetection.py index e6bc259..5673fc9 100644 --- a/udf_apex_S1backscatter_changedetection.py +++ b/udf_apex_S1backscatter_changedetection.py @@ -829,16 +829,16 @@ def apply_datacube(cube: xr.DataArray, context: Dict) -> xr.DataArray: # Get temporal extent spatial_extent = context["spatial_extent"] - datection_time = context["detection_start_time"] - detection_end_time = context["detection_end_time"] + detection_start_time = context["detection_extent"][0] + detection_end_time = context["detection_extent"][1] - start_d = datetime.datetime.strptime(datection_time, "%Y-%m-%d") + start_d = datetime.datetime.strptime(detection_start_time, "%Y-%m-%d") end_d = datetime.datetime.strptime(detection_end_time, "%Y-%m-%d") delta_days = (end_d - start_d).days acq_frequency = abs(delta_days) # temporal extent - days_interval = get_context_intervals(datection_time, acq_frequency=acq_frequency) + days_interval = get_context_intervals(detection_start_time, acq_frequency=acq_frequency) start_time, end_time = get_overall_start_end(days_interval) # logger.info(f"Processingfromto: {start_time} to {end_time}")