commit b4c5eed237ea98f08093090a8fb1d7e8a4f3db0e
parent b2ce67c0b5d5c2e1d336858b6c18cdffa746045b
Author: Henry Wilson <henry@henryandlizzy.uk>
Date: Mon, 5 Dec 2022 22:15:16 +0000
coro-poll: Add coroutine scheduling with poll(2)
Diffstat:
2 files changed, 167 insertions(+), 0 deletions(-)
diff --git a/.gitignore b/.gitignore
@@ -11,6 +11,7 @@
/cie-1931
/clock-test
/cobs
+/coro-poll
/coro-round-robin
/coro-timer-dispatch
/coro-unconditional-dispatch
diff --git a/src/coro-poll.cpp b/src/coro-poll.cpp
@@ -0,0 +1,166 @@
+#include <unistd.h>
+#include <poll.h>
+#include <sys/timerfd.h>
+
+#include <coroutine>
+#include <iostream>
+#include <map>
+
+static void check_err(char const* msg)
+{
+ if (errno)
+ throw std::system_error(errno, std::generic_category(), msg);
+}
+
+struct file_descriptor
+{
+ explicit file_descriptor(int f) noexcept
+ : fd{f}
+ {}
+ ~file_descriptor()
+ {
+ std::cout << "close fd[" << fd << "]\n";
+ ::close(fd);
+ check_err("close");
+ }
+
+ file_descriptor(file_descriptor const&) = delete;
+ file_descriptor& operator =(file_descriptor const&) = delete;
+
+ operator int() noexcept { return fd; };
+
+ int fd;
+};
+
+struct coroutine_task
+{
+ struct promise_type
+ {
+ void get_return_object(void) const noexcept {}
+ std::suspend_never initial_suspend(void) const noexcept { return {}; }
+ std::suspend_never final_suspend() const noexcept { return {}; }
+ void unhandled_exception() const noexcept {}
+ void return_void(void) const noexcept {}
+ };
+};
+
+struct poll_scheduler
+{
+ poll_scheduler() = default;
+ poll_scheduler(poll_scheduler const&) = delete;
+ poll_scheduler& operator =(poll_scheduler const&) = delete;
+
+ ~poll_scheduler()
+ {
+ for (auto& [_, h] : waiting)
+ h.destroy();
+ }
+
+ struct awaitable : std::suspend_always
+ {
+ poll_scheduler& scheduler;
+ int fd;
+
+ void await_suspend(std::coroutine_handle<> h) const
+ {
+ if (auto [it, success] = scheduler.waiting.emplace(fd, h); not success)
+ throw std::runtime_error("File descriptor polled twice");
+ };
+ };
+
+ awaitable ready(int fd)
+ {
+ return awaitable({}, *this, fd);
+ }
+
+ void poll(void)
+ {
+ if (not *this)
+ return;
+
+ pollfd pfds[waiting.size()], *p = pfds;
+
+ for (auto const& [fd, _] : waiting)
+ *p++ = {fd, POLLIN, 0};
+
+ ::poll(pfds, waiting.size(), -1);
+ check_err("poll");
+
+ for (auto& [fd, _, revents] : pfds)
+ if (revents & POLLIN)
+ {
+ auto handle = waiting.at(fd);
+ waiting.erase(fd);
+ handle();
+ }
+ }
+
+ void flush(void)
+ {
+ while (*this)
+ poll();
+ }
+
+ explicit operator bool() const
+ {
+ return not waiting.empty();
+ }
+private:
+ std::map<int, std::coroutine_handle<>> waiting;
+};
+
+/// TASKS
+
+coroutine_task echo_task(int fd, poll_scheduler& scheduler)
+{
+ file_descriptor stream{fd};
+ for (;;)
+ {
+ co_await scheduler.ready(stream);
+ char buf[256];
+ int n = ::read(stream, buf, sizeof buf);
+ check_err("read");
+ if (not n)
+ co_return;
+
+ std::cout << "Stream [" << stream << "]: " << std::string_view(buf, n);
+ }
+}
+
+coroutine_task timer_task(unsigned interval, poll_scheduler& scheduler)
+{
+ itimerspec spec{
+ .it_interval = {interval, 0}, /* Interval for periodic timer */
+ .it_value = {interval, 0}, /* Initial expiration */
+ };
+ file_descriptor timer{::timerfd_create(CLOCK_MONOTONIC, 0)};
+ check_err("timerfd_create");
+ ::timerfd_settime(timer, 0, &spec, NULL);
+ check_err("timerfd_settime");
+
+ uint64_t ticks = 0;
+
+ while (ticks < 16)
+ {
+ co_await scheduler.ready(timer);
+
+ uint64_t new_ticks;
+ ::read(timer, &new_ticks, sizeof new_ticks);
+ check_err("read");
+ new_ticks *= spec.it_interval.tv_sec;
+ ticks += new_ticks;
+ std::cout << "Timer [" << timer << "]: +" << new_ticks << ' ' << ticks << '\n';
+ }
+ co_return;
+}
+
+int main()
+{
+ poll_scheduler s;
+
+ echo_task(0, s);
+ timer_task(4, s);
+ timer_task(8, s);
+
+ s.flush();
+}