2 * net/tipc/subscr.c: TIPC subscription service
4 * Copyright (c) 2000-2006, Ericsson AB
5 * Copyright (c) 2005, Wind River Systems
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are met:
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in the
15 * documentation and/or other materials provided with the distribution.
16 * 3. Neither the names of the copyright holders nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
20 * Alternatively, this software may be distributed under the terms of the
21 * GNU General Public License ("GPL") version 2 as published by the Free
22 * Software Foundation.
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34 * POSSIBILITY OF SUCH DAMAGE.
40 #include "name_table.h"
44 * struct subscriber - TIPC network topology subscriber
45 * @ref: object reference to subscriber object itself
46 * @lock: pointer to spinlock controlling access to subscriber object
47 * @subscriber_list: adjacent subscribers in top. server's list of subscribers
48 * @subscription_list: list of subscription objects for this subscriber
49 * @port_ref: object reference to port used to communicate with subscriber
55 struct list_head subscriber_list;
56 struct list_head subscription_list;
61 * struct top_srv - TIPC network topology subscription service
62 * @user_ref: TIPC userid of subscription service
63 * @setup_port: reference to TIPC port that handles subscription requests
64 * @subscription_count: number of active subscriptions (not subscribers!)
65 * @subscriber_list: list of ports subscribing to service
66 * @lock: spinlock govering access to subscriber list
72 atomic_t subscription_count;
73 struct list_head subscriber_list;
77 static struct top_srv topsrv = { 0 };
80 * htohl - convert value to endianness used by destination
81 * @in: value to convert
82 * @swap: non-zero if endianness must be reversed
84 * Returns converted value
87 static u32 htohl(u32 in, int swap)
89 char *c = (char *)∈
91 return swap ? ((c[3] << 3) + (c[2] << 2) + (c[1] << 1) + c[0]) : in;
95 * subscr_send_event - send a message containing a tipc_event to the subscriber
98 static void subscr_send_event(struct subscription *sub,
105 struct iovec msg_sect;
107 msg_sect.iov_base = (void *)&sub->evt;
108 msg_sect.iov_len = sizeof(struct tipc_event);
110 sub->evt.event = htohl(event, sub->swap);
111 sub->evt.found_lower = htohl(found_lower, sub->swap);
112 sub->evt.found_upper = htohl(found_upper, sub->swap);
113 sub->evt.port.ref = htohl(port_ref, sub->swap);
114 sub->evt.port.node = htohl(node, sub->swap);
115 tipc_send(sub->owner->port_ref, 1, &msg_sect);
119 * tipc_subscr_overlap - test for subscription overlap with the given values
121 * Returns 1 if there is overlap, otherwise 0.
124 int tipc_subscr_overlap(struct subscription *sub,
129 if (found_lower < sub->seq.lower)
130 found_lower = sub->seq.lower;
131 if (found_upper > sub->seq.upper)
132 found_upper = sub->seq.upper;
133 if (found_lower > found_upper)
139 * tipc_subscr_report_overlap - issue event if there is subscription overlap
141 * Protected by nameseq.lock in name_table.c
144 void tipc_subscr_report_overlap(struct subscription *sub,
152 dbg("Rep overlap %u:%u,%u<->%u,%u\n", sub->seq.type, sub->seq.lower,
153 sub->seq.upper, found_lower, found_upper);
154 if (!tipc_subscr_overlap(sub, found_lower, found_upper))
156 if (!must && !(sub->filter & TIPC_SUB_PORTS))
159 sub->event_cb(sub, found_lower, found_upper, event, port_ref, node);
163 * subscr_timeout - subscription timeout has occurred
166 static void subscr_timeout(struct subscription *sub)
168 struct subscriber *subscriber;
171 /* Validate subscriber reference (in case subscriber is terminating) */
173 subscriber_ref = sub->owner->ref;
174 subscriber = (struct subscriber *)tipc_ref_lock(subscriber_ref);
175 if (subscriber == NULL)
178 /* Validate timeout (in case subscription is being cancelled) */
180 if (sub->timeout == TIPC_WAIT_FOREVER) {
181 tipc_ref_unlock(subscriber_ref);
185 /* Unlink subscription from name table */
187 tipc_nametbl_unsubscribe(sub);
189 /* Notify subscriber of timeout, then unlink subscription */
191 subscr_send_event(sub,
192 sub->evt.s.seq.lower,
193 sub->evt.s.seq.upper,
197 list_del(&sub->subscription_list);
199 /* Now destroy subscription */
201 tipc_ref_unlock(subscriber_ref);
202 k_term_timer(&sub->timer);
204 atomic_dec(&topsrv.subscription_count);
208 * subscr_del - delete a subscription within a subscription list
210 * Called with subscriber locked.
213 static void subscr_del(struct subscription *sub)
215 tipc_nametbl_unsubscribe(sub);
216 list_del(&sub->subscription_list);
218 atomic_dec(&topsrv.subscription_count);
222 * subscr_terminate - terminate communication with a subscriber
224 * Called with subscriber locked. Routine must temporarily release this lock
225 * to enable subscription timeout routine(s) to finish without deadlocking;
226 * the lock is then reclaimed to allow caller to release it upon return.
227 * (This should work even in the unlikely event some other thread creates
228 * a new object reference in the interim that uses this lock; this routine will
229 * simply wait for it to be released, then claim it.)
232 static void subscr_terminate(struct subscriber *subscriber)
234 struct subscription *sub;
235 struct subscription *sub_temp;
237 /* Invalidate subscriber reference */
239 tipc_ref_discard(subscriber->ref);
240 spin_unlock_bh(subscriber->lock);
242 /* Destroy any existing subscriptions for subscriber */
244 list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list,
246 if (sub->timeout != TIPC_WAIT_FOREVER) {
247 k_cancel_timer(&sub->timer);
248 k_term_timer(&sub->timer);
250 dbg("Term: Removing sub %u,%u,%u from subscriber %x list\n",
251 sub->seq.type, sub->seq.lower, sub->seq.upper, subscriber);
255 /* Sever connection to subscriber */
257 tipc_shutdown(subscriber->port_ref);
258 tipc_deleteport(subscriber->port_ref);
260 /* Remove subscriber from topology server's subscriber list */
262 spin_lock_bh(&topsrv.lock);
263 list_del(&subscriber->subscriber_list);
264 spin_unlock_bh(&topsrv.lock);
266 /* Now destroy subscriber */
268 spin_lock_bh(subscriber->lock);
273 * subscr_cancel - handle subscription cancellation request
275 * Called with subscriber locked. Routine must temporarily release this lock
276 * to enable the subscription timeout routine to finish without deadlocking;
277 * the lock is then reclaimed to allow caller to release it upon return.
279 * Note that fields of 's' use subscriber's endianness!
282 static void subscr_cancel(struct tipc_subscr *s,
283 struct subscriber *subscriber)
285 struct subscription *sub;
286 struct subscription *sub_temp;
289 /* Find first matching subscription, exit if not found */
291 list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list,
293 if (!memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr))) {
301 /* Cancel subscription timer (if used), then delete subscription */
303 if (sub->timeout != TIPC_WAIT_FOREVER) {
304 sub->timeout = TIPC_WAIT_FOREVER;
305 spin_unlock_bh(subscriber->lock);
306 k_cancel_timer(&sub->timer);
307 k_term_timer(&sub->timer);
308 spin_lock_bh(subscriber->lock);
310 dbg("Cancel: removing sub %u,%u,%u from subscriber %x list\n",
311 sub->seq.type, sub->seq.lower, sub->seq.upper, subscriber);
316 * subscr_subscribe - create subscription for subscriber
318 * Called with subscriber locked
321 static void subscr_subscribe(struct tipc_subscr *s,
322 struct subscriber *subscriber)
324 struct subscription *sub;
327 /* Determine subscriber's endianness */
329 swap = !(s->filter & (TIPC_SUB_PORTS | TIPC_SUB_SERVICE));
331 /* Detect & process a subscription cancellation request */
333 if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) {
334 s->filter &= ~htohl(TIPC_SUB_CANCEL, swap);
335 subscr_cancel(s, subscriber);
339 /* Refuse subscription if global limit exceeded */
341 if (atomic_read(&topsrv.subscription_count) >= tipc_max_subscriptions) {
342 warn("Subscription rejected, subscription limit reached (%u)\n",
343 tipc_max_subscriptions);
344 subscr_terminate(subscriber);
348 /* Allocate subscription object */
350 sub = kzalloc(sizeof(*sub), GFP_ATOMIC);
352 warn("Subscription rejected, no memory\n");
353 subscr_terminate(subscriber);
357 /* Initialize subscription object */
359 sub->seq.type = htohl(s->seq.type, swap);
360 sub->seq.lower = htohl(s->seq.lower, swap);
361 sub->seq.upper = htohl(s->seq.upper, swap);
362 sub->timeout = htohl(s->timeout, swap);
363 sub->filter = htohl(s->filter, swap);
364 if ((!(sub->filter & TIPC_SUB_PORTS)
365 == !(sub->filter & TIPC_SUB_SERVICE))
366 || (sub->seq.lower > sub->seq.upper)) {
367 warn("Subscription rejected, illegal request\n");
369 subscr_terminate(subscriber);
372 sub->event_cb = subscr_send_event;
373 memcpy(&sub->evt.s, s, sizeof(struct tipc_subscr));
374 INIT_LIST_HEAD(&sub->subscription_list);
375 INIT_LIST_HEAD(&sub->nameseq_list);
376 list_add(&sub->subscription_list, &subscriber->subscription_list);
378 atomic_inc(&topsrv.subscription_count);
379 if (sub->timeout != TIPC_WAIT_FOREVER) {
380 k_init_timer(&sub->timer,
381 (Handler)subscr_timeout, (unsigned long)sub);
382 k_start_timer(&sub->timer, sub->timeout);
384 sub->owner = subscriber;
385 tipc_nametbl_subscribe(sub);
389 * subscr_conn_shutdown_event - handle termination request from subscriber
392 static void subscr_conn_shutdown_event(void *usr_handle,
394 struct sk_buff **buf,
395 unsigned char const *data,
399 struct subscriber *subscriber;
400 spinlock_t *subscriber_lock;
402 subscriber = tipc_ref_lock((u32)(unsigned long)usr_handle);
403 if (subscriber == NULL)
406 subscriber_lock = subscriber->lock;
407 subscr_terminate(subscriber);
408 spin_unlock_bh(subscriber_lock);
412 * subscr_conn_msg_event - handle new subscription request from subscriber
415 static void subscr_conn_msg_event(void *usr_handle,
417 struct sk_buff **buf,
421 struct subscriber *subscriber;
422 spinlock_t *subscriber_lock;
424 subscriber = tipc_ref_lock((u32)(unsigned long)usr_handle);
425 if (subscriber == NULL)
428 subscriber_lock = subscriber->lock;
429 if (size != sizeof(struct tipc_subscr))
430 subscr_terminate(subscriber);
432 subscr_subscribe((struct tipc_subscr *)data, subscriber);
434 spin_unlock_bh(subscriber_lock);
438 * subscr_named_msg_event - handle request to establish a new subscriber
441 static void subscr_named_msg_event(void *usr_handle,
443 struct sk_buff **buf,
447 struct tipc_portid const *orig,
448 struct tipc_name_seq const *dest)
450 struct subscriber *subscriber;
451 struct iovec msg_sect = {NULL, 0};
452 spinlock_t *subscriber_lock;
454 dbg("subscr_named_msg_event: orig = %x own = %x,\n",
455 orig->node, tipc_own_addr);
456 if (size && (size != sizeof(struct tipc_subscr))) {
457 warn("Subscriber rejected, invalid subscription size\n");
461 /* Create subscriber object */
463 subscriber = kzalloc(sizeof(struct subscriber), GFP_ATOMIC);
464 if (subscriber == NULL) {
465 warn("Subscriber rejected, no memory\n");
468 INIT_LIST_HEAD(&subscriber->subscription_list);
469 INIT_LIST_HEAD(&subscriber->subscriber_list);
470 subscriber->ref = tipc_ref_acquire(subscriber, &subscriber->lock);
471 if (subscriber->ref == 0) {
472 warn("Subscriber rejected, reference table exhausted\n");
476 spin_unlock_bh(subscriber->lock);
478 /* Establish a connection to subscriber */
480 tipc_createport(topsrv.user_ref,
481 (void *)(unsigned long)subscriber->ref,
485 subscr_conn_shutdown_event,
488 subscr_conn_msg_event,
490 &subscriber->port_ref);
491 if (subscriber->port_ref == 0) {
492 warn("Subscriber rejected, unable to create port\n");
493 tipc_ref_discard(subscriber->ref);
497 tipc_connect2port(subscriber->port_ref, orig);
500 /* Add subscriber to topology server's subscriber list */
502 tipc_ref_lock(subscriber->ref);
503 spin_lock_bh(&topsrv.lock);
504 list_add(&subscriber->subscriber_list, &topsrv.subscriber_list);
505 spin_unlock_bh(&topsrv.lock);
508 * Subscribe now if message contains a subscription,
509 * otherwise send an empty response to complete connection handshaking
512 subscriber_lock = subscriber->lock;
514 subscr_subscribe((struct tipc_subscr *)data, subscriber);
516 tipc_send(subscriber->port_ref, 1, &msg_sect);
518 spin_unlock_bh(subscriber_lock);
521 int tipc_subscr_start(void)
523 struct tipc_name_seq seq = {TIPC_TOP_SRV, TIPC_TOP_SRV, TIPC_TOP_SRV};
526 memset(&topsrv, 0, sizeof (topsrv));
527 spin_lock_init(&topsrv.lock);
528 INIT_LIST_HEAD(&topsrv.subscriber_list);
530 spin_lock_bh(&topsrv.lock);
531 res = tipc_attach(&topsrv.user_ref, NULL, NULL);
533 spin_unlock_bh(&topsrv.lock);
537 res = tipc_createport(topsrv.user_ref,
539 TIPC_CRITICAL_IMPORTANCE,
544 subscr_named_msg_event,
551 res = tipc_nametbl_publish_rsv(topsrv.setup_port, TIPC_NODE_SCOPE, &seq);
555 spin_unlock_bh(&topsrv.lock);
559 err("Failed to create subscription service\n");
560 tipc_detach(topsrv.user_ref);
562 spin_unlock_bh(&topsrv.lock);
566 void tipc_subscr_stop(void)
568 struct subscriber *subscriber;
569 struct subscriber *subscriber_temp;
570 spinlock_t *subscriber_lock;
572 if (topsrv.user_ref) {
573 tipc_deleteport(topsrv.setup_port);
574 list_for_each_entry_safe(subscriber, subscriber_temp,
575 &topsrv.subscriber_list,
577 tipc_ref_lock(subscriber->ref);
578 subscriber_lock = subscriber->lock;
579 subscr_terminate(subscriber);
580 spin_unlock_bh(subscriber_lock);
582 tipc_detach(topsrv.user_ref);
588 int tipc_ispublished(struct tipc_name const *name)
592 return(tipc_nametbl_translate(name->type, name->instance,&domain) != 0);