Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ object Connection:
allowPublicKeyRetrieval: Boolean = false,
useCursorFetch: Boolean = false,
useServerPrepStmts: Boolean = false,
maxAllowedPacket: Int = MySQLConfig.DEFAULT_PACKET_SIZE,
databaseTerm: Option[DatabaseMetaData.DatabaseTerm] = Some(DatabaseMetaData.DatabaseTerm.CATALOG),
defaultAuthenticationPlugin: Option[AuthenticationPlugin[F]] = None,
plugins: List[AuthenticationPlugin[F]] = List.empty[AuthenticationPlugin[F]]
Expand All @@ -93,6 +94,7 @@ object Connection:
allowPublicKeyRetrieval,
useCursorFetch,
useServerPrepStmts,
maxAllowedPacket,
databaseTerm,
defaultAuthenticationPlugin,
plugins,
Expand All @@ -115,6 +117,7 @@ object Connection:
allowPublicKeyRetrieval: Boolean = false,
useCursorFetch: Boolean = false,
useServerPrepStmts: Boolean = false,
maxAllowedPacket: Int = MySQLConfig.DEFAULT_PACKET_SIZE,
databaseTerm: Option[DatabaseMetaData.DatabaseTerm] = Some(DatabaseMetaData.DatabaseTerm.CATALOG),
defaultAuthenticationPlugin: Option[AuthenticationPlugin[F]] = None,
plugins: List[AuthenticationPlugin[F]] = List.empty[AuthenticationPlugin[F]]
Expand All @@ -131,6 +134,7 @@ object Connection:
allowPublicKeyRetrieval,
useCursorFetch,
useServerPrepStmts,
maxAllowedPacket,
databaseTerm,
defaultAuthenticationPlugin,
plugins,
Expand All @@ -151,6 +155,7 @@ object Connection:
allowPublicKeyRetrieval: Boolean = false,
useCursorFetch: Boolean = false,
useServerPrepStmts: Boolean = false,
maxAllowedPacket: Int = MySQLConfig.DEFAULT_PACKET_SIZE,
databaseTerm: Option[DatabaseMetaData.DatabaseTerm] = Some(DatabaseMetaData.DatabaseTerm.CATALOG),
defaultAuthenticationPlugin: Option[AuthenticationPlugin[F]] = None,
plugins: List[AuthenticationPlugin[F]] = List.empty[AuthenticationPlugin[F]],
Expand All @@ -176,6 +181,7 @@ object Connection:
allowPublicKeyRetrieval,
useCursorFetch,
useServerPrepStmts,
maxAllowedPacket,
databaseTerm,
defaultAuthenticationPlugin,
plugins,
Expand All @@ -197,6 +203,7 @@ object Connection:
allowPublicKeyRetrieval: Boolean = false,
useCursorFetch: Boolean = false,
useServerPrepStmts: Boolean = false,
maxAllowedPacket: Int = MySQLConfig.DEFAULT_PACKET_SIZE,
databaseTerm: Option[DatabaseMetaData.DatabaseTerm] = None,
defaultAuthenticationPlugin: Option[AuthenticationPlugin[F]],
plugins: List[AuthenticationPlugin[F]],
Expand All @@ -219,6 +226,7 @@ object Connection:
allowPublicKeyRetrieval,
readTimeout,
capabilityFlags,
maxAllowedPacket,
defaultAuthenticationPlugin,
pluginMap
)
Expand Down Expand Up @@ -260,6 +268,7 @@ object Connection:
allowPublicKeyRetrieval: Boolean = false,
useCursorFetch: Boolean = false,
useServerPrepStmts: Boolean = false,
maxAllowedPacket: Int = MySQLConfig.DEFAULT_PACKET_SIZE,
databaseTerm: Option[DatabaseMetaData.DatabaseTerm] = None,
defaultAuthenticationPlugin: Option[AuthenticationPlugin[F]],
plugins: List[AuthenticationPlugin[F]],
Expand Down Expand Up @@ -290,6 +299,7 @@ object Connection:
allowPublicKeyRetrieval,
useCursorFetch,
useServerPrepStmts,
maxAllowedPacket,
databaseTerm,
defaultAuthenticationPlugin,
plugins,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,11 +416,63 @@ trait MySQLConfig:
*/
def setPoolName(name: String): MySQLConfig

/**
* Gets the maximum allowed packet size for network communication with MySQL server.
*
* This setting controls the maximum size of packets that can be sent to or received from
* the MySQL server. It helps prevent memory exhaustion attacks and ensures compatibility
* with the MySQL protocol limits.
*
* The value corresponds to the MySQL server's `max_allowed_packet` system variable.
*
* @return the maximum packet size in bytes
*/
def maxAllowedPacket: Int

