From 470325f7b197ada65f26c59eba6827fbab3265b6 Mon Sep 17 00:00:00 2001 From: takapi327 Date: Sun, 21 Dec 2025 23:10:34 +0900 Subject: [PATCH 1/4] Added atomic status check --- .../ldbc/connector/pool/ConcurrentBag.scala | 99 ++++++++++++++----- 1 file changed, 73 insertions(+), 26 deletions(-) diff --git a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/pool/ConcurrentBag.scala b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/pool/ConcurrentBag.scala index 20ca1ad09..9fc6d9799 100644 --- a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/pool/ConcurrentBag.scala +++ b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/pool/ConcurrentBag.scala @@ -127,7 +127,6 @@ object ConcurrentBag: /** * Create a new ConcurrentBag instance. * - * @param maxFiberLocalSize maximum number of items to store in fiber-local storage * @tparam F the effect type * @tparam T the type of items stored in the bag * @return a new ConcurrentBag instance @@ -249,35 +248,39 @@ object ConcurrentBag: if state == BagEntry.STATE_REMOVED then Temporal[F].unit else // Reset state to not in use - item.setState(BagEntry.STATE_NOT_IN_USE) >> - // Check if item is in the shared list - sharedList.get.flatMap { list => - if list.exists(_ eq item) then - // Item is already in the list, handle waiters - waiters.get.flatMap { waiting => - if waiting > 0 then - // Try to hand off directly to a waiter - handoffQueue.tryOffer(item).void - else - // No waiters, item stays in shared list - Temporal[F].unit - } - else - // Item not in list, add it with distribution - sharedList.modify { list => - val newList = distributeItem(item, list) - (newList, ()) - } >> + item.compareAndSet(BagEntry.STATE_IN_USE, BagEntry.STATE_NOT_IN_USE).flatMap { + case true => // 成功時のみ処理継続 + // Check if item is in the shared list + sharedList.get.flatMap { list => + if list.exists(_ eq item) then + // Item is already in the list, handle waiters waiters.get.flatMap { waiting => if waiting > 0 then // Try to hand off directly to a waiter - handoffQueue.tryOffer(item).flatMap { - case true => Temporal[F].unit - case false => Temporal[F].unit - } - else Temporal[F].unit + handoffQueue.tryOffer(item).void + else + // No waiters, item stays in shared list + Temporal[F].unit } - } + else + // Item not in list, add it with distribution + sharedList.modify { list => + val newList = distributeItem(item, list) + (newList, ()) + } >> + waiters.get.flatMap { waiting => + if waiting > 0 then + // Try to hand off directly to a waiter + handoffQueue.tryOffer(item).flatMap { + case true => Temporal[F].unit + case false => Temporal[F].unit + } + else Temporal[F].unit + } + } + case false => // 他のファイバーが状態を変更済み + handleStateConflict(item) + } } } @@ -334,3 +337,47 @@ object ConcurrentBag: else val (front, back) = list.splitAt(idx - 1) front ++ (item :: back) + + private def handleStateConflict(item: T): F[Unit] = + item.getState.flatMap { + case BagEntry.STATE_REMOVED => + // Item deleted - No action required (normal termination) + Temporal[F].unit + + case BagEntry.STATE_NOT_IN_USE => + // Other fibers have already completed requite - Avoid duplicate processing + Temporal[F].unit + + case BagEntry.STATE_RESERVED => + // Temporary reservation status - Please wait a short while and try again + Temporal[F].sleep(1.milli) >> + item.compareAndSet(BagEntry.STATE_RESERVED, BagEntry.STATE_NOT_IN_USE).flatMap { + case true => continueRequiteProcess(item) + case false => handleStateConflict(item) + } + + case BagEntry.STATE_IN_USE => + // Anomaly: Still in IN_USE state - Detected state inconsistency + item.setState(BagEntry.STATE_NOT_IN_USE) >> + continueRequiteProcess(item) + + case unknownState => + Temporal[F].raiseError( + new IllegalStateException(s"Unknown bag entry state: $unknownState") + ) + } + + private def continueRequiteProcess(item: T): F[Unit] = + // Continue with the normal requite process after state conflict resolution + sharedList.get.flatMap { list => + if list.exists(_ eq item) then + waiters.get.flatMap { waiting => + if waiting > 0 then handoffQueue.tryOffer(item).void + else Temporal[F].unit + } + else + sharedList.modify { list => + val newList = distributeItem(item, list) + (newList, ()) + } + } From 1ab7761495d201339ffbb7461b6588b821d036b0 Mon Sep 17 00:00:00 2001 From: takapi327 Date: Sun, 21 Dec 2025 23:10:49 +0900 Subject: [PATCH 2/4] Action sbt scalafmtAll --- .../src/main/scala/ldbc/connector/pool/ConcurrentBag.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/pool/ConcurrentBag.scala b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/pool/ConcurrentBag.scala index 9fc6d9799..aa603ad40 100644 --- a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/pool/ConcurrentBag.scala +++ b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/pool/ConcurrentBag.scala @@ -352,7 +352,7 @@ object ConcurrentBag: // Temporary reservation status - Please wait a short while and try again Temporal[F].sleep(1.milli) >> item.compareAndSet(BagEntry.STATE_RESERVED, BagEntry.STATE_NOT_IN_USE).flatMap { - case true => continueRequiteProcess(item) + case true => continueRequiteProcess(item) case false => handleStateConflict(item) } From ec77327e2c31d337f189da76089067b0e49233ba Mon Sep 17 00:00:00 2001 From: takapi327 Date: Sun, 21 Dec 2025 23:43:20 +0900 Subject: [PATCH 3/4] Fixed concurrentBag statecheck bug --- .../ldbc/connector/pool/ConcurrentBag.scala | 48 +++++++------------ 1 file changed, 17 insertions(+), 31 deletions(-) diff --git a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/pool/ConcurrentBag.scala b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/pool/ConcurrentBag.scala index aa603ad40..157d07cca 100644 --- a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/pool/ConcurrentBag.scala +++ b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/pool/ConcurrentBag.scala @@ -247,39 +247,25 @@ object ConcurrentBag: item.getState.flatMap { state => if state == BagEntry.STATE_REMOVED then Temporal[F].unit else - // Reset state to not in use + // Attempt atomic transition from IN_USE to NOT_IN_USE item.compareAndSet(BagEntry.STATE_IN_USE, BagEntry.STATE_NOT_IN_USE).flatMap { - case true => // 成功時のみ処理継続 - // Check if item is in the shared list - sharedList.get.flatMap { list => - if list.exists(_ eq item) then - // Item is already in the list, handle waiters - waiters.get.flatMap { waiting => - if waiting > 0 then - // Try to hand off directly to a waiter - handoffQueue.tryOffer(item).void - else - // No waiters, item stays in shared list - Temporal[F].unit - } - else - // Item not in list, add it with distribution - sharedList.modify { list => - val newList = distributeItem(item, list) - (newList, ()) - } >> - waiters.get.flatMap { waiting => - if waiting > 0 then - // Try to hand off directly to a waiter - handoffQueue.tryOffer(item).flatMap { - case true => Temporal[F].unit - case false => Temporal[F].unit - } - else Temporal[F].unit - } + case true => // Successfully transitioned from IN_USE + continueRequiteProcess(item) + case false => + // Failed transition - could be NOT_IN_USE already or other state + item.getState.flatMap { currentState => + currentState match { + case BagEntry.STATE_NOT_IN_USE => + // Item already in correct state - continue with requite + continueRequiteProcess(item) + case BagEntry.STATE_REMOVED => + // Item was removed, nothing to do + Temporal[F].unit + case _ => + // Handle other state conflicts + handleStateConflict(item) + } } - case false => // 他のファイバーが状態を変更済み - handleStateConflict(item) } } } From aa1fa2d24022de5e53dae9f07293f071db3b3394 Mon Sep 17 00:00:00 2001 From: takapi327 Date: Sun, 21 Dec 2025 23:43:30 +0900 Subject: [PATCH 4/4] Action sbt scalafmtAll --- .../src/main/scala/ldbc/connector/pool/ConcurrentBag.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/pool/ConcurrentBag.scala b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/pool/ConcurrentBag.scala index 157d07cca..1b5806088 100644 --- a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/pool/ConcurrentBag.scala +++ b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/pool/ConcurrentBag.scala @@ -251,7 +251,7 @@ object ConcurrentBag: item.compareAndSet(BagEntry.STATE_IN_USE, BagEntry.STATE_NOT_IN_USE).flatMap { case true => // Successfully transitioned from IN_USE continueRequiteProcess(item) - case false => + case false => // Failed transition - could be NOT_IN_USE already or other state item.getState.flatMap { currentState => currentState match {