@@ -10,6 +10,10 @@ import dev.openfeature.kotlin.sdk.Value
1010import dev.openfeature.kotlin.sdk.events.OpenFeatureProviderEvents
1111import dev.openfeature.kotlin.sdk.events.toOpenFeatureStatusError
1212import dev.openfeature.kotlin.sdk.exceptions.OpenFeatureError
13+ import kotlinx.coroutines.CancellationException
14+ import kotlinx.coroutines.CoroutineScope
15+ import kotlinx.coroutines.Job
16+ import kotlinx.coroutines.SupervisorJob
1317import kotlinx.coroutines.async
1418import kotlinx.coroutines.awaitAll
1519import kotlinx.coroutines.coroutineScope
@@ -21,6 +25,7 @@ import kotlinx.coroutines.flow.asStateFlow
2125import kotlinx.coroutines.flow.launchIn
2226import kotlinx.coroutines.flow.onEach
2327import kotlinx.coroutines.flow.update
28+ import kotlinx.coroutines.launch
2429
2530/* *
2631 * Type alias for a function that evaluates a feature flag using a FeatureProvider.
@@ -154,6 +159,8 @@ class MultiProvider(
154159 }
155160 }
156161
162+ private var observeProviderEventsJob: Job ? = null
163+
157164 /* *
158165 * @return Number of unique providers
159166 */
@@ -169,20 +176,27 @@ class MultiProvider(
169176 */
170177 override suspend fun initialize (initialContext : EvaluationContext ? ) {
171178 coroutineScope {
172- // Listen to events emitted by providers to emit our own set of events
173- // according to https://openfeature.dev/specification/appendix-a/#status-and-event-handling
174- childFeatureProviders.forEach { provider ->
175- provider.observe()
176- .onEach { event ->
177- handleProviderEvent(provider, event)
178- }
179- .launchIn(this )
179+ observeProviderEventsJob?.cancel(
180+ cause = CancellationException (" Observe provider events job cancelled due to new initialize call" )
181+ )
182+ observeProviderEventsJob = CoroutineScope (this .coroutineContext + SupervisorJob ()).launch {
183+ // Listen to events emitted by providers to emit our own set of events
184+ // according to https://openfeature.dev/specification/appendix-a/#status-and-event-handling
185+ childFeatureProviders.forEach { provider ->
186+ provider.observe()
187+ .onEach { event ->
188+ handleProviderEvent(provider, event)
189+ }
190+ .launchIn(this )
191+ }
180192 }
181193
182- // State updates captured by observing individual Feature Flag providers
183- childFeatureProviders
184- .map { async { it.initialize(initialContext) } }
185- .awaitAll()
194+ launch {
195+ // State updates captured by observing individual Feature Flag providers
196+ childFeatureProviders
197+ .map { async { it.initialize(initialContext) } }
198+ .awaitAll()
199+ }
186200 }
187201 }
188202
@@ -221,6 +235,10 @@ class MultiProvider(
221235 * This allows providers to clean up resources and complete any pending operations.
222236 */
223237 override fun shutdown () {
238+ observeProviderEventsJob?.cancel(
239+ cause = CancellationException (" Observe provider events job cancelled due to shutdown" )
240+ )
241+
224242 val shutdownErrors = mutableListOf<Pair <String , Throwable >>()
225243 childFeatureProviders.forEach { provider ->
226244 try {
0 commit comments