coro-poll.cpp (3380B)
1 #include "coroutine_owner.hpp" 2 3 #include <unistd.h> 4 #include <poll.h> 5 #include <sys/timerfd.h> 6 7 #include <algorithm> 8 #include <cstdint> 9 #include <iostream> 10 #include <map> 11 12 static void check_err(char const* msg) 13 { 14 if (errno) 15 throw std::system_error(errno, std::generic_category(), msg); 16 } 17 18 struct file_descriptor 19 { 20 explicit file_descriptor(int f) noexcept 21 : fd{f} 22 {} 23 ~file_descriptor() 24 { 25 std::cout << "close fd[" << fd << "]\n"; 26 ::close(fd); 27 check_err("close"); 28 } 29 30 file_descriptor(file_descriptor const&) = delete; 31 file_descriptor& operator =(file_descriptor const&) = delete; 32 33 operator int() noexcept { return fd; }; 34 35 int fd; 36 }; 37 38 struct coroutine_task 39 { 40 struct promise_type 41 { 42 coroutine_task get_return_object(void) noexcept { return {make_coroutine_owner(*this)}; } 43 std::suspend_never initial_suspend(void) const noexcept { return {}; } 44 std::suspend_always final_suspend() const noexcept { return {}; } 45 void unhandled_exception() const noexcept { std::terminate(); } 46 void return_void(void) const noexcept {} 47 }; 48 49 coroutine_owner<promise_type> c; 50 }; 51 52 struct poll_scheduler 53 { 54 poll_scheduler() = default; 55 poll_scheduler(poll_scheduler const&) = delete; 56 poll_scheduler& operator =(poll_scheduler const&) = delete; 57 58 struct awaitable : std::suspend_always 59 { 60 poll_scheduler& scheduler; 61 int fd; 62 63 void await_suspend(std::coroutine_handle<> h) const 64 { 65 if (auto [it, success] = scheduler.waiting.emplace(fd, h); not success) 66 throw std::runtime_error("File descriptor polled twice"); 67 }; 68 }; 69 70 awaitable ready(int fd) 71 { 72 return awaitable({}, *this, fd); 73 } 74 75 void poll(void) 76 { 77 if (not *this) 78 return; 79 80 pollfd pfds[waiting.size()]; 81 82 std::ranges::transform(waiting, pfds, [](auto const& x) { 83 return pollfd{x.first, POLLIN, 0}; 84 }); 85 86 ::poll(pfds, waiting.size(), -1); 87 check_err("poll"); 88 89 for (auto& [fd, _, revents] : pfds) 90 if (revents & POLLIN) 91 { 92 auto handle = waiting.at(fd); 93 waiting.erase(fd); 94 handle(); 95 } 96 } 97 98 void flush(void) 99 { 100 while (*this) 101 poll(); 102 } 103 104 explicit operator bool() const 105 { 106 return not waiting.empty(); 107 } 108 private: 109 std::map<int, std::coroutine_handle<>> waiting; 110 }; 111 112 /// TASKS 113 114 coroutine_task echo_task(int fd, poll_scheduler& scheduler) 115 { 116 file_descriptor stream{fd}; 117 for (;;) 118 { 119 co_await scheduler.ready(stream); 120 char buf[256]; 121 int n = ::read(stream, buf, sizeof buf); 122 check_err("read"); 123 if (not n) 124 co_return; 125 126 std::cout << "Stream [" << stream << "]: " << std::string_view(buf, n); 127 } 128 } 129 130 coroutine_task timer_task(unsigned interval, poll_scheduler& scheduler) 131 { 132 itimerspec spec{ 133 .it_interval = {interval, 0}, /* Interval for periodic timer */ 134 .it_value = {interval, 0}, /* Initial expiration */ 135 }; 136 file_descriptor timer{::timerfd_create(CLOCK_MONOTONIC, 0)}; 137 check_err("timerfd_create"); 138 ::timerfd_settime(timer, 0, &spec, NULL); 139 check_err("timerfd_settime"); 140 141 uint64_t ticks = 0; 142 143 while (ticks < 16) 144 { 145 co_await scheduler.ready(timer); 146 147 uint64_t new_ticks; 148 ::read(timer, &new_ticks, sizeof new_ticks); 149 check_err("read"); 150 new_ticks *= spec.it_interval.tv_sec; 151 ticks += new_ticks; 152 std::cout << "Timer [" << timer << "]: +" << new_ticks << ' ' << ticks << '\n'; 153 } 154 co_return; 155 } 156 157 int main() 158 { 159 poll_scheduler s; 160 161 auto task1 = echo_task(0, s); 162 auto task2 = timer_task(4, s); 163 auto task3 = timer_task(8, s); 164 165 s.flush(); 166 }