Bitcoin Core 22.99.0
P2P Digital Currency
checkqueue.h
Go to the documentation of this file.
1// Copyright (c) 2012-2020 The Bitcoin Core developers
2// Distributed under the MIT software license, see the accompanying
3// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4
5#ifndef BITCOIN_CHECKQUEUE_H
6#define BITCOIN_CHECKQUEUE_H
7
8#include <sync.h>
9#include <tinyformat.h>
11#include <util/threadnames.h>
12
13#include <algorithm>
14#include <vector>
15
16template <typename T>
18
29template <typename T>
31{
32private:
35
37 std::condition_variable m_worker_cv;
38
40 std::condition_variable m_master_cv;
41
44 std::vector<T> queue GUARDED_BY(m_mutex);
45
47 int nIdle GUARDED_BY(m_mutex){0};
48
50 int nTotal GUARDED_BY(m_mutex){0};
51
53 bool fAllOk GUARDED_BY(m_mutex){true};
54
60 unsigned int nTodo GUARDED_BY(m_mutex){0};
61
63 const unsigned int nBatchSize;
64
65 std::vector<std::thread> m_worker_threads;
66 bool m_request_stop GUARDED_BY(m_mutex){false};
67
69 bool Loop(bool fMaster)
70 {
71 std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv;
72 std::vector<T> vChecks;
73 vChecks.reserve(nBatchSize);
74 unsigned int nNow = 0;
75 bool fOk = true;
76 do {
77 {
78 WAIT_LOCK(m_mutex, lock);
79 // first do the clean-up of the previous loop run (allowing us to do it in the same critsect)
80 if (nNow) {
81 fAllOk &= fOk;
82 nTodo -= nNow;
83 if (nTodo == 0 && !fMaster)
84 // We processed the last element; inform the master it can exit and return the result
85 m_master_cv.notify_one();
86 } else {
87 // first iteration
88 nTotal++;
89 }
90 // logically, the do loop starts here
91 while (queue.empty() && !m_request_stop) {
92 if (fMaster && nTodo == 0) {
93 nTotal--;
94 bool fRet = fAllOk;
95 // reset the status for new work later
96 fAllOk = true;
97 // return the current status
98 return fRet;
99 }
100 nIdle++;
101 cond.wait(lock); // wait
102 nIdle--;
103 }
104 if (m_request_stop) {
105 return false;
106 }
107
108 // Decide how many work units to process now.
109 // * Do not try to do everything at once, but aim for increasingly smaller batches so
110 // all workers finish approximately simultaneously.
111 // * Try to account for idle jobs which will instantly start helping.
112 // * Don't do batches smaller than 1 (duh), or larger than nBatchSize.
113 nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1)));
114 vChecks.resize(nNow);
115 for (unsigned int i = 0; i < nNow; i++) {
116 // We want the lock on the m_mutex to be as short as possible, so swap jobs from the global
117 // queue to the local batch vector instead of copying.
118 vChecks[i].swap(queue.back());
119 queue.pop_back();
120 }
121 // Check whether we need to do work at all
122 fOk = fAllOk;
123 }
124 // execute work
125 for (T& check : vChecks)
126 if (fOk)
127 fOk = check();
128 vChecks.clear();
129 } while (true);
130 }
131
132public:
135
137 explicit CCheckQueue(unsigned int nBatchSizeIn)
138 : nBatchSize(nBatchSizeIn)
139 {
140 }
141
143 void StartWorkerThreads(const int threads_num)
144 {
145 {
146 LOCK(m_mutex);
147 nIdle = 0;
148 nTotal = 0;
149 fAllOk = true;
150 }
151 assert(m_worker_threads.empty());
152 for (int n = 0; n < threads_num; ++n) {
153 m_worker_threads.emplace_back([this, n]() {
154 util::ThreadRename(strprintf("scriptch.%i", n));
156 Loop(false /* worker thread */);
157 });
158 }
159 }
160
162 bool Wait()
163 {
164 return Loop(true /* master thread */);
165 }
166
168 void Add(std::vector<T>& vChecks)
169 {
170 LOCK(m_mutex);
171 for (T& check : vChecks) {
172 queue.push_back(T());
173 check.swap(queue.back());
174 }
175 nTodo += vChecks.size();
176 if (vChecks.size() == 1)
177 m_worker_cv.notify_one();
178 else if (vChecks.size() > 1)
179 m_worker_cv.notify_all();
180 }
181
184 {
185 WITH_LOCK(m_mutex, m_request_stop = true);
186 m_worker_cv.notify_all();
187 for (std::thread& t : m_worker_threads) {
188 t.join();
189 }
190 m_worker_threads.clear();
191 WITH_LOCK(m_mutex, m_request_stop = false);
192 }
193
195 {
196 assert(m_worker_threads.empty());
197 }
198
199};
200
205template <typename T>
207{
208private:
210 bool fDone;
211
212public:
216 explicit CCheckQueueControl(CCheckQueue<T> * const pqueueIn) : pqueue(pqueueIn), fDone(false)
217 {
218 // passed queue is supposed to be unused, or nullptr
219 if (pqueue != nullptr) {
220 ENTER_CRITICAL_SECTION(pqueue->m_control_mutex);
221 }
222 }
223
224 bool Wait()
225 {
226 if (pqueue == nullptr)
227 return true;
228 bool fRet = pqueue->Wait();
229 fDone = true;
230 return fRet;
231 }
232
233 void Add(std::vector<T>& vChecks)
234 {
235 if (pqueue != nullptr)
236 pqueue->Add(vChecks);
237 }
238
240 {
241 if (!fDone)
242 Wait();
243 if (pqueue != nullptr) {
244 LEAVE_CRITICAL_SECTION(pqueue->m_control_mutex);
245 }
246 }
247};
248
249#endif // BITCOIN_CHECKQUEUE_H
RAII-style controller object for a CCheckQueue that guarantees the passed queue is finished before co...
Definition: checkqueue.h:207
CCheckQueueControl & operator=(const CCheckQueueControl &)=delete
CCheckQueue< T > *const pqueue
Definition: checkqueue.h:209
CCheckQueueControl(CCheckQueue< T > *const pqueueIn)
Definition: checkqueue.h:216
CCheckQueueControl()=delete
void Add(std::vector< T > &vChecks)
Definition: checkqueue.h:233
CCheckQueueControl(const CCheckQueueControl &)=delete
Queue for verifications that have to be performed.
Definition: checkqueue.h:31
const unsigned int nBatchSize
The maximum number of elements to be processed in one batch.
Definition: checkqueue.h:60
void StopWorkerThreads()
Stop all of the worker threads.
Definition: checkqueue.h:183
std::condition_variable m_master_cv
Master thread blocks on this when out of work.
Definition: checkqueue.h:40
bool fAllOk GUARDED_BY(m_mutex)
The temporary evaluation result.
Definition: checkqueue.h:53
bool m_request_stop GUARDED_BY(m_mutex)
Definition: checkqueue.h:66
bool Wait()
Wait until execution finishes, and return whether all evaluations were successful.
Definition: checkqueue.h:162
int nTotal GUARDED_BY(m_mutex)
The total number of workers (including the master).
Definition: checkqueue.h:50
std::vector< std::thread > m_worker_threads
Definition: checkqueue.h:65
Mutex m_control_mutex
Mutex to ensure only one concurrent CCheckQueueControl.
Definition: checkqueue.h:134
bool Loop(bool fMaster)
Internal function that does bulk of the verification work.
Definition: checkqueue.h:69
std::condition_variable m_worker_cv
Worker threads block on this when out of work.
Definition: checkqueue.h:37
Mutex m_mutex
Mutex to protect the inner state.
Definition: checkqueue.h:34
void StartWorkerThreads(const int threads_num)
Create a pool of new worker threads.
Definition: checkqueue.h:143
CCheckQueue(unsigned int nBatchSizeIn)
Create a new check queue.
Definition: checkqueue.h:137
int nIdle GUARDED_BY(m_mutex)
The number of workers (including the master) that are idle.
Definition: checkqueue.h:47
void Add(std::vector< T > &vChecks)
Add a batch of checks to the queue.
Definition: checkqueue.h:168
unsigned int nTodo GUARDED_BY(m_mutex)
Number of verifications that haven't completed yet.
Definition: checkqueue.h:60
std::vector< T > queue GUARDED_BY(m_mutex)
The queue of elements to be processed.
#define T(expected, seed, data)
void ThreadRename(std::string &&)
Rename a thread both in terms of an internal (in-memory) name as well as its system thread name.
Definition: threadnames.cpp:57
#define WAIT_LOCK(cs, name)
Definition: sync.h:231
#define ENTER_CRITICAL_SECTION(cs)
Definition: sync.h:233
#define LEAVE_CRITICAL_SECTION(cs)
Definition: sync.h:239
#define LOCK(cs)
Definition: sync.h:226
#define WITH_LOCK(cs, code)
Run code while locking a mutex.
Definition: sync.h:270
void SetSyscallSandboxPolicy(SyscallSandboxPolicy syscall_policy)
Force the current thread (and threads created from the current thread) into a restricted-service oper...
#define strprintf
Format arguments and return the string or write to given std::ostream (see tinyformat::format doc for...
Definition: tinyformat.h:1164
assert(!tx.IsCoinBase())