1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
//! A blocking channel for Inter-Task Communication (ITC) with an internal queue for buffering messages.
//! 
//! This crate offers a blocking channel that allows multiple tasks to exchange messages through the
//! use of a bounded-capacity intermediate buffer. Unlike the `rendezvous` channel, the sender and
//! receiver do not need to rendezvous to send or receive data.
//! 
//! Only `Send` types can be sent or received through the channel.
//! 
//! This is not a zero-copy channel; to avoid copying large messages,
//! use a reference type like `Box` or another layer of indirection.

#![no_std]

extern crate alloc;
#[cfg(trace_channel)] #[macro_use] extern crate log;
#[cfg(trace_channel)] #[macro_use] extern crate debugit;
extern crate wait_queue;
extern crate mpmc;
extern crate crossbeam_utils;
extern crate core2;
extern crate sync;
extern crate sync_spin;

use alloc::sync::Arc;
use mpmc::Queue as MpmcQueue;
use wait_queue::WaitQueue;
use crossbeam_utils::atomic::AtomicCell;
use core::sync::atomic::{AtomicUsize, Ordering};
use sync::DeadlockPrevention;
use sync_spin::Spin;

/// Create a new channel that allows senders and receivers to 
/// asynchronously exchange messages via an internal intermediary buffer.
/// 
/// This channel's buffer has a bounded capacity of minimum size 2 messages,
/// and it must be a power of 2 due to the restrictions of the current MPMC queue type that is used. 
/// The given `minimum_capacity` will be rounded up to the next largest power of 2, with a minimum value of 2.
/// 
/// When the number of pending (buffered) messages is larger than the capacity,
/// the channel is considered full.
/// Depending on whether a non-blocking or blocking send function is invoked,
/// future attempts to send another message will either block or return a `Full` error 
/// until the channel's buffer is drained by a receiver and space in the buffer becomes available.
///
/// For the vast majority of use cases, this function is recommended way to create
/// a new channel, because there is no need to specify a deadlock prevention method.
/// To create a channel with different deadlock prevention, see [`new_channel_with()`].
pub fn new_channel<T: Send>(minimum_capacity: usize) -> (Sender<T>, Receiver<T>) {
    new_channel_with(minimum_capacity)
}

/// Creates a new blocking channel with the specified deadlock prevention method.
///
/// See [`new_channel()`] for more details.
///
/// The blocking channel uses a wait queue internally and hence exposes a
/// deadlock prevention type parameter `P` that is [`Spin`] by default.
/// See [`WaitQueue`]'s documentation for more info on setting this type parameter.
pub fn new_channel_with<T: Send, P: DeadlockPrevention>(
    minimum_capacity: usize,
) -> (Sender<T, P>, Receiver<T, P>) {
    let channel = Arc::new(Channel {
        queue: MpmcQueue::with_capacity(minimum_capacity),
        waiting_senders: WaitQueue::new(),
        waiting_receivers: WaitQueue::new(),
        channel_status: AtomicCell::new(ChannelStatus::Connected),
        sender_count: AtomicUsize::new(1),
        receiver_count: AtomicUsize::new(1),
    });
    (
        Sender { channel: channel.clone() },
        Receiver { channel },
    )
}

/// Indicates whether channel is Connected or Disconnected
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum ChannelStatus {
    /// Channel is working. Initially channel is created with Connected status.
    Connected,
    /// Set to Disconnected when Sender end is dropped.
    SenderDisconnected,
    /// Set to Disconnected when Receiver end is dropped.
    ReceiverDisconnected,
}

/// Error type for tracking different type of errors sender and receiver 
/// can encounter.
#[derive(Debug, PartialEq)]
pub enum Error {
    /// Occurs when a "try" operation would need to block to complete.
    ///
    /// I.e. `try_send` is performed on a full channel or `try_receive` is
    /// performed on an empty channel.
    WouldBlock,
    /// Occurs when one end of channel is dropped
    ChannelDisconnected,
}

impl From<Error> for core2::io::Error {
    fn from(e: Error) -> Self {
        match e {
            Error::WouldBlock => core2::io::ErrorKind::WouldBlock,
            Error::ChannelDisconnected => core2::io::ErrorKind::BrokenPipe,
        }
        .into()
    }
}

/// The inner channel for asynchronous communication between `Sender`s and `Receiver`s.
///
/// This struct is effectively a wrapper around an MPMC queue
/// with waitqueues for senders (producers) and receivers (consumers).
/// 
/// This channel object is not Send/Sync or cloneable itself;
/// it can be shared across tasks using an `Arc`.
struct Channel<T: Send, P: DeadlockPrevention = Spin> {
    queue: MpmcQueue<T>,
    waiting_senders: WaitQueue<P>,
    waiting_receivers: WaitQueue<P>,
    channel_status: AtomicCell<ChannelStatus>,
    sender_count: AtomicUsize,
    receiver_count: AtomicUsize,
}

