17#include "AUI/Traits/concepts.h"
18#include "AUI/Util/ABitField.h"
26#include "AConditionVariable.h"
28#include <AUI/Common/SharedPtrTypes.h>
29#include <AUI/Common/AString.h>
30#include <AUI/Common/AException.h>
31#include <AUI/Logging/ALogger.h>
32#include <AUI/Reflect/AReflect.h>
37class AInvocationTargetException:
public AException {
40 AInvocationTargetException(
const AString& message = {}, std::exception_ptr causedBy = std::current_exception()):
44 ~AInvocationTargetException()
noexcept override =
default;
54 ALLOW_STACKFUL_COROUTINES = 0b10,
59 ALLOW_TASK_EXECUTION_IF_NOT_PICKED_UP = 0b01,
60 DEFAULT = ALLOW_STACKFUL_COROUTINES | ALLOW_TASK_EXECUTION_IF_NOT_PICKED_UP,
64namespace aui::impl::future {
69 template<
typename Inner>
70 struct CancellationWrapper {
72 explicit CancellationWrapper(_unique<Inner> wrapped) : wrapped(std::move(wrapped)) {}
74 ~CancellationWrapper() {
76 wrapped->waitForTask();
80 const _unique<Inner>& ptr()
noexcept {
84 Inner* operator->()
const noexcept {
89 _unique<Inner> wrapped;
94 using type = std::function<void(
const T& value)>;
110 using type = std::function<void()>;
112 template<
typename Value =
void>
116 static constexpr bool isVoid = std::is_same_v<void, Value>;
117 using TaskCallback = std::function<Value()>;
119 using OnSuccessCallback =
typename OnSuccessCallback<Value>::type;
122 bool interrupted =
false;
126 std::conditional_t<isVoid, bool, AOptional<Value>>
value;
135 OnSuccessCallback onSuccess;
136 std::function<void(
const AException& exception)> onError;
138 bool cancelled =
false;
140 explicit Inner(std::function<Value()> task)
noexcept: task(std::move(task)) {
141 if constexpr(isVoid) {
146 void waitForTask() noexcept {
147 std::unique_lock lock(
mutex);
148 bool rethrowInterrupted =
false;
149 while ((thread) && !
hasResult() && !cancelled) {
153 rethrowInterrupted =
true;
156 if (rethrowInterrupted) {
162 bool isWaitNeeded() noexcept {
163 return (thread || !cancelled) && !hasResult();
167 bool hasResult() const noexcept {
168 return value || exception || interrupted;
172 bool hasValue() const noexcept {
176 bool setThread(_<AAbstractThread> thr)
noexcept {
177 std::unique_lock lock(
mutex);
178 if (cancelled)
return true;
179 if (thread)
return true;
180 thread = std::move(thr);
184 void wait(
const _weak<CancellationWrapper<Inner>>& innerWeak, ABitField<AFutureWait> flags = AFutureWait::DEFAULT)
noexcept;
186 void cancel() noexcept {
187 std::unique_lock lock(
mutex);
190 if (thread && !hasResult()) {
208 if (
auto innerCancellation = innerWeak.lock()) {
209 if (
value)
return false;
210 auto& inner = innerCancellation->ptr();
213 if (task ==
nullptr) {
216 std::unique_lock lock(
mutex);
217 if (task ==
nullptr) {
220 auto func = std::move(task);
223 innerCancellation =
nullptr;
224 if constexpr(isVoid) {
226 if (
auto sharedPtrLock = innerWeak.lock()) {
233 if (lock.owns_lock()) lock.unlock();
236 auto result = func();
237 if (
auto sharedPtrLock = innerWeak.lock()) {
239 value = std::move(result);
244 if (lock.owns_lock()) lock.unlock();
248 if (
auto sharedPtrLock = innerWeak.lock()) {
249 inner->reportInterrupted();
253 if (
auto sharedPtrLock = innerWeak.lock()) {
254 inner->reportException();
262 void reportInterrupted() noexcept {
263 std::unique_lock lock(
mutex);
268 void reportException(std::exception_ptr causedBy = std::current_exception()) noexcept {
272 std::unique_lock lock(
mutex);
273 exception.emplace(
"exception reported", std::move(causedBy));
278 auto localOnError = std::move(onError);
280 localOnError(*exception);
298 if (
value && onSuccess) {
299 auto localOnSuccess = std::move(onSuccess);
302 invokeOnSuccessCallback(localOnSuccess);
307 void invokeOnSuccessCallback(F&& f) {
309 if constexpr (isVoid) {
315 ALogger::err(
"AFuture") <<
"AFuture onSuccess thrown an exception: " << e;
319 template<
typename Callback>
320 void addOnSuccessCallback(Callback&& callback) {
321 if constexpr (isVoid) {
323 onSuccess = [prev = std::move(onSuccess),
324 callback = std::forward<Callback>(callback)]()
mutable {
329 onSuccess = [callback = std::forward<Callback>(callback)]()
mutable { callback(); };
333 onSuccess = [prev = std::move(onSuccess),
334 callback = std::forward<Callback>(callback)](
const Value& v)
mutable {
339 onSuccess = [callback = std::forward<Callback>(callback)](
const Value& v)
mutable {
346 template<
typename Callback>
347 void addOnErrorCallback(Callback&& callback) {
349 onError = [prev = std::move(onError),
350 callback = std::forward<Callback>(callback)](
const AException& v) {
355 onError = [callback = std::forward<Callback>(callback)](
const AException& v) { callback(v); };
364 struct CoPromiseType;
368 _<CancellationWrapper<Inner>> mInner;
388 return (*mInner)->isWaitNeeded();
396 return (*mInner)->hasResult();
405 return (*mInner)->hasValue();
408 void reportException() const noexcept {
409 (*mInner)->reportException();
412 template<
typename Callback>
413 void onSuccess(Callback&& callback)
const {
415 (*mInner)->invokeOnSuccessCallback(std::forward<Callback>(callback));
418 std::unique_lock lock((*mInner)->mutex);
420 (*mInner)->invokeOnSuccessCallback(std::forward<Callback>(callback));
423 (*mInner)->addOnSuccessCallback(std::forward<Callback>(callback));
426 template<aui::invocable<const AException&> Callback>
427 void onError(Callback&& callback)
const {
428 std::unique_lock lock((*mInner)->mutex);
429 (*mInner)->addOnErrorCallback(std::forward<Callback>(callback));
435 template<aui::invocable Callback>
437 std::unique_lock lock((*mInner)->mutex);
438 (*mInner)->addOnSuccessCallback([callback](
const auto&...) { callback(); });
439 (*mInner)->addOnErrorCallback([callback = std::move(callback)](
const auto&...) { callback(); });
440 (*mInner)->notifyOnSuccessCallback(lock);
456 void reportInterrupted()
const {
457 (*mInner)->reportInterrupted();
465 void wait(AFutureWait flags = AFutureWait::DEFAULT)
const {
466 (*mInner)->wait(mInner, flags);
478 typename FutureReturnType<Value>::type
get(AFutureWait flags = AFutureWait::DEFAULT)
const {
481 (*mInner)->wait(mInner, flags);
484 if ((*mInner)->exception) {
485 throw *(*mInner)->exception;
487 if ((*mInner)->interrupted) {
491 if constexpr(!isVoid) {
492 return *(*mInner)->value;
505 typename FutureReturnType<Value>::type
operator*()
const {
506 return **
const_cast<Future*
>(
this);
534 void checkForSelfWait()
const {
620template<
typename T =
void>
626 using Task =
typename super::TaskCallback;
628 using promise_type =
typename super::CoPromiseType;
632 explicit AFuture(T immediateValue): super() {
633 auto& inner = (*super::mInner);
634 inner->value = std::move(immediateValue);
636 AFuture(Task task =
nullptr)
noexcept: super(std::move(task)) {}
637 ~AFuture() =
default;
639 AFuture(
const AFuture&) =
default;
640 AFuture(AFuture&&)
noexcept =
default;
642 AFuture& operator=(
const AFuture&) =
default;
643 AFuture& operator=(AFuture&&)
noexcept =
default;
652 auto& inner = (*super::mInner);
653 AUI_ASSERTX(inner->task ==
nullptr,
"task is already provided");
655 std::unique_lock lock(inner->mutex);
656 inner->value = std::move(v);
657 inner->cv.notify_all();
658 inner->notifyOnSuccessCallback(lock);
664 void supplyException(std::exception_ptr causedBy = std::current_exception()) const noexcept {
665 auto& inner = (*super::mInner);
666 inner->reportException(std::move(causedBy));
669 AFuture& operator=(std::nullptr_t)
noexcept {
670 super::mInner =
nullptr;
675 bool operator==(std::nullptr_t)
const noexcept {
676 return super::mInner ==
nullptr;
680 bool operator!=(std::nullptr_t)
const noexcept {
681 return super::mInner !=
nullptr;
685 bool operator==(
const AFuture& r)
const noexcept {
686 return super::mInner == r.mInner;
708 template<aui::invocable<const T&> Callback>
709 const AFuture&
onSuccess(Callback&& callback)
const noexcept {
710 super::onSuccess(std::forward<Callback>(callback));
733 template<aui::invocable<const AException&> Callback>
734 const AFuture&
onError(Callback&& callback)
const noexcept {
735 super::onError(std::forward<Callback>(callback));
742 template<aui::invocable Callback>
743 const AFuture&
onFinally(Callback&& callback)
const noexcept {
744 super::onFinally(std::forward<Callback>(callback));
751 template<aui::invocable<const T&> Callback>
752 auto map(Callback&& callback) -> AFuture<decltype(callback(std::declval<T>()))>
const {
753 AFuture<decltype(callback(std::declval<T>()))> result;
754 onSuccess([result, callback = std::forward<Callback>(callback)](
const T& v) {
755 result.supplyValue(callback(v));
761 result.reportException();
775 using Task =
typename super::TaskCallback;
777 using promise_type =
typename super::CoPromiseType;
779 using Inner =
decltype(std::declval<super>().inner());
781 AFuture(Task task =
nullptr)
noexcept: super(std::move(task)) {}
782 ~AFuture() =
default;
784 AFuture(
const AFuture&) =
default;
785 AFuture(AFuture&&)
noexcept =
default;
787 AFuture& operator=(
const AFuture&) =
default;
788 AFuture& operator=(AFuture&&)
noexcept =
default;
793 void supplyException(std::exception_ptr causedBy = std::current_exception()) const noexcept {
794 auto& inner = (*super::mInner);
795 inner->reportException(std::move(causedBy));
804 auto& inner = (*super::mInner);
805 AUI_ASSERTX(inner->task ==
nullptr,
"task is already provided");
807 std::unique_lock lock(inner->mutex);
809 inner->cv.notify_all();
810 inner->notifyOnSuccessCallback(lock);
813 AFuture& operator=(std::nullptr_t)
noexcept {
814 super::mInner =
nullptr;
819 bool operator==(std::nullptr_t)
const noexcept {
820 return super::mInner ==
nullptr;
824 bool operator!=(std::nullptr_t)
const noexcept {
825 return super::mInner !=
nullptr;
829 bool operator==(
const AFuture& r)
const noexcept {
830 return super::mInner == r.mInner;
852 template<aui::invocable Callback>
854 super::onSuccess(std::forward<Callback>(callback));
877 template<aui::invocable<const AException&> Callback>
878 const AFuture&
onError(Callback&& callback)
const {
879 super::onError(std::forward<Callback>(callback));
886 template<aui::invocable Callback>
888 super::onFinally(std::forward<Callback>(callback));
894#include <AUI/Thread/AThreadPool.h>
895#include <AUI/Common/AException.h>
897template <
typename Value>
898void aui::impl::future::Future<Value>::Inner::wait(
const _weak<CancellationWrapper<Inner>>& innerWeak,
900 if (hasResult())
return;
901 std::unique_lock lock(mutex);
903 if ((thread || !cancelled) && !hasResult() && flags & AFutureWait::ALLOW_TASK_EXECUTION_IF_NOT_PICKED_UP && task) {
906 if (tryExecute(innerWeak)) {
912 if (flags & AFutureWait::ALLOW_STACKFUL_COROUTINES) {
913 if (
auto threadPoolWorker = _cast<AThreadPool::Worker>(
AThread::current())) {
914 if (hasResult())
return;
916 auto callback = [threadPoolWorker](
auto&&...) {
917 threadPoolWorker->threadPool().wakeUpAll();
919 addOnSuccessCallback(callback);
920 addOnErrorCallback(callback);
922 threadPoolWorker->loop([&] {
923 if (lock.owns_lock())
931 while ((thread || !cancelled) && !hasResult()) {
944template<
typename Value>
946 AFuture<Value> future;
947 auto initial_suspend() const noexcept
949 return std::suspend_never{};
952 auto final_suspend() const noexcept
954 return std::suspend_never{};
956 auto unhandled_exception() const noexcept {
957 future.supplyException();
960 const AFuture<Value>& get_return_object() const noexcept {
964 void return_value(Value v)
const noexcept {
965 future.supplyValue(std::move(v));
974 bool await_ready() const noexcept {
983 void await_suspend(std::coroutine_handle<> h)
989 future.
onError([h](
const AException&) {
995 return Awaitable{ std::move(future) };
Bit field implementation.
Definition ABitField.h:20
Represents a condition variable.
Definition AConditionVariable.h:24
void notify_all() noexcept
Definition AConditionVariable.h:62
Abstract AUI exception.
Definition AException.h:28
Represents a value that will be available at some point in the future.
Definition AFuture.h:621
const AFuture & onError(Callback &&callback) const
Add onSuccess callback to the future.
Definition AFuture.h:878
const AFuture & onSuccess(Callback &&callback) const noexcept
Add onSuccess callback to the future.
Definition AFuture.h:709
const AFuture & onFinally(Callback &&callback) const
Adds the callback to both onSuccess and onResult.
Definition AFuture.h:887
const AFuture & onError(Callback &&callback) const noexcept
Add onError callback to the future.
Definition AFuture.h:734
void supplyException(std::exception_ptr causedBy=std::current_exception()) const noexcept
Stores an exception from std::current_exception to the future.
Definition AFuture.h:793
auto map(Callback &&callback) -> AFuture< decltype(callback(std::declval< T >()))> const
Maps this AFuture to another type of AFuture.
Definition AFuture.h:752
void supplyValue(T v) const noexcept
Pushes the result to AFuture.
Definition AFuture.h:651
const AFuture & onSuccess(Callback &&callback) const
Add onSuccess callback to the future.
Definition AFuture.h:853
void supplyException(std::exception_ptr causedBy=std::current_exception()) const noexcept
Stores an exception from std::current_exception to the future.
Definition AFuture.h:664
const AFuture & onFinally(Callback &&callback) const noexcept
Adds the callback to both onSuccess and onResult.
Definition AFuture.h:743
void supplyValue() const noexcept
Pushes "success" result.
Definition AFuture.h:803
Utility wrapper implementing the stack-allocated (fast) optional idiom.
Definition AOptional.h:33
Synchronization primitive that is implemented with atomic values instead of doing syscalls.
Definition AMutex.h:65
static AStacktrace capture(unsigned skipFrames=0, unsigned maxFrames=128) noexcept
Creates stacktrace of the current thread.
Represents a Unicode character string.
Definition AString.h:38
Thread pool implementation.
Definition AThreadPool.h:33
Exception that is thrown by AThread::interruptionPoint(), if interruption is requested for this threa...
Definition AThread.h:175
void needRethrow() const noexcept
Schedules AThread::Interrupted exception to the next interruption point. Sometimes you could not thro...
Definition AThread.h:182
static void interruptionPoint()
Interruption point.
static _< AAbstractThread > current()
An std::weak_ptr with AUI extensions.
Definition SharedPtrTypes.h:179
bool isWaitNeeded() const noexcept
Definition AFuture.h:387
bool hasValue() const noexcept
Definition AFuture.h:404
FutureReturnType< Value >::type operator*()
Returns the supplyValue from the another thread. Sleeps if the supplyValue is not currently available...
Definition AFuture.h:517
void onFinally(Callback &&callback) const
Adds the callback to both onSuccess and onResult.
Definition AFuture.h:436
Value * operator->() const
Returns the supplyValue from the another thread. Sleeps if the supplyValue is not currently available...
Definition AFuture.h:529
void cancel() const noexcept
Cancels the AFuture's task.
Definition AFuture.h:452
void wait(AFutureWait flags=AFutureWait::DEFAULT) const
Sleeps if the supplyValue is not currently available.
Definition AFuture.h:465
bool hasResult() const noexcept
Definition AFuture.h:395
Future(TaskCallback task=nullptr)
Definition AFuture.h:376
FutureReturnType< Value >::type get(AFutureWait flags=AFutureWait::DEFAULT) const
Returns the supplyValue from the another thread. Sleeps if the supplyValue is not currently available...
Definition AFuture.h:478
FutureReturnType< Value >::type operator*() const
Returns the task result from the another thread. Sleeps if the task result is not currently available...
Definition AFuture.h:505
#define AUI_ENUM_FLAG(name)
Make a bitfield-style enum class.
Definition AEnumerate.h:227
#define AUI_ASSERT(condition)
Asserts that the passed condition evaluates to true.
Definition Assert.h:55
#define AUI_ASSERTX(condition, what)
Asserts that the passed condition evaluates to true. Adds extra message string.
Definition Assert.h:74
@ DEFAULT
There's no concrete input action. Let the OS decide which action is the most appropriate.
Definition ATextInputActionIcon.h:36
An std::weak_ptr with AUI extensions.
Definition SharedPtrTypes.h:52
std::conditional_t< isVoid, bool, AOptional< Value > > value
Definition AFuture.h:126
void notifyOnSuccessCallback(std::unique_lock< decltype(mutex)> &lock) noexcept
Calls onSuccess callback.
Definition AFuture.h:293
bool tryExecute(const _weak< CancellationWrapper< Inner > > &innerWeak)
Executes the task stored in the future. Using weak_ptr to internal object in order to make possible F...
Definition AFuture.h:202
ASpinlockMutex mutex
Definition AFuture.h:132