Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ import com.typesafe.config.{ Config, ConfigFactory }
override def terminate(): Unit = terminationPromise.trySuccess(Done)
override def whenTerminated: Future[Done] = terminationPromise.future
override def getWhenTerminated: CompletionStage[Done] = whenTerminated.asJava
override def close(): Unit = {
terminate()
Await.result(whenTerminated, scala.concurrent.duration.Duration.Inf)
}
override val startTime: Long = System.currentTimeMillis()
override def uptime: Long = System.currentTimeMillis() - startTime
override def threadFactory: java.util.concurrent.ThreadFactory = new ThreadFactory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.util.concurrent.CompletionStage;
import org.apache.pekko.testkit.PekkoJUnitActorSystemResource;
Expand Down Expand Up @@ -47,4 +48,14 @@ public void testGetWhenTerminated() throws Exception {
public void testGetWhenTerminatedWithoutTermination() {
assertFalse(system.getWhenTerminated().toCompletableFuture().isDone());
}

@Test
public void testTryWithResources() throws Exception {
ActorSystem system = null;
try (ActorSystem actorSystem = ActorSystem.create()) {
system = actorSystem;
}
final CompletionStage<Terminated> cs = system.getWhenTerminated();
assertTrue(cs.toCompletableFuture().isDone());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.annotation.nowarn
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._
import scala.util.Using

import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks

Expand Down Expand Up @@ -263,7 +264,7 @@ class ActorSystemSpec extends PekkoSpec(ActorSystemSpec.config) with ImplicitSen

"throw RejectedExecutionException when shutdown" in {
val system2 = ActorSystem("RejectedExecution-1", PekkoSpec.testConf)
Await.ready(system2.terminate(), 10.seconds)
system2.close()

intercept[RejectedExecutionException] {
system2.registerOnTermination { println("IF YOU SEE THIS THEN THERE'S A BUG HERE") }
Expand Down Expand Up @@ -334,6 +335,20 @@ class ActorSystemSpec extends PekkoSpec(ActorSystemSpec.config) with ImplicitSen
}
}

"close method terminates ActorSystem" in {
val system = ActorSystem()
system.close()
system.whenTerminated.isCompleted should ===(true)
}

"Scala's Using automatically terminates ActorSystem" in {
var currentSystem: ActorSystem = null
Using(ActorSystem()) { system =>
currentSystem = system
}
currentSystem.whenTerminated.isCompleted should ===(true)
}

"allow configuration of guardian supervisor strategy" in {
implicit val system =
ActorSystem(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class DispatcherShutdownSpec extends AnyWordSpec with Matchers {
val system = ActorSystem("DispatcherShutdownSpec")
threadCount should be > 0

Await.ready(system.terminate(), 1.second)
system.close()
Await.ready(Future(pekko.Done)(system.dispatcher), 1.second)

TestKit.awaitCond(threadCount == 0, 3.second)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.util.concurrent.CompletionStage;
import org.apache.pekko.Done;
Expand All @@ -39,4 +40,15 @@ public void testGetWhenTerminatedWithoutTermination() {
ActorSystem.create(Behaviors.empty(), "GetWhenTerminatedWithoutTermination");
assertFalse(system.getWhenTerminated().toCompletableFuture().isDone());
}

@Test
public void testTryWithResources() throws Exception {
ActorSystem<Void> system = null;
try (ActorSystem<Void> actorSystem =
ActorSystem.create(Behaviors.empty(), "TryWithResourcesSystem")) {
system = actorSystem;
}
final CompletionStage<Done> cs = system.getWhenTerminated();
assertTrue(cs.toCompletableFuture().isDone());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.pekko.actor.typed

import scala.annotation.nowarn
import scala.util.Using

import org.apache.pekko
import pekko.actor.typed.scaladsl.Behaviors
Expand Down Expand Up @@ -45,5 +46,19 @@ class ActorSystemSpec extends PekkoSpec {
system.terminate()
}
}

"close method terminates ActorSystem" in {
val system = ActorSystem(Behaviors.empty[String], "close-terminates-system")
system.close()
system.whenTerminated.isCompleted should ===(true)
}

"Scala's Using automatically terminates ActorSystem" in {
var currentSystem: ActorSystem[Nothing] = null
Using(ActorSystem(Behaviors.empty[String], "using-terminates-system")) { system =>
currentSystem = system
}
currentSystem.whenTerminated.isCompleted should ===(true)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# Add close to ActorSystem
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.actor.typed.ActorSystem.close")
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.pekko.actor.typed.ActorSystem.close")
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

package org.apache.pekko.actor.typed

import java.util.concurrent.{ CompletionStage, ThreadFactory }
import java.util.concurrent.{ CompletionStage, ThreadFactory, TimeoutException }

import scala.concurrent.{ ExecutionContextExecutor, Future }

Expand Down Expand Up @@ -42,7 +42,7 @@ import com.typesafe.config.{ Config, ConfigFactory }
* Not for user extension.
*/
@DoNotInherit
abstract class ActorSystem[-T] extends ActorRef[T] with Extensions with ClassicActorSystemProvider {
abstract class ActorSystem[-T] extends ActorRef[T] with Extensions with ClassicActorSystemProvider with AutoCloseable {
this: InternalRecipientRef[T] =>

/**
Expand Down Expand Up @@ -147,6 +147,26 @@ abstract class ActorSystem[-T] extends ActorRef[T] with Extensions with ClassicA
*/
def getWhenTerminated: CompletionStage[Done]

/**
* Terminates this actor system by running [[pekko.actor.CoordinatedShutdown]] with reason
* [[pekko.actor.CoordinatedShutdown.ActorSystemTerminateReason]]. This method will block
* until either the actor system is terminated or
* `pekko.coordinated-shutdown.close-actor-system-timeout` timeout duration is
* passed, in which case a [[TimeoutException]] is thrown.
*
* If `pekko.coordinated-shutdown.run-by-actor-system-terminate` is configured to `off`
* it will not run `CoordinatedShutdown`, but the `ActorSystem` and its actors
* will still be terminated.
*
* This will stop the guardian actor, which in turn
* will recursively stop all its child actors, and finally the system guardian
* (below which the logging actors reside) and then execute all registered
* termination handlers (see [[pekko.actor.ActorSystem.registerOnTermination]]).
* @since 1.3.0
*/
@throws(classOf[TimeoutException])
override def close(): Unit

/**
* The deadLetter address is a destination that will accept (and discard)
* every message sent to it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ import org.slf4j.{ Logger, LoggerFactory }
override lazy val getWhenTerminated: CompletionStage[pekko.Done] =
whenTerminated.asJava

override def close(): Unit = system.close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@throws(classOf[TimeoutException])

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as #2486 (comment), this is the implementation and end users won't see it (they will see the abstract method)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add @since 1.3.0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as #2486 (comment), this is not part of public API


override def systemActorOf[U](behavior: Behavior[U], name: String, props: Props): ActorRef[U] = {
val ref = system.systemActorOf(
PropsAdapter(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# Add close to ActorSystem
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.pekko.actor.ActorSystem.close")
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.pekko.actor.ExtendedActorSystem.close")
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.actor.ActorSystem.close")
6 changes: 6 additions & 0 deletions actor/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1244,6 +1244,12 @@ pekko {
# Terminate the ActorSystem in the last phase actor-system-terminate.
terminate-actor-system = on

# The timeout that will be used when calling .close on an ActorSystem.
# This timeout will also be used when ActorSystem's are automatically
# terminated by using Java's try-with-resources or Scala's
# scala.util.Using
close-actor-system-timeout = 60 s
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ForkJoinPool's default is 1Day

Copy link
Contributor Author

@mdedetrich mdedetrich Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1Day is way too much, people calling close() will expect it to do so in a reasonable time especially if they are doing it before a pod/server/instance is shutting down.

1 minute is a good default in my view


# Exit the JVM (System.exit(0)) in the last phase actor-system-terminate
# if this is set to 'on'. It is done after termination of the
# ActorSystem if terminate-actor-system=on, otherwise it is done
Expand Down
32 changes: 30 additions & 2 deletions actor/src/main/scala/org/apache/pekko/actor/ActorSystem.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.util.concurrent.atomic.AtomicReference

import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor, Future, Promise }
import scala.concurrent.{ Await, ExecutionContext, ExecutionContextExecutor, Future, Promise }
import scala.concurrent.blocking
import scala.concurrent.duration.Duration
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -524,7 +524,7 @@ object ActorSystem {
* extending [[pekko.actor.ExtendedActorSystem]] instead, but beware that you
* are completely on your own in that case!
*/
abstract class ActorSystem extends ActorRefFactory with ClassicActorSystemProvider {
abstract class ActorSystem extends ActorRefFactory with ClassicActorSystemProvider with AutoCloseable {
import ActorSystem._

/**
Expand Down Expand Up @@ -677,6 +677,26 @@ abstract class ActorSystem extends ActorRefFactory with ClassicActorSystemProvid
*/
def terminate(): Future[Terminated]

/**
* Terminates this actor system by running [[CoordinatedShutdown]] with reason
* [[CoordinatedShutdown.ActorSystemTerminateReason]]. This method will block
* until either the actor system is terminated or
* `pekko.coordinated-shutdown.close-actor-system-timeout` timeout duration is
* passed, in which case a [[TimeoutException]] is thrown.
*
* If `pekko.coordinated-shutdown.run-by-actor-system-terminate` is configured to `off`
* it will not run `CoordinatedShutdown`, but the `ActorSystem` and its actors
* will still be terminated.
*
* This will stop the guardian actor, which in turn
* will recursively stop all its child actors, and finally the system guardian
* (below which the logging actors reside) and then execute all registered
* termination handlers (see [[ActorSystem#registerOnTermination]]).
* @since 1.3.0
*/
@throws(classOf[TimeoutException])
override def close(): Unit

/**
* Returns a Future which will be completed after the ActorSystem has been terminated
* and termination hooks have been executed. If you registered any callback with
Expand Down Expand Up @@ -1080,6 +1100,14 @@ private[pekko] class ActorSystemImpl(
whenTerminated
}

override def close(): Unit = {
Copy link
Member

@He-Pin He-Pin Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@throws(classOf[TimeoutException])

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the implementation, the parent which defines the abstract method (ActorSystem) has @throws(classOf[TimeoutException]) which is what matters for users, I think thats enough?

terminate()
val duration =
Duration(settings.config.getDuration("pekko.coordinated-shutdown.close-actor-system-timeout").toMillis,
TimeUnit.MILLISECONDS)
Await.result(whenTerminated, duration)
}

override private[pekko] def finalTerminate(): Unit = {
terminating = true
// these actions are idempotent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ package org.apache.pekko.actor

import java.util.concurrent.TimeUnit

import scala.concurrent.Await
import scala.concurrent.duration._

import BenchmarkActors._
import org.openjdk.jmh.annotations._

Expand Down Expand Up @@ -100,10 +97,8 @@ class ActorBenchmark {
}

@TearDown(Level.Trial)
def shutdown(): Unit = {
system.terminate()
Await.ready(system.whenTerminated, 15.seconds)
}
def shutdown(): Unit =
system.close()

@Benchmark
@OperationsPerInvocation(totalMessages)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ package org.apache.pekko.actor

import java.util.concurrent.TimeUnit

import scala.concurrent.Await
import scala.concurrent.duration._

import org.openjdk.jmh.annotations._

/*
Expand Down Expand Up @@ -46,10 +43,8 @@ class ActorCreationBenchmark {
}

@TearDown(Level.Trial)
def shutdown(): Unit = {
system.terminate()
Await.ready(system.whenTerminated, 15.seconds)
}
def shutdown(): Unit =
system.close()

@Benchmark
@OutputTimeUnit(TimeUnit.MICROSECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ package org.apache.pekko.actor
import java.util.concurrent.TimeUnit

import scala.annotation.tailrec
import scala.concurrent.Await
import scala.concurrent.duration._

import BenchmarkActors._
import org.openjdk.jmh.annotations._
Expand Down Expand Up @@ -78,10 +76,8 @@ class ForkJoinActorBenchmark {
}

@TearDown(Level.Trial)
def shutdown(): Unit = {
system.terminate()
Await.ready(system.whenTerminated, 15.seconds)
}
def shutdown(): Unit =
system.close()

// @Benchmark
// @OperationsPerInvocation(totalMessagesTwoActors)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package org.apache.pekko.actor

import java.util.concurrent.TimeUnit

import scala.concurrent.Await
import scala.concurrent.duration._

import org.openjdk.jmh.annotations._
Expand All @@ -40,10 +39,8 @@ class RouterPoolCreationBenchmark {
var size = 0

@TearDown(Level.Trial)
def shutdown(): Unit = {
system.terminate()
Await.ready(system.whenTerminated, 15.seconds)
}
def shutdown(): Unit =
system.close()

@Benchmark
@OutputTimeUnit(TimeUnit.MICROSECONDS)
Expand Down
Loading