1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
#![allow(clippy::new_without_default)]
#![no_std]
use mpmc_queue::Queue;
use preemption::hold_preemption;
use sync::DeadlockPrevention;
use sync_spin::Spin;
use task::{get_my_current_task, TaskRef};
/// A queue of tasks waiting for an event to occur.
///
/// When tasks are popped off the front of the queue via [`notify_one()`]
/// or [`notify_all()`], they are each unblocked.
///
/// The wait queue uses a mutex internally and hence exposes a deadlock prevention
/// type parameter `P`, which is [`Spin`] by default.
/// This parameter should only be set to another deadlock prevention method
/// if a spin-based mutex could lead to deadlock, e.g., in an interrupt context.
/// You do not need to use the `DisablePreemption` deadlock prevention method
/// here to avoid scheduler race conditions -- that is already accounted for
/// in this wait queue's implementation, even when using the [`Spin`] default.
///
/// [`notify_one()`]: Self::notify_one
/// [`notify_all()`]: Self::notify_all
pub struct WaitQueue<P = Spin>
where
P: DeadlockPrevention,
{
inner: Queue<TaskRef, P>,
}
impl<P> WaitQueue<P>
where
P: DeadlockPrevention,
{
/// Creates a new empty wait queue.
pub const fn new() -> Self {
Self {
inner: Queue::new(),
}
}
/// Blocks the current task until the given condition succeeds.
pub fn wait_until<F, T>(&self, mut condition: F) -> T
where
F: FnMut() -> Option<T>,
{
let task = get_my_current_task().unwrap();
loop {
let wrapped_condition = || {
if let Some(value) = condition() {
Ok(value)
} else {
// Ensure that we don't get preempted after blocking ourselves
// before we get a chance to release the internal lock of the queue.
let preemption_guard = hold_preemption();
task.block().unwrap();
Err(preemption_guard)
}
};
match self.inner.push_if_fail(task.clone(), wrapped_condition) {
Ok(value) => return value,
Err(preemption_guard) => {
drop(preemption_guard);
scheduler::schedule();
}
}
}
}
/// Notifies the first task in the wait queue.
///
/// If it fails to unblock the first task, it will continue unblocking
/// subsequent tasks until a task is successfully unblocked.
pub fn notify_one(&self) -> bool {
loop {
let task = match self.inner.pop() {
Some(task) => task,
None => return false,
};
if task.unblock().is_ok() {
return true;
}
}
}
/// Notifies all the tasks in the wait queue.
pub fn notify_all(&self) {
while self.notify_one() {}
}
}