Line data Source code
1 : //
2 : // Copyright (c) 2026 Michael Vandeberg
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_WHEN_ANY_HPP
11 : #define BOOST_CAPY_WHEN_ANY_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/concept/executor.hpp>
15 : #include <boost/capy/io_awaitable.hpp>
16 : #include <boost/capy/coro.hpp>
17 : #include <boost/capy/ex/executor_ref.hpp>
18 : #include <boost/capy/ex/frame_allocator.hpp>
19 : #include <boost/capy/task.hpp>
20 :
21 : #include <array>
22 : #include <atomic>
23 : #include <exception>
24 : #include <optional>
25 : #include <stdexcept>
26 : #include <stop_token>
27 : #include <tuple>
28 : #include <type_traits>
29 : #include <utility>
30 : #include <variant>
31 : #include <vector>
32 :
33 : /*
34 : * when_any - Race multiple tasks, return first completion
35 : * ========================================================
36 : *
37 : * OVERVIEW:
38 : * ---------
39 : * when_any launches N tasks concurrently and completes when the FIRST task
40 : * finishes (success or failure). It then requests stop for all siblings and
41 : * waits for them to acknowledge before returning.
42 : *
43 : * ARCHITECTURE:
44 : * -------------
45 : * The design mirrors when_all but with inverted completion semantics:
46 : *
47 : * when_all: complete when remaining_count reaches 0 (all done)
48 : * when_any: complete when has_winner becomes true (first done)
49 : * BUT still wait for remaining_count to reach 0 for cleanup
50 : *
51 : * Key components:
52 : * - when_any_state: Shared state tracking winner and completion
53 : * - when_any_runner: Wrapper coroutine for each child task
54 : * - when_any_launcher: Awaitable that starts all runners concurrently
55 : *
56 : * CRITICAL INVARIANTS:
57 : * --------------------
58 : * 1. Exactly one task becomes the winner (via atomic compare_exchange)
59 : * 2. All tasks must complete before parent resumes (cleanup safety)
60 : * 3. Stop is requested immediately when winner is determined
61 : * 4. Only the winner's result/exception is stored
62 : *
63 : * TYPE DEDUPLICATION:
64 : * -------------------
65 : * std::variant requires unique alternative types. Since when_any can race
66 : * tasks with identical return types (e.g., three task<int>), we must
67 : * deduplicate types before constructing the variant.
68 : *
69 : * Example: when_any(task<int>, task<string>, task<int>)
70 : * - Raw types after void->monostate: int, string, int
71 : * - Deduplicated variant: std::variant<int, string>
72 : * - Return: pair<size_t, variant<int, string>>
73 : *
74 : * The winner_index tells you which task won (0, 1, or 2), while the variant
75 : * holds the result. Use the index to determine how to interpret the variant.
76 : *
77 : * VOID HANDLING:
78 : * --------------
79 : * void tasks contribute std::monostate to the variant (then deduplicated).
80 : * All-void tasks result in: pair<size_t, variant<monostate>>
81 : *
82 : * MEMORY MODEL:
83 : * -------------
84 : * Synchronization chain from winner's write to parent's read:
85 : *
86 : * 1. Winner thread writes result_/winner_exception_ (non-atomic)
87 : * 2. Winner thread calls signal_completion() → fetch_sub(acq_rel) on remaining_count_
88 : * 3. Last task thread (may be winner or non-winner) calls signal_completion()
89 : * → fetch_sub(acq_rel) on remaining_count_, observing count becomes 0
90 : * 4. Last task returns caller_ex_.dispatch(continuation_) via symmetric transfer
91 : * 5. Parent coroutine resumes and reads result_/winner_exception_
92 : *
93 : * Synchronization analysis:
94 : * - All fetch_sub operations on remaining_count_ form a release sequence
95 : * - Winner's fetch_sub releases; subsequent fetch_sub operations participate
96 : * in the modification order of remaining_count_
97 : * - Last task's fetch_sub(acq_rel) synchronizes-with prior releases in the
98 : * modification order, establishing happens-before from winner's writes
99 : * - Executor dispatch() is expected to provide queue-based synchronization
100 : * (release-on-post, acquire-on-execute) completing the chain to parent
101 : * - Even inline executors work (same thread = sequenced-before)
102 : *
103 : * Alternative considered: Adding winner_ready_ atomic (set with release after
104 : * storing winner data, acquired before reading) would make synchronization
105 : * self-contained and not rely on executor implementation details. Current
106 : * approach is correct but requires careful reasoning about release sequences
107 : * and executor behavior.
108 : *
109 : * EXCEPTION SEMANTICS:
110 : * --------------------
111 : * Unlike when_all (which captures first exception, discards others), when_any
112 : * treats exceptions as valid completions. If the winning task threw, that
113 : * exception is rethrown. Exceptions from non-winners are silently discarded.
114 : */
115 :
116 : namespace boost {
117 : namespace capy {
118 :
119 : namespace detail {
120 :
121 : /** Convert void to monostate for variant storage.
122 :
123 : std::variant<void, ...> is ill-formed, so void tasks contribute
124 : std::monostate to the result variant instead. Non-void types
125 : pass through unchanged.
126 :
127 : @tparam T The type to potentially convert (void becomes monostate).
128 : */
129 : template<typename T>
130 : using void_to_monostate_t = std::conditional_t<std::is_void_v<T>, std::monostate, T>;
131 :
132 : /** Type deduplication for variant construction.
133 :
134 : std::variant requires unique alternative types. These metafunctions
135 : deduplicate a type list while preserving order of first occurrence.
136 :
137 : @par Algorithm
138 : Fold left over the type list, appending each type to the accumulator
139 : only if not already present. O(N^2) in number of types but N is
140 : typically small (number of when_any arguments).
141 : */
142 : /** Primary template for appending a type to a variant if not already present.
143 :
144 : @tparam Variant The accumulated variant type.
145 : @tparam T The type to potentially append.
146 : */
147 : template<typename Variant, typename T>
148 : struct variant_append_if_unique;
149 :
150 : /** Specialization that checks for type uniqueness and appends if needed.
151 :
152 : @tparam Vs Types already in the variant.
153 : @tparam T The type to potentially append.
154 : */
155 : template<typename... Vs, typename T>
156 : struct variant_append_if_unique<std::variant<Vs...>, T>
157 : {
158 : /** Result type: original variant if T is duplicate, extended variant otherwise. */
159 : using type = std::conditional_t<
160 : (std::is_same_v<T, Vs> || ...),
161 : std::variant<Vs...>,
162 : std::variant<Vs..., T>>;
163 : };
164 :
165 : /** Primary template for type list deduplication.
166 :
167 : @tparam Accumulated The variant accumulating unique types.
168 : @tparam Remaining Types still to be processed.
169 : */
170 : template<typename Accumulated, typename... Remaining>
171 : struct deduplicate_impl;
172 :
173 : /** Base case: no more types to process.
174 :
175 : @tparam Accumulated The final deduplicated variant type.
176 : */
177 : template<typename Accumulated>
178 : struct deduplicate_impl<Accumulated>
179 : {
180 : /** The final deduplicated variant type. */
181 : using type = Accumulated;
182 : };
183 :
184 : /** Recursive case: add T if unique, then process rest.
185 :
186 : @tparam Accumulated The variant accumulated so far.
187 : @tparam T The current type to potentially add.
188 : @tparam Rest Remaining types to process.
189 : */
190 : template<typename Accumulated, typename T, typename... Rest>
191 : struct deduplicate_impl<Accumulated, T, Rest...>
192 : {
193 : /** Intermediate type after potentially appending T. */
194 : using next = typename variant_append_if_unique<Accumulated, T>::type;
195 :
196 : /** Final result after processing all remaining types. */
197 : using type = typename deduplicate_impl<next, Rest...>::type;
198 : };
199 :
200 : /** Deduplicated variant from a list of types.
201 :
202 : Constructs a std::variant containing unique types from the input list.
203 : Void types are converted to std::monostate before deduplication.
204 : The first type T0 seeds the accumulator, ensuring the variant is well-formed.
205 :
206 : @tparam T0 First result type (required, seeds the deduplication).
207 : @tparam Ts Remaining result types (void is converted to monostate).
208 : */
209 : template<typename T0, typename... Ts>
210 : using unique_variant_t = typename deduplicate_impl<
211 : std::variant<void_to_monostate_t<T0>>,
212 : void_to_monostate_t<Ts>...>::type;
213 :
214 : /** Result type for when_any: (winner_index, deduplicated_variant).
215 :
216 : The first element is the zero-based index of the winning task in the
217 : original argument order. The second element is a variant holding the
218 : winner's result by type. When multiple tasks share the same return type,
219 : use the index to determine which task actually won.
220 :
221 : @tparam T0 First task's result type.
222 : @tparam Ts Remaining tasks' result types.
223 : */
224 : template<typename T0, typename... Ts>
225 : using when_any_result_t = std::pair<std::size_t, unique_variant_t<T0, Ts...>>;
226 :
227 : /** Shared state for when_any operation.
228 :
229 : Coordinates winner selection, result storage, and completion tracking
230 : for all child tasks in a when_any operation.
231 :
232 : @par Lifetime
233 : Allocated on the parent coroutine's frame, outlives all runners.
234 :
235 : @par Thread Safety
236 : Atomic operations protect winner selection and completion count.
237 : Result storage is written only by the winner before any concurrent access.
238 :
239 : @tparam T0 First task's result type.
240 : @tparam Ts Remaining tasks' result types.
241 : */
242 : template<typename T0, typename... Ts>
243 : struct when_any_state
244 : {
245 : /** Total number of tasks being raced. */
246 : static constexpr std::size_t task_count = 1 + sizeof...(Ts);
247 :
248 : /** Deduplicated variant type for storing the winner's result. */
249 : using variant_type = unique_variant_t<T0, Ts...>;
250 :
251 : /** Counter for tasks still running.
252 :
253 : Must wait for ALL tasks to finish before parent resumes; this ensures
254 : runner coroutine frames are valid until their final_suspend completes.
255 : */
256 : std::atomic<std::size_t> remaining_count_;
257 :
258 : /** Flag indicating whether a winner has been determined.
259 :
260 : Winner selection: exactly one task wins via atomic CAS on has_winner_.
261 : winner_index_ is written only by the winner, read after all complete.
262 : */
263 : std::atomic<bool> has_winner_{false};
264 :
265 : /** Index of the winning task in the original argument list. */
266 : std::size_t winner_index_{0};
267 :
268 : /** Storage for the winner's result value.
269 :
270 : Result storage: deduplicated variant. Stored by type, not task index,
271 : because multiple tasks may share the same return type.
272 : */
273 : std::optional< variant_type > result_;
274 :
275 : /** Exception thrown by the winner, if any.
276 :
277 : Non-null if winner threw (rethrown to caller after all tasks complete).
278 : */
279 : std::exception_ptr winner_exception_;
280 :
281 : /** Handles to runner coroutines for cleanup.
282 :
283 : Runner coroutine handles; destroyed in destructor after all complete.
284 : */
285 : std::array<coro, task_count> runner_handles_{};
286 :
287 : /** Stop source for cancelling sibling tasks.
288 :
289 : Owned stop_source: request_stop() called when winner determined.
290 : */
291 : std::stop_source stop_source_;
292 :
293 : /** Callback functor that forwards stop requests.
294 :
295 : Forwards parent's stop requests to our stop_source, enabling
296 : cancellation to propagate from caller through when_any to children.
297 : */
298 : struct stop_callback_fn
299 : {
300 : /** Pointer to the stop source to signal. */
301 : std::stop_source* source_;
302 :
303 : /** Invoke the stop request on the source. */
304 4 : void operator()() const noexcept { source_->request_stop(); }
305 : };
306 :
307 : /** Type alias for the stop callback registration. */
308 : using stop_callback_t = std::stop_callback<stop_callback_fn>;
309 :
310 : /** Optional callback linking parent's stop token to our stop source. */
311 : std::optional<stop_callback_t> parent_stop_callback_;
312 :
313 : /** Parent coroutine handle to resume when all children complete. */
314 : coro continuation_;
315 :
316 : /** Executor reference for dispatching the parent resumption. */
317 : executor_ref caller_ex_;
318 :
319 : /** Construct state for racing task_count tasks.
320 :
321 : Initializes remaining_count_ to task_count so all tasks must complete
322 : before the parent coroutine resumes.
323 : */
324 37 : when_any_state()
325 37 : : remaining_count_(task_count)
326 : {
327 37 : }
328 :
329 : /** Destroy state and clean up runner coroutine handles.
330 :
331 : All runners must have completed before destruction (guaranteed by
332 : waiting for remaining_count_ to reach zero).
333 : */
334 37 : ~when_any_state()
335 : {
336 130 : for(auto h : runner_handles_)
337 93 : if(h)
338 93 : h.destroy();
339 37 : }
340 :
341 : /** Attempt to become the winner.
342 :
343 : Atomically claims winner status. Exactly one task succeeds; all others
344 : see false. The winner must store its result before returning.
345 :
346 : @param index The task's index in the original argument list.
347 : @return true if this task is now the winner, false if another won first.
348 : */
349 93 : bool try_win(std::size_t index) noexcept
350 : {
351 93 : bool expected = false;
352 93 : if(has_winner_.compare_exchange_strong(
353 : expected, true, std::memory_order_acq_rel))
354 : {
355 37 : winner_index_ = index;
356 : // Signal siblings to exit early if they support cancellation
357 37 : stop_source_.request_stop();
358 37 : return true;
359 : }
360 56 : return false;
361 : }
362 :
363 : /** Store the winner's result.
364 :
365 : @pre Only called by the winner (try_win returned true).
366 : @note Uses type-based emplacement because the variant is deduplicated;
367 : task index may differ from variant alternative index.
368 : */
369 : template<typename T>
370 30 : void set_winner_result(T value)
371 : noexcept(std::is_nothrow_move_constructible_v<T>)
372 : {
373 30 : result_.emplace(std::in_place_type<T>, std::move(value));
374 30 : }
375 :
376 : /** Store the winner's void completion as monostate.
377 :
378 : @pre Only called by the winner of a void-returning task.
379 : */
380 2 : void set_winner_void() noexcept
381 : {
382 2 : result_.emplace(std::in_place_type<std::monostate>, std::monostate{});
383 2 : }
384 :
385 : /** Store the winner's exception.
386 :
387 : @pre Only called by the winner (try_win returned true).
388 : */
389 5 : void set_winner_exception(std::exception_ptr ep) noexcept
390 : {
391 5 : winner_exception_ = ep;
392 5 : }
393 :
394 : /** Signal that a task has completed (success, failure, or cancelled).
395 :
396 : Called by every runner at final_suspend. The last one to complete
397 : resumes the parent coroutine. This ensures all child coroutine
398 : frames are destroyed before the parent continues.
399 :
400 : @return Coroutine to resume (parent if last, noop otherwise).
401 : */
402 93 : coro signal_completion() noexcept
403 : {
404 93 : auto remaining = remaining_count_.fetch_sub(1, std::memory_order_acq_rel);
405 93 : if(remaining == 1)
406 37 : return caller_ex_.dispatch(continuation_);
407 56 : return std::noop_coroutine();
408 : }
409 : };
410 :
411 : /** Wrapper coroutine that runs a single child task for when_any.
412 :
413 : Each child task is wrapped in a runner that:
414 : 1. Propagates executor and stop_token to the child
415 : 2. Attempts to claim winner status on completion
416 : 3. Stores result only if this runner won
417 : 4. Signals completion regardless of win/loss (for cleanup)
418 :
419 : @tparam T The result type of the wrapped task.
420 : @tparam Ts All task result types (for when_any_state compatibility).
421 : */
422 : template<typename T, typename... Ts>
423 : struct when_any_runner
424 : {
425 : /** Promise type for the runner coroutine.
426 :
427 : Manages executor propagation, stop token forwarding, and completion
428 : signaling for the wrapped child task.
429 : */
430 : struct promise_type : frame_allocating_base
431 : {
432 : /** Pointer to shared state for winner coordination. */
433 : when_any_state<Ts...>* state_ = nullptr;
434 :
435 : /** Index of this task in the original argument list. */
436 : std::size_t index_ = 0;
437 :
438 : /** Executor reference inherited from the parent coroutine. */
439 : executor_ref ex_;
440 :
441 : /** Stop token for cooperative cancellation. */
442 : std::stop_token stop_token_;
443 :
444 : /** Create the runner coroutine object from this promise.
445 :
446 : @return Runner coroutine wrapping this promise's coroutine handle.
447 : */
448 93 : when_any_runner get_return_object()
449 : {
450 93 : return when_any_runner(std::coroutine_handle<promise_type>::from_promise(*this));
451 : }
452 :
453 : /** Suspend immediately on creation.
454 :
455 : Runner coroutines start suspended; the launcher resumes them
456 : after setting up state_, index_, ex_, and stop_token_.
457 :
458 : @return Always suspends.
459 : */
460 93 : std::suspend_always initial_suspend() noexcept
461 : {
462 93 : return {};
463 : }
464 :
465 : /** Final suspend awaiter that signals completion to shared state.
466 :
467 : @return Custom awaiter that calls signal_completion().
468 : */
469 93 : auto final_suspend() noexcept
470 : {
471 : /** Awaiter that signals task completion and potentially resumes parent. */
472 : struct awaiter
473 : {
474 : /** Pointer to the promise for accessing shared state. */
475 : promise_type* p_;
476 :
477 : /** Never ready; always suspend to signal completion.
478 :
479 : @return Always false.
480 : */
481 93 : bool await_ready() const noexcept
482 : {
483 93 : return false;
484 : }
485 :
486 : /** Signal completion and return next coroutine to resume.
487 :
488 : @return Parent coroutine if this was the last task, noop otherwise.
489 : */
490 93 : coro await_suspend(coro) noexcept
491 : {
492 93 : return p_->state_->signal_completion();
493 : }
494 :
495 : /** No-op resume; coroutine is destroyed after final suspend. */
496 0 : void await_resume() const noexcept
497 : {
498 0 : }
499 : };
500 93 : return awaiter{this};
501 : }
502 :
503 : /** Called when runner coroutine body completes normally.
504 :
505 : The actual result handling is done in make_when_any_runner;
506 : this just satisfies the coroutine return requirement.
507 : */
508 84 : void return_void()
509 : {
510 84 : }
511 :
512 : /** Handle exceptions thrown by the child task.
513 :
514 : Exceptions are valid completions in when_any (unlike when_all).
515 : If this exception wins, it will be rethrown to the caller.
516 : If another task already won, this exception is discarded.
517 : */
518 9 : void unhandled_exception()
519 : {
520 9 : if(state_->try_win(index_))
521 5 : state_->set_winner_exception(std::current_exception());
522 9 : }
523 :
524 : /** Awaiter wrapper that injects executor and stop token into child awaitables.
525 :
526 : @tparam Awaitable The underlying awaitable type being wrapped.
527 : */
528 : template<class Awaitable>
529 : struct transform_awaiter
530 : {
531 : /** The wrapped awaitable instance. */
532 : std::decay_t<Awaitable> a_;
533 :
534 : /** Pointer to promise for accessing executor and stop token. */
535 : promise_type* p_;
536 :
537 : /** Check if the underlying awaitable is ready.
538 :
539 : @return True if awaitable can complete synchronously.
540 : */
541 93 : bool await_ready()
542 : {
543 93 : return a_.await_ready();
544 : }
545 :
546 : /** Get the result from the underlying awaitable.
547 :
548 : @return The awaitable's result value.
549 : */
550 93 : auto await_resume()
551 : {
552 93 : return a_.await_resume();
553 : }
554 :
555 : /** Suspend with executor and stop token injection.
556 :
557 : @tparam Promise The suspending coroutine's promise type.
558 : @param h Handle to the suspending coroutine.
559 : @return Coroutine to resume or void.
560 : */
561 : template<class Promise>
562 93 : auto await_suspend(std::coroutine_handle<Promise> h)
563 : {
564 93 : return a_.await_suspend(h, p_->ex_, p_->stop_token_);
565 : }
566 : };
567 :
568 : /** Transform awaitables to inject executor and stop token.
569 :
570 : @tparam Awaitable The awaitable type being co_awaited.
571 : @param a The awaitable instance.
572 : @return Transformed awaiter with executor/stop_token injection.
573 : */
574 : template<class Awaitable>
575 93 : auto await_transform(Awaitable&& a)
576 : {
577 : using A = std::decay_t<Awaitable>;
578 : if constexpr (IoAwaitable<A>)
579 : {
580 : return transform_awaiter<Awaitable>{
581 186 : std::forward<Awaitable>(a), this};
582 : }
583 : else
584 : {
585 : return make_affine(std::forward<Awaitable>(a), ex_);
586 : }
587 93 : }
588 : };
589 :
590 : /** Handle to the underlying coroutine frame. */
591 : std::coroutine_handle<promise_type> h_;
592 :
593 : /** Construct runner from a coroutine handle.
594 :
595 : @param h Handle to the runner coroutine frame.
596 : */
597 93 : explicit when_any_runner(std::coroutine_handle<promise_type> h) noexcept
598 93 : : h_(h)
599 : {
600 93 : }
601 :
602 : /** Move constructor (Clang 14 workaround).
603 :
604 : Clang 14 (non-Apple) has a coroutine codegen bug requiring explicit
605 : move constructor; other compilers work correctly with deleted move.
606 : */
607 : #if defined(__clang__) && __clang_major__ == 14 && !defined(__apple_build_version__)
608 : when_any_runner(when_any_runner&& other) noexcept : h_(std::exchange(other.h_, nullptr)) {}
609 : #endif
610 :
611 : /** Copy construction is not allowed. */
612 : when_any_runner(when_any_runner const&) = delete;
613 :
614 : /** Copy assignment is not allowed. */
615 : when_any_runner& operator=(when_any_runner const&) = delete;
616 :
617 : /** Move construction is deleted (except on Clang 14). */
618 : #if !defined(__clang__) || __clang_major__ != 14 || defined(__apple_build_version__)
619 : when_any_runner(when_any_runner&&) = delete;
620 : #endif
621 :
622 : /** Move assignment is not allowed. */
623 : when_any_runner& operator=(when_any_runner&&) = delete;
624 :
625 : /** Release ownership of the coroutine handle.
626 :
627 : @return The coroutine handle; this object becomes empty.
628 : */
629 93 : auto release() noexcept
630 : {
631 93 : return std::exchange(h_, nullptr);
632 : }
633 : };
634 :
635 : /** Create a runner coroutine for a single task in when_any.
636 :
637 : Factory function that creates a wrapper coroutine for a child task.
638 : The runner handles executor/stop_token propagation and winner selection.
639 :
640 : @tparam Index Compile-time index of this task in the argument list.
641 : @tparam T The result type of the task being wrapped.
642 : @tparam Ts All task result types (for when_any_state compatibility).
643 : @param inner The task to run (will be moved from).
644 : @param state Shared state for winner coordination.
645 : @return Runner coroutine (must be started via resume()).
646 : */
647 : template<std::size_t Index, typename T, typename... Ts>
648 : when_any_runner<T, Ts...>
649 93 : make_when_any_runner(task<T> inner, when_any_state<Ts...>* state)
650 : {
651 : if constexpr (std::is_void_v<T>)
652 : {
653 : co_await std::move(inner);
654 : if(state->try_win(Index))
655 : state->set_winner_void(); // noexcept
656 : }
657 : else
658 : {
659 : auto result = co_await std::move(inner);
660 : if(state->try_win(Index))
661 : {
662 : try
663 : {
664 : state->set_winner_result(std::move(result));
665 : }
666 : catch(...)
667 : {
668 : state->set_winner_exception(std::current_exception());
669 : }
670 : }
671 : }
672 186 : }
673 :
674 : /** Awaitable that launches all runner coroutines concurrently.
675 :
676 : Handles the tricky lifetime issue where tasks may complete synchronously
677 : during launch, potentially destroying this awaitable's frame before
678 : all tasks are extracted from the tuple. See await_suspend for details.
679 :
680 : @tparam Ts The result types of the tasks being launched.
681 : */
682 : template<typename... Ts>
683 : class when_any_launcher
684 : {
685 : /** Pointer to tuple of tasks to launch. */
686 : std::tuple<task<Ts>...>* tasks_;
687 :
688 : /** Pointer to shared state for coordination. */
689 : when_any_state<Ts...>* state_;
690 :
691 : public:
692 : /** Construct launcher with task tuple and shared state.
693 :
694 : @param tasks Pointer to tuple of tasks (must outlive the await).
695 : @param state Pointer to shared state for winner coordination.
696 : */
697 37 : when_any_launcher(
698 : std::tuple<task<Ts>...>* tasks,
699 : when_any_state<Ts...>* state)
700 37 : : tasks_(tasks)
701 37 : , state_(state)
702 : {
703 37 : }
704 :
705 : /** Check if the launcher can complete synchronously.
706 :
707 : @return True only if there are no tasks (degenerate case).
708 : */
709 37 : bool await_ready() const noexcept
710 : {
711 37 : return sizeof...(Ts) == 0;
712 : }
713 :
714 : /** Launch all runner coroutines and suspend the parent.
715 :
716 : Sets up stop propagation from parent to children, then launches
717 : each task in a runner coroutine. Returns noop_coroutine because
718 : runners resume the parent via signal_completion().
719 :
720 : CRITICAL: If the last task finishes synchronously then the parent
721 : coroutine resumes, destroying its frame, and destroying this object
722 : prior to the completion of await_suspend. Therefore, await_suspend
723 : must ensure `this` cannot be referenced after calling `launch_one`
724 : for the last time.
725 :
726 : @tparam Ex The executor type.
727 : @param continuation Handle to the parent coroutine to resume later.
728 : @param caller_ex Executor for dispatching child coroutines.
729 : @param parent_token Stop token from the parent for cancellation propagation.
730 : @return noop_coroutine; parent is resumed by the last completing task.
731 : */
732 : template<typename Ex>
733 37 : coro await_suspend(coro continuation, Ex const& caller_ex, std::stop_token parent_token = {})
734 : {
735 37 : state_->continuation_ = continuation;
736 37 : state_->caller_ex_ = caller_ex;
737 :
738 : // Forward parent's stop requests to children
739 37 : if(parent_token.stop_possible())
740 : {
741 16 : state_->parent_stop_callback_.emplace(
742 : parent_token,
743 8 : typename when_any_state<Ts...>::stop_callback_fn{&state_->stop_source_});
744 :
745 8 : if(parent_token.stop_requested())
746 2 : state_->stop_source_.request_stop();
747 : }
748 :
749 37 : auto token = state_->stop_source_.get_token();
750 74 : [&]<std::size_t... Is>(std::index_sequence<Is...>) {
751 37 : (..., launch_one<Is>(caller_ex, token));
752 37 : }(std::index_sequence_for<Ts...>{});
753 :
754 74 : return std::noop_coroutine();
755 37 : }
756 :
757 : /** Resume after all tasks complete.
758 :
759 : No return value; results are accessed via the shared state.
760 : */
761 37 : void await_resume() const noexcept
762 : {
763 37 : }
764 :
765 : private:
766 : /** Launch a single runner coroutine for task at index I.
767 :
768 : Creates the runner, configures its promise with state and executor,
769 : stores its handle for cleanup, and dispatches it for execution.
770 :
771 : @tparam I Compile-time index of the task in the tuple.
772 : @tparam Ex The executor type.
773 : @param caller_ex Executor for dispatching the runner.
774 : @param token Stop token for cooperative cancellation.
775 :
776 : @pre Ex::dispatch() and coro::resume() must not throw. If they do,
777 : the coroutine handle may leak.
778 : */
779 : template<std::size_t I, typename Ex>
780 93 : void launch_one(Ex const& caller_ex, std::stop_token token)
781 : {
782 93 : auto runner = make_when_any_runner<I>(
783 93 : std::move(std::get<I>(*tasks_)), state_);
784 :
785 93 : auto h = runner.release();
786 93 : h.promise().state_ = state_;
787 93 : h.promise().index_ = I;
788 93 : h.promise().ex_ = caller_ex;
789 93 : h.promise().stop_token_ = token;
790 :
791 93 : coro ch{h};
792 93 : state_->runner_handles_[I] = ch;
793 93 : caller_ex.dispatch(ch).resume();
794 93 : }
795 : };
796 :
797 : } // namespace detail
798 :
799 : /** Wait for the first task to complete.
800 :
801 : Races multiple heterogeneous tasks concurrently and returns when the
802 : first one completes. The result includes the winner's index and a
803 : deduplicated variant containing the result value.
804 :
805 : @par Example
806 : @code
807 : task<void> example() {
808 : auto [index, result] = co_await when_any(
809 : fetch_from_primary(), // task<Response>
810 : fetch_from_backup() // task<Response>
811 : );
812 : // index is 0 or 1, result holds the winner's Response
813 : auto response = std::get<Response>(result);
814 : }
815 : @endcode
816 :
817 : @tparam T0 First task's result type.
818 : @tparam Ts Remaining tasks' result types.
819 : @param task0 The first task to race.
820 : @param tasks Additional tasks to race concurrently.
821 : @return A task yielding a pair of (winner_index, result_variant).
822 :
823 : @par Key Features
824 : @li All tasks are launched concurrently
825 : @li Returns when first task completes (success or failure)
826 : @li Stop is requested for all siblings
827 : @li Waits for all siblings to complete before returning
828 : @li If winner threw, that exception is rethrown
829 : @li Void tasks contribute std::monostate to the variant
830 : */
831 : template<typename T0, typename... Ts>
832 : [[nodiscard]] task<detail::when_any_result_t<T0, Ts...>>
833 37 : when_any(task<T0> task0, task<Ts>... tasks)
834 : {
835 : using result_type = detail::when_any_result_t<T0, Ts...>;
836 :
837 : detail::when_any_state<T0, Ts...> state;
838 : std::tuple<task<T0>, task<Ts>...> task_tuple(std::move(task0), std::move(tasks)...);
839 :
840 : co_await detail::when_any_launcher<T0, Ts...>(&task_tuple, &state);
841 :
842 : if(state.winner_exception_)
843 : std::rethrow_exception(state.winner_exception_);
844 :
845 : co_return result_type{state.winner_index_, std::move(*state.result_)};
846 74 : }
847 :
848 : /** Alias for when_any result type, useful for declaring callback signatures.
849 :
850 : Provides a convenient public alias for the internal result type.
851 : The result is a pair containing the winner's index and a deduplicated
852 : variant holding the result value.
853 :
854 : @par Example
855 : @code
856 : void on_complete(when_any_result_type<int, std::string> result);
857 : @endcode
858 :
859 : @tparam T0 First task's result type.
860 : @tparam Ts Remaining tasks' result types.
861 : */
862 : template<typename T0, typename... Ts>
863 : using when_any_result_type = detail::when_any_result_t<T0, Ts...>;
864 :
865 : namespace detail {
866 :
867 : /** Shared state for homogeneous when_any (vector overload).
868 :
869 : Simpler than the heterogeneous version: uses std::optional<T> instead
870 : of variant, and std::vector instead of std::array for runner handles.
871 :
872 : @tparam T The common result type of all tasks.
873 : */
874 : template<typename T>
875 : struct when_any_homogeneous_state
876 : {
877 : /** Counter for tasks still running.
878 :
879 : Completion tracking - must wait for ALL tasks for proper cleanup.
880 : */
881 : std::atomic<std::size_t> remaining_count_;
882 :
883 : /** Total number of tasks being raced. */
884 : std::size_t task_count_;
885 :
886 : /** Flag indicating whether a winner has been determined.
887 :
888 : Winner tracking - first task to complete claims this.
889 : */
890 : std::atomic<bool> has_winner_{false};
891 :
892 : /** Index of the winning task in the vector. */
893 : std::size_t winner_index_{0};
894 :
895 : /** Storage for the winner's result value.
896 :
897 : Result storage - simple value, no variant needed.
898 : */
899 : std::optional<T> result_;
900 :
901 : /** Exception thrown by the winner, if any. */
902 : std::exception_ptr winner_exception_;
903 :
904 : /** Handles to runner coroutines for cleanup.
905 :
906 : Runner handles - destroyed in destructor.
907 : */
908 : std::vector<coro> runner_handles_;
909 :
910 : /** Stop source for cancelling sibling tasks.
911 :
912 : Stop propagation - requested when winner is found.
913 : */
914 : std::stop_source stop_source_;
915 :
916 : /** Callback functor that forwards stop requests.
917 :
918 : Connects parent's stop_token to our stop_source.
919 : */
920 : struct stop_callback_fn
921 : {
922 : /** Pointer to the stop source to signal. */
923 : std::stop_source* source_;
924 :
925 : /** Invoke the stop request on the source. */
926 2 : void operator()() const noexcept { source_->request_stop(); }
927 : };
928 :
929 : /** Type alias for the stop callback registration. */
930 : using stop_callback_t = std::stop_callback<stop_callback_fn>;
931 :
932 : /** Optional callback linking parent's stop token to our stop source. */
933 : std::optional<stop_callback_t> parent_stop_callback_;
934 :
935 : /** Parent coroutine handle to resume when all children complete. */
936 : coro continuation_;
937 :
938 : /** Executor reference for dispatching the parent resumption. */
939 : executor_ref caller_ex_;
940 :
941 : /** Construct state for racing the given number of tasks.
942 :
943 : @param count Number of tasks to race.
944 : */
945 13 : explicit when_any_homogeneous_state(std::size_t count)
946 13 : : remaining_count_(count)
947 13 : , task_count_(count)
948 39 : , runner_handles_(count)
949 : {
950 13 : }
951 :
952 : /** Destroy state and clean up runner coroutine handles.
953 :
954 : All runners must have completed before destruction.
955 : */
956 13 : ~when_any_homogeneous_state()
957 : {
958 60 : for(auto h : runner_handles_)
959 47 : if(h)
960 47 : h.destroy();
961 13 : }
962 :
963 : /** Attempt to become the winner.
964 :
965 : @param index The task's index in the vector.
966 : @return true if this task is now the winner, false if another won first.
967 : */
968 47 : bool try_win(std::size_t index) noexcept
969 : {
970 47 : bool expected = false;
971 47 : if(has_winner_.compare_exchange_strong(
972 : expected, true, std::memory_order_acq_rel))
973 : {
974 13 : winner_index_ = index;
975 13 : stop_source_.request_stop();
976 13 : return true;
977 : }
978 34 : return false;
979 : }
980 :
981 : /** Store the winner's result.
982 :
983 : @pre Only called by the winner (try_win returned true).
984 : */
985 11 : void set_winner_result(T value)
986 : noexcept(std::is_nothrow_move_constructible_v<T>)
987 : {
988 11 : result_.emplace(std::move(value));
989 11 : }
990 :
991 : /** Store the winner's exception.
992 :
993 : @pre Only called by the winner (try_win returned true).
994 : */
995 2 : void set_winner_exception(std::exception_ptr ep) noexcept
996 : {
997 2 : winner_exception_ = ep;
998 2 : }
999 :
1000 : /** Signal task completion; last one resumes the parent.
1001 :
1002 : @return Coroutine to resume (parent if last, noop otherwise).
1003 : */
1004 47 : coro signal_completion() noexcept
1005 : {
1006 47 : auto remaining = remaining_count_.fetch_sub(1, std::memory_order_acq_rel);
1007 47 : if(remaining == 1)
1008 13 : return caller_ex_.dispatch(continuation_);
1009 34 : return std::noop_coroutine();
1010 : }
1011 : };
1012 :
1013 : /** Specialization for void tasks (no result storage needed).
1014 :
1015 : When racing void-returning tasks, there is no result value to store.
1016 : Only the winner's index and any exception are tracked.
1017 : */
1018 : template<>
1019 : struct when_any_homogeneous_state<void>
1020 : {
1021 : /** Counter for tasks still running. */
1022 : std::atomic<std::size_t> remaining_count_;
1023 :
1024 : /** Total number of tasks being raced. */
1025 : std::size_t task_count_;
1026 :
1027 : /** Flag indicating whether a winner has been determined. */
1028 : std::atomic<bool> has_winner_{false};
1029 :
1030 : /** Index of the winning task in the vector. */
1031 : std::size_t winner_index_{0};
1032 :
1033 : /** Exception thrown by the winner, if any. */
1034 : std::exception_ptr winner_exception_;
1035 :
1036 : /** Handles to runner coroutines for cleanup. */
1037 : std::vector<coro> runner_handles_;
1038 :
1039 : /** Stop source for cancelling sibling tasks. */
1040 : std::stop_source stop_source_;
1041 :
1042 : /** Callback functor that forwards stop requests. */
1043 : struct stop_callback_fn
1044 : {
1045 : /** Pointer to the stop source to signal. */
1046 : std::stop_source* source_;
1047 :
1048 : /** Invoke the stop request on the source. */
1049 0 : void operator()() const noexcept { source_->request_stop(); }
1050 : };
1051 :
1052 : /** Type alias for the stop callback registration. */
1053 : using stop_callback_t = std::stop_callback<stop_callback_fn>;
1054 :
1055 : /** Optional callback linking parent's stop token to our stop source. */
1056 : std::optional<stop_callback_t> parent_stop_callback_;
1057 :
1058 : /** Parent coroutine handle to resume when all children complete. */
1059 : coro continuation_;
1060 :
1061 : /** Executor reference for dispatching the parent resumption. */
1062 : executor_ref caller_ex_;
1063 :
1064 : /** Construct state for racing the given number of void tasks.
1065 :
1066 : @param count Number of tasks to race.
1067 : */
1068 2 : explicit when_any_homogeneous_state(std::size_t count)
1069 2 : : remaining_count_(count)
1070 2 : , task_count_(count)
1071 6 : , runner_handles_(count)
1072 : {
1073 2 : }
1074 :
1075 : /** Destroy state and clean up runner coroutine handles. */
1076 2 : ~when_any_homogeneous_state()
1077 : {
1078 7 : for(auto h : runner_handles_)
1079 5 : if(h)
1080 5 : h.destroy();
1081 2 : }
1082 :
1083 : /** Attempt to become the winner.
1084 :
1085 : @param index The task's index in the vector.
1086 : @return true if this task is now the winner, false if another won first.
1087 : */
1088 5 : bool try_win(std::size_t index) noexcept
1089 : {
1090 5 : bool expected = false;
1091 5 : if(has_winner_.compare_exchange_strong(
1092 : expected, true, std::memory_order_acq_rel))
1093 : {
1094 2 : winner_index_ = index;
1095 2 : stop_source_.request_stop();
1096 2 : return true;
1097 : }
1098 3 : return false;
1099 : }
1100 :
1101 : /** Store the winner's exception.
1102 :
1103 : @pre Only called by the winner (try_win returned true).
1104 : */
1105 1 : void set_winner_exception(std::exception_ptr ep) noexcept
1106 : {
1107 1 : winner_exception_ = ep;
1108 1 : }
1109 :
1110 : /** Signal task completion; last one resumes the parent.
1111 :
1112 : @return Coroutine to resume (parent if last, noop otherwise).
1113 : */
1114 5 : coro signal_completion() noexcept
1115 : {
1116 5 : auto remaining = remaining_count_.fetch_sub(1, std::memory_order_acq_rel);
1117 5 : if(remaining == 1)
1118 2 : return caller_ex_.dispatch(continuation_);
1119 3 : return std::noop_coroutine();
1120 : }
1121 : };
1122 :
1123 : /** Wrapper coroutine for homogeneous when_any tasks (vector overload).
1124 :
1125 : Same role as when_any_runner but uses a runtime index instead of
1126 : a compile-time index, allowing it to work with vectors of tasks.
1127 :
1128 : @tparam T The common result type of all tasks.
1129 : */
1130 : template<typename T>
1131 : struct when_any_homogeneous_runner
1132 : {
1133 : /** Promise type for the homogeneous runner coroutine.
1134 :
1135 : Manages executor propagation, stop token forwarding, and completion
1136 : signaling for the wrapped child task.
1137 : */
1138 : struct promise_type : frame_allocating_base
1139 : {
1140 : /** Pointer to shared state for winner coordination. */
1141 : when_any_homogeneous_state<T>* state_ = nullptr;
1142 :
1143 : /** Runtime index of this task in the vector. */
1144 : std::size_t index_ = 0;
1145 :
1146 : /** Executor reference inherited from the parent coroutine. */
1147 : executor_ref ex_;
1148 :
1149 : /** Stop token for cooperative cancellation. */
1150 : std::stop_token stop_token_;
1151 :
1152 : /** Create the runner coroutine object from this promise.
1153 :
1154 : @return Runner coroutine wrapping this promise's coroutine handle.
1155 : */
1156 52 : when_any_homogeneous_runner get_return_object()
1157 : {
1158 : return when_any_homogeneous_runner(
1159 52 : std::coroutine_handle<promise_type>::from_promise(*this));
1160 : }
1161 :
1162 : /** Suspend immediately on creation.
1163 :
1164 : Runner coroutines start suspended; the launcher resumes them
1165 : after setting up state_, index_, ex_, and stop_token_.
1166 :
1167 : @return Always suspends.
1168 : */
1169 52 : std::suspend_always initial_suspend() noexcept
1170 : {
1171 52 : return {};
1172 : }
1173 :
1174 : /** Final suspend awaiter that signals completion to shared state.
1175 :
1176 : @return Custom awaiter that calls signal_completion().
1177 : */
1178 52 : auto final_suspend() noexcept
1179 : {
1180 : /** Awaiter that signals task completion and potentially resumes parent. */
1181 : struct awaiter
1182 : {
1183 : /** Pointer to the promise for accessing shared state. */
1184 : promise_type* p_;
1185 :
1186 : /** Never ready; always suspend to signal completion.
1187 :
1188 : @return Always false.
1189 : */
1190 52 : bool await_ready() const noexcept
1191 : {
1192 52 : return false;
1193 : }
1194 :
1195 : /** Signal completion and return next coroutine to resume.
1196 :
1197 : @return Parent coroutine if this was the last task, noop otherwise.
1198 : */
1199 52 : coro await_suspend(coro) noexcept
1200 : {
1201 52 : return p_->state_->signal_completion();
1202 : }
1203 :
1204 : /** No-op resume; coroutine is destroyed after final suspend. */
1205 0 : void await_resume() const noexcept
1206 : {
1207 0 : }
1208 : };
1209 52 : return awaiter{this};
1210 : }
1211 :
1212 : /** Called when runner coroutine body completes normally.
1213 :
1214 : The actual result handling is done in make_when_any_homogeneous_runner;
1215 : this just satisfies the coroutine return requirement.
1216 : */
1217 49 : void return_void()
1218 : {
1219 49 : }
1220 :
1221 : /** Handle exceptions thrown by the child task.
1222 :
1223 : Exceptions are valid completions in when_any. If this exception wins,
1224 : it will be rethrown to the caller. If another task already won,
1225 : this exception is discarded.
1226 : */
1227 3 : void unhandled_exception()
1228 : {
1229 3 : if(state_->try_win(index_))
1230 3 : state_->set_winner_exception(std::current_exception());
1231 3 : }
1232 :
1233 : /** Awaiter wrapper that injects executor and stop token into child awaitables.
1234 :
1235 : @tparam Awaitable The underlying awaitable type being wrapped.
1236 : */
1237 : template<class Awaitable>
1238 : struct transform_awaiter
1239 : {
1240 : /** The wrapped awaitable instance. */
1241 : std::decay_t<Awaitable> a_;
1242 :
1243 : /** Pointer to promise for accessing executor and stop token. */
1244 : promise_type* p_;
1245 :
1246 : /** Check if the underlying awaitable is ready.
1247 :
1248 : @return True if awaitable can complete synchronously.
1249 : */
1250 52 : bool await_ready()
1251 : {
1252 52 : return a_.await_ready();
1253 : }
1254 :
1255 : /** Get the result from the underlying awaitable.
1256 :
1257 : @return The awaitable's result value.
1258 : */
1259 52 : auto await_resume()
1260 : {
1261 52 : return a_.await_resume();
1262 : }
1263 :
1264 : /** Suspend with executor and stop token injection.
1265 :
1266 : @tparam Promise The suspending coroutine's promise type.
1267 : @param h Handle to the suspending coroutine.
1268 : @return Coroutine to resume or void.
1269 : */
1270 : template<class Promise>
1271 52 : auto await_suspend(std::coroutine_handle<Promise> h)
1272 : {
1273 52 : return a_.await_suspend(h, p_->ex_, p_->stop_token_);
1274 : }
1275 : };
1276 :
1277 : /** Transform awaitables to inject executor and stop token.
1278 :
1279 : @tparam Awaitable The awaitable type being co_awaited.
1280 : @param a The awaitable instance.
1281 : @return Transformed awaiter with executor/stop_token injection.
1282 : */
1283 : template<class Awaitable>
1284 52 : auto await_transform(Awaitable&& a)
1285 : {
1286 : using A = std::decay_t<Awaitable>;
1287 : if constexpr (IoAwaitable<A>)
1288 : {
1289 : return transform_awaiter<Awaitable>{
1290 104 : std::forward<Awaitable>(a), this};
1291 : }
1292 : else
1293 : {
1294 : return make_affine(std::forward<Awaitable>(a), ex_);
1295 : }
1296 52 : }
1297 : };
1298 :
1299 : /** Handle to the underlying coroutine frame. */
1300 : std::coroutine_handle<promise_type> h_;
1301 :
1302 : /** Construct runner from a coroutine handle.
1303 :
1304 : @param h Handle to the runner coroutine frame.
1305 : */
1306 52 : explicit when_any_homogeneous_runner(std::coroutine_handle<promise_type> h) noexcept
1307 52 : : h_(h)
1308 : {
1309 52 : }
1310 :
1311 : /** Move constructor (Clang 14 workaround).
1312 :
1313 : Clang 14 (non-Apple) has a coroutine codegen bug requiring explicit
1314 : move constructor; other compilers work correctly with deleted move.
1315 : */
1316 : #if defined(__clang__) && __clang_major__ == 14 && !defined(__apple_build_version__)
1317 : when_any_homogeneous_runner(when_any_homogeneous_runner&& other) noexcept
1318 : : h_(std::exchange(other.h_, nullptr)) {}
1319 : #endif
1320 :
1321 : /** Copy construction is not allowed. */
1322 : when_any_homogeneous_runner(when_any_homogeneous_runner const&) = delete;
1323 :
1324 : /** Copy assignment is not allowed. */
1325 : when_any_homogeneous_runner& operator=(when_any_homogeneous_runner const&) = delete;
1326 :
1327 : /** Move construction is deleted (except on Clang 14). */
1328 : #if !defined(__clang__) || __clang_major__ != 14 || defined(__apple_build_version__)
1329 : when_any_homogeneous_runner(when_any_homogeneous_runner&&) = delete;
1330 : #endif
1331 :
1332 : /** Move assignment is not allowed. */
1333 : when_any_homogeneous_runner& operator=(when_any_homogeneous_runner&&) = delete;
1334 :
1335 : /** Release ownership of the coroutine handle.
1336 :
1337 : @return The coroutine handle; this object becomes empty.
1338 : */
1339 52 : auto release() noexcept
1340 : {
1341 52 : return std::exchange(h_, nullptr);
1342 : }
1343 : };
1344 :
1345 : /** Create a runner coroutine for a homogeneous when_any task.
1346 :
1347 : Factory function that creates a wrapper coroutine for a child task
1348 : in the vector overload. Uses a runtime index instead of compile-time.
1349 :
1350 : @tparam T The result type of the task being wrapped.
1351 : @param inner The task to run (will be moved from).
1352 : @param state Shared state for winner coordination.
1353 : @param index Runtime index of this task in the vector.
1354 : @return Runner coroutine (must be started via resume()).
1355 : */
1356 : template<typename T>
1357 : when_any_homogeneous_runner<T>
1358 52 : make_when_any_homogeneous_runner(task<T> inner, when_any_homogeneous_state<T>* state, std::size_t index)
1359 : {
1360 : if constexpr (std::is_void_v<T>)
1361 : {
1362 : co_await std::move(inner);
1363 : state->try_win(index); // void tasks have no result to store
1364 : }
1365 : else
1366 : {
1367 : auto result = co_await std::move(inner);
1368 : if(state->try_win(index))
1369 : {
1370 : try
1371 : {
1372 : state->set_winner_result(std::move(result));
1373 : }
1374 : catch(...)
1375 : {
1376 : state->set_winner_exception(std::current_exception());
1377 : }
1378 : }
1379 : }
1380 104 : }
1381 :
1382 : /** Awaitable that launches all runners for homogeneous when_any.
1383 :
1384 : Same lifetime concerns as when_any_launcher; see its documentation.
1385 : Uses runtime iteration over the task vector instead of compile-time
1386 : expansion over a tuple.
1387 :
1388 : @tparam T The common result type of all tasks in the vector.
1389 : */
1390 : template<typename T>
1391 : class when_any_homogeneous_launcher
1392 : {
1393 : /** Pointer to vector of tasks to launch. */
1394 : std::vector<task<T>>* tasks_;
1395 :
1396 : /** Pointer to shared state for coordination. */
1397 : when_any_homogeneous_state<T>* state_;
1398 :
1399 : public:
1400 : /** Construct launcher with task vector and shared state.
1401 :
1402 : @param tasks Pointer to vector of tasks (must outlive the await).
1403 : @param state Pointer to shared state for winner coordination.
1404 : */
1405 15 : when_any_homogeneous_launcher(
1406 : std::vector<task<T>>* tasks,
1407 : when_any_homogeneous_state<T>* state)
1408 15 : : tasks_(tasks)
1409 15 : , state_(state)
1410 : {
1411 15 : }
1412 :
1413 : /** Check if the launcher can complete synchronously.
1414 :
1415 : @return True only if there are no tasks (degenerate case).
1416 : */
1417 15 : bool await_ready() const noexcept
1418 : {
1419 15 : return tasks_->empty();
1420 : }
1421 :
1422 : /** Launch all runner coroutines and suspend the parent.
1423 :
1424 : Sets up stop propagation from parent to children, then launches
1425 : each task in a runner coroutine. Returns noop_coroutine because
1426 : runners resume the parent via signal_completion().
1427 :
1428 : CRITICAL: If the last task finishes synchronously then the parent
1429 : coroutine resumes, destroying its frame, and destroying this object
1430 : prior to the completion of await_suspend. Therefore, await_suspend
1431 : must ensure `this` cannot be referenced after calling `launch_one`
1432 : for the last time.
1433 :
1434 : @tparam Ex The executor type.
1435 : @param continuation Handle to the parent coroutine to resume later.
1436 : @param caller_ex Executor for dispatching child coroutines.
1437 : @param parent_token Stop token from the parent for cancellation propagation.
1438 : @return noop_coroutine; parent is resumed by the last completing task.
1439 : */
1440 : template<typename Ex>
1441 15 : coro await_suspend(coro continuation, Ex const& caller_ex, std::stop_token parent_token = {})
1442 : {
1443 15 : state_->continuation_ = continuation;
1444 15 : state_->caller_ex_ = caller_ex;
1445 :
1446 : // Forward parent's stop requests to children
1447 15 : if(parent_token.stop_possible())
1448 : {
1449 10 : state_->parent_stop_callback_.emplace(
1450 : parent_token,
1451 5 : typename when_any_homogeneous_state<T>::stop_callback_fn{&state_->stop_source_});
1452 :
1453 5 : if(parent_token.stop_requested())
1454 2 : state_->stop_source_.request_stop();
1455 : }
1456 :
1457 15 : auto num_tasks = tasks_->size();
1458 15 : auto token = state_->stop_source_.get_token();
1459 67 : for(std::size_t i = 0; i < num_tasks; ++i)
1460 52 : launch_one( i, caller_ex, token);
1461 :
1462 30 : return std::noop_coroutine();
1463 15 : }
1464 :
1465 : /** Resume after all tasks complete.
1466 :
1467 : No return value; results are accessed via the shared state.
1468 : */
1469 15 : void await_resume() const noexcept
1470 : {
1471 15 : }
1472 :
1473 : private:
1474 : /** Launch a single runner coroutine for task at the given index.
1475 :
1476 : Creates the runner, configures its promise with state and executor,
1477 : stores its handle for cleanup, and dispatches it for execution.
1478 :
1479 : @tparam Ex The executor type.
1480 : @param index Runtime index of the task in the vector.
1481 : @param caller_ex Executor for dispatching the runner.
1482 : @param token Stop token for cooperative cancellation.
1483 :
1484 : @pre Ex::dispatch() and coro::resume() must not throw. If they do,
1485 : the coroutine handle may leak.
1486 : */
1487 : template<typename Ex>
1488 52 : void launch_one(std::size_t index, Ex const& caller_ex, std::stop_token token)
1489 : {
1490 52 : auto runner = make_when_any_homogeneous_runner(
1491 52 : std::move((*tasks_)[index]), state_, index);
1492 :
1493 52 : auto h = runner.release();
1494 52 : h.promise().state_ = state_;
1495 52 : h.promise().index_ = index;
1496 52 : h.promise().ex_ = caller_ex;
1497 52 : h.promise().stop_token_ = token;
1498 :
1499 52 : coro ch{h};
1500 52 : state_->runner_handles_[index] = ch;
1501 52 : caller_ex.dispatch(ch).resume();
1502 52 : }
1503 : };
1504 :
1505 : } // namespace detail
1506 :
1507 : /** Wait for the first task to complete (homogeneous overload).
1508 :
1509 : Races a vector of tasks with the same result type. Simpler than the
1510 : heterogeneous overload: returns a direct pair instead of a variant
1511 : since all tasks share the same type.
1512 :
1513 : @par Example
1514 : @code
1515 : task<void> example() {
1516 : std::vector<task<Response>> requests;
1517 : requests.push_back(fetch_from_server(0));
1518 : requests.push_back(fetch_from_server(1));
1519 : requests.push_back(fetch_from_server(2));
1520 :
1521 : auto [index, response] = co_await when_any(std::move(requests));
1522 : // index is 0, 1, or 2; response is the winner's Response
1523 : }
1524 : @endcode
1525 :
1526 : @tparam T The common result type of all tasks (must not be void).
1527 : @param tasks Vector of tasks to race concurrently (must not be empty).
1528 : @return A task yielding a pair of (winner_index, result).
1529 : @throws std::invalid_argument if tasks is empty.
1530 :
1531 : @par Key Features
1532 : @li All tasks are launched concurrently
1533 : @li Returns when first task completes (success or failure)
1534 : @li Stop is requested for all siblings
1535 : @li Waits for all siblings to complete before returning
1536 : @li If winner threw, that exception is rethrown
1537 : @li Returns simple pair (no variant needed for homogeneous types)
1538 : */
1539 : template<typename T>
1540 : requires (!std::is_void_v<T>)
1541 : [[nodiscard]] task<std::pair<std::size_t, T>>
1542 14 : when_any(std::vector<task<T>> tasks)
1543 : {
1544 : if(tasks.empty())
1545 : throw std::invalid_argument("when_any requires at least one task");
1546 :
1547 : using result_type = std::pair<std::size_t, T>;
1548 :
1549 : detail::when_any_homogeneous_state<T> state(tasks.size());
1550 :
1551 : co_await detail::when_any_homogeneous_launcher<T>(&tasks, &state);
1552 :
1553 : if(state.winner_exception_)
1554 : std::rethrow_exception(state.winner_exception_);
1555 :
1556 : co_return result_type{state.winner_index_, std::move(*state.result_)};
1557 28 : }
1558 :
1559 : /** Wait for the first task to complete (homogeneous void overload).
1560 :
1561 : Races a vector of void-returning tasks. Since void tasks have no
1562 : result value, only the winner's index is returned.
1563 :
1564 : @param tasks Vector of void tasks to race concurrently (must not be empty).
1565 : @return A task yielding the winner's index (zero-based).
1566 : @throws std::invalid_argument if tasks is empty.
1567 :
1568 : @par Key Features
1569 : @li All tasks are launched concurrently
1570 : @li Returns when first task completes (success or failure)
1571 : @li Stop is requested for all siblings
1572 : @li Waits for all siblings to complete before returning
1573 : @li If winner threw, that exception is rethrown
1574 : */
1575 : [[nodiscard]] inline task<std::size_t>
1576 2 : when_any(std::vector<task<void>> tasks)
1577 : {
1578 : if(tasks.empty())
1579 : throw std::invalid_argument("when_any requires at least one task");
1580 :
1581 : detail::when_any_homogeneous_state<void> state(tasks.size());
1582 :
1583 : co_await detail::when_any_homogeneous_launcher<void>(&tasks, &state);
1584 :
1585 : if(state.winner_exception_)
1586 : std::rethrow_exception(state.winner_exception_);
1587 :
1588 : co_return state.winner_index_;
1589 4 : }
1590 :
1591 : /** Alias for vector when_any result type.
1592 :
1593 : For homogeneous when_any (vector overload), the result is simpler:
1594 : a pair of the winner's index and the result value directly (no variant
1595 : needed since all tasks share the same type).
1596 :
1597 : @par Example
1598 : @code
1599 : void on_complete(when_any_vector_result_type<Response> result);
1600 : @endcode
1601 :
1602 : @tparam T The common result type of all tasks in the vector.
1603 : */
1604 : template<typename T>
1605 : using when_any_vector_result_type = std::pair<std::size_t, T>;
1606 :
1607 : } // namespace capy
1608 : } // namespace boost
1609 :
1610 : #endif
|