coro-poll.cpp (3041B)
1 #include "coroutine_owner.hpp" 2 #include "fd.hpp" 3 4 #include <unistd.h> 5 #include <poll.h> 6 #include <sys/timerfd.h> 7 8 #include <algorithm> 9 #include <cstdint> 10 #include <iostream> 11 #include <map> 12 13 static void check_err(char const* msg) 14 { 15 if (errno) 16 throw std::system_error(errno, std::generic_category(), msg); 17 } 18 19 struct coroutine_task 20 { 21 struct promise_type 22 { 23 coroutine_task get_return_object(void) noexcept { return {make_coroutine_owner(*this)}; } 24 std::suspend_never initial_suspend(void) const noexcept { return {}; } 25 std::suspend_always final_suspend() const noexcept { return {}; } 26 void unhandled_exception() const noexcept { std::terminate(); } 27 void return_void(void) const noexcept {} 28 }; 29 30 coroutine_owner<promise_type> c; 31 }; 32 33 struct poll_scheduler 34 { 35 poll_scheduler() = default; 36 poll_scheduler(poll_scheduler const&) = delete; 37 poll_scheduler& operator =(poll_scheduler const&) = delete; 38 39 struct awaitable : std::suspend_always 40 { 41 poll_scheduler& scheduler; 42 int fd; 43 44 void await_suspend(std::coroutine_handle<> h) const 45 { 46 if (auto [it, success] = scheduler.waiting.emplace(fd, h); not success) 47 throw std::runtime_error("File descriptor polled twice"); 48 }; 49 }; 50 51 awaitable ready(int fd) 52 { 53 return awaitable({}, *this, fd); 54 } 55 56 void poll(void) 57 { 58 if (not *this) 59 return; 60 61 pollfd pfds[waiting.size()]; 62 63 std::ranges::transform(waiting, pfds, [](auto const& x) { 64 return pollfd{x.first, POLLIN, 0}; 65 }); 66 67 ::poll(pfds, waiting.size(), -1); 68 check_err("poll"); 69 70 for (auto& [fd, _, revents] : pfds) 71 if (revents & POLLIN) 72 { 73 auto handle = waiting.at(fd); 74 waiting.erase(fd); 75 handle(); 76 } 77 } 78 79 void flush(void) 80 { 81 while (*this) 82 poll(); 83 } 84 85 explicit operator bool() const 86 { 87 return not waiting.empty(); 88 } 89 private: 90 std::map<int, std::coroutine_handle<>> waiting; 91 }; 92 93 /// TASKS 94 95 coroutine_task echo_task(int fd, poll_scheduler& scheduler) 96 { 97 file_descriptor stream{fd}; 98 for (;;) 99 { 100 co_await scheduler.ready(stream); 101 char buf[256]; 102 int n = ::read(stream, buf, sizeof buf); 103 check_err("read"); 104 if (not n) 105 co_return; 106 107 std::cout << "Stream [" << stream << "]: " << std::string_view(buf, n); 108 } 109 } 110 111 coroutine_task timer_task(unsigned interval, poll_scheduler& scheduler) 112 { 113 itimerspec spec{ 114 .it_interval = {interval, 0}, /* Interval for periodic timer */ 115 .it_value = {interval, 0}, /* Initial expiration */ 116 }; 117 file_descriptor timer{::timerfd_create(CLOCK_MONOTONIC, 0)}; 118 check_err("timerfd_create"); 119 ::timerfd_settime(timer, 0, &spec, NULL); 120 check_err("timerfd_settime"); 121 122 uint64_t ticks = 0; 123 124 while (ticks < 16) 125 { 126 co_await scheduler.ready(timer); 127 128 uint64_t new_ticks; 129 ::read(timer, &new_ticks, sizeof new_ticks); 130 check_err("read"); 131 new_ticks *= spec.it_interval.tv_sec; 132 ticks += new_ticks; 133 std::cout << "Timer [" << timer << "]: +" << new_ticks << ' ' << ticks << '\n'; 134 } 135 co_return; 136 } 137 138 int main() 139 { 140 poll_scheduler s; 141 142 auto task1 = echo_task(0, s); 143 auto task2 = timer_task(4, s); 144 auto task3 = timer_task(8, s); 145 146 s.flush(); 147 }