examples

Toy examples in single C files.
git clone git://henryandlizzy.uk/examples
Log | Files | Refs

coro-poll.cpp (3380B)


      1 #include "coroutine_owner.hpp"
      2 
      3 #include <unistd.h>
      4 #include <poll.h>
      5 #include <sys/timerfd.h>
      6 
      7 #include <algorithm>
      8 #include <cstdint>
      9 #include <iostream>
     10 #include <map>
     11 
     12 static void check_err(char const* msg)
     13 {
     14 	if (errno)
     15 		throw std::system_error(errno, std::generic_category(), msg);
     16 }
     17 
     18 struct file_descriptor
     19 {
     20 	explicit file_descriptor(int f) noexcept
     21 	:	fd{f}
     22 	{}
     23 	~file_descriptor()
     24 	{
     25 		std::cout << "close fd[" << fd << "]\n";
     26 		::close(fd);
     27 		check_err("close");
     28 	}
     29 
     30 	file_descriptor(file_descriptor const&) = delete;
     31 	file_descriptor& operator =(file_descriptor const&) = delete;
     32 
     33 	operator int() noexcept { return fd; };
     34 
     35 	int fd;
     36 };
     37 
     38 struct coroutine_task
     39 {
     40 	struct promise_type
     41 	{
     42 		coroutine_task get_return_object(void) noexcept { return {make_coroutine_owner(*this)}; }
     43 		std::suspend_never initial_suspend(void) const noexcept { return {}; }
     44 		std::suspend_always final_suspend() const noexcept { return {}; }
     45 		void unhandled_exception() const noexcept { std::terminate(); }
     46 		void return_void(void) const noexcept {}
     47 	};
     48 
     49 	coroutine_owner<promise_type> c;
     50 };
     51 
     52 struct poll_scheduler
     53 {
     54 	poll_scheduler() = default;
     55 	poll_scheduler(poll_scheduler const&) = delete;
     56 	poll_scheduler& operator =(poll_scheduler const&) = delete;
     57 
     58 	struct awaitable : std::suspend_always
     59 	{
     60 		poll_scheduler& scheduler;
     61 		int fd;
     62 
     63 		void await_suspend(std::coroutine_handle<> h) const
     64 		{
     65 			if (auto [it, success] = scheduler.waiting.emplace(fd, h); not success)
     66 				throw std::runtime_error("File descriptor polled twice");
     67 		};
     68 	};
     69 
     70 	awaitable ready(int fd)
     71 	{
     72 		return awaitable({}, *this, fd);
     73 	}
     74 
     75 	void poll(void)
     76 	{
     77 		if (not *this)
     78 			return;
     79 
     80 		pollfd pfds[waiting.size()];
     81 
     82 		std::ranges::transform(waiting, pfds, [](auto const& x) {
     83 			return pollfd{x.first, POLLIN, 0};
     84 		});
     85 
     86 		::poll(pfds, waiting.size(), -1);
     87 		check_err("poll");
     88 
     89 		for (auto& [fd, _, revents] : pfds)
     90 			if (revents & POLLIN)
     91 			{
     92 				auto handle = waiting.at(fd);
     93 				waiting.erase(fd);
     94 				handle();
     95 			}
     96 	}
     97 
     98 	void flush(void)
     99 	{
    100 		while (*this)
    101 			poll();
    102 	}
    103 
    104 	explicit operator bool() const
    105 	{
    106 		return not waiting.empty();
    107 	}
    108 private:
    109 	std::map<int, std::coroutine_handle<>> waiting;
    110 };
    111 
    112 /// TASKS
    113 
    114 coroutine_task echo_task(int fd, poll_scheduler& scheduler)
    115 {
    116 	file_descriptor stream{fd};
    117 	for (;;)
    118 	{
    119 		co_await scheduler.ready(stream);
    120 		char buf[256];
    121 		int n = ::read(stream, buf, sizeof buf);
    122 		check_err("read");
    123 		if (not n)
    124 			co_return;
    125 
    126 		std::cout << "Stream [" << stream << "]: " << std::string_view(buf, n);
    127 	}
    128 }
    129 
    130 coroutine_task timer_task(unsigned interval, poll_scheduler& scheduler)
    131 {
    132 	itimerspec spec{
    133 		.it_interval = {interval, 0},  /* Interval for periodic timer */
    134 		.it_value = {interval, 0},     /* Initial expiration */
    135 	};
    136 	file_descriptor timer{::timerfd_create(CLOCK_MONOTONIC, 0)};
    137 	check_err("timerfd_create");
    138 	::timerfd_settime(timer, 0, &spec, NULL);
    139 	check_err("timerfd_settime");
    140 
    141 	uint64_t ticks = 0;
    142 
    143 	while (ticks < 16)
    144 	{
    145 		co_await scheduler.ready(timer);
    146 
    147 		uint64_t new_ticks;
    148 		::read(timer, &new_ticks, sizeof new_ticks);
    149 		check_err("read");
    150 		new_ticks *= spec.it_interval.tv_sec;
    151 		ticks += new_ticks;
    152 		std::cout << "Timer  [" << timer << "]: +" << new_ticks << ' ' << ticks << '\n';
    153 	}
    154 	co_return;
    155 }
    156 
    157 int main()
    158 {
    159 	poll_scheduler s;
    160 
    161 	auto task1 = echo_task(0, s);
    162 	auto task2 = timer_task(4, s);
    163 	auto task3 = timer_task(8, s);
    164 
    165 	s.flush();
    166 }