18#include <AUI/Common/AVector.h>
19#include <AUI/Common/AQueue.h>
20#include <AUI/Common/AException.h>
21#include <AUI/Thread/AThread.h>
24#include "AUI/Traits/concepts.h"
38 bool processQueue(std::unique_lock<std::mutex>& mutex,
AQueue<std::function<
void()>>& queue);
41 void iteration(std::unique_lock<std::mutex>& tpLock);
42 void wait(std::unique_lock<std::mutex>& tpLock);
49 template<aui::predicate ShouldContinue>
50 void loop(ShouldContinue&& shouldContinue) {
51 std::unique_lock lock(mTP.mQueueLock);
52 while (shouldContinue()) {
54 if (!shouldContinue()) {
71 typedef std::function<void()> task;
77 std::mutex mQueueLock;
78 std::condition_variable mCV;
79 size_t mIdleWorkers = 0;
93 size_t getPendingTaskCount();
94 void run(
const std::function<
void()>& fun, Priority priority = PRIORITY_MEDIUM);
97 static void enqueue(
const std::function<
void()>& fun, Priority priority = PRIORITY_MEDIUM);
99 void setWorkersCount(std::size_t workersCount);
102 std::unique_lock lck(mQueueLock);
111 size_t getTotalWorkerCount()
const {
return mWorkers.size(); }
112 size_t getIdleWorkerCount()
const {
return mIdleWorkers; }
133 template <
typename Iterator,
typename Functor>
134 auto parallel(Iterator begin, Iterator end, Functor&& functor);
136 template <aui::invocable Callable>
137 [[nodiscard]]
inline auto operator*(Callable fun) {
138 using Value = std::invoke_result_t<Callable>;
141 [innerWeak = future.inner().weak()]() {
146 if (auto lock = innerWeak.lock()) {
147 auto innerUnsafePointer = lock->ptr().get();
152 innerUnsafePointer->tryExecute(
156 AThreadPool::PRIORITY_LOWEST);
163#include <AUI/Thread/AFuture.h>
174template <
typename T =
void>
215 template <aui::invocable OnComplete>
228 OnComplete onComplete;
230 std::atomic_bool canBeCalled =
true;
232 auto temporary = _new<Temporary>(std::forward<OnComplete>(onComplete), *
this);
235 for (
const AFuture<T>& v : temporary->myCopy) {
236 if (!v.hasResult()) {
241 if (temporary->canBeCalled.exchange(
false)) {
242 temporary->onComplete();
249template <
typename Iterator,
typename Functor>
251 using ResultType =
decltype(std::declval<Functor>()(std::declval<Iterator>(), std::declval<Iterator>()));
254 size_t itemCount = end - begin;
258 size_t itemsPerThread = itemCount / affinity;
260 for (
size_t threadIndex = 0; threadIndex < affinity; ++threadIndex) {
261 auto forThreadBegin = begin;
262 begin += itemsPerThread;
263 auto forThreadEnd = threadIndex + 1 == affinity ? end : begin;
264 futureSet.push_back(*
this *
265 [functor = std::forward<Functor>(functor), forThreadBegin,
266 forThreadEnd]() ->
decltype(
auto) {
return functor(forThreadBegin, forThreadEnd); });
272#include <AUI/Reflect/AReflect.h>
Manages multiple futures.
Definition: AThreadPool.h:175
void waitForAll()
Wait for the result of every AFuture.
Definition: AThreadPool.h:186
void checkForExceptions() const
Find AFutures that encountered an exception. If such AFuture is found, AInvocationTargetException is ...
Definition: AThreadPool.h:197
void onAllComplete(OnComplete &&onComplete)
Specifies a callback which will be called when all futures in future set would have the result.
Definition: AThreadPool.h:216
Represents a value that will be available at some point in the future.
Definition: AFuture.h:620
const AFuture & onSuccess(Callback &&callback) const noexcept
Add onSuccess callback to the future.
Definition: AFuture.h:698
A std::queue with AUI extensions.
Definition: AQueue.h:24
Definition: AThreadPool.h:160
Definition: AThreadPool.h:35
Thread pool implementation.
Definition: AThreadPool.h:33
static AThreadPool & global()
Global thread pool created with the default constructor.
Definition: AThreadPool.cpp:145
auto parallel(Iterator begin, Iterator end, Functor &&functor)
Definition: AThreadPool.h:250
Represents a user-defined thread.
Definition: AThread.h:174
A std::vector with AUI extensions.
Definition: AVector.h:38
bool hasResult() const noexcept
Definition: AFuture.h:395
Definition: iterators.h:34