Bitcoin Core 22.99.0
P2P Digital Currency
zmqnotificationinterface.cpp
Go to the documentation of this file.
1// Copyright (c) 2015-2020 The Bitcoin Core developers
2// Distributed under the MIT software license, see the accompanying
3// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4
7#include <zmq/zmqutil.h>
8
9#include <zmq.h>
10
11#include <validation.h>
12#include <util/system.h>
13
15{
16}
17
19{
20 Shutdown();
21}
22
23std::list<const CZMQAbstractNotifier*> CZMQNotificationInterface::GetActiveNotifiers() const
24{
25 std::list<const CZMQAbstractNotifier*> result;
26 for (const auto& n : notifiers) {
27 result.push_back(n.get());
28 }
29 return result;
30}
31
33{
34 std::map<std::string, CZMQNotifierFactory> factories;
35 factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
36 factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
37 factories["pubrawblock"] = CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>;
38 factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
39 factories["pubsequence"] = CZMQAbstractNotifier::Create<CZMQPublishSequenceNotifier>;
40
41 std::list<std::unique_ptr<CZMQAbstractNotifier>> notifiers;
42 for (const auto& entry : factories)
43 {
44 std::string arg("-zmq" + entry.first);
45 const auto& factory = entry.second;
46 for (const std::string& address : gArgs.GetArgs(arg)) {
47 std::unique_ptr<CZMQAbstractNotifier> notifier = factory();
48 notifier->SetType(entry.first);
49 notifier->SetAddress(address);
50 notifier->SetOutboundMessageHighWaterMark(static_cast<int>(gArgs.GetIntArg(arg + "hwm", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM)));
51 notifiers.push_back(std::move(notifier));
52 }
53 }
54
55 if (!notifiers.empty())
56 {
57 std::unique_ptr<CZMQNotificationInterface> notificationInterface(new CZMQNotificationInterface());
58 notificationInterface->notifiers = std::move(notifiers);
59
60 if (notificationInterface->Initialize()) {
61 return notificationInterface.release();
62 }
63 }
64
65 return nullptr;
66}
67
68// Called at startup to conditionally set up ZMQ socket(s)
70{
71 int major = 0, minor = 0, patch = 0;
72 zmq_version(&major, &minor, &patch);
73 LogPrint(BCLog::ZMQ, "zmq: version %d.%d.%d\n", major, minor, patch);
74
75 LogPrint(BCLog::ZMQ, "zmq: Initialize notification interface\n");
77
78 pcontext = zmq_ctx_new();
79
80 if (!pcontext)
81 {
82 zmqError("Unable to initialize context");
83 return false;
84 }
85
86 for (auto& notifier : notifiers) {
87 if (notifier->Initialize(pcontext)) {
88 LogPrint(BCLog::ZMQ, "zmq: Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
89 } else {
90 LogPrint(BCLog::ZMQ, "zmq: Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
91 return false;
92 }
93 }
94
95 return true;
96}
97
98// Called during shutdown sequence
100{
101 LogPrint(BCLog::ZMQ, "zmq: Shutdown notification interface\n");
102 if (pcontext)
103 {
104 for (auto& notifier : notifiers) {
105 LogPrint(BCLog::ZMQ, "zmq: Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
106 notifier->Shutdown();
107 }
108 zmq_ctx_term(pcontext);
109
110 pcontext = nullptr;
111 }
112}
113
114namespace {
115
116template <typename Function>
117void TryForEachAndRemoveFailed(std::list<std::unique_ptr<CZMQAbstractNotifier>>& notifiers, const Function& func)
118{
119 for (auto i = notifiers.begin(); i != notifiers.end(); ) {
120 CZMQAbstractNotifier* notifier = i->get();
121 if (func(notifier)) {
122 ++i;
123 } else {
124 notifier->Shutdown();
125 i = notifiers.erase(i);
126 }
127 }
128}
129
130} // anonymous namespace
131
132void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload)
133{
134 if (fInitialDownload || pindexNew == pindexFork) // In IBD or blocks were disconnected without any new ones
135 return;
136
137 TryForEachAndRemoveFailed(notifiers, [pindexNew](CZMQAbstractNotifier* notifier) {
138 return notifier->NotifyBlock(pindexNew);
139 });
140}
141
143{
144 const CTransaction& tx = *ptx;
145
146 TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
147 return notifier->NotifyTransaction(tx) && notifier->NotifyTransactionAcceptance(tx, mempool_sequence);
148 });
149}
150
152{
153 // Called for all non-block inclusion reasons
154 const CTransaction& tx = *ptx;
155
156 TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
157 return notifier->NotifyTransactionRemoval(tx, mempool_sequence);
158 });
159}
160
161void CZMQNotificationInterface::BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected)
162{
163 for (const CTransactionRef& ptx : pblock->vtx) {
164 const CTransaction& tx = *ptx;
165 TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
166 return notifier->NotifyTransaction(tx);
167 });
168 }
169
170 // Next we notify BlockConnect listeners for *all* blocks
171 TryForEachAndRemoveFailed(notifiers, [pindexConnected](CZMQAbstractNotifier* notifier) {
172 return notifier->NotifyBlockConnect(pindexConnected);
173 });
174}
175
176void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected)
177{
178 for (const CTransactionRef& ptx : pblock->vtx) {
179 const CTransaction& tx = *ptx;
180 TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
181 return notifier->NotifyTransaction(tx);
182 });
183 }
184
185 // Next we notify BlockDisconnect listeners for *all* blocks
186 TryForEachAndRemoveFailed(notifiers, [pindexDisconnected](CZMQAbstractNotifier* notifier) {
187 return notifier->NotifyBlockDisconnect(pindexDisconnected);
188 });
189}
190
std::vector< std::string > GetArgs(const std::string &strArg) const
Return a vector of strings of the given argument.
Definition: system.cpp:487
int64_t GetIntArg(const std::string &strArg, int64_t nDefault) const
Return integer argument or default value.
Definition: system.cpp:596
The block chain is a tree shaped structure starting with the genesis block at the root,...
Definition: chain.h:146
The basic transaction that is broadcasted on the network and contained in blocks.
Definition: transaction.h:260
virtual void Shutdown()=0
virtual bool NotifyBlockConnect(const CBlockIndex *pindex)
static const int DEFAULT_ZMQ_SNDHWM
virtual bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence)
virtual bool NotifyBlock(const CBlockIndex *pindex)
virtual bool NotifyTransaction(const CTransaction &transaction)
virtual bool NotifyBlockDisconnect(const CBlockIndex *pindex)
virtual bool NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence)
void BlockConnected(const std::shared_ptr< const CBlock > &pblock, const CBlockIndex *pindexConnected) override
Notifies listeners of a block being connected.
static CZMQNotificationInterface * Create()
void TransactionAddedToMempool(const CTransactionRef &tx, uint64_t mempool_sequence) override
Notifies listeners of a transaction having been added to mempool.
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override
Notifies listeners when the block chain tip advances.
std::list< std::unique_ptr< CZMQAbstractNotifier > > notifiers
void BlockDisconnected(const std::shared_ptr< const CBlock > &pblock, const CBlockIndex *pindexDisconnected) override
Notifies listeners of a block being disconnected.
void TransactionRemovedFromMempool(const CTransactionRef &tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override
Notifies listeners of a transaction leaving mempool.
std::list< const CZMQAbstractNotifier * > GetActiveNotifiers() const
#define LogPrint(category,...)
Definition: logging.h:191
@ ZMQ
Definition: logging.h:43
std::shared_ptr< const CTransaction > CTransactionRef
Definition: transaction.h:386
MemPoolRemovalReason
Reason why a transaction was removed from the mempool, this is passed to the notification signal.
Definition: txmempool.h:341
ArgsManager gArgs
Definition: system.cpp:85
assert(!tx.IsCoinBase())
CZMQNotificationInterface * g_zmq_notification_interface
void zmqError(const std::string &str)
Definition: zmqutil.cpp:13