Skip to content

Commit 67be3b5

Browse files
authored
Add support for RedisClient in "batch" mode to RedisClientPool (#287)
* Version 3.42-SNAPSHOT * Add support for RedisClient in batch mode to RedisPool, and some documentation clarifying pipeline/multi/batch modes. * add .bsp to .gitignore
1 parent 9321ce9 commit 67be3b5

File tree

5 files changed

+31
-9
lines changed

5 files changed

+31
-9
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ src_managed/
1010
project/boot/
1111
project/plugins/project/
1212
project/activator-sbt*
13+
.bsp
1314

1415
.env
1516

.sbtopts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
-J-Xms512m
12
-J-Xmx3g
23
-J-XX:MaxMetaspaceSize=512m
3-
#-Dsbt.task.timings=true
4+
#-Dsbt.task.timings=true

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ lazy val redisClient = (project in file(".")).settings(coreSettings : _*)
44

55
lazy val commonSettings: Seq[Setting[_]] = Seq(
66
organization := "net.debasishg",
7-
version := "3.41",
7+
version := "3.42-SNAPSHOT",
88
scalaVersion := "2.13.6",
99
crossScalaVersions := Seq("2.12.14", "2.11.12", "2.10.7"),
1010

src/main/scala/com/redis/Pool.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@ import javax.net.ssl.SSLContext
66
import org.apache.commons.pool2._
77
import org.apache.commons.pool2.impl._
88

9-
private [redis] class RedisClientFactory(val host: String, val port: Int, val database: Int = 0, val secret: Option[Any] = None, val timeout : Int = 0, val sslContext: Option[SSLContext] = None)
9+
private [redis] class RedisClientFactory(val host: String, val port: Int, val database: Int = 0, val secret: Option[Any] = None, val timeout : Int = 0, val sslContext: Option[SSLContext] = None, batch: RedisClient.Mode = RedisClient.SINGLE)
1010
extends PooledObjectFactory[RedisClient] {
1111

1212
// when we make an object it's already connected
1313
override def makeObject: PooledObject[RedisClient] = {
14-
new DefaultPooledObject[RedisClient](new RedisClient(host, port, database, secret, timeout, sslContext))
14+
new DefaultPooledObject[RedisClient](new RedisClient(host, port, database, secret, timeout, sslContext, batch))
1515
}
1616

1717
// quit & disconnect
@@ -42,7 +42,8 @@ class RedisClientPool(
4242
val timeout: Int = 0,
4343
val maxConnections: Int = RedisClientPool.UNLIMITED_CONNECTIONS,
4444
val poolWaitTimeout: Long = 3000,
45-
val sslContext: Option[SSLContext] = None
45+
val sslContext: Option[SSLContext] = None,
46+
val batch: RedisClient.Mode = RedisClient.SINGLE
4647
) {
4748

4849
val objectPoolConfig = new GenericObjectPoolConfig[RedisClient]
@@ -54,7 +55,7 @@ class RedisClientPool(
5455

5556
val abandonedConfig = new AbandonedConfig
5657
abandonedConfig.setRemoveAbandonedTimeout(TimeUnit.MILLISECONDS.toSeconds(poolWaitTimeout).toInt)
57-
val pool = new GenericObjectPool(new RedisClientFactory(host, port, database, secret, timeout, sslContext), objectPoolConfig,abandonedConfig)
58+
val pool = new GenericObjectPool(new RedisClientFactory(host, port, database, secret, timeout, sslContext, batch), objectPoolConfig,abandonedConfig)
5859
override def toString: String = host + ":" + String.valueOf(port)
5960

6061
def withClient[T](body: RedisClient => T): T = {

src/main/scala/com/redis/RedisClient.scala

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,13 @@ class RedisClient(override val host: String, override val port: Int,
135135
)
136136
override def toString: String = host + ":" + String.valueOf(port) + "/" + database
137137

138-
// with MULTI/EXEC
138+
/**
139+
* Execute operations in the context of a MULTI command.
140+
* If you want to send the commands in batch mode, use the `batchedPipeline` method.
141+
*
142+
* @throws NullPointerException if code attempts to access the results of any Redis command within the block.
143+
* @see https://redis.io/commands/multi
144+
*/
139145
def pipeline(f: PipelineClient => Any): Option[List[Any]] = {
140146
send("MULTI", false)(asString) // flush reply stream
141147
try {
@@ -186,6 +192,8 @@ class RedisClient(override val host: String, override val port: Int,
186192
* </pre>
187193
*
188194
* Or the client may wish to track and get the promises as soon as the underlying <tt>Future</tt> is completed.
195+
*
196+
* @throws NullPointerException if code attempts to access the results of any Redis command within the block.
189197
*/
190198
def pipelineNoMulti(commands: Seq[() => Any]) = {
191199
val ps = List.fill(commands.size)(Promise[Any]())
@@ -206,9 +214,15 @@ class RedisClient(override val host: String, override val port: Int,
206214
ps
207215
}
208216

209-
// batched pipelines : all commands submitted in batch
217+
/**
218+
* Executes all the provided commands a single communication, returning a list with all the results.
219+
*
220+
* @throws IllegalStateException if this client was not intialized for batch (pipelined) messaging
221+
* @see https://redis.io/topics/pipelining
222+
*/
210223
def batchedPipeline(commands: List[() => Any]): Option[List[Any]] = {
211-
assert(batch == BATCH)
224+
if (batch != BATCH) throw new IllegalStateException("Cannot use batch operations for non-batch mode client")
225+
212226
commands.foreach { command =>
213227
command()
214228
}
@@ -218,6 +232,11 @@ class RedisClient(override val host: String, override val port: Int,
218232
r
219233
}
220234

235+
/**
236+
* Redis client which sends all messages in the context of a MULTI command, providing transaction semantics.
237+
*
238+
* @see https://redis.io/commands/multi
239+
*/
221240
class PipelineClient(parent: RedisClient) extends RedisCommand(parent.batch) with PubOperations {
222241
import com.redis.serialization.Parse
223242

0 commit comments

Comments
 (0)