commit c7505e33169e91a9749d182c0a808f6178d83569
parent 22802b32fb1c96c3c785b5c43eb3fed162bdff2d
Author: Henry Wilson <henry@henryandlizzy.uk>
Date: Wed, 4 Dec 2024 23:10:13 +0000
io_uring: Handle cancellations which do not fail
Diffstat:
1 file changed, 22 insertions(+), 15 deletions(-)
diff --git a/src/io_uring.cpp b/src/io_uring.cpp
@@ -64,6 +64,7 @@ public:
uring(const uring&) = delete;
uring& operator =(const uring&) = delete;
+ uint64_t wait_and_complete(void);
bool process_completion(void);
awaitable read(int, std::span<char>, off_t);
awaitable write(int, std::span<char const>, off_t);
@@ -83,6 +84,7 @@ private:
if (n < 0)
throw std::system_error(-n, std::system_category());
assert(n == 1);
+ assert(not inflight.contains(last_id));
inflight[last_id] = std::move(r);
}
@@ -128,7 +130,8 @@ public:
assert(false);
break;
case -EALREADY: // not sure what to do here, can't cancel, can't finish destructor until complete?
- std::println("cancel: EALREADY");
+ while (ring.wait_and_complete() != id)
+ {}
break;
default:
std::println("cancel: {}", result);
@@ -170,29 +173,36 @@ private:
};
-bool uring::process_completion(void)
+uint64_t uring::wait_and_complete(void)
{
- if (inflight.empty())
- return false;
+ assert(not inflight.empty());
io_uring_cqe* cqe;
if (io_uring_wait_cqe(&ring, &cqe))
throw std::system_error(errno, std::system_category());
- //std::println("Job {} completed", io_uring_cqe_get_data64(cqe));
- auto it = inflight.find(io_uring_cqe_get_data64(cqe));
+ uint64_t id = io_uring_cqe_get_data64(cqe);
+ auto it = inflight.find(id);
assert(it != inflight.end());
- auto& s = *it->second;
+ auto s = std::move(it->second);
+ inflight.erase(it);
- assert(not s.result);
- s.result = cqe->res;
+ assert(not s->result);
+ s->result = cqe->res;
io_uring_cq_advance(&ring, 1);
- if (s.continuation)
- s.continuation();
+ if (s->continuation)
+ s->continuation();
- inflight.erase(it);
+ return id;
+}
+
+bool uring::process_completion(void)
+{
+ if (inflight.empty())
+ return false;
+ wait_and_complete();
return true;
}
@@ -262,7 +272,6 @@ task timer_routine(uring& u)
i += timeouts;
auto x = std::format("Timeout {}\n", i);
- //std::println("{}", x);
int n = co_await u.write(1, std::span<char>{x.data(), x.size()});
if (n <= 0)
throw std::system_error(-n, std::system_category());
@@ -275,8 +284,6 @@ int main()
task rr = read_routine(r);
task tr = timer_routine(r);
- //task tr2 = timer_routine(r);
- //task tr3 = timer_routine(r);
while (not rr.done())
r.process_completion();