17#include "AUI/Traits/concepts.h"
18#include "AUI/Util/ABitField.h"
26#include "AConditionVariable.h"
28#include <AUI/Common/SharedPtrTypes.h>
29#include <AUI/Common/AString.h>
30#include <AUI/Common/AException.h>
31#include <AUI/Logging/ALogger.h>
32#include <AUI/Reflect/AReflect.h>
54 ALLOW_STACKFUL_COROUTINES = 0b10,
59 ALLOW_TASK_EXECUTION_IF_NOT_PICKED_UP = 0b01,
60 DEFAULT = ALLOW_STACKFUL_COROUTINES | ALLOW_TASK_EXECUTION_IF_NOT_PICKED_UP,
64namespace aui::impl::future {
69 template<
typename Inner>
76 wrapped->waitForTask();
80 const _unique<Inner>&
ptr()
noexcept {
84 Inner* operator->()
const noexcept {
89 _unique<Inner> wrapped;
94 using type = std::function<void(
const T& value)>;
110 using type = std::function<void()>;
112 template<
typename Value =
void>
116 static constexpr bool isVoid = std::is_same_v<void, Value>;
117 using TaskCallback = std::function<Value()>;
119 using OnSuccessCallback =
typename OnSuccessCallback<Value>::type;
122 bool interrupted =
false;
126 std::conditional_t<isVoid, bool, AOptional<Value>>
value;
135 OnSuccessCallback onSuccess;
136 std::function<void(
const AException& exception)> onError;
138 bool cancelled =
false;
140 explicit Inner(std::function<Value()> task)
noexcept: task(std::move(task)) {
141 if constexpr(isVoid) {
146 void waitForTask() noexcept {
147 std::unique_lock lock(
mutex);
148 bool rethrowInterrupted =
false;
149 while ((thread) && !hasResult() && !cancelled) {
153 rethrowInterrupted =
true;
156 if (rethrowInterrupted) {
162 bool isWaitNeeded() noexcept {
163 return (thread || !cancelled) && !hasResult();
167 bool hasResult() const noexcept {
168 return value || exception || interrupted;
172 bool hasValue() const noexcept {
177 std::unique_lock lock(
mutex);
178 if (cancelled)
return true;
179 if (thread)
return true;
180 thread = std::move(thr);
186 void cancel() noexcept {
187 std::unique_lock lock(
mutex);
190 if (thread && !hasResult()) {
208 if (
auto innerCancellation = innerWeak.lock()) {
209 if (
value)
return false;
210 auto& inner = innerCancellation->ptr();
213 if (task ==
nullptr) {
216 std::unique_lock lock(
mutex);
217 if (task ==
nullptr) {
220 auto func = std::move(task);
223 innerCancellation =
nullptr;
224 if constexpr(isVoid) {
226 if (
auto sharedPtrLock = innerWeak.lock()) {
233 if (lock.owns_lock()) lock.unlock();
236 auto result = func();
237 if (
auto sharedPtrLock = innerWeak.lock()) {
239 value = std::move(result);
244 if (lock.owns_lock()) lock.unlock();
248 if (
auto sharedPtrLock = innerWeak.lock()) {
249 inner->reportInterrupted();
253 if (
auto sharedPtrLock = innerWeak.lock()) {
254 inner->reportException();
262 void reportInterrupted() noexcept {
263 std::unique_lock lock(
mutex);
268 void reportException(std::exception_ptr causedBy = std::current_exception()) noexcept {
272 std::unique_lock lock(
mutex);
273 exception.emplace(
"exception reported", std::move(causedBy));
278 auto localOnError = std::move(onError);
280 localOnError(*exception);
298 if (
value && onSuccess) {
299 auto localOnSuccess = std::move(onSuccess);
302 invokeOnSuccessCallback(localOnSuccess);
307 void invokeOnSuccessCallback(F&& f) {
309 if constexpr (isVoid) {
315 ALogger::err(
"AFuture") <<
"AFuture onSuccess thrown an exception: " << e;
319 template<
typename Callback>
320 void addOnSuccessCallback(Callback&& callback) {
321 if constexpr (isVoid) {
323 onSuccess = [prev = std::move(onSuccess),
324 callback = std::forward<Callback>(callback)]()
mutable {
329 onSuccess = [callback = std::forward<Callback>(callback)]()
mutable { callback(); };
333 onSuccess = [prev = std::move(onSuccess),
334 callback = std::forward<Callback>(callback)](
const Value& v)
mutable {
339 onSuccess = [callback = std::forward<Callback>(callback)](
const Value& v)
mutable {
346 template<
typename Callback>
347 void addOnErrorCallback(Callback&& callback) {
349 onError = [prev = std::move(onError),
350 callback = std::forward<Callback>(callback)](
const AException& v) {
355 onError = [callback = std::forward<Callback>(callback)](
const AException& v) { callback(v); };
364 struct CoPromiseType;
388 return (*mInner)->isWaitNeeded();
396 return (*mInner)->hasResult();
405 return (*mInner)->hasValue();
408 void reportException() const noexcept {
409 (*mInner)->reportException();
412 template<
typename Callback>
413 void onSuccess(Callback&& callback)
const {
415 (*mInner)->invokeOnSuccessCallback(std::forward<Callback>(callback));
418 std::unique_lock lock((*mInner)->mutex);
420 (*mInner)->invokeOnSuccessCallback(std::forward<Callback>(callback));
423 (*mInner)->addOnSuccessCallback(std::forward<Callback>(callback));
426 template<aui::invocable<const AException&> Callback>
427 void onError(Callback&& callback)
const {
428 std::unique_lock lock((*mInner)->mutex);
429 (*mInner)->addOnErrorCallback(std::forward<Callback>(callback));
435 template<aui::invocable Callback>
437 std::unique_lock lock((*mInner)->mutex);
438 (*mInner)->addOnSuccessCallback([callback](
const auto&...) { callback(); });
439 (*mInner)->addOnErrorCallback([callback = std::move(callback)](
const auto&...) { callback(); });
440 (*mInner)->notifyOnSuccessCallback(lock);
456 void reportInterrupted()
const {
457 (*mInner)->reportInterrupted();
465 void wait(AFutureWait flags = AFutureWait::DEFAULT)
const {
466 (*mInner)->wait(mInner, flags);
478 typename FutureReturnType<Value>::type
get(AFutureWait flags = AFutureWait::DEFAULT)
const {
481 (*mInner)->wait(mInner, flags);
484 if ((*mInner)->exception) {
485 throw *(*mInner)->exception;
487 if ((*mInner)->interrupted) {
491 if constexpr(!isVoid) {
492 return *(*mInner)->value;
505 typename FutureReturnType<Value>::type
operator*()
const {
506 return **
const_cast<Future*
>(
this);
534 void checkForSelfWait()
const {
619template<
typename T =
void>
625 using Task =
typename super::TaskCallback;
627 using promise_type =
typename super::CoPromiseType;
631 explicit AFuture(T immediateValue): super() {
632 auto& inner = (*super::mInner);
633 inner->value = std::move(immediateValue);
635 AFuture(Task task =
nullptr)
noexcept: super(std::move(task)) {}
651 auto& inner = (*super::mInner);
652 AUI_ASSERTX(inner->task ==
nullptr,
"task is already provided");
654 std::unique_lock lock(inner->mutex);
655 inner->value = std::move(v);
656 inner->cv.notify_all();
657 inner->notifyOnSuccessCallback(lock);
663 void supplyException(std::exception_ptr causedBy = std::current_exception()) const noexcept {
664 auto& inner = (*super::mInner);
665 inner->reportException(std::move(causedBy));
668 AFuture& operator=(std::nullptr_t)
noexcept {
669 super::mInner =
nullptr;
674 bool operator==(
const AFuture& r)
const noexcept {
675 return super::mInner == r.mInner;
697 template<aui::invocable<const T&> Callback>
699 super::onSuccess(std::forward<Callback>(callback));
722 template<aui::invocable<const AException&> Callback>
724 super::onError(std::forward<Callback>(callback));
731 template<aui::invocable Callback>
733 super::onFinally(std::forward<Callback>(callback));
740 template<aui::invocable<const T&> Callback>
743 onSuccess([result, callback = std::forward<Callback>(callback)](
const T& v) {
744 result.supplyValue(callback(v));
750 result.reportException();
764 using Task =
typename super::TaskCallback;
766 using promise_type =
typename super::CoPromiseType;
768 using Inner =
decltype(std::declval<super>().inner());
770 AFuture(Task task =
nullptr)
noexcept: super(std::move(task)) {}
782 void supplyException(std::exception_ptr causedBy = std::current_exception()) const noexcept {
783 auto& inner = (*super::mInner);
784 inner->reportException(std::move(causedBy));
793 auto& inner = (*super::mInner);
794 AUI_ASSERTX(inner->task ==
nullptr,
"task is already provided");
796 std::unique_lock lock(inner->mutex);
798 inner->cv.notify_all();
799 inner->notifyOnSuccessCallback(lock);
802 AFuture& operator=(std::nullptr_t)
noexcept {
803 super::mInner =
nullptr;
808 bool operator==(
const AFuture& r)
const noexcept {
809 return super::mInner == r.mInner;
831 template<aui::invocable Callback>
833 super::onSuccess(std::forward<Callback>(callback));
856 template<aui::invocable<const AException&> Callback>
858 super::onError(std::forward<Callback>(callback));
865 template<aui::invocable Callback>
867 super::onFinally(std::forward<Callback>(callback));
873#include <AUI/Thread/AThreadPool.h>
874#include <AUI/Common/AException.h>
876template <
typename Value>
879 if (hasResult())
return;
880 std::unique_lock lock(mutex);
882 if ((thread || !cancelled) && !hasResult() && flags & AFutureWait::ALLOW_TASK_EXECUTION_IF_NOT_PICKED_UP && task) {
885 if (tryExecute(innerWeak)) {
891 if (flags & AFutureWait::ALLOW_STACKFUL_COROUTINES) {
892 if (
auto threadPoolWorker = _cast<AThreadPool::Worker>(
AThread::current())) {
893 if (hasResult())
return;
895 auto callback = [threadPoolWorker](
auto&&...) {
896 threadPoolWorker->threadPool().wakeUpAll();
898 addOnSuccessCallback(callback);
899 addOnErrorCallback(callback);
901 threadPoolWorker->loop([&] {
902 if (lock.owns_lock())
910 while ((thread || !cancelled) && !hasResult()) {
923template<
typename Value>
926 auto initial_suspend() const noexcept
928 return std::suspend_never{};
931 auto final_suspend() const noexcept
933 return std::suspend_never{};
935 auto unhandled_exception() const noexcept {
943 void return_value(Value v)
const noexcept {
953 bool await_ready() const noexcept {
962 void await_suspend(std::coroutine_handle<> h)
974 return Awaitable{ std::move(future) };
Bit field implementation.
Definition: ABitField.h:20
Represents a condition variable.
Definition: AConditionVariable.h:24
void notify_all() noexcept
Definition: AConditionVariable.h:62
Abstract AUI exception.
Definition: AException.h:29
const AFuture & onError(Callback &&callback) const
Add onSuccess callback to the future.
Definition: AFuture.h:857
const AFuture & onFinally(Callback &&callback) const
Adds the callback to both onSuccess and onResult.
Definition: AFuture.h:866
void supplyException(std::exception_ptr causedBy=std::current_exception()) const noexcept
Stores an exception from std::current_exception to the future.
Definition: AFuture.h:782
const AFuture & onSuccess(Callback &&callback) const
Add onSuccess callback to the future.
Definition: AFuture.h:832
void supplyValue() const noexcept
Pushes "success" result.
Definition: AFuture.h:792
Represents a value that will be available at some point in the future.
Definition: AFuture.h:620
const AFuture & onSuccess(Callback &&callback) const noexcept
Add onSuccess callback to the future.
Definition: AFuture.h:698
const AFuture & onError(Callback &&callback) const noexcept
Add onError callback to the future.
Definition: AFuture.h:723
auto map(Callback &&callback) -> AFuture< decltype(callback(std::declval< T >()))> const
Maps this AFuture to another type of AFuture.
Definition: AFuture.h:741
void supplyValue(T v) const noexcept
Pushes the result to AFuture.
Definition: AFuture.h:650
void supplyException(std::exception_ptr causedBy=std::current_exception()) const noexcept
Stores an exception from std::current_exception to the future.
Definition: AFuture.h:663
const AFuture & onFinally(Callback &&callback) const noexcept
Adds the callback to both onSuccess and onResult.
Definition: AFuture.h:732
Utility wrapper implementing the stack-allocated (fast) optional idiom.
Definition: AOptional.h:32
Synchronization primitive that is implemented with atomic values instead of doing syscalls.
Definition: AMutex.h:65
static AStacktrace capture(unsigned skipFrames=0, unsigned maxFrames=128) noexcept
Creates stacktrace of the current thread.
Definition: AStacktraceImpl.cpp:156
Represents a Unicode character string.
Definition: AString.h:37
Thread pool implementation.
Definition: AThreadPool.h:33
Exception that is thrown by AThread::interruptionPoint(), if interruption is requested for this threa...
Definition: AThread.h:181
void needRethrow() const noexcept
Schedules AThread::Interrupted exception to the next interruption point. Sometimes you could not thro...
Definition: AThread.h:188
static _< AAbstractThread > current()
Definition: AThread.cpp:221
static void interruptionPoint()
Interruption point.
Definition: AThread.cpp:232
An std::weak_ptr with AUI extensions.
Definition: SharedPtrTypes.h:177
Definition: AFuture.h:114
bool isWaitNeeded() const noexcept
Definition: AFuture.h:387
bool hasValue() const noexcept
Definition: AFuture.h:404
FutureReturnType< Value >::type operator*()
Returns the supplyValue from the another thread. Sleeps if the supplyValue is not currently available...
Definition: AFuture.h:517
void onFinally(Callback &&callback) const
Adds the callback to both onSuccess and onResult.
Definition: AFuture.h:436
Value * operator->() const
Returns the supplyValue from the another thread. Sleeps if the supplyValue is not currently available...
Definition: AFuture.h:529
void cancel() const noexcept
Cancels the AFuture's task.
Definition: AFuture.h:452
void wait(AFutureWait flags=AFutureWait::DEFAULT) const
Sleeps if the supplyValue is not currently available.
Definition: AFuture.h:465
bool hasResult() const noexcept
Definition: AFuture.h:395
Future(TaskCallback task=nullptr)
Definition: AFuture.h:376
FutureReturnType< Value >::type get(AFutureWait flags=AFutureWait::DEFAULT) const
Returns the supplyValue from the another thread. Sleeps if the supplyValue is not currently available...
Definition: AFuture.h:478
FutureReturnType< Value >::type operator*() const
Returns the task result from the another thread. Sleeps if the task result is not currently available...
Definition: AFuture.h:505
AUI_ENUM_FLAG(ASide)
Describes sides of a 2D rectangle.
Definition: ASide.h:24
#define AUI_ASSERT(condition)
Asserts that the passed condition evaluates to true.
Definition: Assert.h:55
#define AUI_ASSERTX(condition, what)
Asserts that the passed condition evaluates to true. Adds extra message string.
Definition: Assert.h:74
@ DEFAULT
There's no concrete input action. Let the OS decide which action is the most appropriate.
An std::weak_ptr with AUI extensions.
Definition: SharedPtrTypes.h:51
Definition: AFuture.h:121
std::conditional_t< isVoid, bool, AOptional< Value > > value
Definition: AFuture.h:126
void notifyOnSuccessCallback(std::unique_lock< decltype(mutex)> &lock) noexcept
Calls onSuccess callback.
Definition: AFuture.h:293
bool tryExecute(const _weak< CancellationWrapper< Inner > > &innerWeak)
Executes the task stored in the future. Using weak_ptr to internal object in order to make possible F...
Definition: AFuture.h:202
ASpinlockMutex mutex
Definition: AFuture.h:132
Definition: SharedPtrTypes.h:114