// Ensure that `AtomicCell<ChannelStatus>` is actually a lock-free atomic.
const _: () = assert!(AtomicCell::<ChannelStatus>::is_lock_free());

impl <T: Send, P: DeadlockPrevention> Channel<T, P> {
    /// Returns true if the channel is disconnected.
    #[inline(always)]
    fn is_disconnected(&self) -> bool {
        self.get_channel_status() != ChannelStatus::Connected
    }

    /// Returns the channel's current status.
    #[inline(always)]
    fn get_channel_status(&self) -> ChannelStatus {
        self.channel_status.load()
    }
    
    /// Returns another `Sender` endpoint connected to the given channel.
    ///
    /// This increments the channel's sender count.
    /// If there were previously no senders, the channel status is updated to `Connected`.
    fn add_sender(channel: &Arc<Self>) -> Sender<T, P> {
        if channel.sender_count.fetch_add(1, Ordering::SeqCst) == 0 {
            channel.channel_status.store(ChannelStatus::Connected);
        }
        Sender { channel: channel.clone() }
    }
    
    /// Returns another `Receiver` endpoint connected to the given channel.
    ///
    /// This increments the channel's receiver count.
    /// If there were previously no receivers, the channel status is updated to `Connected`.
    fn add_receiver(channel: &Arc<Self>) -> Receiver<T, P> {
        if channel.receiver_count.fetch_add(1, Ordering::SeqCst) == 0 {
            channel.channel_status.store(ChannelStatus::Connected);
        }
        Receiver { channel: channel.clone() }
    }
}

/// The sender (transmit) side of a channel.
pub struct Sender<T: Send, P: DeadlockPrevention = Spin> {
    channel: Arc<Channel<T, P>>,
}

impl<T:Send, P: DeadlockPrevention> Clone for Sender<T, P> {
    /// Clones this `Sender`, returning another `Sender` connected to the same channel.
    ///
    /// This increments the channel's sender count.
    /// If there were previously no senders, the channel status is updated to `Connected`.
    fn clone(&self) -> Self {
        Channel::add_sender( &self.channel )
    }
}

impl <T: Send, P: DeadlockPrevention> Sender<T, P> {
    /// Send a message, blocking until space in the channel's buffer is available. 
    /// 
    /// Returns `Ok(())` if the message was sent successfully,
    /// otherwise returns an [`Error`]. 
    pub fn send(&self, msg: T) -> Result<(), Error> {
        #[cfg(trace_channel)]
        trace!("sync_channel: sending msg: {:?}", debugit!(msg));
        // Fast path: attempt to send the message, assuming the buffer isn't full
        let msg = match self.try_send(msg) {
            // if successful return ok
            Ok(()) => return Ok(()),
            // if unsunccessful check whether it fails due to any other reason than channel being full
            Err((returned_msg, channel_error)) => {
                if channel_error != Error::WouldBlock {
                    return Err(channel_error);
                }
                returned_msg
            },
        };

        // Slow path: the buffer was full, so now we need to block until space becomes available.
        // The code can move to this point only if fast path failed due to channel being full
        // trace!("waiting for space to send...");

        // Here we use an option to store the un-sent message outside of the `closure`
        // so that we can repeatedly try to re-send it upon the next invocation of the `closure`
        // (which happens when this sender task is notified in the future).
        let mut msg = Some(msg);

        // This closure is invoked from within a locked context, so we cannot just call `try_send()` here
        // because it will notify the receivers which can cause deadlock.
        // Therefore, we need to perform the nofity action outside of this closure after it returns.
        let mut closure = || {
            let owned_msg = msg.take();
            let result = owned_msg.and_then(|m| match self.channel.queue.push(m) {
                Ok(()) => {
                    // trace!("Sending in closure");
                    // We wrap the result in Some() since `wait_until` progresses only when `Some` is returned.
                    Some(Ok(()))
                },
                Err(returned_msg) => {
                    // Here: we (the sender) woke up and failed to send, 
                    // so we save the returned message outside of the closure to retry later. 
                    // trace!("try_send() failed, saving message {:?} for next retry.", debugit!(returned_msg));
                    msg = Some(returned_msg);
                    None
                }
            });

            if self.channel.is_disconnected() {
                // trace!("Receiver Endpoint is dropped");
                // Here the receiver end has dropped. 
                // So we don't wait anymore in the waitqueue
                Some(Err(Error::ChannelDisconnected))
            } else {
                result
            }
            
        };

        // When `wait_until_mut` returns it can be either a successful send marked as  Ok(Ok()), 
        // Error in the condition (channel disconnection) marked as Ok(Err()),
        // or the wait_until runs into error (Err()) 
        let res = self.channel.waiting_senders.wait_until(&mut closure);

        // trace!("... sending space became available.");

        // If we successfully sent a message, we need to notify any waiting receivers.
        // As stated above, to avoid deadlock, this must be done here rather than in the above closure.
        if res.is_ok() {
            // trace!("successful send() is notifying receivers.");
            self.channel.waiting_receivers.notify_one();
        }
        res
    }

