|
1 | 1 | package net.imglib2.algorithm.convolution; |
2 | 2 |
|
3 | | -import java.util.ArrayList; |
4 | | -import java.util.List; |
5 | | -import java.util.concurrent.Callable; |
6 | | -import java.util.concurrent.ExecutionException; |
7 | | -import java.util.concurrent.ExecutorService; |
8 | | -import java.util.concurrent.Future; |
9 | | -import java.util.function.Consumer; |
10 | | -import java.util.function.Supplier; |
11 | | - |
12 | 3 | import net.imglib2.FinalInterval; |
13 | 4 | import net.imglib2.Interval; |
14 | 5 | import net.imglib2.Localizable; |
15 | | -import net.imglib2.Point; |
16 | 6 | import net.imglib2.RandomAccess; |
17 | 7 | import net.imglib2.RandomAccessible; |
18 | 8 | import net.imglib2.RandomAccessibleInterval; |
19 | | -import net.imglib2.util.IntervalIndexer; |
| 9 | +import net.imglib2.loops.LoopBuilder; |
20 | 10 | import net.imglib2.util.Intervals; |
| 11 | +import net.imglib2.util.Localizables; |
21 | 12 | import net.imglib2.view.Views; |
22 | 13 |
|
23 | 14 | /** |
|
26 | 17 | * |
27 | 18 | * @author Matthias Arzt |
28 | 19 | */ |
29 | | -public class LineConvolution< T > extends AbstractMultiThreadedConvolution< T > |
| 20 | +public class LineConvolution< T > implements Convolution<T> |
30 | 21 | { |
31 | 22 | private final LineConvolverFactory< ? super T > factory; |
32 | 23 |
|
@@ -55,100 +46,33 @@ public T preferredSourceType( final T targetType ) |
55 | 46 | } |
56 | 47 |
|
57 | 48 | @Override |
58 | | - protected void process( final RandomAccessible< ? extends T > source, final RandomAccessibleInterval< ? extends T > target, final ExecutorService executorService, final int numThreads ) |
| 49 | + public void process( RandomAccessible< ? extends T > source, RandomAccessibleInterval< ? extends T > target ) |
59 | 50 | { |
60 | 51 | final RandomAccessibleInterval< ? extends T > sourceInterval = Views.interval( source, requiredSourceInterval( target ) ); |
61 | 52 | final long[] sourceMin = Intervals.minAsLongArray( sourceInterval ); |
62 | 53 | final long[] targetMin = Intervals.minAsLongArray( target ); |
63 | 54 |
|
64 | | - final Supplier< Consumer< Localizable > > actionFactory = () -> { |
65 | | - |
66 | | - final RandomAccess< ? extends T > in = sourceInterval.randomAccess(); |
67 | | - final RandomAccess< ? extends T > out = target.randomAccess(); |
68 | | - final Runnable convolver = factory.getConvolver( in, out, direction, target.dimension( direction ) ); |
69 | | - |
70 | | - return position -> { |
71 | | - in.setPosition( sourceMin ); |
72 | | - out.setPosition( targetMin ); |
73 | | - in.move( position ); |
74 | | - out.move( position ); |
75 | | - convolver.run(); |
76 | | - }; |
77 | | - }; |
78 | | - |
79 | 55 | final long[] dim = Intervals.dimensionsAsLongArray( target ); |
80 | 56 | dim[ direction ] = 1; |
81 | 57 |
|
82 | | - final int numTasks = numThreads > 1 ? timesFourAvoidOverflow(numThreads) : 1; |
83 | | - LineConvolution.forEachIntervalElementInParallel( executorService, numTasks, new FinalInterval( dim ), actionFactory ); |
84 | | - } |
| 58 | + RandomAccessibleInterval< Localizable > positions = Localizables.randomAccessibleInterval( new FinalInterval( dim ) ); |
| 59 | + LoopBuilder.setImages( positions ).multiThreaded().forEachChunk( |
| 60 | + chunk -> { |
85 | 61 |
|
86 | | - private int timesFourAvoidOverflow( int x ) |
87 | | - { |
88 | | - return (int) Math.min((long) x * 4, Integer.MAX_VALUE); |
89 | | - } |
| 62 | + final RandomAccess< ? extends T > in = sourceInterval.randomAccess(); |
| 63 | + final RandomAccess< ? extends T > out = target.randomAccess(); |
| 64 | + final Runnable convolver = factory.getConvolver( in, out, direction, target.dimension( direction ) ); |
90 | 65 |
|
91 | | - /** |
92 | | - * {@link #forEachIntervalElementInParallel(ExecutorService, int, Interval, Supplier)} |
93 | | - * executes a given action for each position in a given interval. Therefor |
94 | | - * it starts the specified number of tasks. Each tasks calls the action |
95 | | - * factory once, to get an instance of the action that should be executed. |
96 | | - * The action is then called multiple times by the task. |
97 | | - * |
98 | | - * @param service |
99 | | - * {@link ExecutorService} used to create the tasks. |
100 | | - * @param numTasks |
101 | | - * number of tasks to use. |
102 | | - * @param interval |
103 | | - * interval to iterate over. |
104 | | - * @param actionFactory |
105 | | - * factory that returns the action to be executed. |
106 | | - */ |
107 | | - // TODO: move to a better place |
108 | | - public static void forEachIntervalElementInParallel( final ExecutorService service, final int numTasks, final Interval interval, |
109 | | - final Supplier< Consumer< Localizable > > actionFactory ) |
110 | | - { |
111 | | - final long[] min = Intervals.minAsLongArray( interval ); |
112 | | - final long[] dim = Intervals.dimensionsAsLongArray( interval ); |
113 | | - final long size = Intervals.numElements( dim ); |
114 | | - final int boundedNumTasks = (int) Math.max( 1, Math.min(size, numTasks )); |
115 | | - final long taskSize = ( size - 1 ) / boundedNumTasks + 1; // taskSize = roundUp(size / boundedNumTasks); |
116 | | - final ArrayList< Callable< Void > > callables = new ArrayList<>(); |
| 66 | + chunk.forEachPixel( position -> { |
| 67 | + in.setPosition( sourceMin ); |
| 68 | + out.setPosition( targetMin ); |
| 69 | + in.move( position ); |
| 70 | + out.move( position ); |
| 71 | + convolver.run(); |
| 72 | + } ); |
117 | 73 |
|
118 | | - for ( int taskNum = 0; taskNum < boundedNumTasks; ++taskNum ) |
119 | | - { |
120 | | - final long myStartIndex = taskNum * taskSize; |
121 | | - final long myEndIndex = Math.min( size, myStartIndex + taskSize ); |
122 | | - final Callable< Void > r = () -> { |
123 | | - final Consumer< Localizable > action = actionFactory.get(); |
124 | | - final long[] position = new long[ dim.length ]; |
125 | | - final Localizable localizable = Point.wrap( position ); |
126 | | - for ( long index = myStartIndex; index < myEndIndex; ++index ) |
127 | | - { |
128 | | - IntervalIndexer.indexToPositionWithOffset( index, dim, min, position ); |
129 | | - action.accept( localizable ); |
| 74 | + return null; |
130 | 75 | } |
131 | | - return null; |
132 | | - }; |
133 | | - callables.add( r ); |
134 | | - } |
135 | | - execute( service, callables ); |
136 | | - } |
137 | | - |
138 | | - private static void execute( final ExecutorService service, final ArrayList< Callable< Void > > callables ) |
139 | | - { |
140 | | - try |
141 | | - { |
142 | | - final List< Future< Void > > futures = service.invokeAll( callables ); |
143 | | - for ( final Future< Void > future : futures ) |
144 | | - future.get(); |
145 | | - } |
146 | | - catch ( final InterruptedException | ExecutionException e ) |
147 | | - { |
148 | | - final Throwable cause = e.getCause(); |
149 | | - if ( cause instanceof RuntimeException ) |
150 | | - throw ( RuntimeException ) cause; |
151 | | - throw new RuntimeException( e ); |
152 | | - } |
| 76 | + ); |
153 | 77 | } |
154 | 78 | } |
0 commit comments