-
Notifications
You must be signed in to change notification settings - Fork 672
feat: make ConcurrentLimitLayer accept Semaphore #6618
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: Chojan Shang <chojan.shang@vesoft.com>
bonsairobo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to be able to pass the Semaphore to ConcurrentLimitLayer::new so users can have full control over its initialization.
Signed-off-by: Chojan Shang <chojan.shang@vesoft.com>
tisonkun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since now the Semaphore is in the public API, I wonder if we can use mea's Semaphore that fixes several issues where tokio's Semaphore has.
It's tested in ScopeDB's production environment and should be solid for use.
See https://docs.rs/mea/latest/mea/semaphore/struct.Semaphore.html and https://github.com/fast/mea?tab=readme-ov-file#history
|
Ummm I can see the original issue is about using user's tokio Semaphore now. Let me think a bit ... |
|
@bonsairobo do you have a public example how a Semaphore would be shared in this case? |
|
@tisonkun I don't |
tisonkun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your feedback. Then I'd block this PR until we find a proper use case. Otherwise it does no benefits to tight our interfaces with a certain sync utils impls.
|
@tisonkun You asked if I have a public example. I don't, but my company has a private repository that would make use of this feature.
That's not true. While I agree that there is tight coupling, that doesn't imply that no one uses this type of semaphore. |
|
@bonsairobo Thanks for your clarification. Could you elaborate a bit how the semaphore share between different resources and how it benefits your use case? |
@tisonkun OpenDAL is not the only code in our system that opens files. Processes have an open file limit, which is especially low on MacOS (256). We use a global semaphore to limit open file handles. It only works if all code shares that semaphore, so we need to create it ourselves and feed it into the concurrent limit layer. |
|
I'm thinking of provide a trait like we do for other layers like |
Signed-off-by: Chojan Shang <psiace@apache.org>
Signed-off-by: Chojan Shang <chojan.shang@vesoft.com>
Signed-off-by: Chojan Shang <chojan.shang@vesoft.com>
| ) -> jlong { | ||
| let op = unsafe { &*op }; | ||
| let concurrent_limit = ConcurrentLimitLayer::new(permits as usize); | ||
| let concurrent_limit = ConcurrentLimitLayer::with_permits(permits as usize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to this change this API.
| /// ConcurrencySemaphore abstracts a semaphore-like concurrency primitive | ||
| /// that yields an owned permit released on drop. It mirrors RetryLayer's | ||
| /// interceptor pattern by serving as a generic extension point for the layer. | ||
| pub trait ConcurrencySemaphore: Send + Sync + Any + Clone + 'static { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about naming it after the layer like ConcurrentLimitSemaphore?
| pub fn new(permits: usize) -> Self { | ||
| /// The provided semaphore will be shared by every operator wrapped by this | ||
| /// layer, giving callers full control over its configuration. | ||
| pub fn new(operation_semaphore: Arc<Semaphore>) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please don't change public API unless we have to.
| /// Create a new `ConcurrentLimitLayer` with the specified number of | ||
| /// permits. | ||
| pub fn with_permits(permits: usize) -> Self { | ||
| Self::new(Arc::new(Semaphore::new(permits))) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove this API.
| /// Provide a custom HTTP concurrency semaphore instance. | ||
| pub fn with_http_concurrency(mut self, semaphore: Arc<S>) -> Self { | ||
| self.http_semaphore = Some(semaphore); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to always follow the same API naming pattern.
Which issue does this PR close?
Closes #6591 . also cc @bonsairobo for review
Rationale for this change
Expose the internal semaphore used by
ConcurrentLimitLayerso external components can coordinate concurrencywith OpenDAL without reinventing their own limiters.
What changes are included in this PR?
ConcurrentLimitLayer::newto take anArc<Semaphore>and added the with_permits helper to preserve the prior ergonomic constructor.with_http_semaphoreplus a documentedwith_http_concurrent_limitconvenience to allow sharing HTTP semaphores as well.Are there any user-facing changes?
Yes. Applications can now reuse ConcurrentLimitLayer’s semaphore directly to align their own concurrency
controls with OpenDAL.