    /// Sends a slice of objects through the channel, returning how many objects were sent.
    ///
    /// This method only blocks on the first object being sent.
    pub fn send_buf(&self, buf: &[T]) -> Result<usize, Error>
    where
        T: Copy
    {
        for (idx, item) in buf.iter().enumerate() {
            if idx == 0 {
                self.send(*item)?;
            } else {
                match self.try_send(*item) {
                    Ok(_) => {},
                    Err((_, Error::WouldBlock)) => return Ok(idx),
                    Err((_, e)) => return Err(e),
                }
            }
        }
        Ok(buf.len())
    }

    /// Attempts to send an entire slice of objects through the channel.
    pub fn send_all(&self, buf: &[T]) -> Result<(), Error>
    where
        T: Copy
    {
        for item in buf.iter() {
            self.send(*item)?;
        }
        Ok(())
    }

    /// Tries to send the message, only succeeding if buffer space is available.
    /// 
    /// If no buffer space is available, it returns the `msg`  with `Error` back to the caller without blocking. 
    pub fn try_send(&self, msg: T) -> Result<(), (T, Error)> {
        // first we'll check whether the channel is active
        match self.channel.get_channel_status() {
            ChannelStatus::SenderDisconnected => {
                self.channel.channel_status.store(ChannelStatus::Connected);
            },
            ChannelStatus::ReceiverDisconnected  => {
                return Err((msg, Error::ChannelDisconnected));
            },
            _ => {},
        }

        match self.channel.queue.push(msg) {
            // successfully sent
            Ok(()) => {
                // trace!("successful try_send() is notifying receivers.");
                self.channel.waiting_receivers.notify_one();
                Ok(())
            }
            // queue was full, return message back to caller
            Err(returned_msg) => Err((returned_msg, Error::WouldBlock)),
        }
    }

    /// Sends a slice of objects through the channel, returning how many objects were sent.
    ///
    /// This method does not block.
    pub fn try_send_buf(&self, buf: &[T]) -> Result<usize, Error>
    where
        T: Copy
    {
        for (idx, item) in buf.iter().enumerate() {
            if idx == 0 {
                self.try_send(*item).map_err(|(_, e)| e)?;
            } else {
                match self.try_send(*item) {
                    Ok(_) => {},
                    Err((_, Error::WouldBlock)) => return Ok(idx),
                    Err((_, e)) => return Err(e),
                }
            }
        }
        Ok(buf.len())
    }

    /// Returns true if the channel is disconnected.
    pub fn is_disconnected(&self) -> bool {
        self.channel.is_disconnected()
    }

    /// Obtain a `Receiver` endpoint connected to the same channel as this `Sender`.
    pub fn receiver(&self) -> Receiver<T, P> {
        Channel::add_receiver( &self.channel )
    }
}

/// The receiver side of a channel.
pub struct Receiver<T: Send, P: DeadlockPrevention = Spin> {
    channel: Arc<Channel<T, P>>,
}

impl<T: Send, P: DeadlockPrevention> Clone for Receiver<T, P> {
    /// Clones this `Receiver`, returning another `Receiver` connected to the same channel.
    ///
    /// This increments the channel's receiver count.
    /// If there were previously no receivers, the channel status is updated to `Connected`.
    fn clone(&self) -> Self {
        Channel::add_receiver( &self.channel )
    }
}

