Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ object ConcurrentBag:
/**
* Create a new ConcurrentBag instance.
*
* @param maxFiberLocalSize maximum number of items to store in fiber-local storage
* @tparam F the effect type
* @tparam T the type of items stored in the bag
* @return a new ConcurrentBag instance
Expand Down Expand Up @@ -248,36 +247,26 @@ object ConcurrentBag:
item.getState.flatMap { state =>
if state == BagEntry.STATE_REMOVED then Temporal[F].unit
else
// Reset state to not in use
item.setState(BagEntry.STATE_NOT_IN_USE) >>
// Check if item is in the shared list
sharedList.get.flatMap { list =>
if list.exists(_ eq item) then
// Item is already in the list, handle waiters
waiters.get.flatMap { waiting =>
if waiting > 0 then
// Try to hand off directly to a waiter
handoffQueue.tryOffer(item).void
else
// No waiters, item stays in shared list
// Attempt atomic transition from IN_USE to NOT_IN_USE
item.compareAndSet(BagEntry.STATE_IN_USE, BagEntry.STATE_NOT_IN_USE).flatMap {
case true => // Successfully transitioned from IN_USE
continueRequiteProcess(item)
case false =>
// Failed transition - could be NOT_IN_USE already or other state
item.getState.flatMap { currentState =>
currentState match {
case BagEntry.STATE_NOT_IN_USE =>
// Item already in correct state - continue with requite
continueRequiteProcess(item)
case BagEntry.STATE_REMOVED =>
// Item was removed, nothing to do
Temporal[F].unit
case _ =>
// Handle other state conflicts
handleStateConflict(item)
}
else
// Item not in list, add it with distribution
sharedList.modify { list =>
val newList = distributeItem(item, list)
(newList, ())
} >>
waiters.get.flatMap { waiting =>
if waiting > 0 then
// Try to hand off directly to a waiter
handoffQueue.tryOffer(item).flatMap {
case true => Temporal[F].unit
case false => Temporal[F].unit
}
else Temporal[F].unit
}
}
}
}
}
}

Expand Down Expand Up @@ -334,3 +323,47 @@ object ConcurrentBag:
else
val (front, back) = list.splitAt(idx - 1)
front ++ (item :: back)

private def handleStateConflict(item: T): F[Unit] =
item.getState.flatMap {
case BagEntry.STATE_REMOVED =>
// Item deleted - No action required (normal termination)
Temporal[F].unit

case BagEntry.STATE_NOT_IN_USE =>
// Other fibers have already completed requite - Avoid duplicate processing
Temporal[F].unit

case BagEntry.STATE_RESERVED =>
// Temporary reservation status - Please wait a short while and try again
Temporal[F].sleep(1.milli) >>
item.compareAndSet(BagEntry.STATE_RESERVED, BagEntry.STATE_NOT_IN_USE).flatMap {
case true => continueRequiteProcess(item)
case false => handleStateConflict(item)
}

case BagEntry.STATE_IN_USE =>
// Anomaly: Still in IN_USE state - Detected state inconsistency
item.setState(BagEntry.STATE_NOT_IN_USE) >>
continueRequiteProcess(item)

case unknownState =>
Temporal[F].raiseError(
new IllegalStateException(s"Unknown bag entry state: $unknownState")
)
}

private def continueRequiteProcess(item: T): F[Unit] =
// Continue with the normal requite process after state conflict resolution
sharedList.get.flatMap { list =>
if list.exists(_ eq item) then
waiters.get.flatMap { waiting =>
if waiting > 0 then handoffQueue.tryOffer(item).void
else Temporal[F].unit
}
else
sharedList.modify { list =>
val newList = distributeItem(item, list)
(newList, ())
}
}