/**
* Sets the maximum allowed packet size for network communication.
*
* This setting provides protection against:
* - Memory exhaustion attacks through oversized packets
* - Denial of Service (DoS) attacks via large data payloads
* - Accidental transmission of extremely large data sets
*
* @param maxAllowedPacket the maximum packet size in bytes
* @return a new MySQLConfig with the updated setting
* @throws IllegalArgumentException if the value is outside the valid range (1024 to 16,777,215)
*
* @example {{{
* // Set conservative 64KB limit (default)
* config.setMaxAllowedPacket(65535)
*
* // Set practical 1MB limit for applications with moderate BLOB usage
* config.setMaxAllowedPacket(1048576)
*
* // Set maximum protocol limit for applications requiring large data transfers
* config.setMaxAllowedPacket(16777215)
* }}}
*
* @note The default value of 65,535 bytes (64KB) is compatible with MySQL JDBC Driver defaults
* and provides good security against packet-based attacks while accommodating most use cases.
* @note Valid range: 1,024 bytes (1KB) minimum to 16,777,215 bytes (16MB) maximum (MySQL protocol limit)
* @see [[https://dev.mysql.com/doc/refman/en/packet-too-large.html MySQL Protocol Packet Limits]]
*/
def setMaxAllowedPacket(maxAllowedPacket: Int): MySQLConfig

/**
* Companion object for MySQLConfig providing factory methods.
*/
object MySQLConfig:

/** Minimum allowed packet size in bytes (1KB) */
val MIN_PACKET_SIZE: Int = 1024

/** Maximum allowed packet size in bytes (16MB - MySQL protocol limit) */
val MAX_PACKET_SIZE: Int = 16777215

/** Default packet size in bytes (64KB - MySQL JDBC Driver compatible) */
val DEFAULT_PACKET_SIZE: Int = 65535

/** Default socket options applied to all connections. */
private[ldbc] val defaultSocketOptions: List[SocketOption] =
List(SocketOption.noDelay(true))
Expand Down Expand Up @@ -455,7 +507,8 @@ object MySQLConfig:
connectionTestQuery: Option[String] = None,
logPoolState: Boolean = false,
poolStateLogInterval: FiniteDuration = 30.seconds,
poolName: String = "ldbc-pool"
poolName: String = "ldbc-pool",
maxAllowedPacket: Int = DEFAULT_PACKET_SIZE
) extends MySQLConfig:

override def setHost(host: String): MySQLConfig = copy(host = host)
Expand Down Expand Up @@ -491,6 +544,17 @@ object MySQLConfig:
override def setLogPoolState(enabled: Boolean): MySQLConfig = copy(logPoolState = enabled)
override def setPoolStateLogInterval(interval: FiniteDuration): MySQLConfig = copy(poolStateLogInterval = interval)
override def setPoolName(name: String): MySQLConfig = copy(poolName = name)
override def setMaxAllowedPacket(maxAllowedPacket: Int): MySQLConfig = {
require(
maxAllowedPacket >= MIN_PACKET_SIZE,
s"maxAllowedPacket must be at least $MIN_PACKET_SIZE bytes, but got $maxAllowedPacket"
)
require(
maxAllowedPacket <= MAX_PACKET_SIZE,
s"maxAllowedPacket must not exceed $MAX_PACKET_SIZE bytes (MySQL protocol limit), but got $maxAllowedPacket"
)
copy(maxAllowedPacket = maxAllowedPacket)
}

