Bitcoin Core 22.99.0
P2P Digital Currency
zmqpublishnotifier.cpp
Go to the documentation of this file.
1// Copyright (c) 2015-2020 The Bitcoin Core developers
2// Distributed under the MIT software license, see the accompanying
3// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4
6
7#include <chain.h>
8#include <chainparams.h>
9#include <netbase.h>
10#include <node/blockstorage.h>
11#include <rpc/server.h>
12#include <streams.h>
13#include <util/system.h>
14#include <validation.h> // For cs_main
15#include <zmq/zmqutil.h>
16
17#include <zmq.h>
18
19#include <cstdarg>
20#include <cstddef>
21#include <map>
22#include <optional>
23#include <string>
24#include <utility>
25
26static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers;
27
28static const char *MSG_HASHBLOCK = "hashblock";
29static const char *MSG_HASHTX = "hashtx";
30static const char *MSG_RAWBLOCK = "rawblock";
31static const char *MSG_RAWTX = "rawtx";
32static const char *MSG_SEQUENCE = "sequence";
33
34// Internal function to send multipart message
35static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
36{
37 va_list args;
38 va_start(args, size);
39
40 while (1)
41 {
42 zmq_msg_t msg;
43
44 int rc = zmq_msg_init_size(&msg, size);
45 if (rc != 0)
46 {
47 zmqError("Unable to initialize ZMQ msg");
48 va_end(args);
49 return -1;
50 }
51
52 void *buf = zmq_msg_data(&msg);
53 memcpy(buf, data, size);
54
55 data = va_arg(args, const void*);
56
57 rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0);
58 if (rc == -1)
59 {
60 zmqError("Unable to send ZMQ msg");
61 zmq_msg_close(&msg);
62 va_end(args);
63 return -1;
64 }
65
66 zmq_msg_close(&msg);
67
68 if (!data)
69 break;
70
71 size = va_arg(args, size_t);
72 }
73 va_end(args);
74 return 0;
75}
76
77static bool IsZMQAddressIPV6(const std::string &zmq_address)
78{
79 const std::string tcp_prefix = "tcp://";
80 const size_t tcp_index = zmq_address.rfind(tcp_prefix);
81 const size_t colon_index = zmq_address.rfind(":");
82 if (tcp_index == 0 && colon_index != std::string::npos) {
83 const std::string ip = zmq_address.substr(tcp_prefix.length(), colon_index - tcp_prefix.length());
84 CNetAddr addr;
85 LookupHost(ip, addr, false);
86 if (addr.IsIPv6()) return true;
87 }
88 return false;
89}
90
92{
94
95 // check if address is being used by other publish notifier
96 std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator i = mapPublishNotifiers.find(address);
97
98 if (i==mapPublishNotifiers.end())
99 {
100 psocket = zmq_socket(pcontext, ZMQ_PUB);
101 if (!psocket)
102 {
103 zmqError("Failed to create socket");
104 return false;
105 }
106
107 LogPrint(BCLog::ZMQ, "zmq: Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark);
108
109 int rc = zmq_setsockopt(psocket, ZMQ_SNDHWM, &outbound_message_high_water_mark, sizeof(outbound_message_high_water_mark));
110 if (rc != 0)
111 {
112 zmqError("Failed to set outbound message high water mark");
113 zmq_close(psocket);
114 return false;
115 }
116
117 const int so_keepalive_option {1};
118 rc = zmq_setsockopt(psocket, ZMQ_TCP_KEEPALIVE, &so_keepalive_option, sizeof(so_keepalive_option));
119 if (rc != 0) {
120 zmqError("Failed to set SO_KEEPALIVE");
121 zmq_close(psocket);
122 return false;
123 }
124
125 // On some systems (e.g. OpenBSD) the ZMQ_IPV6 must not be enabled, if the address to bind isn't IPv6
126 const int enable_ipv6 { IsZMQAddressIPV6(address) ? 1 : 0};
127 rc = zmq_setsockopt(psocket, ZMQ_IPV6, &enable_ipv6, sizeof(enable_ipv6));
128 if (rc != 0) {
129 zmqError("Failed to set ZMQ_IPV6");
130 zmq_close(psocket);
131 return false;
132 }
133
134 rc = zmq_bind(psocket, address.c_str());
135 if (rc != 0)
136 {
137 zmqError("Failed to bind address");
138 zmq_close(psocket);
139 return false;
140 }
141
142 // register this notifier for the address, so it can be reused for other publish notifier
143 mapPublishNotifiers.insert(std::make_pair(address, this));
144 return true;
145 }
146 else
147 {
148 LogPrint(BCLog::ZMQ, "zmq: Reusing socket for address %s\n", address);
149 LogPrint(BCLog::ZMQ, "zmq: Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark);
150
151 psocket = i->second->psocket;
152 mapPublishNotifiers.insert(std::make_pair(address, this));
153
154 return true;
155 }
156}
157
159{
160 // Early return if Initialize was not called
161 if (!psocket) return;
162
163 int count = mapPublishNotifiers.count(address);
164
165 // remove this notifier from the list of publishers using this address
166 typedef std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator iterator;
167 std::pair<iterator, iterator> iterpair = mapPublishNotifiers.equal_range(address);
168
169 for (iterator it = iterpair.first; it != iterpair.second; ++it)
170 {
171 if (it->second==this)
172 {
173 mapPublishNotifiers.erase(it);
174 break;
175 }
176 }
177
178 if (count == 1)
179 {
180 LogPrint(BCLog::ZMQ, "zmq: Close socket at address %s\n", address);
181 int linger = 0;
182 zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
183 zmq_close(psocket);
184 }
185
186 psocket = nullptr;
187}
188
189bool CZMQAbstractPublishNotifier::SendZmqMessage(const char *command, const void* data, size_t size)
190{
192
193 /* send three parts, command & data & a LE 4byte sequence number */
194 unsigned char msgseq[sizeof(uint32_t)];
195 WriteLE32(msgseq, nSequence);
196 int rc = zmq_send_multipart(psocket, command, strlen(command), data, size, msgseq, (size_t)sizeof(uint32_t), nullptr);
197 if (rc == -1)
198 return false;
199
200 /* increment memory only sequence number after sending */
201 nSequence++;
202
203 return true;
204}
205
207{
208 uint256 hash = pindex->GetBlockHash();
209 LogPrint(BCLog::ZMQ, "zmq: Publish hashblock %s to %s\n", hash.GetHex(), this->address);
210 char data[32];
211 for (unsigned int i = 0; i < 32; i++)
212 data[31 - i] = hash.begin()[i];
213 return SendZmqMessage(MSG_HASHBLOCK, data, 32);
214}
215
217{
218 uint256 hash = transaction.GetHash();
219 LogPrint(BCLog::ZMQ, "zmq: Publish hashtx %s to %s\n", hash.GetHex(), this->address);
220 char data[32];
221 for (unsigned int i = 0; i < 32; i++)
222 data[31 - i] = hash.begin()[i];
223 return SendZmqMessage(MSG_HASHTX, data, 32);
224}
225
227{
228 LogPrint(BCLog::ZMQ, "zmq: Publish rawblock %s to %s\n", pindex->GetBlockHash().GetHex(), this->address);
229
230 const Consensus::Params& consensusParams = Params().GetConsensus();
232 {
233 LOCK(cs_main);
234 CBlock block;
235 if(!ReadBlockFromDisk(block, pindex, consensusParams))
236 {
237 zmqError("Can't read block from disk");
238 return false;
239 }
240
241 ss << block;
242 }
243
244 return SendZmqMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size());
245}
246
248{
249 uint256 hash = transaction.GetHash();
250 LogPrint(BCLog::ZMQ, "zmq: Publish rawtx %s to %s\n", hash.GetHex(), this->address);
252 ss << transaction;
253 return SendZmqMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
254}
255
256// Helper function to send a 'sequence' topic message with the following structure:
257// <32-byte hash> | <1-byte label> | <8-byte LE sequence> (optional)
258static bool SendSequenceMsg(CZMQAbstractPublishNotifier& notifier, uint256 hash, char label, std::optional<uint64_t> sequence = {})
259{
260 unsigned char data[sizeof(hash) + sizeof(label) + sizeof(uint64_t)];
261 for (unsigned int i = 0; i < sizeof(hash); ++i) {
262 data[sizeof(hash) - 1 - i] = hash.begin()[i];
263 }
264 data[sizeof(hash)] = label;
265 if (sequence) WriteLE64(data + sizeof(hash) + sizeof(label), *sequence);
266 return notifier.SendZmqMessage(MSG_SEQUENCE, data, sequence ? sizeof(data) : sizeof(hash) + sizeof(label));
267}
268
270{
271 uint256 hash = pindex->GetBlockHash();
272 LogPrint(BCLog::ZMQ, "zmq: Publish sequence block connect %s to %s\n", hash.GetHex(), this->address);
273 return SendSequenceMsg(*this, hash, /* Block (C)onnect */ 'C');
274}
275
277{
278 uint256 hash = pindex->GetBlockHash();
279 LogPrint(BCLog::ZMQ, "zmq: Publish sequence block disconnect %s to %s\n", hash.GetHex(), this->address);
280 return SendSequenceMsg(*this, hash, /* Block (D)isconnect */ 'D');
281}
282
283bool CZMQPublishSequenceNotifier::NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence)
284{
285 uint256 hash = transaction.GetHash();
286 LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool acceptance %s to %s\n", hash.GetHex(), this->address);
287 return SendSequenceMsg(*this, hash, /* Mempool (A)cceptance */ 'A', mempool_sequence);
288}
289
290bool CZMQPublishSequenceNotifier::NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence)
291{
292 uint256 hash = transaction.GetHash();
293 LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool removal %s to %s\n", hash.GetHex(), this->address);
294 return SendSequenceMsg(*this, hash, /* Mempool (R)emoval */ 'R', mempool_sequence);
295}
RecursiveMutex cs_main
Mutex to guard access to validation specific variables, such as reading or changing the chainstate.
Definition: validation.cpp:118
bool ReadBlockFromDisk(CBlock &block, const FlatFilePos &pos, const Consensus::Params &consensusParams)
Functions for disk access for blocks.
const CChainParams & Params()
Return the currently selected parameters.
Definition: block.h:63
The block chain is a tree shaped structure starting with the genesis block at the root,...
Definition: chain.h:146
uint256 GetBlockHash() const
Definition: chain.h:254
const Consensus::Params & GetConsensus() const
Definition: chainparams.h:82
Double ended buffer combining vector and stream-like interfaces.
Definition: streams.h:205
const_iterator begin() const
Definition: streams.h:251
size_type size() const
Definition: streams.h:255
Network address.
Definition: netaddress.h:119
bool IsIPv6() const
Definition: netaddress.cpp:320
The basic transaction that is broadcasted on the network and contained in blocks.
Definition: transaction.h:260
const uint256 & GetHash() const
Definition: transaction.h:302
bool SendZmqMessage(const char *command, const void *data, size_t size)
uint32_t nSequence
upcounting per message sequence number
bool Initialize(void *pcontext) override
bool NotifyBlock(const CBlockIndex *pindex) override
bool NotifyTransaction(const CTransaction &transaction) override
bool NotifyBlock(const CBlockIndex *pindex) override
bool NotifyTransaction(const CTransaction &transaction) override
bool NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence) override
bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence) override
bool NotifyBlockConnect(const CBlockIndex *pindex) override
bool NotifyBlockDisconnect(const CBlockIndex *pindex) override
unsigned char * begin()
Definition: uint256.h:58
std::string GetHex() const
Definition: uint256.cpp:20
256-bit opaque blob.
Definition: uint256.h:124
static void WriteLE32(unsigned char *ptr, uint32_t x)
Definition: common.h:44
static void WriteLE64(unsigned char *ptr, uint64_t x)
Definition: common.h:50
static CService ip(uint32_t i)
#define LogPrint(category,...)
Definition: logging.h:191
@ ZMQ
Definition: logging.h:43
bool LookupHost(const std::string &name, std::vector< CNetAddr > &vIP, unsigned int nMaxSolutions, bool fAllowLookup, DNSLookupFn dns_lookup_function)
Resolve a host string to its corresponding network addresses.
Definition: netbase.cpp:170
@ SER_NETWORK
Definition: serialize.h:138
int RPCSerializationFlags()
Definition: server.cpp:540
Parameters that influence chain consensus.
Definition: params.h:70
#define LOCK(cs)
Definition: sync.h:226
static int count
Definition: tests.c:41
assert(!tx.IsCoinBase())
static const int PROTOCOL_VERSION
network protocol versioning
Definition: version.h:12
static const char * MSG_HASHBLOCK
static const char * MSG_SEQUENCE
static const char * MSG_RAWBLOCK
static bool SendSequenceMsg(CZMQAbstractPublishNotifier &notifier, uint256 hash, char label, std::optional< uint64_t > sequence={})
static bool IsZMQAddressIPV6(const std::string &zmq_address)
static std::multimap< std::string, CZMQAbstractPublishNotifier * > mapPublishNotifiers
static const char * MSG_RAWTX
static int zmq_send_multipart(void *sock, const void *data, size_t size,...)
static const char * MSG_HASHTX
void zmqError(const std::string &str)
Definition: zmqutil.cpp:13