Skip to content

Commit 3d81a9e

Browse files
authored
Merge pull request #93 from SwissBorg/improve-read-efficiency
Improve events by tag query
2 parents 350a548 + 18d72b3 commit 3d81a9e

File tree

8 files changed

+138
-43
lines changed

8 files changed

+138
-43
lines changed

core/src/main/scala/akka/persistence/postgres/query/dao/ByteArrayReadJournalDao.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import slick.jdbc.JdbcBackend._
2020

2121
import scala.collection.immutable._
2222
import scala.concurrent.{ ExecutionContext, Future }
23-
import scala.util.{ Failure, Success, Try }
23+
import scala.util.Try
2424

2525
trait BaseByteArrayReadJournalDao extends ReadJournalDao with BaseJournalDaoWithReadMessages {
2626
def db: Database
@@ -40,8 +40,7 @@ trait BaseByteArrayReadJournalDao extends ReadJournalDao with BaseJournalDaoWith
4040
maxOffset: Long,
4141
max: Long): Source[Try[(PersistentRepr, Long)], NotUsed] = {
4242
val publisher: Int => DatabasePublisher[JournalRow] = tagId =>
43-
db.stream(queries.eventsByTag(List(tagId), offset, maxOffset, max).result)
44-
// applies workaround for https://github.com/akka/akka-persistence-jdbc/issues/168
43+
db.stream(queries.eventsByTag(List(tagId), offset, maxOffset).result)
4544
Source
4645
.future(tagIdResolver.lookupIdFor(tag))
4746
.flatMapConcat(_.fold(Source.empty[JournalRow])(tagId => Source.fromPublisher(publisher(tagId))))

core/src/main/scala/akka/persistence/postgres/query/dao/ReadJournalQueries.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,11 @@ class ReadJournalQueries(val readJournalConfig: ReadJournalConfig) {
4040
protected def _eventsByTag(
4141
tag: Rep[List[Int]],
4242
offset: ConstColumn[Long],
43-
maxOffset: ConstColumn[Long],
44-
max: ConstColumn[Long]) = {
43+
maxOffset: ConstColumn[Long]): Query[JournalTable, JournalRow, Seq] = {
4544
baseTableQuery()
4645
.filter(_.tags @> tag)
4746
.sortBy(_.ordering.asc)
4847
.filter(row => row.ordering > offset && row.ordering <= maxOffset)
49-
.take(max)
5048
}
5149

5250
val eventsByTag = Compiled(_eventsByTag _)

core/src/main/scala/akka/persistence/postgres/query/scaladsl/PostgresReadJournal.scala

Lines changed: 38 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import akka.stream.scaladsl.{ Sink, Source }
2222
import akka.stream.{ Materializer, SystemMaterializer }
2323
import akka.util.Timeout
2424
import com.typesafe.config.Config
25+
import org.slf4j.LoggerFactory
2526
import slick.jdbc.JdbcBackend._
2627

2728
import scala.collection.immutable._
@@ -42,6 +43,8 @@ class PostgresReadJournal(config: Config, configPath: String)(implicit val syste
4243
with CurrentEventsByTagQuery
4344
with EventsByTagQuery {
4445

46+
private val log = LoggerFactory.getLogger(this.getClass)
47+
4548
implicit val ec: ExecutionContext = system.dispatcher
4649
implicit val mat: Materializer = SystemMaterializer(system).materializer
4750
val readJournalConfig = new ReadJournalConfig(config)
@@ -203,11 +206,14 @@ class PostgresReadJournal(config: Config, configPath: String)(implicit val syste
203206
latestOrdering: MaxOrderingId): Source[EventEnvelope, NotUsed] = {
204207
if (latestOrdering.maxOrdering < offset) Source.empty
205208
else {
206-
readJournalDao.eventsByTag(tag, offset, latestOrdering.maxOrdering, max).mapAsync(1)(Future.fromTry).mapConcat {
207-
case (repr, ordering) =>
208-
adaptEvents(repr).map(r =>
209-
EventEnvelope(Sequence(ordering), r.persistenceId, r.sequenceNr, r.payload, r.timestamp))
210-
}
209+
readJournalDao
210+
.eventsByTag(tag, offset, math.min(offset + max, latestOrdering.maxOrdering), max)
211+
.mapAsync(1)(Future.fromTry)
212+
.mapConcat {
213+
case (repr, ordering) =>
214+
adaptEvents(repr).map(r =>
215+
EventEnvelope(Sequence(ordering), r.persistenceId, r.sequenceNr, r.payload, r.timestamp))
216+
}
211217
}
212218
}
213219

@@ -227,14 +233,21 @@ class PostgresReadJournal(config: Config, configPath: String)(implicit val syste
227233
val batchSize = readJournalConfig.maxBufferSize
228234

229235
Source
230-
.unfoldAsync[(Long, FlowControl), Seq[EventEnvelope]]((offset, Continue)) {
236+
.unfoldAsync[(Long, FlowControl), Seq[EventEnvelope]]((math.max(0L, offset), Continue)) {
231237
case (from, control) =>
232238
def retrieveNextBatch() = {
233239
for {
234240
queryUntil <- journalSequenceActor.ask(GetMaxOrderingId).mapTo[MaxOrderingId]
235241
xs <- currentJournalEventsByTag(tag, from, batchSize, queryUntil).runWith(Sink.seq)
236242
} yield {
237-
val hasMoreEvents = xs.size == batchSize
243+
val to = from + batchSize
244+
val highestOffset = xs.map(_.offset.value) match {
245+
case Nil => to
246+
case offsets => offsets.max
247+
}
248+
val hasMoreEvents = {
249+
highestOffset < queryUntil.maxOrdering
250+
}
238251
val nextControl: FlowControl =
239252
terminateAfterOffset match {
240253
// we may stop if target is behind queryUntil and we don't have more events to fetch
@@ -247,15 +260,25 @@ class PostgresReadJournal(config: Config, configPath: String)(implicit val syste
247260
if (hasMoreEvents) Continue else ContinueDelayed
248261
}
249262

250-
val nextStartingOffset = if (xs.isEmpty) {
251-
/* If no events matched the tag between `from` and `maxOrdering` then there is no need to execute the exact
252-
* same query again. We can continue querying from `maxOrdering`, which will save some load on the db.
253-
* (Note: we may never return a value smaller than `from`, otherwise we might return duplicate events) */
254-
math.max(from, queryUntil.maxOrdering)
255-
} else {
256-
// Continue querying from the largest offset
257-
xs.map(_.offset.value).max
263+
val nextStartingOffset = (xs, queryUntil.maxOrdering) match {
264+
case (Nil, 0L) =>
265+
/* If `maxOrdering` is not known yet or journal is empty (min value for Postgres' bigserial is 1)
266+
* then we should not move the query window forward. Otherwise we might miss (skip) some events.
267+
* By setting nextStartingOffset to `from` we wait for either maxOrdering to be discovered or first
268+
* event to be persisted in the journal. */
269+
from
270+
case (Nil, maxOrdering) =>
271+
/* If no events matched the tag between `from` and `to` (`from + batchSize`) and `maxOrdering` then
272+
* there is no need to execute the exact same query again. We can continue querying from `to`,
273+
* which will save some load on the db. */
274+
math.min(to, maxOrdering)
275+
case _ =>
276+
// Continue querying from the largest offset
277+
highestOffset
258278
}
279+
280+
log.trace(
281+
s"tag = $tag => ($nextStartingOffset, $nextControl), [highestOffset = $highestOffset, maxOrdering = ${queryUntil.maxOrdering}, hasMoreEvents = $hasMoreEvents, results = ${xs.size}, from = $from]")
259282
Some((nextStartingOffset, nextControl), xs)
260283
}
261284
}

core/src/test/scala/akka/persistence/postgres/SingleActorSystemPerTestSpec.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ package akka.persistence.postgres
77

88
import akka.actor.ActorSystem
99
import akka.persistence.postgres.config.{ JournalConfig, ReadJournalConfig, SlickConfiguration }
10+
import akka.persistence.postgres.db.SlickDatabase
1011
import akka.persistence.postgres.query.javadsl.PostgresReadJournal
1112
import akka.persistence.postgres.util.DropCreate
12-
import akka.persistence.postgres.db.SlickDatabase
1313
import akka.util.Timeout
1414
import com.typesafe.config.{ Config, ConfigFactory, ConfigValue }
1515
import org.scalatest.BeforeAndAfterEach
@@ -29,7 +29,7 @@ abstract class SingleActorSystemPerTestSpec(val config: Config)
2929
implicit val pc: PatienceConfig = PatienceConfig(timeout = 1.minute)
3030
implicit val timeout: Timeout = Timeout(1.minute)
3131

32-
val cfg = config.getConfig("postgres-journal")
32+
val cfg: Config = config.getConfig("postgres-journal")
3333
val journalConfig = new JournalConfig(cfg)
3434
val readJournalConfig = new ReadJournalConfig(config.getConfig(PostgresReadJournal.Identifier))
3535

@@ -70,4 +70,10 @@ abstract class SingleActorSystemPerTestSpec(val config: Config)
7070
f(system)
7171
system.terminate().futureValue
7272
}
73+
74+
def withActorSystem(config: Config = config)(f: ActorSystem => Unit): Unit = {
75+
implicit val system: ActorSystem = ActorSystem("test", config)
76+
f(system)
77+
system.terminate().futureValue
78+
}
7379
}

core/src/test/scala/akka/persistence/postgres/query/CurrentEventsByTagTest.scala

Lines changed: 61 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,22 +11,26 @@ import akka.persistence.postgres.query.CurrentEventsByTagTest._
1111
import akka.persistence.postgres.query.EventAdapterTest.{ Event, TaggedAsyncEvent }
1212
import akka.persistence.postgres.util.Schema.{ NestedPartitions, Partitioned, Plain, SchemaType }
1313
import akka.persistence.query.{ EventEnvelope, NoOffset, Sequence }
14-
import com.typesafe.config.{ ConfigValue, ConfigValueFactory }
14+
import com.typesafe.config.{ Config, ConfigFactory, ConfigValue, ConfigValueFactory }
1515

1616
import scala.concurrent.Future
1717
import scala.concurrent.duration._
1818

1919
object CurrentEventsByTagTest {
2020
val maxBufferSize = 20
21-
val refreshInterval = 500.milliseconds
21+
val refreshInterval: FiniteDuration = 500.milliseconds
2222

2323
val configOverrides: Map[String, ConfigValue] = Map(
2424
"postgres-read-journal.max-buffer-size" -> ConfigValueFactory.fromAnyRef(maxBufferSize.toString),
2525
"postgres-read-journal.refresh-interval" -> ConfigValueFactory.fromAnyRef(refreshInterval.toString()))
26+
27+
case class TestEvent(greetings: String)
28+
2629
}
2730

2831
abstract class CurrentEventsByTagTest(val schemaType: SchemaType)
2932
extends QueryTestSpec(s"${schemaType.resourceNamePrefix}-shared-db-application.conf", configOverrides) {
33+
3034
it should "not find an event by tag for unknown tag" in withActorSystem { implicit system =>
3135
val journalOps = new ScalaPostgresReadJournalOperations(system)
3236
withTestActors(replyToMessages = true) { (actor1, actor2, actor3) =>
@@ -164,8 +168,15 @@ abstract class CurrentEventsByTagTest(val schemaType: SchemaType)
164168
}
165169
}
166170

167-
it should "complete without any gaps in case events are being persisted when the query is executed" in withActorSystem {
168-
implicit system =>
171+
{
172+
val numOfActors = 3
173+
val batch1Size = 200
174+
val batch2Size = 10000
175+
176+
import scala.collection.JavaConverters._
177+
178+
it should "complete without any gaps in case events are being persisted when the query is executed" in withActorSystem(
179+
withMaxBufferSize(1000)) { implicit system =>
169180
val journalOps = new JavaDslPostgresReadJournalOperations(system)
170181
import system.dispatcher
171182
withTestActors(replyToMessages = true) { (actor1, actor2, actor3) =>
@@ -201,7 +212,53 @@ abstract class CurrentEventsByTagTest(val schemaType: SchemaType)
201212
}
202213
batch2.futureValue
203214
}
215+
}
216+
217+
it should "complete without omitting any events in case events are being persisted when the query is executed" in withActorSystem(
218+
withMaxBufferSize((batch1Size + batch2Size) * (numOfActors + 1))) { implicit system =>
219+
val journalOps = new JavaDslPostgresReadJournalOperations(system)
220+
import system.dispatcher
221+
withTestActors(replyToMessages = true) { (actor1, actor2, actor3) =>
222+
def sendMessagesWithTag(tag: String, numberOfMessagesPerActor: Int): Future[Done] = {
223+
val futures = for {
224+
actor <- Seq(actor1, actor2, actor3)
225+
i <- 1 to numberOfMessagesPerActor
226+
} yield {
227+
actor ? TaggedAsyncEvent(Event(i.toString), tag)
228+
}
229+
Future.sequence(futures).map(_ => Done)
230+
}
231+
232+
val tag = "someTag"
233+
// send a batch of 3 * 200
234+
val batch1 = sendMessagesWithTag(tag, 200)
235+
// Try to persist a large batch of events per actor. Some of these may be returned, but not all!
236+
val batch2 = sendMessagesWithTag(tag, 10000)
237+
238+
// wait for acknowledgement of the first batch only
239+
batch1.futureValue
240+
// Sanity check, all events in the first batch must be in the journal
241+
journalOps.countJournal.futureValue should be >= 600L
242+
243+
// start the query before the last batch completes
244+
journalOps.withCurrentEventsByTag()(tag, NoOffset) { tp =>
245+
// The stream must complete within the given amount of time
246+
// This make take a while in case the journal sequence actor detects gaps
247+
val allEvents = tp.toStrict(atMost = 30.seconds)
248+
allEvents.size should be >= 600
249+
}
250+
batch2.futureValue
251+
journalOps.withCurrentEventsByTag()(tag, Sequence(600)) { tp =>
252+
val allEvents = tp.toStrict(atMost = 3.minutes)
253+
allEvents.size should equal(3 * 10000)
254+
}
255+
}
256+
}
257+
258+
def withMaxBufferSize(size: Long): Config =
259+
ConfigFactory.parseMap(Map("postgres-read-journal.max-buffer-size" -> size.toString).asJava).withFallback(config)
204260
}
261+
205262
}
206263

207264
// Note: these tests use the shared-db configs, the test for all (so not only current) events use the regular db config

core/src/test/scala/akka/persistence/postgres/query/CurrentEventsByTagWithGapsTest.scala

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,23 @@ import akka.persistence.postgres.util.Schema.SchemaType
99
import akka.persistence.query.NoOffset
1010
import akka.serialization.SerializationExtension
1111
import akka.stream.scaladsl.{ Sink, Source }
12+
import com.typesafe.config.{ ConfigValue, ConfigValueFactory }
1213

1314
import scala.concurrent.duration._
1415

16+
object CurrentEventsByTagWithGapsTest {
17+
private val maxBufferSize = 10000
18+
private val refreshInterval = 500.milliseconds
19+
20+
val configOverrides: Map[String, ConfigValue] = Map(
21+
"postgres-read-journal.max-buffer-size" -> ConfigValueFactory.fromAnyRef(maxBufferSize.toString),
22+
"postgres-read-journal.refresh-interval" -> ConfigValueFactory.fromAnyRef(refreshInterval.toString()))
23+
}
24+
1525
class CurrentEventsByTagWithGapsTest
16-
extends QueryTestSpec(s"${Schema.Partitioned.resourceNamePrefix}-shared-db-application.conf") {
26+
extends QueryTestSpec(
27+
s"${Schema.Partitioned.resourceNamePrefix}-shared-db-application.conf",
28+
CurrentEventsByTagWithGapsTest.configOverrides) {
1729

1830
// We are using Partitioned variant because it does not override values for an `ordering` field
1931
override val schemaType: SchemaType = Schema.Partitioned
@@ -63,7 +75,7 @@ class CurrentEventsByTagWithGapsTest
6375
.futureValue
6476

6577
journalOps.withCurrentEventsByTag(5.minutes)(tag, NoOffset) { tp =>
66-
val allEvents = tp.toStrict(atMost = 3.minutes)
78+
val allEvents = tp.toStrict(atMost = 5.minutes)
6779
allEvents.size should equal(expectedTotalNumElements)
6880
}
6981

core/src/test/scala/akka/persistence/postgres/query/JournalSequenceActorTest.scala

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,35 +7,35 @@ package akka.persistence.postgres.query
77

88
import java.util.concurrent.atomic.AtomicLong
99

10-
import akka.actor.{ActorRef, ActorSystem}
10+
import akka.actor.{ ActorRef, ActorSystem }
1111
import akka.pattern.ask
1212
import akka.persistence.postgres.config.JournalSequenceRetrievalConfig
1313
import akka.persistence.postgres.db.ExtendedPostgresProfile
14-
import akka.persistence.postgres.query.JournalSequenceActor.{GetMaxOrderingId, MaxOrderingId}
15-
import akka.persistence.postgres.query.dao.{ByteArrayReadJournalDao, TestProbeReadJournalDao}
16-
import akka.persistence.postgres.tag.{CachedTagIdResolver, SimpleTagDao}
17-
import akka.persistence.postgres.util.Schema.{NestedPartitions, Partitioned, Plain, SchemaType}
18-
import akka.persistence.postgres.{JournalRow, SharedActorSystemTestSpec}
14+
import akka.persistence.postgres.query.JournalSequenceActor.{ GetMaxOrderingId, MaxOrderingId }
15+
import akka.persistence.postgres.query.dao.{ ByteArrayReadJournalDao, TestProbeReadJournalDao }
16+
import akka.persistence.postgres.tag.{ CachedTagIdResolver, SimpleTagDao }
17+
import akka.persistence.postgres.util.Schema.{ NestedPartitions, Partitioned, Plain, SchemaType }
18+
import akka.persistence.postgres.{ JournalRow, SharedActorSystemTestSpec }
1919
import akka.serialization.SerializationExtension
20-
import akka.stream.scaladsl.{Sink, Source}
21-
import akka.stream.{Materializer, SystemMaterializer}
20+
import akka.stream.scaladsl.{ Sink, Source }
21+
import akka.stream.{ Materializer, SystemMaterializer }
2222
import akka.testkit.TestProbe
2323
import org.scalatest.time.Span
2424
import org.slf4j.LoggerFactory
25-
import slick.jdbc.{JdbcBackend, JdbcCapabilities}
25+
import slick.jdbc.{ JdbcBackend, JdbcCapabilities }
2626

2727
import scala.concurrent.Future
2828
import scala.concurrent.duration._
2929

3030
abstract class JournalSequenceActorTest(val schemaType: SchemaType) extends QueryTestSpec(schemaType.configName) {
3131
private val log = LoggerFactory.getLogger(classOf[JournalSequenceActorTest])
3232

33-
val journalSequenceActorConfig = readJournalConfig.journalSequenceRetrievalConfiguration
34-
val journalTable = schemaType.table(journalConfig.journalTableConfiguration)
33+
private val journalSequenceActorConfig = readJournalConfig.journalSequenceRetrievalConfiguration
34+
private val journalTable = schemaType.table(journalConfig.journalTableConfiguration)
3535

3636
import akka.persistence.postgres.db.ExtendedPostgresProfile.api._
3737

38-
implicit val askTimeout = 50.millis
38+
implicit val askTimeout: FiniteDuration = 50.millis
3939

4040
private val orderingSeq = new AtomicLong(0L)
4141
def generateId: Long = orderingSeq.incrementAndGet()
@@ -192,7 +192,7 @@ class MockDaoJournalSequenceActorTest extends SharedActorSystemTestSpec {
192192

193193
val almostQueryDelay = queryDelay - 50.millis
194194
val almostImmediately = 50.millis
195-
withTestProbeJournalSequenceActor(batchSize, maxTries, queryDelay) { (daoProbe, actor) =>
195+
withTestProbeJournalSequenceActor(batchSize, maxTries, queryDelay) { (daoProbe, _) =>
196196
daoProbe.expectMsg(almostImmediately, TestProbeReadJournalDao.JournalSequence(0, batchSize))
197197
val firstBatch = (1L to 40L) ++ (51L to 110L)
198198
daoProbe.reply(firstBatch)

core/src/test/scala/akka/persistence/postgres/query/dao/ReadJournalQueriesTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ class ReadJournalQueriesTest extends BaseQueryTest {
1313
}
1414

1515
it should "create SQL query for eventsByTag" in withReadJournalQueries { queries =>
16-
queries.eventsByTag(List(11), 23L, 2L, 3L) shouldBeSQL """select "ordering", "deleted", "persistence_id", "sequence_number", "message", "tags" from "journal" where ("tags" @> ?) and (("ordering" > ?) and ("ordering" <= ?)) order by "ordering" limit ?"""
16+
queries.eventsByTag(List(11), 23L, 25L) shouldBeSQL """select "ordering", "deleted", "persistence_id", "sequence_number", "message", "tags" from "journal" where ("tags" @> ?) and (("ordering" > ?) and ("ordering" <= ?)) order by "ordering""""
1717
}
1818

1919
it should "create SQL query for journalSequenceQuery" in withReadJournalQueries { queries =>

0 commit comments

Comments
 (0)