diff --git a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/pool/ConcurrentBag.scala b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/pool/ConcurrentBag.scala index 20ca1ad09..1b5806088 100644 --- a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/pool/ConcurrentBag.scala +++ b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/pool/ConcurrentBag.scala @@ -127,7 +127,6 @@ object ConcurrentBag: /** * Create a new ConcurrentBag instance. * - * @param maxFiberLocalSize maximum number of items to store in fiber-local storage * @tparam F the effect type * @tparam T the type of items stored in the bag * @return a new ConcurrentBag instance @@ -248,36 +247,26 @@ object ConcurrentBag: item.getState.flatMap { state => if state == BagEntry.STATE_REMOVED then Temporal[F].unit else - // Reset state to not in use - item.setState(BagEntry.STATE_NOT_IN_USE) >> - // Check if item is in the shared list - sharedList.get.flatMap { list => - if list.exists(_ eq item) then - // Item is already in the list, handle waiters - waiters.get.flatMap { waiting => - if waiting > 0 then - // Try to hand off directly to a waiter - handoffQueue.tryOffer(item).void - else - // No waiters, item stays in shared list + // Attempt atomic transition from IN_USE to NOT_IN_USE + item.compareAndSet(BagEntry.STATE_IN_USE, BagEntry.STATE_NOT_IN_USE).flatMap { + case true => // Successfully transitioned from IN_USE + continueRequiteProcess(item) + case false => + // Failed transition - could be NOT_IN_USE already or other state + item.getState.flatMap { currentState => + currentState match { + case BagEntry.STATE_NOT_IN_USE => + // Item already in correct state - continue with requite + continueRequiteProcess(item) + case BagEntry.STATE_REMOVED => + // Item was removed, nothing to do Temporal[F].unit + case _ => + // Handle other state conflicts + handleStateConflict(item) } - else - // Item not in list, add it with distribution - sharedList.modify { list => - val newList = distributeItem(item, list) - (newList, ()) - } >> - waiters.get.flatMap { waiting => - if waiting > 0 then - // Try to hand off directly to a waiter - handoffQueue.tryOffer(item).flatMap { - case true => Temporal[F].unit - case false => Temporal[F].unit - } - else Temporal[F].unit - } - } + } + } } } @@ -334,3 +323,47 @@ object ConcurrentBag: else val (front, back) = list.splitAt(idx - 1) front ++ (item :: back) + + private def handleStateConflict(item: T): F[Unit] = + item.getState.flatMap { + case BagEntry.STATE_REMOVED => + // Item deleted - No action required (normal termination) + Temporal[F].unit + + case BagEntry.STATE_NOT_IN_USE => + // Other fibers have already completed requite - Avoid duplicate processing + Temporal[F].unit + + case BagEntry.STATE_RESERVED => + // Temporary reservation status - Please wait a short while and try again + Temporal[F].sleep(1.milli) >> + item.compareAndSet(BagEntry.STATE_RESERVED, BagEntry.STATE_NOT_IN_USE).flatMap { + case true => continueRequiteProcess(item) + case false => handleStateConflict(item) + } + + case BagEntry.STATE_IN_USE => + // Anomaly: Still in IN_USE state - Detected state inconsistency + item.setState(BagEntry.STATE_NOT_IN_USE) >> + continueRequiteProcess(item) + + case unknownState => + Temporal[F].raiseError( + new IllegalStateException(s"Unknown bag entry state: $unknownState") + ) + } + + private def continueRequiteProcess(item: T): F[Unit] = + // Continue with the normal requite process after state conflict resolution + sharedList.get.flatMap { list => + if list.exists(_ eq item) then + waiters.get.flatMap { waiting => + if waiting > 0 then handoffQueue.tryOffer(item).void + else Temporal[F].unit + } + else + sharedList.modify { list => + val newList = distributeItem(item, list) + (newList, ()) + } + }