1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
// So the goal of the mailbox is basically a Request/Response server,
// with some additional messages sent unsolicited
//
// In an ideal form, it looks like this:
//
// 1. The userspace submits a message to be sent
// 2. Once there is room in the ring, it is serialized
// 3. The userspace waits on the response to come back
// 4. The response is deserialized, and the caller is given the response
// 5. The user reacts to the response
//
// So, we have a finite amount of resources, and there will need to be
// SOME kind of backpressure mechanism somewhere.
//
// This could be:
//
// ## Submission backpressure
//
// * The mailbox gives back a future when the user asks to submit a message
// * The mailbox readies the future when it has room in the "response map"
//   AND there is room in the ring to serialize the message
//     * TODO: How to "wake" the pending slots? Do we do a "jailbreak"
//       wake all? Or just wake the next N items based on available slots?
// * The mailbox exchanges the "send" future with a "receive" future
// * Once the response comes in, the task/future is retrieved from the
//     "response map", and awoken
// * The task "picks up" its message, and frees the space in the response map
//
// Downsides:
//
// A lot of small, slow responses could cause large and/or fast responses to be
// blocked on a pending response slot. Ideally, you could spam messages into
// the outgoing queue immediately (allowing them to be processed), but you'd need
// SOME way to process the response messages, and if we get back a response before
// the request has made it into the "response map", it'll be a problem.

use core::{
    cell::UnsafeCell,
    mem::MaybeUninit,
    sync::atomic::{AtomicBool, AtomicU32, Ordering},
};

use abi::{
    bbqueue_ipc::framed::{FrameConsumer, FrameProducer},
    syscall::{
        KernelMsg, KernelResponse, KernelResponseBody, UserRequest, UserRequestBody,
        UserRequestHeader,
    },
};
use futures_util::pin_mut;
use maitake::sync::{
    wait_map::{self, WaitMap},
    WaitQueue,
};

pub static MAILBOX: MailBox = MailBox::new();

// TODO: There's a bit of mutexing going on here. `send_wait` and `recv_wait` BOTH have
pub struct MailBox {
    nonce: AtomicU32,
    inhibit_send: AtomicBool,
    send_wait: WaitQueue,
    recv_wait: WaitMap<u32, KernelResponseBody>,
    rings: OnceRings,
}

impl MailBox {
    pub const fn new() -> Self {
        Self {
            nonce: AtomicU32::new(0),
            inhibit_send: AtomicBool::new(false),
            send_wait: WaitQueue::new(),
            recv_wait: WaitMap::new(),
            rings: OnceRings::new(),
        }
    }

    pub fn set_rings(&self, rings: Rings) {
        self.rings.set(rings);
    }

    pub fn poll(&self) {
        let rings = self.rings.get();

        while let Some(msg) = rings.k2u.read() {
            match postcard::from_bytes::<KernelMsg>(&msg) {
                Ok(KernelMsg::Response(KernelResponse { header, body })) => {
                    // Attempt to wake a relevant waiting task, OR drop the response
                    self.recv_wait.wake(&header.nonce, body);
                }
                Ok(_) => todo!(),
                Err(_) => {
                    // todo: print something? Relax this panic later with a graceful
                    // warning
                    panic!("Decoded bad message from kernel?");
                }
            }

            msg.release();
        }

        if self.inhibit_send.load(Ordering::Acquire) && rings.u2k.grant(128).is_ok() {
            self.inhibit_send.store(false, Ordering::Release);
            self.send_wait.wake_all();
        }
    }

    async fn send_inner(&'static self, nonce: u32, msg: UserRequestBody) -> Result<(), ()> {
        let rings = self.rings.get();
        let outgoing = UserRequest {
            header: UserRequestHeader { nonce },
            body: msg,
        };

        // Wait for a successful send
        loop {
            if !self.inhibit_send.load(Ordering::Acquire) {
                // TODO: Max Size
                if let Ok(mut wgr) = rings.u2k.grant(128) {
                    let used = postcard::to_slice(&outgoing, &mut wgr).map_err(drop)?.len();
                    wgr.commit(used);
                    break;
                } else {
                    // Inhibit further sending until there is room, in order to prevent
                    // starving waiters
                    self.inhibit_send.store(true, Ordering::Release);
                }
            }
            self.send_wait.wait().await.map_err(drop)?;
        }

        Ok(())
    }

    /// Send a message to the kernel without waiting for a response
    pub async fn send(&'static self, msg: UserRequestBody) -> Result<(), ()> {
        let nonce = self.nonce.fetch_add(1, Ordering::AcqRel);
        self.send_inner(nonce, msg).await
    }

    /// Send a message to the kernel, waiting for a response
    pub async fn request(&'static self, msg: UserRequestBody) -> Result<KernelResponseBody, ()> {
        let nonce = self.nonce.fetch_add(1, Ordering::AcqRel);

        // Start listening for the response BEFORE we send the request
        let rx: wait_map::Wait<u32, KernelResponseBody> = MAILBOX.recv_wait.wait(nonce);
        pin_mut!(rx);
        rx.as_mut().enqueue().await.map_err(drop)?;
        self.send_inner(nonce, msg).await?;

        rx.await.map_err(drop)
    }
}

unsafe impl Sync for OnceRings {}

struct OnceRings {
    set: AtomicBool,
    queues: UnsafeCell<MaybeUninit<Rings>>,
}

impl OnceRings {
    const fn new() -> Self {
        Self {
            set: AtomicBool::new(false),
            queues: UnsafeCell::new(MaybeUninit::uninit()),
        }
    }

    fn set(&self, rings: Rings) {
        unsafe {
            self.queues.get().cast::<Rings>().write(rings);
            let old = self.set.swap(true, Ordering::SeqCst);
            assert!(!old);
        }
    }

    fn get(&self) -> &Rings {
        assert!(self.set.load(Ordering::Relaxed));
        unsafe { &*self.queues.get().cast::<Rings>() }
    }
}

pub struct Rings {
    pub u2k: FrameProducer<'static>,
    pub k2u: FrameConsumer<'static>,
}

// impl Ma