/**
* Creates a default MySQLConfig with standard connection parameters.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import ldbc.DataSource
* @param tracer optional OpenTelemetry tracer for distributed tracing
* @param useCursorFetch whether to use cursor-based fetching for result sets
* @param useServerPrepStmts whether to use server-side prepared statements
* @param maxAllowedPacket Maximum allowed packet size for network communication in bytes.
* @param defaultAuthenticationPlugin The authentication plugin used first for communication with the server
* @param plugins Additional authentication plugins used for communication with the server
* @param before optional hook to execute before a connection is acquired
Expand Down Expand Up @@ -84,6 +85,7 @@ final case class MySQLDataSource[F[_]: Async: Network: Console: Hashing: UUIDGen
tracer: Option[Tracer[F]] = None,
useCursorFetch: Boolean = false,
useServerPrepStmts: Boolean = false,
maxAllowedPacket: Int = MySQLConfig.DEFAULT_PACKET_SIZE,
defaultAuthenticationPlugin: Option[AuthenticationPlugin[F]] = None,
plugins: List[AuthenticationPlugin[F]] = List.empty[AuthenticationPlugin[F]],
before: Option[Connection[F] => F[A]] = None,
Expand Down Expand Up @@ -118,6 +120,7 @@ final case class MySQLDataSource[F[_]: Async: Network: Console: Hashing: UUIDGen
allowPublicKeyRetrieval = allowPublicKeyRetrieval,
useCursorFetch = useCursorFetch,
useServerPrepStmts = useServerPrepStmts,
maxAllowedPacket = maxAllowedPacket,
databaseTerm = databaseTerm,
defaultAuthenticationPlugin = defaultAuthenticationPlugin,
plugins = plugins
Expand All @@ -138,6 +141,7 @@ final case class MySQLDataSource[F[_]: Async: Network: Console: Hashing: UUIDGen
allowPublicKeyRetrieval = allowPublicKeyRetrieval,
useCursorFetch = useCursorFetch,
useServerPrepStmts = useServerPrepStmts,
maxAllowedPacket = maxAllowedPacket,
databaseTerm = databaseTerm,
defaultAuthenticationPlugin = defaultAuthenticationPlugin,
plugins = plugins
Expand All @@ -156,6 +160,7 @@ final case class MySQLDataSource[F[_]: Async: Network: Console: Hashing: UUIDGen
allowPublicKeyRetrieval = allowPublicKeyRetrieval,
useCursorFetch = useCursorFetch,
useServerPrepStmts = useServerPrepStmts,
maxAllowedPacket = maxAllowedPacket,
databaseTerm = databaseTerm,
defaultAuthenticationPlugin = defaultAuthenticationPlugin,
plugins = plugins
Expand Down Expand Up @@ -256,6 +261,24 @@ final case class MySQLDataSource[F[_]: Async: Network: Console: Hashing: UUIDGen
def setUseServerPrepStmts(newUseServerPrepStmts: Boolean): MySQLDataSource[F, A] =
copy(useServerPrepStmts = newUseServerPrepStmts)

/** Sets the maximum allowed packet size for network communication.
*
* @param maxAllowedPacket the maximum packet size in bytes (1,024 to 16,777,215)
* @return a new MySQLDataSource with the updated packet size limit
* @throws IllegalArgumentException if the value is outside the valid range
*/
def setMaxAllowedPacket(maxAllowedPacket: Int): MySQLDataSource[F, A] = {
require(
maxAllowedPacket >= MySQLConfig.MIN_PACKET_SIZE,
s"maxAllowedPacket must be at least ${ MySQLConfig.MIN_PACKET_SIZE } bytes, but got $maxAllowedPacket"
)
require(
maxAllowedPacket <= MySQLConfig.MAX_PACKET_SIZE,
s"maxAllowedPacket must not exceed ${ MySQLConfig.MAX_PACKET_SIZE } bytes (MySQL protocol limit), but got $maxAllowedPacket"
)
copy(maxAllowedPacket = maxAllowedPacket)
}

/** Sets whether to authentication plugin to be used first for communication with the server.
* @param defaultAuthenticationPlugin
* The authentication plugin used first for communication with the server
Expand Down Expand Up @@ -389,7 +412,8 @@ object MySQLDataSource:
allowPublicKeyRetrieval = config.allowPublicKeyRetrieval,
databaseTerm = config.databaseTerm,
useCursorFetch = config.useCursorFetch,
useServerPrepStmts = config.useServerPrepStmts
useServerPrepStmts = config.useServerPrepStmts,
maxAllowedPacket = config.maxAllowedPacket
)

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/**
* Copyright (c) 2023-2025 by Takahiko Tominaga
* This software is licensed under the MIT License (MIT).
* For more information see LICENSE or https://opensource.org/licenses/MIT
*/

package ldbc.connector.exception

