Bitcoin Core 22.99.0
P2P Digital Currency
scheduler.cpp
Go to the documentation of this file.
1// Copyright (c) 2015-2020 The Bitcoin Core developers
2// Distributed under the MIT software license, see the accompanying
3// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4
5#include <scheduler.h>
6
7#include <random.h>
9#include <util/time.h>
10
11#include <assert.h>
12#include <functional>
13#include <utility>
14
16{
17}
18
20{
21 assert(nThreadsServicingQueue == 0);
22 if (stopWhenEmpty) assert(taskQueue.empty());
23}
24
25
27{
30 ++nThreadsServicingQueue;
31
32 // newTaskMutex is locked throughout this loop EXCEPT
33 // when the thread is waiting or when the user's function
34 // is called.
35 while (!shouldStop()) {
36 try {
37 while (!shouldStop() && taskQueue.empty()) {
38 // Wait until there is something to do.
39 newTaskScheduled.wait(lock);
40 }
41
42 // Wait until either there is a new task, or until
43 // the time of the first item on the queue:
44
45 while (!shouldStop() && !taskQueue.empty()) {
46 std::chrono::system_clock::time_point timeToWaitFor = taskQueue.begin()->first;
47 if (newTaskScheduled.wait_until(lock, timeToWaitFor) == std::cv_status::timeout) {
48 break; // Exit loop after timeout, it means we reached the time of the event
49 }
50 }
51
52 // If there are multiple threads, the queue can empty while we're waiting (another
53 // thread may service the task we were waiting on).
54 if (shouldStop() || taskQueue.empty())
55 continue;
56
57 Function f = taskQueue.begin()->second;
58 taskQueue.erase(taskQueue.begin());
59
60 {
61 // Unlock before calling f, so it can reschedule itself or another task
62 // without deadlocking:
63 REVERSE_LOCK(lock);
64 f();
65 }
66 } catch (...) {
67 --nThreadsServicingQueue;
68 throw;
69 }
70 }
71 --nThreadsServicingQueue;
72 newTaskScheduled.notify_one();
73}
74
75void CScheduler::schedule(CScheduler::Function f, std::chrono::system_clock::time_point t)
76{
77 {
79 taskQueue.insert(std::make_pair(t, f));
80 }
81 newTaskScheduled.notify_one();
82}
83
84void CScheduler::MockForward(std::chrono::seconds delta_seconds)
85{
86 assert(delta_seconds > 0s && delta_seconds <= 1h);
87
88 {
90
91 // use temp_queue to maintain updated schedule
92 std::multimap<std::chrono::system_clock::time_point, Function> temp_queue;
93
94 for (const auto& element : taskQueue) {
95 temp_queue.emplace_hint(temp_queue.cend(), element.first - delta_seconds, element.second);
96 }
97
98 // point taskQueue to temp_queue
99 taskQueue = std::move(temp_queue);
100 }
101
102 // notify that the taskQueue needs to be processed
103 newTaskScheduled.notify_one();
104}
105
106static void Repeat(CScheduler& s, CScheduler::Function f, std::chrono::milliseconds delta)
107{
108 f();
109 s.scheduleFromNow([=, &s] { Repeat(s, f, delta); }, delta);
110}
111
112void CScheduler::scheduleEvery(CScheduler::Function f, std::chrono::milliseconds delta)
113{
114 scheduleFromNow([=] { Repeat(*this, f, delta); }, delta);
115}
116
117size_t CScheduler::getQueueInfo(std::chrono::system_clock::time_point& first,
118 std::chrono::system_clock::time_point& last) const
119{
121 size_t result = taskQueue.size();
122 if (!taskQueue.empty()) {
123 first = taskQueue.begin()->first;
124 last = taskQueue.rbegin()->first;
125 }
126 return result;
127}
128
130{
132 return nThreadsServicingQueue;
133}
134
135
137{
138 {
140 // Try to avoid scheduling too many copies here, but if we
141 // accidentally have two ProcessQueue's scheduled at once its
142 // not a big deal.
143 if (m_are_callbacks_running) return;
144 if (m_callbacks_pending.empty()) return;
145 }
146 m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this), std::chrono::system_clock::now());
147}
148
150{
151 std::function<void()> callback;
152 {
154 if (m_are_callbacks_running) return;
155 if (m_callbacks_pending.empty()) return;
156 m_are_callbacks_running = true;
157
158 callback = std::move(m_callbacks_pending.front());
159 m_callbacks_pending.pop_front();
160 }
161
162 // RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue
163 // to ensure both happen safely even if callback() throws.
164 struct RAIICallbacksRunning {
166 explicit RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {}
167 ~RAIICallbacksRunning()
168 {
169 {
170 LOCK(instance->m_cs_callbacks_pending);
171 instance->m_are_callbacks_running = false;
172 }
173 instance->MaybeScheduleProcessQueue();
174 }
175 } raiicallbacksrunning(this);
176
177 callback();
178}
179
181{
183
184 {
186 m_callbacks_pending.emplace_back(std::move(func));
187 }
189}
190
192{
194 bool should_continue = true;
195 while (should_continue) {
196 ProcessQueue();
198 should_continue = !m_callbacks_pending.empty();
199 }
200}
201
203{
205 return m_callbacks_pending.size();
206}
Simple class for background tasks that should be run periodically or once "after a while".
Definition: scheduler.h:34
void serviceQueue()
Services the queue 'forever'.
Definition: scheduler.cpp:26
void scheduleFromNow(Function f, std::chrono::milliseconds delta)
Call f once after the delta has passed.
Definition: scheduler.h:47
void schedule(Function f, std::chrono::system_clock::time_point t)
Call func at/after time t.
Definition: scheduler.cpp:75
void scheduleEvery(Function f, std::chrono::milliseconds delta)
Repeat f until the scheduler is stopped.
Definition: scheduler.cpp:112
std::function< void()> Function
Definition: scheduler.h:41
void MockForward(std::chrono::seconds delta_seconds)
Mock the scheduler to fast forward in time.
Definition: scheduler.cpp:84
size_t getQueueInfo(std::chrono::system_clock::time_point &first, std::chrono::system_clock::time_point &last) const
Returns number of tasks waiting to be serviced, and first and last task times.
Definition: scheduler.cpp:117
bool shouldStop() const EXCLUSIVE_LOCKS_REQUIRED(newTaskMutex)
Definition: scheduler.h:104
bool AreThreadsServicingQueue() const
Returns true if there are threads actively running in serviceQueue()
Definition: scheduler.cpp:129
std::condition_variable newTaskScheduled
Definition: scheduler.h:99
Mutex newTaskMutex
Definition: scheduler.h:98
Class used by CScheduler clients which may schedule multiple jobs which are required to be run serial...
Definition: scheduler.h:118
void EmptyQueue()
Processes all remaining queue members on the calling thread, blocking until queue is empty Must be ca...
Definition: scheduler.cpp:191
void AddToProcessQueue(std::function< void()> func)
Add a callback to be executed.
Definition: scheduler.cpp:180
RecursiveMutex m_cs_callbacks_pending
Definition: scheduler.h:122
static void Repeat(CScheduler &s, CScheduler::Function f, std::chrono::milliseconds delta)
Definition: scheduler.cpp:106
#define WAIT_LOCK(cs, name)
Definition: sync.h:231
#define LOCK(cs)
Definition: sync.h:226
#define REVERSE_LOCK(g)
Definition: sync.h:221
void SetSyscallSandboxPolicy(SyscallSandboxPolicy syscall_policy)
Force the current thread (and threads created from the current thread) into a restricted-service oper...
assert(!tx.IsCoinBase())