AUI Framework  master
Cross-platform module-based framework for developing C++20 desktop applications
AThreadPool.h
1/*
2 * AUI Framework - Declarative UI toolkit for modern C++20
3 * Copyright (C) 2020-2024 Alex2772 and Contributors
4 *
5 * SPDX-License-Identifier: MPL-2.0
6 *
7 * This Source Code Form is subject to the terms of the Mozilla Public
8 * License, v. 2.0. If a copy of the MPL was not distributed with this
9 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
10 */
11
12#pragma once
13
14#include <AUI/Core.h>
15#include <cassert>
16#include <atomic>
17
18#include <AUI/Common/AVector.h>
19#include <AUI/Common/AQueue.h>
20#include <AUI/Common/AException.h>
21#include <AUI/Thread/AThread.h>
22#include <glm/glm.hpp>
23#include <utility>
24#include "AUI/Traits/concepts.h"
25
26template<typename T>
27class AFuture;
28
33class API_AUI_CORE AThreadPool {
34 public:
35 class API_AUI_CORE Worker : public AThread {
36 private:
37 bool mEnabled = true;
38 bool processQueue(std::unique_lock<std::mutex>& mutex, AQueue<std::function<void()>>& queue);
39 AThreadPool& mTP;
40
41 void iteration(std::unique_lock<std::mutex>& tpLock);
42 void wait(std::unique_lock<std::mutex>& tpLock);
43
44 public:
45 Worker(AThreadPool& tp, size_t index);
46 ~Worker();
47 void aboutToDelete();
48
49 template<aui::predicate ShouldContinue>
50 void loop(ShouldContinue&& shouldContinue) {
51 std::unique_lock lock(mTP.mQueueLock);
52 while (shouldContinue()) {
53 iteration(lock);
54 if (!shouldContinue()) {
55 return;
56 }
57 wait(lock);
58 }
59 }
60
61 AThreadPool& threadPool() noexcept { return mTP; }
62 };
63
64 enum Priority {
65 PRIORITY_HIGHEST,
66 PRIORITY_MEDIUM,
67 PRIORITY_LOWEST,
68 };
69
70 protected:
71 typedef std::function<void()> task;
72 AVector<_<Worker>> mWorkers;
73 AQueue<task> mQueueHighest;
74 AQueue<task> mQueueMedium;
75 AQueue<task> mQueueLowest;
76 AQueue<task> mQueueTryLater;
77 std::mutex mQueueLock;
78 std::condition_variable mCV;
79 size_t mIdleWorkers = 0;
80
81 public:
86 AThreadPool(size_t size);
87
93 size_t getPendingTaskCount();
94 void run(const std::function<void()>& fun, Priority priority = PRIORITY_MEDIUM);
95 void clear();
96 void runLaterTasks();
97 static void enqueue(const std::function<void()>& fun, Priority priority = PRIORITY_MEDIUM);
98
99 void setWorkersCount(std::size_t workersCount);
100
101 void wakeUpAll() {
102 std::unique_lock lck(mQueueLock);
103 mCV.notify_all();
104 }
105
109 static AThreadPool& global();
110
111 size_t getTotalWorkerCount() const { return mWorkers.size(); }
112 size_t getIdleWorkerCount() const { return mIdleWorkers; }
113
133 template <typename Iterator, typename Functor>
134 auto parallel(Iterator begin, Iterator end, Functor&& functor);
135
136 template <aui::invocable Callable>
137 [[nodiscard]] inline auto operator*(Callable fun) {
138 using Value = std::invoke_result_t<Callable>;
139 AFuture<Value> future(std::move(fun));
140 run(
141 [innerWeak = future.inner().weak()]() {
142 /*
143 * Avoid holding a strong reference - we need to keep future cancellation on reference count exceeding
144 * even while actual future execution.
145 */
146 if (auto lock = innerWeak.lock()) {
147 auto innerUnsafePointer = lock->ptr().get(); // using .get() here in order to bypass
148 // null check in operator->
149
150 lock = nullptr; // destroy strong ref
151
152 innerUnsafePointer->tryExecute(
153 innerWeak); // there's a check inside tryExecute to check its validity
154 }
155 },
156 AThreadPool::PRIORITY_LOWEST);
157 return future;
158 }
159
161};
162
163#include <AUI/Thread/AFuture.h>
164
174template <typename T = void>
175class AFutureSet : public AVector<AFuture<T>> {
176 private:
177 using super = AVector<AFuture<T>>;
178
179 public:
181
186 void waitForAll() {
187 // wait from the end to avoid idling (see AFuture::wait for details)
188 for (const AFuture<T>& v : aui::reverse_iterator_wrap(*this)) {
189 v.operator*();
190 }
191 }
192
197 void checkForExceptions() const {
198 for (const AFuture<T>& v : *this) {
199 if (v.hasResult()) {
200 v.operator*(); // TODO bad design
201 }
202 }
203 }
204
215 template <aui::invocable OnComplete>
216 void onAllComplete(OnComplete&& onComplete) {
217 // check if all futures is already complete.
218 for (const AFuture<T>& v : *this) {
219 if (!v.hasResult()) {
220 goto setupTheHell;
221 }
222 }
223 onComplete();
224 return;
225
226 setupTheHell:
227 struct Temporary {
228 OnComplete onComplete;
229 AFutureSet myCopy;
230 std::atomic_bool canBeCalled = true;
231 };
232 auto temporary = _new<Temporary>(std::forward<OnComplete>(onComplete), *this);
233 for (const AFuture<T>& v : *this) {
234 v.onSuccess([temporary](const auto& v) {
235 for (const AFuture<T>& v : temporary->myCopy) {
236 if (!v.hasResult()) {
237 return;
238 }
239 }
240 // yay! all tasks are completed. the last thing to check if the callback is already called
241 if (temporary->canBeCalled.exchange(false)) {
242 temporary->onComplete();
243 }
244 });
245 }
246 }
247};
248
249template <typename Iterator, typename Functor>
250auto AThreadPool::parallel(Iterator begin, Iterator end, Functor&& functor) {
251 using ResultType = decltype(std::declval<Functor>()(std::declval<Iterator>(), std::declval<Iterator>()));
252 AFutureSet<ResultType> futureSet;
253
254 size_t itemCount = end - begin;
255 size_t affinity = (glm::min)(AThreadPool::global().getTotalWorkerCount(), itemCount);
256 if (affinity == 0)
257 return futureSet;
258 size_t itemsPerThread = itemCount / affinity;
259
260 for (size_t threadIndex = 0; threadIndex < affinity; ++threadIndex) {
261 auto forThreadBegin = begin;
262 begin += itemsPerThread;
263 auto forThreadEnd = threadIndex + 1 == affinity ? end : begin;
264 futureSet.push_back(*this *
265 [functor = std::forward<Functor>(functor), forThreadBegin,
266 forThreadEnd]() -> decltype(auto) { return functor(forThreadBegin, forThreadEnd); });
267 }
268
269 return futureSet;
270}
271
272#include <AUI/Reflect/AReflect.h>
Manages multiple futures.
Definition: AThreadPool.h:175
void waitForAll()
Wait for the result of every AFuture.
Definition: AThreadPool.h:186
void checkForExceptions() const
Find AFutures that encountered an exception. If such AFuture is found, AInvocationTargetException is ...
Definition: AThreadPool.h:197
void onAllComplete(OnComplete &&onComplete)
Specifies a callback which will be called when all futures in future set would have the result.
Definition: AThreadPool.h:216
Represents a value that will be available at some point in the future.
Definition: AFuture.h:620
const AFuture & onSuccess(Callback &&callback) const noexcept
Add onSuccess callback to the future.
Definition: AFuture.h:698
A std::queue with AUI extensions.
Definition: AQueue.h:24
Definition: AThreadPool.h:160
Definition: AThreadPool.h:35
Thread pool implementation.
Definition: AThreadPool.h:33
static AThreadPool & global()
Global thread pool created with the default constructor.
Definition: AThreadPool.cpp:145
auto parallel(Iterator begin, Iterator end, Functor &&functor)
Definition: AThreadPool.h:250
Represents a user-defined thread.
Definition: AThread.h:174
A std::vector with AUI extensions.
Definition: AVector.h:38
bool hasResult() const noexcept
Definition: AFuture.h:395
Definition: iterators.h:34