Skip to content

code algo

filipdulic edited this page Dec 14, 2020 · 13 revisions
code algoritham
    pub fn broadcast(&self, object: T) -> Result<(), SendError<T>> {
        if self.sub_count.get() == 0 {
            return Err(SendError(object));
        }
        self.buffer[self.wi.get() % self.size].store(object);
        self.wi.inc();
        Ok(())
    }
    pub fn try_recv(&self, ri: &AtomicCounter, skip_items: usize) -> Result<Arc<T>, TryRecvError> {
        if ri.get() == self.wi.get() {
            if self.is_available() {
                return Err(TryRecvError::Empty);
            } else {
                return Err(TryRecvError::Disconnected);
            }
        }

        // Reader has not read enough to keep up with (writer - buffer size) so
        // set the reader pointer to be (writer - buffer size)
        loop {
            let local_ri = ri.get();

            let val = self.buffer[local_ri % self.size].load();
            if self.wi.get().wrapping_sub(local_ri) >= self.size {
                ri.set(
                    self.wi
                        .get()
                        .wrapping_sub(self.size)
                        .wrapping_add(1 + skip_items),
                );
            } else {
                ri.inc();
                // NOTE: unwrap is safe to use, because the reader would never read a slot that
                // hasn't been written to.
                return Ok(val.unwrap());
            }
        }
    }

Clone this wiki locally