diff --git a/src/pygeoprocessing/extensions.pxd b/src/pygeoprocessing/extensions.pxd index 564621ce..8b5ff86b 100644 --- a/src/pygeoprocessing/extensions.pxd +++ b/src/pygeoprocessing/extensions.pxd @@ -39,6 +39,7 @@ cdef extern from "extensions/ManagedRaster.h": int block_ybits long raster_x_size long raster_y_size + long n_pixels int block_nx int block_ny int write_mode @@ -53,6 +54,9 @@ cdef extern from "extensions/ManagedRaster.h": double get(long xi, long yi) void _load_block(int block_index) except * void close() + bint is_out_of_bounds(long x, long y) + bint is_nodata(double val) + bint is_nodata(long x, long y) cdef cppclass ManagedFlowDirRaster[T]: LRUCache[int, double*]* lru_cache @@ -80,6 +84,9 @@ cdef extern from "extensions/ManagedRaster.h": void set(long xi, long yi, double value) double get(long xi, long yi) void close() + bint is_out_of_bounds(long x, long y) + bint is_nodata(double val) + bint is_nodata(long x, long y) cdef cppclass D8 cdef cppclass MFD diff --git a/src/pygeoprocessing/extensions/ManagedRaster.h b/src/pygeoprocessing/extensions/ManagedRaster.h index 17382847..54593cea 100644 --- a/src/pygeoprocessing/extensions/ManagedRaster.h +++ b/src/pygeoprocessing/extensions/ManagedRaster.h @@ -73,6 +73,17 @@ static void log_msg(LogLevel level, string msg) Py_DECREF(pyString); } +// Note: I was concerned that checking each value for nan would be too slow, but +// I compared its performance against another implementation where we first reclassify +// nans to a regular float, and then skip the nan check, and that was much slower: +// https://github.com/natcap/invest/issues/1714#issuecomment-2762134419 +inline bool is_close(double x, double y) { + if (isnan(x) and isnan(y)) { + return true; + } + return abs(x - y) <= (pow(10, -8) + pow(10, -05) * abs(y)); +} + class NeighborTuple { public: int direction, x, y; @@ -104,6 +115,7 @@ class ManagedRaster { long raster_y_size; int block_nx; int block_ny; + long n_pixels; char* raster_path; int band_id; GDALDataset* dataset; @@ -138,6 +150,7 @@ class ManagedRaster { raster_x_size = dataset->GetRasterXSize(); raster_y_size = dataset->GetRasterYSize(); + n_pixels = raster_x_size * raster_y_size; if (band_id < 1 or band_id > dataset->GetRasterCount()) { throw std::invalid_argument( @@ -380,6 +393,21 @@ class ManagedRaster { delete lru_cache; free(actualBlockWidths); } + + bool is_out_of_bounds(long x, long y) { + if (x < 0 or x >= raster_x_size or y < 0 or y >= raster_y_size) { + return true; + } + return false; + } + + bool is_nodata(double val) { + return is_close(val, nodata); + } + + bool is_nodata(long x, long y) { + return is_close(get(x, y), nodata); + } }; // Represents a flow direction raster, which may be of type MFD or D8 @@ -851,15 +879,4 @@ class UpslopeNeighborsNoDivide: public Neighbors { UpslopeNeighborNoDivideIterator end() { return UpslopeNeighborNoDivideIterator(&endVal); } }; -// Note: I was concerned that checking each value for nan would be too slow, but -// I compared its performance against another implementation where we first reclassify -// nans to a regular float, and then skip the nan check, and that was much slower: -// https://github.com/natcap/invest/issues/1714#issuecomment-2762134419 -inline bool is_close(double x, double y) { - if (isnan(x) and isnan(y)) { - return true; - } - return abs(x - y) <= (pow(10, -8) + pow(10, -05) * abs(y)); -} - #endif // NATCAP_INVEST_MANAGEDRASTER_H_ diff --git a/src/pygeoprocessing/routing/routing.pyx b/src/pygeoprocessing/routing/routing.pyx index 4f12528f..a62a36cc 100644 --- a/src/pygeoprocessing/routing/routing.pyx +++ b/src/pygeoprocessing/routing/routing.pyx @@ -3591,7 +3591,49 @@ def extract_strahler_streams_d8( def _build_discovery_finish_rasters( flow_dir_d8_raster_path_band, target_discovery_raster_path, target_finish_raster_path): - """Generates a discovery and finish time raster for a given d8 flow path. + """Generate discovery and finish time rasters from a D8 flow dir raster. + + This algorithm traverses the flow path starting from drain points and + working upslope using a depth-first search. The terms "discovery time" and + "finish time" come from graph theory and refer to the order in which a + pixel is first visited vs. marked completed by the algorithm. + + The discovery raster records the order in which each pixel is visited, from + 0 (visited first) to the total number of pixels minus 1 (visited last). + + The finish raster records when the "branch" of the watershed originating + from each pixel is complete. A pixel's finish time is equal to the largest + discovery time of any pixel upslope of it. + + Considering an example watershed, where A is the drain point: + + A + / \ + B C + / / \ + E F G + / \ | / \ + J K L N O + + the discovery order would be + + 0 + / \ + 7 1 + / / \ + 8 5 2 + / \ | / \ + 10 9 6 4 3 + + and the finish order would be + + 10 + / \ + 10 6 + / / \ + 10 6 4 + / \ | / \ + 10 9 6 4 3 Args: flow_dir_d8_raster_path_band (tuple): a D8 flow raster path band tuple. @@ -3641,6 +3683,22 @@ def _build_discovery_finish_rasters( cdef int i, j, xoff, yoff, win_xsize, win_ysize, x_l, y_l, x_n, y_n cdef int n_dir, test_dir + # this essentially does a depth-first traversal of the watershed, + # starting from drain points and working upslope. + # + # iterate over each pixel in the flow dir raster, block by block + # if a pixel is a drain point, push it onto the stack + # while the stack isn't empty, + # pop a pixel from the stack + # check each neighbor to see if it drains to the pixel + # for each neighbor pixel, if it drains to i, push it to the stack + # + # after checking all neighbors, push an item to the finish stack + # that consists of (x_i, y_i, number of neighbors pushed) + # + # if no neighbors were pushed, (no pixels drain to i, so i is a local high point), + # + # for offset_dict in pygeoprocessing.iterblocks( flow_dir_d8_raster_path_band, offset_only=True): # search raster block by raster block @@ -3664,10 +3722,12 @@ def _build_discovery_finish_rasters( if d_n == flow_dir_nodata: continue - # check if downstream neighbor runs off raster or is nodata + # find the downstream neighbor (the pixel that this pixel drains to) x_n = x_l + COL_OFFSETS[d_n] y_n = y_l + ROW_OFFSETS[d_n] + # if the downstream neighbor runs off raster or is nodata, then + # this pixel is a drain point, so we can push it to the stack if (x_n < 0 or y_n < 0 or x_n >= n_cols or y_n >= n_rows or flow_dir_managed_raster.get( x_n, y_n) == flow_dir_nodata): @@ -3689,9 +3749,14 @@ def _build_discovery_finish_rasters( for test_dir in range(8): x_n = raster_coord.xi + COL_OFFSETS[test_dir % 8] y_n = raster_coord.yi + ROW_OFFSETS[test_dir % 8] + + # skip neighbors that are outside of the raster bounds, + # or that have nodata if x_n < 0 or y_n < 0 or \ x_n >= n_cols or y_n >= n_rows: continue + + # if the neighbor drains to this pixel, push it to the stack n_dir = flow_dir_managed_raster.get(x_n, y_n) if n_dir == flow_dir_nodata: continue @@ -3705,7 +3770,13 @@ def _build_discovery_finish_rasters( FinishType( raster_coord.xi, raster_coord.yi, n_pushed)) - # pop the finish stack until n_pushed > 1 + # if n_pushed is 0, that means no neighbors drain to pixel i, + # and so i is a local high point. + # + # pop the finish stack until n_pushed > 1. + # the top item will be (xi, yi, 0) + # set the finish order raster at (xi, yi) to discovery order - 1 (why?) + # if n_pushed == 0: while (not finish_stack.empty() and finish_stack.top().n_pushed <= 1): @@ -3780,6 +3851,13 @@ def calculate_subwatershed_boundary( os.path.dirname(target_watershed_boundary_vector_path))) discovery_time_raster_path = os.path.join(workspace_dir, 'discovery.tif') finish_time_raster_path = os.path.join(workspace_dir, 'finish.tif') + boundary_raster_path = os.path.join(workspace_dir, 'boundary.tif') + + pygeoprocessing.new_raster_from_base( + d8_flow_dir_raster_path_band[0], boundary_raster_path, + gdal.GDT_Float64, [-1]) + boundary_managed_raster = ManagedRaster( + boundary_raster_path.encode('utf-8'), 1, True) # construct the discovery/finish time rasters for fast individual cell # watershed detection @@ -3848,6 +3926,18 @@ def calculate_subwatershed_boundary( stream_layer = stream_vector.GetLayer() # construct linkage data structure for upstream streams + # + # each stream feature starts and ends at a confluence point. + # there are no confluence points midway along a stream. + # a stream may have multiple tributaries only if they share a confluence point. + # + # therefore if stream B is a tributary of stream A, then + # stream A's upstream coordinate (its starting point) equals + # stream B's downstream coordinate (its end point) + # + # map each stream's downstream endpoint to its FID + # we will use this to look up a stream's tributaries + # given its upstream coordinate. upstream_fid_map = collections.defaultdict(list) for stream_feature in stream_layer: ds_x = int(stream_feature.GetField('ds_x')) @@ -3864,6 +3954,11 @@ def calculate_subwatershed_boundary( # first stream_layer.SetAttributeFilter(f'"outlet"=1') # these are done last + # + # outlet streams are the lowest/final streams, they are not a tributary + # to any other stream. + # + # visit outlet streams in strahler stream order from largest to smallest for _, outlet_fid in sorted([ (x.GetField('order'), x.GetFID()) for x in stream_layer], reverse=True): @@ -3879,6 +3974,10 @@ def calculate_subwatershed_boundary( ds_x_1 = int(working_feature.GetField('ds_x_1')) ds_y_1 = int(working_feature.GetField('ds_y_1')) + # if this stream has any tributaries that have not yet been + # processed, add them to the stack. + # otherwise, if all tributaries have been processed, we can + # proceed to calculate the subwatershed. upstream_coord = (us_x, us_y) upstream_fids = [ fid for fid in upstream_fid_map[upstream_coord] @@ -3890,26 +3989,26 @@ def calculate_subwatershed_boundary( # the `not outlet_at_confluence` bit allows us to seed # even if the order is 1, otherwise confluences fill # the order 1 streams - if (working_feature.GetField('order') > 1 or - not outlet_at_confluence): - if outlet_at_confluence: - # seed the upstream point - visit_order_stack.append((working_fid, us_x, us_y)) - else: - # seed the downstream but +1 step point - visit_order_stack.append( - (working_fid, ds_x_1, ds_y_1)) + # + # if order == 1: is a headwater stream (has no tributaries) + # if order > 1: has tributaries + + # if this stream has tributaries, and we are using the confluence + # point as the outlet, then we can seed the upstream point + # (which is the downstream point of the tributaries) + if working_feature.GetField('order') > 1 and outlet_at_confluence: + visit_order_stack.append((working_fid, us_x, us_y)) + # if this stream is an outlet, we need to push its downstream point + # onto the stack, because there are no further streams to do so if working_feature.GetField('outlet') == 1: # an outlet is a special case where the outlet itself # should be a subwatershed done last. - ds_x = int(working_feature.GetField('ds_x')) - ds_y = int(working_feature.GetField('ds_y')) - if not outlet_at_confluence: - # undo the previous visit because it will be at - # one pixel up and we want the pixel right at - # the outlet - visit_order_stack.pop() visit_order_stack.append((working_fid, ds_x, ds_y)) + # if not an outlet and we are using 1 pixel up from the confluence + # point, seed the downstream+1 point + # TODO: why does this add DS+1 point of this stream and not of its tributaries? + elif not outlet_at_confluence: + visit_order_stack.append((working_fid, ds_x_1, ds_y_1)) cdef int edge_side, edge_dir, cell_to_test, out_dir_increase=-1 cdef int left, right, n_steps, terminated_early @@ -3917,14 +4016,20 @@ def calculate_subwatershed_boundary( cdef int _int_max_steps_per_watershed = max_steps_per_watershed for index, (stream_fid, x_l, y_l) in enumerate(visit_order_stack): + print('FID', stream_fid) if ctime(NULL) - last_log_time > _LOGGING_PERIOD: LOGGER.info( f'(calculate_subwatershed_boundary): watershed building ' f'{(index/len(visit_order_stack))*100:.1f}% complete') last_log_time = ctime(NULL) discovery = discovery_managed_raster.get(x_l, y_l) + # We reach this case if the current pixel is a boundary pixel of + # a previously calculated watershed. if discovery == -1: continue + + # keep a list of pixels that are within the watershed along the boundary + # these will later be filled in with -1 in the discovery raster boundary_list = [(x_l, y_l)] finish = finish_managed_raster.get(x_l, y_l) outlet_x = x_l @@ -3972,79 +4077,148 @@ def calculate_subwatershed_boundary( x_l += COL_OFFSETS[edge_dir] y_l += ROW_OFFSETS[edge_dir] # note the pixel moved + print(f'adding current pixel {x_l}, {y_l} to boundary') boundary_list.append((x_l, y_l)) + print(f'starting from pixel {x_l}, {y_l}, vertex {x_f}, {y_f}, edge side {edge_side}, edge dir {edge_dir}') + n_steps = 0 terminated_early = 0 - while True: - # step the edge then determine the projected coordinates - x_f += COL_OFFSETS[edge_dir] - y_f += ROW_OFFSETS[edge_dir] - delta_x += COL_OFFSETS[edge_dir] - delta_y += ROW_OFFSETS[edge_dir] - # equivalent to gdal.ApplyGeoTransform(geotransform, x_f, y_f) - # to eliminate python function call overhead - x_p = g0 + g1*x_f + g2*y_f - y_p = g3 + g4*x_f + g5*y_f - watershed_boundary.AddPoint(x_p, y_p) - n_steps += 1 - if n_steps > _int_max_steps_per_watershed: - LOGGER.warning('quitting, too many steps') - terminated_early = 1 - break - if x_l < 0 or y_l < 0 or x_l >= n_cols or y_l >= n_rows: - # This is unexpected but worth checking since missing this - # error would be very difficult to debug. - raise RuntimeError( - f'{x_l}, {y_l} out of bounds for ' - f'{n_cols}x{n_rows} raster.') - if edge_side - ((edge_dir-2) % 8) == 0: - # counterclockwise configuration - left = edge_dir - right = (left-1) % 8 - out_dir_increase = 2 - else: - # clockwise configuration (swapping "left" and "right") - right = edge_dir - left = (edge_side+1) - out_dir_increase = -2 - left_in = _in_watershed( - x_l, y_l, left, discovery, finish, n_cols, n_rows, - discovery_managed_raster, discovery_nodata) - right_in = _in_watershed( - x_l, y_l, right, discovery, finish, n_cols, n_rows, - discovery_managed_raster, discovery_nodata) - if right_in: - # turn right - out_dir = edge_side - edge_side = (edge_side-out_dir_increase) % 8 - edge_dir = out_dir - # pixel moves to be the right cell - x_l += COL_OFFSETS[right] - y_l += ROW_OFFSETS[right] - _diagonal_fill_step( - x_l, y_l, right, - discovery, finish, discovery_managed_raster, - discovery_nodata, - boundary_list) - elif left_in: - # step forward - x_l += COL_OFFSETS[edge_dir] - y_l += ROW_OFFSETS[edge_dir] - # the pixel moves forward - boundary_list.append((x_l, y_l)) - else: - # turn left - edge_side = edge_dir - edge_dir = (edge_side + out_dir_increase) % 8 + geoms = [] + + starting_points = [((x_l, y_l), (x_f, y_f), (x_p, y_p), edge_side, edge_dir, 'left')] + all_starting_points = set((x_f, y_f)) + + while starting_points: + (x_l, y_l), (x_f, y_f), (x_p, y_p), edge_side, edge_dir, orientation = starting_points.pop() + starting_point = (x_f, y_f) + starting_dir = edge_dir + vertices = [(x_p, y_p)] + print('starting from', (x_l, y_l), (x_f, y_f), edge_side, edge_dir, orientation) + + while True: + # step the edge then determine the projected coordinates + x_f += COL_OFFSETS[edge_dir] + y_f += ROW_OFFSETS[edge_dir] + + delta_x += COL_OFFSETS[edge_dir] + delta_y += ROW_OFFSETS[edge_dir] + + # equivalent to gdal.ApplyGeoTransform(geotransform, x_f, y_f) + # to eliminate python function call overhead + x_p = g0 + g1*x_f + g2*y_f + y_p = g3 + g4*x_f + g5*y_f + watershed_boundary.AddPoint(x_p, y_p) + vertices.append((x_p, y_p)) + + # boundary_list.append((x_l, y_l)) + + n_steps += 1 + if n_steps > _int_max_steps_per_watershed: + LOGGER.warning('quitting, too many steps') + terminated_early = 1 + break + if x_l < 0 or y_l < 0 or x_l >= n_cols or y_l >= n_rows: + # This is unexpected but worth checking since missing this + # error would be very difficult to debug. + raise RuntimeError( + f'{x_l}, {y_l} out of bounds for ' + f'{n_cols}x{n_rows} raster.') + + + if edge_side - ((edge_dir-2) % 8) == 0: + # counterclockwise configuration + left = edge_dir + right = (left - 1) % 8 + out_dir_increase = 2 + else: + # clockwise configuration (swapping "left" and "right") + right = edge_dir + left = (right + 1) % 8 + out_dir_increase = -2 + left_in = _in_watershed( + x_l, y_l, left, discovery, finish, n_cols, n_rows, + discovery_managed_raster, discovery_nodata) + right_in = _in_watershed( + x_l, y_l, right, discovery, finish, n_cols, n_rows, + discovery_managed_raster, discovery_nodata) + print('left', left, left_in) + print('right', right, right_in) + + if orientation == 'left': + if right_in and left_in: + print(f'turn right, move pixel to {right}') + # turn right + out_dir = edge_side + edge_side = (edge_side - out_dir_increase) % 8 + edge_dir = out_dir + + # add both left and right to the boundary + # so that the boundary is a continuous border of pixels that share a side + boundary_list.append((x_l + COL_OFFSETS[left], y_l + ROW_OFFSETS[left])) + boundary_list.append((x_l + COL_OFFSETS[right], y_l + ROW_OFFSETS[right])) + + # pixel moves to be the right cell + x_l += COL_OFFSETS[right] + y_l += ROW_OFFSETS[right] + + elif left_in and not right_in: + print(f'go straight, move pixel to {edge_dir}') + # step forward + x_l += COL_OFFSETS[edge_dir] + y_l += ROW_OFFSETS[edge_dir] + # the pixel moves forward + boundary_list.append((x_l, y_l)) + + elif right_in and not left_in: # continue straight and swap orientation + print(f'turn left and push point to stack') + # turn left + edge_side = edge_dir + edge_dir = (edge_side + out_dir_increase) % 8 + + point = ( + (x_l + COL_OFFSETS[right], y_l + ROW_OFFSETS[right]), + (x_f, y_f), + (x_p, y_p), + (edge_side + 4) % 8, + (edge_dir + 4) % 8, + 'left' + ) + print('pushing', point) + + if (x_l + COL_OFFSETS[right], y_l + ROW_OFFSETS[right]) not in all_starting_points: + starting_points.append(point) + all_starting_points.add((x_l + COL_OFFSETS[right], y_l + ROW_OFFSETS[right])) + boundary_list.append((x_l + COL_OFFSETS[right], y_l + ROW_OFFSETS[right])) - if delta_x == 0 and delta_y == 0: - # met the start point so we completed the watershed loop - break + else: + print('turn left') + # turn left + edge_side = edge_dir + edge_dir = (edge_side + out_dir_increase) % 8 + + + if (x_f, y_f) == starting_point and edge_dir == starting_dir: + geoms.append(shapely.Polygon(vertices)) + break + + if delta_x == 0 and delta_y == 0: + break + + + print(f'pixel: ({x_l}, {y_l}), vertex: ({x_f}, {y_f}), edge side: {edge_side}, edge dir: {edge_dir}') + + + + print(stream_fid, 'geoms:', len(geoms)) + + watershed_polygon = shapely.Polygon() + for geom in geoms: + watershed_polygon = shapely.union(watershed_polygon, geom) watershed_feature = ogr.Feature(watershed_layer.GetLayerDefn()) - watershed_polygon = ogr.Geometry(ogr.wkbPolygon) - watershed_polygon.AddGeometry(watershed_boundary) + watershed_polygon = ogr.CreateGeometryFromWkb(watershed_polygon.wkb) + print(watershed_polygon) watershed_feature.SetGeometry(watershed_polygon) watershed_feature.SetField('stream_fid', stream_fid) watershed_feature.SetField('terminated_early', terminated_early) @@ -4056,6 +4230,7 @@ def calculate_subwatershed_boundary( # doesn't interfere with the loop return to think the cells are no # longer in the watershed for boundary_x, boundary_y in boundary_list: + boundary_managed_raster.set(boundary_x, boundary_y, stream_fid) discovery_managed_raster.set(boundary_x, boundary_y, -1) watershed_layer.CommitTransaction() watershed_layer = None @@ -4063,7 +4238,8 @@ def calculate_subwatershed_boundary( discovery_managed_raster.close() finish_managed_raster.close() d8_flow_dir_managed_raster.close() - shutil.rmtree(workspace_dir) + boundary_managed_raster.close() + # shutil.rmtree(workspace_dir) LOGGER.info( '(calculate_subwatershed_boundary): watershed building 100% complete') @@ -4456,24 +4632,19 @@ cdef void _diagonal_fill_step( # this section determines which back diagonal was in the watershed and # fills it. if none are we pick one so there's no degenerate case - cdef int xdelta = COL_OFFSETS[edge_dir] - cdef int ydelta = ROW_OFFSETS[edge_dir] - test_list = [ - (x_l - xdelta, y_l), - (x_l, y_l - ydelta)] - for x_t, y_t in test_list: - point_discovery = discovery_managed_raster.get( - x_t, y_t) + for _xt, _yt in [(x_l - COL_OFFSETS[edge_dir], y_l), (x_l, y_l - ROW_OFFSETS[edge_dir])]: + point_discovery = discovery_managed_raster.get(_xt, _yt) if (point_discovery != discovery_nodata and point_discovery >= discovery and point_discovery <= finish): - boundary_list.append((int(x_t), int(y_t))) + boundary_list.append((_xt, _yt)) # there's only one diagonal to fill in so it's done here - return + break - # if there's a degenerate case then just add the xdelta, - # it doesn't matter - boundary_list.append(test_list[0]) + # # if there's a degenerate case then just add the xdelta, + # # it doesn't matter + # print('wrongly marking', test_list[0], 'as boundary') + # boundary_list.append(test_list[0]) cdef int _in_watershed( @@ -4504,6 +4675,7 @@ cdef int _in_watershed( if x_n < 0 or y_n < 0 or x_n >= n_cols or y_n >= n_rows: return 0 cdef long point_discovery = discovery_managed_raster.get(x_n, y_n) + print('in watershed?', point_discovery, discovery, finish) return (point_discovery != discovery_nodata and point_discovery >= discovery and point_discovery <= finish)