Tor 0.4.9.0-alpha-dev
dispatch_core.c
Go to the documentation of this file.
1/* Copyright (c) 2001, Matej Pfajfar.
2 * Copyright (c) 2001-2004, Roger Dingledine.
3 * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
4 * Copyright (c) 2007-2021, The Tor Project, Inc. */
5/* See LICENSE for licensing information */
6
7/**
8 * \file dispatch_core.c
9 * \brief Core module for sending and receiving messages.
10 */
11
12#define DISPATCH_PRIVATE
13#include "orconfig.h"
14
18
19#include "lib/malloc/malloc.h"
20#include "lib/log/util_bug.h"
21
22#include <string.h>
23
24/**
25 * Use <b>d</b> to drop all storage held for <b>msg</b>.
26 *
27 * (We need the dispatcher so we know how to free the auxiliary data.)
28 **/
29void
31{
32 if (!msg)
33 return;
34
35 d->typefns[msg->type].free_fn(msg->aux_data__);
36 tor_free(msg);
37}
38
39/**
40 * Format the auxiliary data held by msg.
41 **/
42char *
44{
45 if (!msg)
46 return NULL;
47
48 return d->typefns[msg->type].fmt_fn(msg->aux_data__);
49}
50
51/**
52 * Release all storage held by <b>d</b>.
53 **/
54void
56{
57 if (d == NULL)
58 return;
59
60 size_t n_queues = d->n_queues;
61 for (size_t i = 0; i < n_queues; ++i) {
62 msg_t *m, *mtmp;
63 TOR_SIMPLEQ_FOREACH_SAFE(m, &d->queues[i].queue, next, mtmp) {
64 dispatch_free_msg(d, m);
65 }
66 }
67
68 size_t n_msgs = d->n_msgs;
69
70 for (size_t i = 0; i < n_msgs; ++i) {
71 tor_free(d->table[i]);
72 }
73 tor_free(d->table);
74 tor_free(d->typefns);
75 tor_free(d->queues);
76
77 // This is the only time we will treat d->cfg as non-const.
78 //dispatch_cfg_free_((dispatch_items_t *) d->cfg);
79
80 tor_free(d);
81}
82
83/**
84 * Tell the dispatcher to call <b>fn</b> with <b>userdata</b> whenever
85 * <b>chan</b> becomes nonempty. Return 0 on success, -1 on error.
86 **/
87int
88dispatch_set_alert_fn(dispatch_t *d, channel_id_t chan,
89 dispatch_alertfn_t fn, void *userdata)
90{
91 if (BUG(chan >= d->n_queues))
92 return -1;
93
94 dqueue_t *q = &d->queues[chan];
95 q->alert_fn = fn;
96 q->alert_fn_arg = userdata;
97 return 0;
98}
99
100/**
101 * Send a message on the appropriate channel notifying that channel if
102 * necessary.
103 *
104 * This function takes ownership of the auxiliary data; it can't be static or
105 * stack-allocated, and the caller is not allowed to use it afterwards.
106 *
107 * This function does not check the various vields of the message object for
108 * consistency.
109 **/
110int
112 subsys_id_t sender,
113 channel_id_t channel,
114 message_id_t msg,
115 msg_type_id_t type,
116 msg_aux_data_t auxdata)
117{
118 if (!d->table[msg]) {
119 /* Fast path: nobody wants this data. */
120
121 d->typefns[type].free_fn(auxdata);
122 return 0;
123 }
124
125 msg_t *m = tor_malloc(sizeof(msg_t));
126
127 m->sender = sender;
128 m->channel = channel;
129 m->msg = msg;
130 m->type = type;
131 memcpy(&m->aux_data__, &auxdata, sizeof(msg_aux_data_t));
132
133 return dispatch_send_msg(d, m);
134}
135
136int
137dispatch_send_msg(dispatch_t *d, msg_t *m)
138{
139 if (BUG(!d))
140 goto err;
141 if (BUG(!m))
142 goto err;
143 if (BUG(m->channel >= d->n_queues))
144 goto err;
145 if (BUG(m->msg >= d->n_msgs))
146 goto err;
147
148 dtbl_entry_t *ent = d->table[m->msg];
149 if (ent) {
150 if (BUG(m->type != ent->type))
151 goto err;
152 if (BUG(m->channel != ent->channel))
153 goto err;
154 }
155
156 return dispatch_send_msg_unchecked(d, m);
157 err:
158 /* Probably it isn't safe to free m, since type could be wrong. */
159 return -1;
160}
161
162/**
163 * Send a message on the appropriate queue, notifying that queue if necessary.
164 *
165 * This function takes ownership of the message object and its auxiliary data;
166 * it can't be static or stack-allocated, and the caller isn't allowed to use
167 * it afterwards.
168 *
169 * This function does not check the various fields of the message object for
170 * consistency, and can crash if they are out of range. Only functions that
171 * have already constructed the message in a safe way, or checked it for
172 * correctness themselves, should call this function.
173 **/
174int
176{
177 /* Find the right queue. */
178 dqueue_t *q = &d->queues[m->channel];
179 bool was_empty = TOR_SIMPLEQ_EMPTY(&q->queue);
180
181 /* Append the message. */
182 TOR_SIMPLEQ_INSERT_TAIL(&q->queue, m, next);
183
184 if (debug_logging_enabled()) {
185 char *arg = dispatch_fmt_msg_data(d, m);
186 log_debug(LD_MESG,
187 "Queued: %s (%s) from %s, on %s.",
188 get_message_id_name(m->msg),
189 arg,
190 get_subsys_id_name(m->sender),
191 get_channel_id_name(m->channel));
192 tor_free(arg);
193 }
194
195 /* If we just made the queue nonempty for the first time, call the alert
196 * function. */
197 if (was_empty) {
198 q->alert_fn(d, m->channel, q->alert_fn_arg);
199 }
200
201 return 0;
202}
203
204/**
205 * Run all of the callbacks on <b>d</b> associated with <b>m</b>.
206 **/
207static void
209{
210 tor_assert(m->msg <= d->n_msgs);
211 dtbl_entry_t *ent = d->table[m->msg];
212 int n_fns = ent->n_fns;
213
214 if (debug_logging_enabled()) {
215 char *arg = dispatch_fmt_msg_data(d, m);
216 log_debug(LD_MESG,
217 "Delivering: %s (%s) from %s, on %s:",
218 get_message_id_name(m->msg),
219 arg,
220 get_subsys_id_name(m->sender),
221 get_channel_id_name(m->channel));
222 tor_free(arg);
223 }
224
225 int i;
226 for (i=0; i < n_fns; ++i) {
227 if (ent->rcv[i].enabled) {
228 log_debug(LD_MESG, " Delivering to %s.",
229 get_subsys_id_name(ent->rcv[i].sys));
230 ent->rcv[i].fn(m);
231 }
232 }
233}
234
235/**
236 * Run up to <b>max_msgs</b> callbacks for messages on the channel <b>ch</b>
237 * on the given dispatcher. Return 0 on success or recoverable failure,
238 * -1 on unrecoverable error.
239 **/
240int
241dispatch_flush(dispatch_t *d, channel_id_t ch, int max_msgs)
242{
243 if (BUG(ch >= d->n_queues))
244 return 0;
245
246 int n_flushed = 0;
247 dqueue_t *q = &d->queues[ch];
248
249 while (n_flushed < max_msgs) {
250 msg_t *m = TOR_SIMPLEQ_FIRST(&q->queue);
251 if (!m)
252 break;
253 TOR_SIMPLEQ_REMOVE_HEAD(&q->queue, next);
255 dispatch_free_msg(d, m);
256 ++n_flushed;
257 }
258
259 return 0;
260}
Low-level APIs for message-passing system.
void(* dispatch_alertfn_t)(struct dispatch_t *, channel_id_t, void *)
Definition: dispatch.h:98
struct dispatch_t dispatch_t
Definition: dispatch.h:53
int dispatch_send(dispatch_t *d, subsys_id_t sender, channel_id_t channel, message_id_t msg, msg_type_id_t type, msg_aux_data_t auxdata)
void dispatch_free_msg_(const dispatch_t *d, msg_t *msg)
Definition: dispatch_core.c:30
int dispatch_flush(dispatch_t *d, channel_id_t ch, int max_msgs)
int dispatch_set_alert_fn(dispatch_t *d, channel_id_t chan, dispatch_alertfn_t fn, void *userdata)
Definition: dispatch_core.c:88
char * dispatch_fmt_msg_data(const dispatch_t *d, const msg_t *msg)
Definition: dispatch_core.c:43
static void dispatcher_run_msg_cbs(const dispatch_t *d, msg_t *m)
int dispatch_send_msg_unchecked(dispatch_t *d, msg_t *m)
void dispatch_free_(dispatch_t *d)
Definition: dispatch_core.c:55
Header for dispatch_naming.c.
const char * get_channel_id_name(channel_id_t)
private structures used for the dispatcher module
static bool debug_logging_enabled(void)
Definition: log.h:217
#define LD_MESG
Definition: log.h:121
Headers for util_malloc.c.
#define tor_free(p)
Definition: malloc.h:56
uint16_t subsys_id_t
Definition: msgtypes.h:22
uint16_t msg_type_id_t
Definition: msgtypes.h:29
Definition: msgtypes.h:50
msg_type_id_t type
Definition: msgtypes.h:57
msg_aux_data_t aux_data__
Definition: msgtypes.h:60
Macros to manage assertions, fatal and non-fatal.
#define tor_assert(expr)
Definition: util_bug.h:103