Tor 0.4.9.0-alpha-dev
pubsub_build.c
Go to the documentation of this file.
1/* Copyright (c) 2001, Matej Pfajfar.
2 * Copyright (c) 2001-2004, Roger Dingledine.
3 * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
4 * Copyright (c) 2007-2021, The Tor Project, Inc. */
5/* See LICENSE for licensing information */
6
7/**
8 * @file pubsub_build.c
9 * @brief Construct a dispatch_t in safer, more OO way.
10 **/
11
12#define PUBSUB_PRIVATE
13
23
25#include "lib/log/util_bug.h"
26#include "lib/malloc/malloc.h"
27
28 #include <string.h>
29
30/** Construct and return a new empty pubsub_items_t. */
31static pubsub_items_t *
33{
34 pubsub_items_t *cfg = tor_malloc_zero(sizeof(*cfg));
35 cfg->items = smartlist_new();
36 cfg->type_items = smartlist_new();
37 return cfg;
38}
39
40/** Release all storage held in a pubsub_items_t. */
41void
43{
44 if (! cfg)
45 return;
46 SMARTLIST_FOREACH(cfg->items, pubsub_cfg_t *, item, tor_free(item));
47 SMARTLIST_FOREACH(cfg->type_items,
48 pubsub_type_cfg_t *, item, tor_free(item));
49 smartlist_free(cfg->items);
50 smartlist_free(cfg->type_items);
51 tor_free(cfg);
52}
53
54/** Construct and return a new pubsub_builder_t. */
57{
58 dispatch_naming_init();
59
60 pubsub_builder_t *pb = tor_malloc_zero(sizeof(*pb));
61 pb->cfg = dcfg_new();
62 pb->items = pubsub_items_new();
63 return pb;
64}
65
66/**
67 * Release all storage held by a pubsub_builder_t.
68 *
69 * You'll (mostly) only want to call this function on an error case: if you're
70 * constructing a dispatch_t instead, you should call
71 * pubsub_builder_finalize() to consume the pubsub_builder_t.
72 */
73void
75{
76 if (pb == NULL)
77 return;
78 pubsub_items_free(pb->items);
79 dcfg_free(pb->cfg);
80 tor_free(pb);
81}
82
83/**
84 * Create and return a pubsub_connector_t for the subsystem with ID
85 * <b>subsys</b> to use in adding publications, subscriptions, and types to
86 * <b>builder</b>.
87 **/
90 subsys_id_t subsys)
91{
92 tor_assert(builder);
93 ++builder->n_connectors;
94
95 pubsub_connector_t *con = tor_malloc_zero(sizeof(*con));
96
97 con->builder = builder;
98 con->subsys_id = subsys;
99
100 return con;
101}
102
103/**
104 * Release all storage held by a pubsub_connector_t.
105 **/
106void
108{
109 if (!con)
110 return;
111
112 if (con->builder) {
113 --con->builder->n_connectors;
114 tor_assert(con->builder->n_connectors >= 0);
115 }
116 tor_free(con);
117}
118
119/**
120 * Use <b>con</b> to add a request for being able to publish messages of type
121 * <b>msg</b> with auxiliary data of <b>type</b> on <b>channel</b>.
122 **/
123int
125 pub_binding_t *out,
126 channel_id_t channel,
127 message_id_t msg,
128 msg_type_id_t type,
129 unsigned flags,
130 const char *file,
131 unsigned line)
132{
133 pubsub_cfg_t *cfg = tor_malloc_zero(sizeof(*cfg));
134
135 memset(out, 0, sizeof(*out));
136 cfg->is_publish = true;
137
138 out->msg_template.sender = cfg->subsys = con->subsys_id;
139 out->msg_template.channel = cfg->channel = channel;
140 out->msg_template.msg = cfg->msg = msg;
141 out->msg_template.type = cfg->type = type;
142
143 cfg->flags = flags;
144 cfg->added_by_file = file;
145 cfg->added_by_line = line;
146
147 /* We're grabbing a pointer to the pub_binding_t so we can tell it about
148 * the dispatcher later on.
149 */
150 cfg->pub_binding = out;
151
152 smartlist_add(con->builder->items->items, cfg);
153
154 if (dcfg_msg_set_type(con->builder->cfg, msg, type) < 0)
155 goto err;
156 if (dcfg_msg_set_chan(con->builder->cfg, msg, channel) < 0)
157 goto err;
158
159 return 0;
160 err:
161 ++con->builder->n_errors;
162 return -1;
163}
164
165/**
166 * Use <b>con</b> to add a request for being able to publish messages of type
167 * <b>msg</b> with auxiliary data of <b>type</b> on <b>channel</b>,
168 * passing them to the callback in <b>recv_fn</b>.
169 **/
170int
172 recv_fn_t recv_fn,
173 channel_id_t channel,
174 message_id_t msg,
175 msg_type_id_t type,
176 unsigned flags,
177 const char *file,
178 unsigned line)
179{
180 pubsub_cfg_t *cfg = tor_malloc_zero(sizeof(*cfg));
181
182 cfg->is_publish = false;
183 cfg->subsys = con->subsys_id;
184 cfg->channel = channel;
185 cfg->msg = msg;
186 cfg->type = type;
187 cfg->flags = flags;
188 cfg->added_by_file = file;
189 cfg->added_by_line = line;
190
191 cfg->recv_fn = recv_fn;
192
193 smartlist_add(con->builder->items->items, cfg);
194
195 if (dcfg_msg_set_type(con->builder->cfg, msg, type) < 0)
196 goto err;
197 if (dcfg_msg_set_chan(con->builder->cfg, msg, channel) < 0)
198 goto err;
199 if (! (flags & DISP_FLAG_STUB)) {
200 if (dcfg_add_recv(con->builder->cfg, msg, cfg->subsys, recv_fn) < 0)
201 goto err;
202 }
203
204 return 0;
205 err:
206 ++con->builder->n_errors;
207 return -1;
208}
209
210/**
211 * Use <b>con</b> to define the functions to use for manipulating the type
212 * <b>type</b>. Any function pointers left as NULL will be implemented as
213 * no-ops.
214 **/
215int
217 msg_type_id_t type,
219 const char *file,
220 unsigned line)
221{
222 pubsub_type_cfg_t *cfg = tor_malloc_zero(sizeof(*cfg));
223 cfg->type = type;
224 memcpy(&cfg->fns, fns, sizeof(*fns));
225 cfg->subsys = con->subsys_id;
226 cfg->added_by_file = file;
227 cfg->added_by_line = line;
228
229 smartlist_add(con->builder->items->type_items, cfg);
230
231 if (dcfg_type_set_fns(con->builder->cfg, type, fns) < 0)
232 goto err;
233
234 return 0;
235 err:
236 ++con->builder->n_errors;
237 return -1;
238}
239
240/**
241 * Initialize the dispatch_ptr field in every relevant publish binding
242 * for <b>d</b>.
243 */
244static void
246 dispatch_t *d)
247{
248 SMARTLIST_FOREACH_BEGIN(items->items, pubsub_cfg_t *, cfg) {
249 if (cfg->pub_binding) {
250 // XXXX we could skip this for STUB publishers, and for any publishers
251 // XXXX where all subscribers are STUB.
252 cfg->pub_binding->dispatch_ptr = d;
253 }
254 } SMARTLIST_FOREACH_END(cfg);
255}
256
257/**
258 * Remove the dispatch_ptr fields for all the relevant publish bindings
259 * in <b>items</b>. The prevents subsequent dispatch_pub_() calls from
260 * sending messages to a dispatcher that has been freed.
261 **/
262void
264{
265 SMARTLIST_FOREACH_BEGIN(items->items, pubsub_cfg_t *, cfg) {
266 if (cfg->pub_binding) {
267 cfg->pub_binding->dispatch_ptr = NULL;
268 }
269 } SMARTLIST_FOREACH_END(cfg);
270}
271
272/**
273 * Create a new dispatcher as configured in a pubsub_builder_t.
274 *
275 * Consumes and frees its input.
276 **/
279 pubsub_items_t **items_out)
280{
281 dispatch_t *dispatcher = NULL;
282 tor_assert_nonfatal(builder->n_connectors == 0);
283
284 if (pubsub_builder_check(builder) < 0)
285 goto err;
286
287 if (builder->n_errors) {
288 log_warn(LD_GENERAL, "At least one error occurred previously when "
289 "configuring the dispatcher.");
290 goto err;
291 }
292
293 dispatcher = dispatch_new(builder->cfg);
294
295 if (!dispatcher)
296 goto err;
297
298 pubsub_items_install_bindings(builder->items, dispatcher);
299 if (items_out) {
300 *items_out = builder->items;
301 builder->items = NULL; /* Prevent free */
302 }
303
304 err:
305 pubsub_builder_free(builder);
306 return dispatcher;
307}
Low-level APIs for message-passing system.
struct dispatch_t dispatch_t
Definition: dispatch.h:53
int dcfg_msg_set_chan(dispatch_cfg_t *cfg, message_id_t msg, channel_id_t chan)
Definition: dispatch_cfg.c:63
int dcfg_type_set_fns(dispatch_cfg_t *cfg, msg_type_id_t type, const dispatch_typefns_t *fns)
Definition: dispatch_cfg.c:81
int dcfg_msg_set_type(dispatch_cfg_t *cfg, message_id_t msg, msg_type_id_t type)
Definition: dispatch_cfg.c:45
dispatch_cfg_t * dcfg_new(void)
Definition: dispatch_cfg.c:30
int dcfg_add_recv(dispatch_cfg_t *cfg, message_id_t msg, subsys_id_t sys, recv_fn_t fn)
Definition: dispatch_cfg.c:101
Header for distpach_cfg.c.
#define dcfg_free(cfg)
Definition: dispatch_cfg.h:40
Header for dispatch_naming.c.
dispatch_t * dispatch_new(const dispatch_cfg_t *cfg)
Definition: dispatch_new.c:113
#define LD_GENERAL
Definition: log.h:62
Headers for util_malloc.c.
#define tor_free(p)
Definition: malloc.h:56
Types used for messages in the dispatcher code.
uint16_t subsys_id_t
Definition: msgtypes.h:22
uint16_t msg_type_id_t
Definition: msgtypes.h:29
void(* recv_fn_t)(const msg_t *m)
Definition: msgtypes.h:66
Declaration of pub_binding_t.
pubsub_connector_t * pubsub_connector_for_subsystem(pubsub_builder_t *builder, subsys_id_t subsys)
Definition: pubsub_build.c:89
pubsub_builder_t * pubsub_builder_new(void)
Definition: pubsub_build.c:56
int pubsub_add_pub_(pubsub_connector_t *con, pub_binding_t *out, channel_id_t channel, message_id_t msg, msg_type_id_t type, unsigned flags, const char *file, unsigned line)
Definition: pubsub_build.c:124
dispatch_t * pubsub_builder_finalize(pubsub_builder_t *builder, pubsub_items_t **items_out)
Definition: pubsub_build.c:278
void pubsub_connector_free_(pubsub_connector_t *con)
Definition: pubsub_build.c:107
void pubsub_items_free_(pubsub_items_t *cfg)
Definition: pubsub_build.c:42
static pubsub_items_t * pubsub_items_new(void)
Definition: pubsub_build.c:32
void pubsub_builder_free_(pubsub_builder_t *pb)
Definition: pubsub_build.c:74
int pubsub_connector_register_type_(pubsub_connector_t *con, msg_type_id_t type, dispatch_typefns_t *fns, const char *file, unsigned line)
Definition: pubsub_build.c:216
void pubsub_items_clear_bindings(pubsub_items_t *items)
Definition: pubsub_build.c:263
static void pubsub_items_install_bindings(pubsub_items_t *items, dispatch_t *d)
Definition: pubsub_build.c:245
int pubsub_add_sub_(pubsub_connector_t *con, recv_fn_t recv_fn, channel_id_t channel, message_id_t msg, msg_type_id_t type, unsigned flags, const char *file, unsigned line)
Definition: pubsub_build.c:171
Header used for constructing the OO publish-subscribe facility.
#define pubsub_builder_free(db)
Definition: pubsub_build.h:50
int pubsub_builder_check(pubsub_builder_t *)
Definition: pubsub_check.c:399
struct pubsub_items_t pubsub_items_t
Definition: pubsub_build.h:35
struct pubsub_builder_t pubsub_builder_t
Definition: pubsub_build.h:28
#define pubsub_items_free(cfg)
Definition: pubsub_build.h:93
private structures used for configuring dispatchers and messages.
Header for functions that add relationships to a pubsub builder.
struct pubsub_connector_t pubsub_connector_t
Flags that can be set on publish/subscribe messages.
#define DISP_FLAG_STUB
Definition: pubsub_flags.h:30
Header for smartlist.c.
smartlist_t * smartlist_new(void)
void smartlist_add(smartlist_t *sl, void *element)
#define SMARTLIST_FOREACH_BEGIN(sl, type, var)
#define SMARTLIST_FOREACH(sl, type, var, cmd)
msg_type_id_t type
Definition: msgtypes.h:57
Macros to manage assertions, fatal and non-fatal.
#define tor_assert(expr)
Definition: util_bug.h:103