/**
* Exception thrown when a MySQL protocol packet exceeds the configured maximum packet size limit.
*
* This exception serves as a security mechanism to prevent memory exhaustion attacks and ensures
* compatibility with MySQL protocol constraints. It is thrown when the connector attempts to
* send or receive a packet that exceeds the `maxAllowedPacket` configuration setting.
*
* The `maxAllowedPacket` setting corresponds to MySQL's `max_allowed_packet` system variable
* and provides protection against:
*
* - **Memory exhaustion attacks**: Prevents attackers from sending oversized packets to consume server memory
* - **Denial of Service (DoS)**: Blocks attempts to overwhelm the system with large data payloads
* - **Accidental large data transfers**: Catches unintentional transmission of extremely large datasets
* - **Protocol violations**: Enforces MySQL protocol packet size constraints (max 16MB)
*
* @param packetLength the actual size of the packet that exceeded the limit, in bytes
* @param maxAllowed the configured maximum packet size limit, in bytes
*
* @example {{{
* // Typical usage when packet size validation fails
* try {
* // Some operation that sends a large packet
* connection.executeUpdate(queryWithLargeData)
* } catch {
* case ex: PacketTooBigException =>
* println(s"Packet too large: ${ex.getMessage}")
* // Consider increasing maxAllowedPacket or reducing data size
* }
* }}}
*
* @example {{{
* // Configure larger packet size to handle bigger data
* val config = MySQLConfig.default
* .setMaxAllowedPacket(1048576) // 1MB limit
*
* // Or use MySQL protocol maximum
* val config = MySQLConfig.default
* .setMaxAllowedPacket(16777215) // 16MB - protocol maximum
* }}}
*
* @see [[ldbc.connector.MySQLConfig.maxAllowedPacket]] for configuration details
* @see [[ldbc.connector.MySQLConfig.setMaxAllowedPacket]] for setting the packet size limit
* @see [[https://dev.mysql.com/doc/refman/en/packet-too-large.html MySQL Protocol Packet Limits]]
*
* @note The default packet size limit is 65,535 bytes (64KB), which matches MySQL JDBC Driver defaults
* and provides good security against packet-based attacks while accommodating most use cases.
*
* @note This exception extends SQLException to maintain compatibility with JDBC error handling patterns.
* The error message includes both the actual packet size and the configured limit for debugging.
*/
class PacketTooBigException(
packetLength: Int,
maxAllowed: Int
) extends SQLException(
s"Packet for query is too large ($packetLength > $maxAllowed). " +
s"You can change the value by setting the 'maxAllowedPacket' configuration."
)
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import fs2.io.net.Socket
import fs2.Chunk

import ldbc.connector.data.CapabilitiesFlags
import ldbc.connector.exception.PacketTooBigException
import ldbc.connector.net.packet.*
import ldbc.connector.net.packet.response.InitialPacket
import ldbc.connector.net.protocol.parseHeader
Expand All @@ -41,10 +42,15 @@ trait PacketSocket[F[_]]:

object PacketSocket:

val DEFAULT_MAX_PACKET_SIZE = 65535 // 64KB (JDBC Driver default)
val PROTOCOL_MAX_PACKET_SIZE = 16777215 // 16MB (MySQL protocol limit)
val MIN_PACKET_SIZE = 0

def fromBitVectorSocket[F[_]: Concurrent: Console](
bvs: BitVectorSocket[F],
debugEnabled: Boolean,
sequenceIdRef: Ref[F, Byte]
bvs: BitVectorSocket[F],
debugEnabled: Boolean,
sequenceIdRef: Ref[F, Byte],
maxAllowedPacket: Int
): PacketSocket[F] = new PacketSocket[F]:

private def debug(msg: => String): F[Unit] =
Expand All @@ -56,6 +62,7 @@ object PacketSocket:
(for
header <- bvs.read(4)
payloadSize = parseHeader(header.toByteArray)
_ <- validatePacketSize(payloadSize)
payload <- bvs.read(payloadSize)
response = decoder.decodeValue(payload).require
_ <-
Expand Down Expand Up @@ -94,15 +101,27 @@ object PacketSocket:
_ <- sequenceIdRef.update(sequenceId => ((sequenceId + 1) % 256).toByte)
yield ()

private def validatePacketSize(size: Int): F[Unit] =
if size < MIN_PACKET_SIZE then
Concurrent[F].raiseError(
PacketTooBigException(size, maxAllowedPacket)
)
else if size > maxAllowedPacket then
Concurrent[F].raiseError(
PacketTooBigException(size, maxAllowedPacket)
)
else Concurrent[F].unit

def apply[F[_]: Console: Temporal](
debug: Boolean,
sockets: Resource[F, Socket[F]],
sslOptions: Option[SSLNegotiation.Options[F]],
sequenceIdRef: Ref[F, Byte],
initialPacketRef: Ref[F, Option[InitialPacket]],
readTimeout: Duration,
capabilitiesFlags: Set[CapabilitiesFlags]
capabilitiesFlags: Set[CapabilitiesFlags],
maxAllowedPacket: Int
): Resource[F, PacketSocket[F]] =
BitVectorSocket(sockets, sequenceIdRef, initialPacketRef, sslOptions, readTimeout, capabilitiesFlags).map(
fromBitVectorSocket(_, debug, sequenceIdRef)
fromBitVectorSocket(_, debug, sequenceIdRef, maxAllowedPacket)
)
Loading