impl <T: Send, P: DeadlockPrevention> Receiver<T, P> {
    /// Receive a message, blocking until a message is available in the buffer.
    /// 
    /// Returns the message if it was received properly, otherwise returns an [`Error`].
    pub fn receive(&self) -> Result<T, Error> {
        // trace!("sync_channel: receive() entry");
        // Fast path: attempt to receive a message, assuming the buffer isn't empty
        // The code progresses beyond this match only if try_receive fails due to
        // empty channel
        match self.try_receive() {
            Err(Error::WouldBlock) => {},
            x => {
                #[cfg(trace_channel)]
                trace!("sync_channel: received msg: {:?}", debugit!(x));
                return x;
            }
        };

        // Slow path: the buffer was empty, so we need to block until a message is sent.
        // trace!("waiting to receive a message...");
        
        // This closure is invoked from within a locked context, so we cannot just call `try_receive()` here
        // because it will notify the receivers which can cause deadlock.
        // Therefore, we need to perform the nofity action outside of this closure after it returns
        // Closure would output the message if received or an error if channel is disconnected.
        // It would output `None` if neither happens, resulting in waiting in the queue. 
        let closure = || {
            match self.channel.queue.pop() {
                Some(msg) => Some(Ok(msg)),
                _ => {
                    if self.channel.is_disconnected() {
                        Some(Err(Error::ChannelDisconnected))
                    } else {
                        None
                    }
                },
            }
        };

        // When wait returns it can be either a successful receiver marked as  Ok(Ok(msg)), 
        // Error in wait condition marked as Ok(Err(error)),
        // or the wait_until runs into error (Err()) 
        let res =  self.channel.waiting_receivers.wait_until(&closure);
        // trace!("... received msg.");

        // If we successfully received a message, we need to notify any waiting senders.
        // As stated above, to avoid deadlock, this must be done here rather than in the above closure.
        if let Ok(ref _msg) = res {
            // trace!("sync_channel: successful receive() is notifying senders.");
            self.channel.waiting_senders.notify_one();
        }

        #[cfg(trace_channel)]
        trace!("sync_channel: received msg: {:?}", debugit!(res));
        
        res
    }

    /// Receives objects placing them in a buffer and returning the number of objects received.
    ///
    /// This method only blocks on the first object being received.
    pub fn receive_buf(&self, buf: &mut [T]) -> Result<usize, Error> {
        if buf.is_empty() {
            return Ok(0);
        }

        let mut byte = self.receive()?;
        let mut read = 0;

        loop {
            buf[read] = byte;
            read += 1;

            if read == buf.len() {
                return Ok(read);
            }

            byte = match self.try_receive() {
                Ok(b) => b,
                Err(Error::WouldBlock) => return Ok(read),
                Err(e) => return Err(e),
            };
        }
    }

    /// Tries to receive a message, only succeeding if a message is already available in the buffer.
    /// 
    /// If receive succeeds returns `Some(Ok(T))`. 
    /// If an endpoint is disconnected returns `Some(Err(ChannelStatus::Disconnected))`. 
    /// If no such message exists, it returns `None` without blocking
    pub fn try_receive(&self) -> Result<T, Error> {
        if let Some(msg) = self.channel.queue.pop() {
            // trace!("successful try_receive() is notifying senders.");
            self.channel.waiting_senders.notify_one();
            Ok(msg)
        } else {
            // We check whether the channel is disconnected
            match self.channel.get_channel_status() {
                ChannelStatus::ReceiverDisconnected => {
                    self.channel.channel_status.store(ChannelStatus::Connected);
                    Err(Error::WouldBlock)
                },
                ChannelStatus::SenderDisconnected  => {
                    Err(Error::ChannelDisconnected)
                },
                _ => {
                    Err(Error::WouldBlock)
                },
            }
        }
    }

    /// Receives objects placing them in a buffer and returning the number of objects received.
    ///
    /// This method does not block.
    pub fn try_receive_buf(&self, buf: &mut [T]) -> Result<usize, Error> {
        for (idx, item) in buf.iter_mut().enumerate() {
            *item = match self.try_receive() {
                Ok(byte) => byte,
                Err(Error::WouldBlock) => return Ok(idx + 1),
                Err(e) => return Err(e),
            };
        }
        Ok(buf.len())
    }

    /// Returns true if the channel is disconnected.
    pub fn is_disconnected(&self) -> bool {
        self.channel.is_disconnected()
    }

    /// Obtain a `Sender` endpoint connected to the same channel as this `Receiver`.
    pub fn sender(&self) -> Sender<T, P> {
        Channel::add_sender( &self.channel )
    }
}


/// When the only remaining `Receiver` is dropped, we mark the channel as disconnected
/// and notify all of the `Senders`
impl<T: Send, P: DeadlockPrevention> Drop for Receiver<T, P> {
    fn drop(&mut self) {
        // trace!("Dropping a receiver");
        if self.channel.receiver_count.fetch_sub(1, Ordering::SeqCst) == 1 {
            self.channel.channel_status.store(ChannelStatus::ReceiverDisconnected);
            self.channel.waiting_senders.notify_all();
        }
    }
}

/// When the only remaining `Sender` is dropped, we mark the channel as disconnected
/// and notify all of the `Receivers`
impl<T: Send, P: DeadlockPrevention> Drop for Sender<T, P> {
    fn drop(&mut self) {
        // trace!("Dropping a sender");
        if self.channel.sender_count.fetch_sub(1, Ordering::SeqCst) == 1 {
            self.channel.channel_status.store(ChannelStatus::SenderDisconnected);
            self.channel.waiting_receivers.notify_all();
        }
    }
}