diff --git a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/internal/ActorSystemStub.scala b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/internal/ActorSystemStub.scala index 325cfb89c46..b1b77a7c040 100644 --- a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/internal/ActorSystemStub.scala +++ b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/internal/ActorSystemStub.scala @@ -113,6 +113,10 @@ import com.typesafe.config.{ Config, ConfigFactory } override def terminate(): Unit = terminationPromise.trySuccess(Done) override def whenTerminated: Future[Done] = terminationPromise.future override def getWhenTerminated: CompletionStage[Done] = whenTerminated.asJava + override def close(): Unit = { + terminate() + Await.result(whenTerminated, scala.concurrent.duration.Duration.Inf) + } override val startTime: Long = System.currentTimeMillis() override def uptime: Long = System.currentTimeMillis() - startTime override def threadFactory: java.util.concurrent.ThreadFactory = new ThreadFactory { diff --git a/actor-tests/src/test/java/org/apache/pekko/actor/ActorSystemTest.java b/actor-tests/src/test/java/org/apache/pekko/actor/ActorSystemTest.java index 1aff57e5c50..15f421d1f4b 100644 --- a/actor-tests/src/test/java/org/apache/pekko/actor/ActorSystemTest.java +++ b/actor-tests/src/test/java/org/apache/pekko/actor/ActorSystemTest.java @@ -15,6 +15,7 @@ import static java.util.concurrent.TimeUnit.SECONDS; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import java.util.concurrent.CompletionStage; import org.apache.pekko.testkit.PekkoJUnitActorSystemResource; @@ -47,4 +48,14 @@ public void testGetWhenTerminated() throws Exception { public void testGetWhenTerminatedWithoutTermination() { assertFalse(system.getWhenTerminated().toCompletableFuture().isDone()); } + + @Test + public void testTryWithResources() throws Exception { + ActorSystem system = null; + try (ActorSystem actorSystem = ActorSystem.create()) { + system = actorSystem; + } + final CompletionStage cs = system.getWhenTerminated(); + assertTrue(cs.toCompletableFuture().isDone()); + } } diff --git a/actor-tests/src/test/scala/org/apache/pekko/actor/ActorSystemSpec.scala b/actor-tests/src/test/scala/org/apache/pekko/actor/ActorSystemSpec.scala index 022c600eba6..d791552dc50 100644 --- a/actor-tests/src/test/scala/org/apache/pekko/actor/ActorSystemSpec.scala +++ b/actor-tests/src/test/scala/org/apache/pekko/actor/ActorSystemSpec.scala @@ -19,6 +19,7 @@ import java.util.concurrent.atomic.AtomicInteger import scala.annotation.nowarn import scala.concurrent.{ Await, Future } import scala.concurrent.duration._ +import scala.util.Using import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks @@ -263,7 +264,7 @@ class ActorSystemSpec extends PekkoSpec(ActorSystemSpec.config) with ImplicitSen "throw RejectedExecutionException when shutdown" in { val system2 = ActorSystem("RejectedExecution-1", PekkoSpec.testConf) - Await.ready(system2.terminate(), 10.seconds) + system2.close() intercept[RejectedExecutionException] { system2.registerOnTermination { println("IF YOU SEE THIS THEN THERE'S A BUG HERE") } @@ -334,6 +335,20 @@ class ActorSystemSpec extends PekkoSpec(ActorSystemSpec.config) with ImplicitSen } } + "close method terminates ActorSystem" in { + val system = ActorSystem() + system.close() + system.whenTerminated.isCompleted should ===(true) + } + + "Scala's Using automatically terminates ActorSystem" in { + var currentSystem: ActorSystem = null + Using(ActorSystem()) { system => + currentSystem = system + } + currentSystem.whenTerminated.isCompleted should ===(true) + } + "allow configuration of guardian supervisor strategy" in { implicit val system = ActorSystem( diff --git a/actor-tests/src/test/scala/org/apache/pekko/dispatch/DispatcherShutdownSpec.scala b/actor-tests/src/test/scala/org/apache/pekko/dispatch/DispatcherShutdownSpec.scala index 2e00f0195bf..a05f3cd5a13 100644 --- a/actor-tests/src/test/scala/org/apache/pekko/dispatch/DispatcherShutdownSpec.scala +++ b/actor-tests/src/test/scala/org/apache/pekko/dispatch/DispatcherShutdownSpec.scala @@ -45,7 +45,7 @@ class DispatcherShutdownSpec extends AnyWordSpec with Matchers { val system = ActorSystem("DispatcherShutdownSpec") threadCount should be > 0 - Await.ready(system.terminate(), 1.second) + system.close() Await.ready(Future(pekko.Done)(system.dispatcher), 1.second) TestKit.awaitCond(threadCount == 0, 3.second) diff --git a/actor-typed-tests/src/test/java/org/apache/pekko/actor/typed/ActorSystemTest.java b/actor-typed-tests/src/test/java/org/apache/pekko/actor/typed/ActorSystemTest.java index 84608858816..99dbd65ad5e 100644 --- a/actor-typed-tests/src/test/java/org/apache/pekko/actor/typed/ActorSystemTest.java +++ b/actor-typed-tests/src/test/java/org/apache/pekko/actor/typed/ActorSystemTest.java @@ -15,6 +15,7 @@ import static java.util.concurrent.TimeUnit.SECONDS; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import java.util.concurrent.CompletionStage; import org.apache.pekko.Done; @@ -39,4 +40,15 @@ public void testGetWhenTerminatedWithoutTermination() { ActorSystem.create(Behaviors.empty(), "GetWhenTerminatedWithoutTermination"); assertFalse(system.getWhenTerminated().toCompletableFuture().isDone()); } + + @Test + public void testTryWithResources() throws Exception { + ActorSystem system = null; + try (ActorSystem actorSystem = + ActorSystem.create(Behaviors.empty(), "TryWithResourcesSystem")) { + system = actorSystem; + } + final CompletionStage cs = system.getWhenTerminated(); + assertTrue(cs.toCompletableFuture().isDone()); + } } diff --git a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/ActorSystemSpec.scala b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/ActorSystemSpec.scala index 2cab64f2d8e..2fb49c5ffb0 100644 --- a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/ActorSystemSpec.scala +++ b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/ActorSystemSpec.scala @@ -18,6 +18,7 @@ package org.apache.pekko.actor.typed import scala.annotation.nowarn +import scala.util.Using import org.apache.pekko import pekko.actor.typed.scaladsl.Behaviors @@ -45,5 +46,19 @@ class ActorSystemSpec extends PekkoSpec { system.terminate() } } + + "close method terminates ActorSystem" in { + val system = ActorSystem(Behaviors.empty[String], "close-terminates-system") + system.close() + system.whenTerminated.isCompleted should ===(true) + } + + "Scala's Using automatically terminates ActorSystem" in { + var currentSystem: ActorSystem[Nothing] = null + Using(ActorSystem(Behaviors.empty[String], "using-terminates-system")) { system => + currentSystem = system + } + currentSystem.whenTerminated.isCompleted should ===(true) + } } } diff --git a/actor-typed/src/main/mima-filters/1.3.x.backwards.excludes/actorsystem-close.excludes b/actor-typed/src/main/mima-filters/1.3.x.backwards.excludes/actorsystem-close.excludes new file mode 100644 index 00000000000..c41998e1bd0 --- /dev/null +++ b/actor-typed/src/main/mima-filters/1.3.x.backwards.excludes/actorsystem-close.excludes @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Add close to ActorSystem +ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.actor.typed.ActorSystem.close") +ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.pekko.actor.typed.ActorSystem.close") diff --git a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/ActorSystem.scala b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/ActorSystem.scala index 4610e563c0e..d8a3b74913d 100644 --- a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/ActorSystem.scala +++ b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/ActorSystem.scala @@ -13,7 +13,7 @@ package org.apache.pekko.actor.typed -import java.util.concurrent.{ CompletionStage, ThreadFactory } +import java.util.concurrent.{ CompletionStage, ThreadFactory, TimeoutException } import scala.concurrent.{ ExecutionContextExecutor, Future } @@ -42,7 +42,7 @@ import com.typesafe.config.{ Config, ConfigFactory } * Not for user extension. */ @DoNotInherit -abstract class ActorSystem[-T] extends ActorRef[T] with Extensions with ClassicActorSystemProvider { +abstract class ActorSystem[-T] extends ActorRef[T] with Extensions with ClassicActorSystemProvider with AutoCloseable { this: InternalRecipientRef[T] => /** @@ -147,6 +147,26 @@ abstract class ActorSystem[-T] extends ActorRef[T] with Extensions with ClassicA */ def getWhenTerminated: CompletionStage[Done] + /** + * Terminates this actor system by running [[pekko.actor.CoordinatedShutdown]] with reason + * [[pekko.actor.CoordinatedShutdown.ActorSystemTerminateReason]]. This method will block + * until either the actor system is terminated or + * `pekko.coordinated-shutdown.close-actor-system-timeout` timeout duration is + * passed, in which case a [[TimeoutException]] is thrown. + * + * If `pekko.coordinated-shutdown.run-by-actor-system-terminate` is configured to `off` + * it will not run `CoordinatedShutdown`, but the `ActorSystem` and its actors + * will still be terminated. + * + * This will stop the guardian actor, which in turn + * will recursively stop all its child actors, and finally the system guardian + * (below which the logging actors reside) and then execute all registered + * termination handlers (see [[pekko.actor.ActorSystem.registerOnTermination]]). + * @since 1.3.0 + */ + @throws(classOf[TimeoutException]) + override def close(): Unit + /** * The deadLetter address is a destination that will accept (and discard) * every message sent to it. diff --git a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/adapter/ActorSystemAdapter.scala b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/adapter/ActorSystemAdapter.scala index 8a1c8965aad..2cf322c8787 100644 --- a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/adapter/ActorSystemAdapter.scala +++ b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/adapter/ActorSystemAdapter.scala @@ -122,6 +122,8 @@ import org.slf4j.{ Logger, LoggerFactory } override lazy val getWhenTerminated: CompletionStage[pekko.Done] = whenTerminated.asJava + override def close(): Unit = system.close() + override def systemActorOf[U](behavior: Behavior[U], name: String, props: Props): ActorRef[U] = { val ref = system.systemActorOf( PropsAdapter( diff --git a/actor/src/main/mima-filters/1.3.x.backwards.excludes/actorsystem-close.excludes b/actor/src/main/mima-filters/1.3.x.backwards.excludes/actorsystem-close.excludes new file mode 100644 index 00000000000..1721d250acc --- /dev/null +++ b/actor/src/main/mima-filters/1.3.x.backwards.excludes/actorsystem-close.excludes @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Add close to ActorSystem +ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.pekko.actor.ActorSystem.close") +ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.pekko.actor.ExtendedActorSystem.close") +ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.actor.ActorSystem.close") diff --git a/actor/src/main/resources/reference.conf b/actor/src/main/resources/reference.conf index 9b12fb106b4..aa435557aee 100644 --- a/actor/src/main/resources/reference.conf +++ b/actor/src/main/resources/reference.conf @@ -1244,6 +1244,12 @@ pekko { # Terminate the ActorSystem in the last phase actor-system-terminate. terminate-actor-system = on + # The timeout that will be used when calling .close on an ActorSystem. + # This timeout will also be used when ActorSystem's are automatically + # terminated by using Java's try-with-resources or Scala's + # scala.util.Using + close-actor-system-timeout = 60 s + # Exit the JVM (System.exit(0)) in the last phase actor-system-terminate # if this is set to 'on'. It is done after termination of the # ActorSystem if terminate-actor-system=on, otherwise it is done diff --git a/actor/src/main/scala/org/apache/pekko/actor/ActorSystem.scala b/actor/src/main/scala/org/apache/pekko/actor/ActorSystem.scala index d1149817321..dd4cdfb885b 100644 --- a/actor/src/main/scala/org/apache/pekko/actor/ActorSystem.scala +++ b/actor/src/main/scala/org/apache/pekko/actor/ActorSystem.scala @@ -20,7 +20,7 @@ import java.util.concurrent.atomic.AtomicReference import scala.annotation.tailrec import scala.collection.immutable -import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor, Future, Promise } +import scala.concurrent.{ Await, ExecutionContext, ExecutionContextExecutor, Future, Promise } import scala.concurrent.blocking import scala.concurrent.duration.Duration import scala.jdk.CollectionConverters._ @@ -524,7 +524,7 @@ object ActorSystem { * extending [[pekko.actor.ExtendedActorSystem]] instead, but beware that you * are completely on your own in that case! */ -abstract class ActorSystem extends ActorRefFactory with ClassicActorSystemProvider { +abstract class ActorSystem extends ActorRefFactory with ClassicActorSystemProvider with AutoCloseable { import ActorSystem._ /** @@ -677,6 +677,26 @@ abstract class ActorSystem extends ActorRefFactory with ClassicActorSystemProvid */ def terminate(): Future[Terminated] + /** + * Terminates this actor system by running [[CoordinatedShutdown]] with reason + * [[CoordinatedShutdown.ActorSystemTerminateReason]]. This method will block + * until either the actor system is terminated or + * `pekko.coordinated-shutdown.close-actor-system-timeout` timeout duration is + * passed, in which case a [[TimeoutException]] is thrown. + * + * If `pekko.coordinated-shutdown.run-by-actor-system-terminate` is configured to `off` + * it will not run `CoordinatedShutdown`, but the `ActorSystem` and its actors + * will still be terminated. + * + * This will stop the guardian actor, which in turn + * will recursively stop all its child actors, and finally the system guardian + * (below which the logging actors reside) and then execute all registered + * termination handlers (see [[ActorSystem#registerOnTermination]]). + * @since 1.3.0 + */ + @throws(classOf[TimeoutException]) + override def close(): Unit + /** * Returns a Future which will be completed after the ActorSystem has been terminated * and termination hooks have been executed. If you registered any callback with @@ -1080,6 +1100,14 @@ private[pekko] class ActorSystemImpl( whenTerminated } + override def close(): Unit = { + terminate() + val duration = + Duration(settings.config.getDuration("pekko.coordinated-shutdown.close-actor-system-timeout").toMillis, + TimeUnit.MILLISECONDS) + Await.result(whenTerminated, duration) + } + override private[pekko] def finalTerminate(): Unit = { terminating = true // these actions are idempotent diff --git a/bench-jmh/src/main/scala/org/apache/pekko/actor/ActorBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/actor/ActorBenchmark.scala index 8d3e9a74d50..360b450433f 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/actor/ActorBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/actor/ActorBenchmark.scala @@ -15,9 +15,6 @@ package org.apache.pekko.actor import java.util.concurrent.TimeUnit -import scala.concurrent.Await -import scala.concurrent.duration._ - import BenchmarkActors._ import org.openjdk.jmh.annotations._ @@ -100,10 +97,8 @@ class ActorBenchmark { } @TearDown(Level.Trial) - def shutdown(): Unit = { - system.terminate() - Await.ready(system.whenTerminated, 15.seconds) - } + def shutdown(): Unit = + system.close() @Benchmark @OperationsPerInvocation(totalMessages) diff --git a/bench-jmh/src/main/scala/org/apache/pekko/actor/ActorCreationBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/actor/ActorCreationBenchmark.scala index 05317241e97..ddefd5f775a 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/actor/ActorCreationBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/actor/ActorCreationBenchmark.scala @@ -15,9 +15,6 @@ package org.apache.pekko.actor import java.util.concurrent.TimeUnit -import scala.concurrent.Await -import scala.concurrent.duration._ - import org.openjdk.jmh.annotations._ /* @@ -46,10 +43,8 @@ class ActorCreationBenchmark { } @TearDown(Level.Trial) - def shutdown(): Unit = { - system.terminate() - Await.ready(system.whenTerminated, 15.seconds) - } + def shutdown(): Unit = + system.close() @Benchmark @OutputTimeUnit(TimeUnit.MICROSECONDS) diff --git a/bench-jmh/src/main/scala/org/apache/pekko/actor/ForkJoinActorBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/actor/ForkJoinActorBenchmark.scala index 1d0d398c05b..04e846406ac 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/actor/ForkJoinActorBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/actor/ForkJoinActorBenchmark.scala @@ -16,8 +16,6 @@ package org.apache.pekko.actor import java.util.concurrent.TimeUnit import scala.annotation.tailrec -import scala.concurrent.Await -import scala.concurrent.duration._ import BenchmarkActors._ import org.openjdk.jmh.annotations._ @@ -78,10 +76,8 @@ class ForkJoinActorBenchmark { } @TearDown(Level.Trial) - def shutdown(): Unit = { - system.terminate() - Await.ready(system.whenTerminated, 15.seconds) - } + def shutdown(): Unit = + system.close() // @Benchmark // @OperationsPerInvocation(totalMessagesTwoActors) diff --git a/bench-jmh/src/main/scala/org/apache/pekko/actor/RouterPoolCreationBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/actor/RouterPoolCreationBenchmark.scala index 097f333b1c3..bec9bb0a1bb 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/actor/RouterPoolCreationBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/actor/RouterPoolCreationBenchmark.scala @@ -15,7 +15,6 @@ package org.apache.pekko.actor import java.util.concurrent.TimeUnit -import scala.concurrent.Await import scala.concurrent.duration._ import org.openjdk.jmh.annotations._ @@ -40,10 +39,8 @@ class RouterPoolCreationBenchmark { var size = 0 @TearDown(Level.Trial) - def shutdown(): Unit = { - system.terminate() - Await.ready(system.whenTerminated, 15.seconds) - } + def shutdown(): Unit = + system.close() @Benchmark @OutputTimeUnit(TimeUnit.MICROSECONDS) diff --git a/bench-jmh/src/main/scala/org/apache/pekko/actor/ScheduleBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/actor/ScheduleBenchmark.scala index 3cdba681937..97ce680f5ad 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/actor/ScheduleBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/actor/ScheduleBenchmark.scala @@ -52,10 +52,8 @@ class ScheduleBenchmark { } @TearDown - def shutdown(): Unit = { - system.terminate() - Await.ready(system.whenTerminated, 15.seconds) - } + def shutdown(): Unit = + system.close() def op(idx: Int) = if (idx == winner) promise.trySuccess(idx) else idx diff --git a/bench-jmh/src/main/scala/org/apache/pekko/actor/StashCreationBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/actor/StashCreationBenchmark.scala index ee477beb047..09d5b4c733b 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/actor/StashCreationBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/actor/StashCreationBenchmark.scala @@ -15,9 +15,6 @@ package org.apache.pekko.actor import java.util.concurrent.TimeUnit -import scala.concurrent.Await -import scala.concurrent.duration._ - import org.openjdk.jmh.annotations._ import org.apache.pekko.testkit.TestProbe @@ -49,10 +46,8 @@ class StashCreationBenchmark { val probe = TestProbe() @TearDown(Level.Trial) - def shutdown(): Unit = { - system.terminate() - Await.ready(system.whenTerminated, 15.seconds) - } + def shutdown(): Unit = + system.close() @Benchmark @OutputTimeUnit(TimeUnit.MICROSECONDS) diff --git a/bench-jmh/src/main/scala/org/apache/pekko/actor/TellOnlyBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/actor/TellOnlyBenchmark.scala index 722fcbf8dee..9933c0b26c6 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/actor/TellOnlyBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/actor/TellOnlyBenchmark.scala @@ -15,7 +15,6 @@ package org.apache.pekko.actor import java.util.concurrent.TimeUnit -import scala.concurrent.Await import scala.concurrent.duration._ import org.openjdk.jmh.annotations._ @@ -65,10 +64,8 @@ class TellOnlyBenchmark { } @TearDown(Level.Trial) - def shutdown(): Unit = { - system.terminate() - Await.ready(system.whenTerminated, 15.seconds) - } + def shutdown(): Unit = + system.close() var actor: ActorRef = _ var probe: TestProbe = _ diff --git a/bench-jmh/src/main/scala/org/apache/pekko/actor/typed/TypedActorBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/actor/typed/TypedActorBenchmark.scala index 1760a013e5d..d09e70c44fb 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/actor/typed/TypedActorBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/actor/typed/TypedActorBenchmark.scala @@ -100,10 +100,8 @@ class TypedActorBenchmark { } @TearDown(Level.Trial) - def shutdown(): Unit = { - system.terminate() - Await.ready(system.whenTerminated, 15.seconds) - } + def shutdown(): Unit = + system.close() @Benchmark @OperationsPerInvocation(totalMessages) diff --git a/bench-jmh/src/main/scala/org/apache/pekko/actor/typed/TypedForkJoinActorBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/actor/typed/TypedForkJoinActorBenchmark.scala index b986a355ea0..8ac10da4c35 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/actor/typed/TypedForkJoinActorBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/actor/typed/TypedForkJoinActorBenchmark.scala @@ -117,10 +117,8 @@ class TypedForkJoinActorBenchmark { } @TearDown(Level.Trial) - def shutdown(): Unit = { - system.terminate() - Await.ready(system.whenTerminated, 15.seconds) - } + def shutdown(): Unit = + system.close() } object TypedForkJoinActorBenchmark { diff --git a/bench-jmh/src/main/scala/org/apache/pekko/actor/typed/delivery/ReliableDeliveryBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/actor/typed/delivery/ReliableDeliveryBenchmark.scala index d8704548d70..654bbaf6b42 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/actor/typed/delivery/ReliableDeliveryBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/actor/typed/delivery/ReliableDeliveryBenchmark.scala @@ -237,10 +237,8 @@ class ReliableDeliveryBenchmark { } @TearDown(Level.Trial) - def shutdown(): Unit = { - system.terminate() - Await.ready(system.whenTerminated, 15.seconds) - } + def shutdown(): Unit = + system.close() @Benchmark @OperationsPerInvocation(messagesPerOperation) diff --git a/bench-jmh/src/main/scala/org/apache/pekko/persistence/LevelDbBatchingBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/persistence/LevelDbBatchingBenchmark.scala index e129d0f5a18..d133f43ed90 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/persistence/LevelDbBatchingBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/persistence/LevelDbBatchingBenchmark.scala @@ -17,8 +17,6 @@ import java.io.File import java.util.concurrent.TimeUnit import scala.annotation.nowarn -import scala.concurrent.Await -import scala.concurrent.duration._ import org.apache.commons.io.FileUtils import org.openjdk.jmh.annotations._ @@ -75,8 +73,7 @@ class LevelDbBatchingBenchmark { store ! PoisonPill Thread.sleep(500) - sys.terminate() - Await.ready(sys.whenTerminated, 10.seconds) + sys.close() } @Benchmark diff --git a/bench-jmh/src/main/scala/org/apache/pekko/persistence/PersistenceActorDeferBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/persistence/PersistenceActorDeferBenchmark.scala index 31ee2d4fc53..73282ae8cd8 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/persistence/PersistenceActorDeferBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/persistence/PersistenceActorDeferBenchmark.scala @@ -15,9 +15,6 @@ package org.apache.pekko.persistence import java.io.File -import scala.concurrent.Await -import scala.concurrent.duration._ - import org.apache.commons.io.FileUtils import org.openjdk.jmh.annotations._ import org.openjdk.jmh.annotations.Scope @@ -71,8 +68,7 @@ class PersistentActorDeferBenchmark { @TearDown def shutdown(): Unit = { - system.terminate() - Await.ready(system.whenTerminated, 15.seconds) + system.close() storageLocations.foreach(FileUtils.deleteDirectory) } diff --git a/bench-jmh/src/main/scala/org/apache/pekko/persistence/PersistentActorBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/persistence/PersistentActorBenchmark.scala index b69696fc050..1dbea3f4e6a 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/persistence/PersistentActorBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/persistence/PersistentActorBenchmark.scala @@ -15,9 +15,6 @@ package org.apache.pekko.persistence import java.io.File -import scala.concurrent.Await -import scala.concurrent.duration._ - import org.apache.commons.io.FileUtils import org.openjdk.jmh.annotations._ import org.openjdk.jmh.annotations.Scope @@ -69,8 +66,7 @@ class PersistentActorThroughputBenchmark { @TearDown def shutdown(): Unit = { - system.terminate() - Await.ready(system.whenTerminated, 15.seconds) + system.close() storageLocations.foreach(FileUtils.deleteDirectory) } diff --git a/bench-jmh/src/main/scala/org/apache/pekko/persistence/PersistentActorWithAtLeastOnceDeliveryBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/persistence/PersistentActorWithAtLeastOnceDeliveryBenchmark.scala index 4a8860c5386..bc67effab19 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/persistence/PersistentActorWithAtLeastOnceDeliveryBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/persistence/PersistentActorWithAtLeastOnceDeliveryBenchmark.scala @@ -15,7 +15,6 @@ package org.apache.pekko.persistence import java.io.File -import scala.concurrent.Await import scala.concurrent.duration._ import org.apache.commons.io.FileUtils @@ -72,8 +71,7 @@ class PersistentActorWithAtLeastOnceDeliveryBenchmark { @TearDown def shutdown(): Unit = { - system.terminate() - Await.ready(system.whenTerminated, 15.seconds) + system.close() storageLocations.foreach(FileUtils.deleteDirectory) } diff --git a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/PersistentShardingMigrationSpec.scala b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/PersistentShardingMigrationSpec.scala index abe10c610a0..49edf1ccbf7 100644 --- a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/PersistentShardingMigrationSpec.scala +++ b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/PersistentShardingMigrationSpec.scala @@ -12,9 +12,9 @@ */ package org.apache.pekko.cluster.sharding + import java.util.UUID -import scala.concurrent.Await import scala.concurrent.duration._ import org.apache.pekko @@ -184,7 +184,7 @@ class PersistentShardingMigrationSpec extends PekkoSpec(PersistentShardingMigrat extractShardId(rememberedEntitiesProbe.ref)) f(system, region, rememberedEntitiesProbe) } finally { - Await.ready(system.terminate(), 20.seconds) + system.close() } } diff --git a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/RememberEntitiesShardIdExtractorChangeSpec.scala b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/RememberEntitiesShardIdExtractorChangeSpec.scala index f73a0e1a4ac..2419ae053d2 100644 --- a/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/RememberEntitiesShardIdExtractorChangeSpec.scala +++ b/cluster-sharding/src/test/scala/org/apache/pekko/cluster/sharding/RememberEntitiesShardIdExtractorChangeSpec.scala @@ -15,9 +15,6 @@ package org.apache.pekko.cluster.sharding import java.util.UUID -import scala.concurrent.Await -import scala.concurrent.duration._ - import org.apache.pekko import pekko.actor.ActorRef import pekko.actor.ActorSystem @@ -145,7 +142,7 @@ class RememberEntitiesShardIdExtractorChangeSpec val region = ClusterSharding(system).start(TypeName, Props(new PA()), extractEntityId, extractShardId) f(system, region) } finally { - Await.ready(system.terminate(), 20.seconds) + system.close() } } diff --git a/cluster-tools/src/multi-jvm/scala/org/apache/pekko/cluster/client/ClusterClientStopSpec.scala b/cluster-tools/src/multi-jvm/scala/org/apache/pekko/cluster/client/ClusterClientStopSpec.scala index ccba3d8763e..548a8e797e1 100644 --- a/cluster-tools/src/multi-jvm/scala/org/apache/pekko/cluster/client/ClusterClientStopSpec.scala +++ b/cluster-tools/src/multi-jvm/scala/org/apache/pekko/cluster/client/ClusterClientStopSpec.scala @@ -14,7 +14,6 @@ package org.apache.pekko.cluster.client import scala.annotation.nowarn -import scala.concurrent.Await import scala.concurrent.duration._ import org.apache.pekko @@ -117,7 +116,7 @@ class ClusterClientStopSpec extends MultiNodeSpec(ClusterClientStopSpec) with ST runOn(first, second) { enterBarrier("was-in-contact") - Await.ready(system.terminate(), 10.seconds) + system.close() } diff --git a/cluster-typed/src/test/scala/org/apache/pekko/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala b/cluster-typed/src/test/scala/org/apache/pekko/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala index 15408b56ee6..88ce7f1a0eb 100644 --- a/cluster-typed/src/test/scala/org/apache/pekko/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala +++ b/cluster-typed/src/test/scala/org/apache/pekko/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala @@ -205,8 +205,7 @@ class ClusterReceptionistSpec extends AnyWordSpec with Matchers with LogCapturin if (down) { // abrupt termination - system2.terminate() - Await.ready(system2.whenTerminated, 10.seconds) + system2.close() clusterNode1.manager ! Down(clusterNode2.selfMember.address) } else { clusterNode1.manager ! Leave(clusterNode2.selfMember.address) @@ -368,8 +367,7 @@ class ClusterReceptionistSpec extends AnyWordSpec with Matchers with LogCapturin // abrupt termination but then a node with the same host:port comes online quickly system1.log.debug("Terminating system2: [{}]", clusterNode2.selfMember.uniqueAddress) - system2.terminate() - Await.ready(system2.whenTerminated, 10.seconds) + system2.close() val testKit3 = ActorTestKit( system1.name, diff --git a/distributed-data/src/multi-jvm/scala/org/apache/pekko/cluster/ddata/DurableDataSpec.scala b/distributed-data/src/multi-jvm/scala/org/apache/pekko/cluster/ddata/DurableDataSpec.scala index a06e7a22dbe..e051f785ed8 100644 --- a/distributed-data/src/multi-jvm/scala/org/apache/pekko/cluster/ddata/DurableDataSpec.scala +++ b/distributed-data/src/multi-jvm/scala/org/apache/pekko/cluster/ddata/DurableDataSpec.scala @@ -290,7 +290,7 @@ abstract class DurableDataSpec(multiNodeConfig: DurableDataSpecConfig) expectTerminated(r) } } finally { - Await.ready(sys1.terminate(), 10.seconds) + sys1.close() } val sys2 = ActorSystem( diff --git a/distributed-data/src/multi-jvm/scala/org/apache/pekko/cluster/ddata/DurablePruningSpec.scala b/distributed-data/src/multi-jvm/scala/org/apache/pekko/cluster/ddata/DurablePruningSpec.scala index 7e66be81425..8a3da0184e3 100644 --- a/distributed-data/src/multi-jvm/scala/org/apache/pekko/cluster/ddata/DurablePruningSpec.scala +++ b/distributed-data/src/multi-jvm/scala/org/apache/pekko/cluster/ddata/DurablePruningSpec.scala @@ -13,7 +13,6 @@ package org.apache.pekko.cluster.ddata -import scala.concurrent.Await import scala.concurrent.duration._ import org.apache.pekko @@ -147,7 +146,7 @@ class DurablePruningSpec extends MultiNodeSpec(DurablePruningSpec) with STMultiN } enterBarrier("removed") runOn(first) { - Await.ready(sys2.terminate(), 5.seconds) + sys2.close() } within(15.seconds) { diff --git a/docs/src/test/scala/docs/io/ReadBackPressure.scala b/docs/src/test/scala/docs/io/ReadBackPressure.scala index cab26fb9ac1..9ce9fae5b95 100644 --- a/docs/src/test/scala/docs/io/ReadBackPressure.scala +++ b/docs/src/test/scala/docs/io/ReadBackPressure.scala @@ -90,6 +90,6 @@ class PullReadingSpec extends PekkoSpec with ImplicitSender { client.send(connection, ResumeReading) client.expectMsg(Received(ByteString("hello"))) - Await.ready(system.terminate(), Duration.Inf) + system.close() } } diff --git a/persistence-query/src/test/scala/org/apache/pekko/persistence/query/PersistenceQuerySpec.scala b/persistence-query/src/test/scala/org/apache/pekko/persistence/query/PersistenceQuerySpec.scala index 44e3323ef23..6e04f54236d 100644 --- a/persistence-query/src/test/scala/org/apache/pekko/persistence/query/PersistenceQuerySpec.scala +++ b/persistence-query/src/test/scala/org/apache/pekko/persistence/query/PersistenceQuerySpec.scala @@ -15,9 +15,6 @@ package org.apache.pekko.persistence.query import java.util.concurrent.atomic.AtomicInteger -import scala.concurrent.Await -import scala.concurrent.duration._ - import org.apache.pekko import pekko.actor.ActorSystem import pekko.persistence.journal.{ EventSeq, ReadEventAdapter } @@ -102,7 +99,7 @@ class PersistenceQuerySpec extends AnyWordSpecLike with Matchers with BeforeAndA val sys = ActorSystem(s"sys-${systemCounter.incrementAndGet()}", config) try block(sys) - finally Await.ready(sys.terminate(), 10.seconds) + finally sys.close() } } diff --git a/persistence-shared/src/test/scala/org/apache/pekko/persistence/serialization/SerializerSpec.scala b/persistence-shared/src/test/scala/org/apache/pekko/persistence/serialization/SerializerSpec.scala index ae49b5dbfba..834fd124be1 100644 --- a/persistence-shared/src/test/scala/org/apache/pekko/persistence/serialization/SerializerSpec.scala +++ b/persistence-shared/src/test/scala/org/apache/pekko/persistence/serialization/SerializerSpec.scala @@ -16,9 +16,6 @@ package org.apache.pekko.persistence.serialization import java.io.NotSerializableException import java.util.UUID -import scala.concurrent.Await -import scala.concurrent.duration.Duration - import org.apache.commons.codec.binary.Hex.{ decodeHex, encodeHex } import org.apache.pekko @@ -348,9 +345,8 @@ class MessageSerializerRemotingSpec extends PekkoSpec(remote.withFallback(custom remoteSystem.actorOf(Props[RemoteActor](), "remote") } - override def afterTermination(): Unit = { - Await.ready(remoteSystem.terminate(), Duration.Inf) - } + override def afterTermination(): Unit = + remoteSystem.close() "A message serializer" must { "custom-serialize PersistentRepr messages during remoting" in { diff --git a/persistence/src/test/scala/org/apache/pekko/persistence/EndToEndEventAdapterSpec.scala b/persistence/src/test/scala/org/apache/pekko/persistence/EndToEndEventAdapterSpec.scala index 9620edc3ad4..81f169d1072 100644 --- a/persistence/src/test/scala/org/apache/pekko/persistence/EndToEndEventAdapterSpec.scala +++ b/persistence/src/test/scala/org/apache/pekko/persistence/EndToEndEventAdapterSpec.scala @@ -16,8 +16,6 @@ package org.apache.pekko.persistence import java.io.File import scala.annotation.nowarn -import scala.concurrent.Await -import scala.concurrent.duration._ import org.apache.commons.io.FileUtils @@ -187,7 +185,7 @@ class EndToEndEventAdapterSpec extends AnyWordSpecLike with Matchers with Before def withActorSystem[T](name: String, config: Config)(block: ActorSystem => T): T = { val system = ActorSystem(name, journalConfig.withFallback(config)) try block(system) - finally Await.ready(system.terminate(), 3.seconds) + finally system.close() } "EventAdapters in end-to-end scenarios" must { diff --git a/remote/src/test/scala/org/apache/pekko/remote/artery/SystemMessageDeliverySpec.scala b/remote/src/test/scala/org/apache/pekko/remote/artery/SystemMessageDeliverySpec.scala index f13ea78e8f0..73dff9928d6 100644 --- a/remote/src/test/scala/org/apache/pekko/remote/artery/SystemMessageDeliverySpec.scala +++ b/remote/src/test/scala/org/apache/pekko/remote/artery/SystemMessageDeliverySpec.scala @@ -184,7 +184,7 @@ class SystemMessageDeliverySpec extends AbstractSystemMessageDeliverySpec(System watch(remoteRef) remoteRef ! "hello" expectMsg("hello") - Await.ready(systemC.terminate(), 10.seconds) + systemC.close() system.log.debug("systemC terminated") // DeathWatchNotification is sent from systemC, failure detection takes longer than 3 seconds expectTerminated(remoteRef, 10.seconds)