Merge branch 'work'
[linux-2.6] / net / tipc / port.c
1 /*
2  * net/tipc/port.c: TIPC port code
3  * 
4  * Copyright (c) 1992-2006, Ericsson AB
5  * Copyright (c) 2004-2005, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36
37 #include "core.h"
38 #include "config.h"
39 #include "dbg.h"
40 #include "port.h"
41 #include "addr.h"
42 #include "link.h"
43 #include "node.h"
44 #include "port.h"
45 #include "name_table.h"
46 #include "user_reg.h"
47 #include "msg.h"
48 #include "bcast.h"
49
50 /* Connection management: */
51 #define PROBING_INTERVAL 3600000        /* [ms] => 1 h */
52 #define CONFIRMED 0
53 #define PROBING 1
54
55 #define MAX_REJECT_SIZE 1024
56
57 static struct sk_buff *msg_queue_head = 0;
58 static struct sk_buff *msg_queue_tail = 0;
59
60 spinlock_t port_list_lock = SPIN_LOCK_UNLOCKED;
61 static spinlock_t queue_lock = SPIN_LOCK_UNLOCKED;
62
63 LIST_HEAD(ports);
64 static void port_handle_node_down(unsigned long ref);
65 static struct sk_buff* port_build_self_abort_msg(struct port *,u32 err);
66 static struct sk_buff* port_build_peer_abort_msg(struct port *,u32 err);
67 static void port_timeout(unsigned long ref);
68
69
70 static inline u32 port_peernode(struct port *p_ptr)
71 {
72         return msg_destnode(&p_ptr->publ.phdr);
73 }
74
75 static inline u32 port_peerport(struct port *p_ptr)
76 {
77         return msg_destport(&p_ptr->publ.phdr);
78 }
79
80 static inline u32 port_out_seqno(struct port *p_ptr)
81 {
82         return msg_transp_seqno(&p_ptr->publ.phdr);
83 }
84
85 static inline void port_set_out_seqno(struct port *p_ptr, u32 seqno) 
86 {
87         msg_set_transp_seqno(&p_ptr->publ.phdr,seqno);
88 }
89
90 static inline void port_incr_out_seqno(struct port *p_ptr)
91 {
92         struct tipc_msg *m = &p_ptr->publ.phdr;
93
94         if (likely(!msg_routed(m)))
95                 return;
96         msg_set_transp_seqno(m, (msg_transp_seqno(m) + 1));
97 }
98
99 /**
100  * tipc_multicast - send a multicast message to local and remote destinations
101  */
102
103 int tipc_multicast(u32 ref, struct tipc_name_seq const *seq, u32 domain,
104                    u32 num_sect, struct iovec const *msg_sect)
105 {
106         struct tipc_msg *hdr;
107         struct sk_buff *buf;
108         struct sk_buff *ibuf = NULL;
109         struct port_list dports = {0, NULL, };
110         struct port *oport = port_deref(ref);
111         int ext_targets;
112         int res;
113
114         if (unlikely(!oport))
115                 return -EINVAL;
116
117         /* Create multicast message */
118
119         hdr = &oport->publ.phdr;
120         msg_set_type(hdr, TIPC_MCAST_MSG);
121         msg_set_nametype(hdr, seq->type);
122         msg_set_namelower(hdr, seq->lower);
123         msg_set_nameupper(hdr, seq->upper);
124         msg_set_hdr_sz(hdr, MCAST_H_SIZE);
125         res = msg_build(hdr, msg_sect, num_sect, MAX_MSG_SIZE,
126                         !oport->user_port, &buf);
127         if (unlikely(!buf))
128                 return res;
129
130         /* Figure out where to send multicast message */
131
132         ext_targets = nametbl_mc_translate(seq->type, seq->lower, seq->upper,
133                                            TIPC_NODE_SCOPE, &dports);
134         
135         /* Send message to destinations (duplicate it only if necessary) */ 
136
137         if (ext_targets) {
138                 if (dports.count != 0) {
139                         ibuf = skb_copy(buf, GFP_ATOMIC);
140                         if (ibuf == NULL) {
141                                 port_list_free(&dports);
142                                 buf_discard(buf);
143                                 return -ENOMEM;
144                         }
145                 }
146                 res = bclink_send_msg(buf);
147                 if ((res < 0) && (dports.count != 0)) {
148                         buf_discard(ibuf);
149                 }
150         } else {
151                 ibuf = buf;
152         }
153
154         if (res >= 0) {
155                 if (ibuf)
156                         port_recv_mcast(ibuf, &dports);
157         } else {
158                 port_list_free(&dports);
159         }
160         return res;
161 }
162
163 /**
164  * port_recv_mcast - deliver multicast message to all destination ports
165  * 
166  * If there is no port list, perform a lookup to create one
167  */
168
169 void port_recv_mcast(struct sk_buff *buf, struct port_list *dp)
170 {
171         struct tipc_msg* msg;
172         struct port_list dports = {0, NULL, };
173         struct port_list *item = dp;
174         int cnt = 0;
175
176         assert(buf);
177         msg = buf_msg(buf);
178
179         /* Create destination port list, if one wasn't supplied */
180
181         if (dp == NULL) {
182                 nametbl_mc_translate(msg_nametype(msg),
183                                      msg_namelower(msg),
184                                      msg_nameupper(msg),
185                                      TIPC_CLUSTER_SCOPE,
186                                      &dports);
187                 item = dp = &dports;
188         }
189
190         /* Deliver a copy of message to each destination port */
191
192         if (dp->count != 0) {
193                 if (dp->count == 1) {
194                         msg_set_destport(msg, dp->ports[0]);
195                         port_recv_msg(buf);
196                         port_list_free(dp);
197                         return;
198                 }
199                 for (; cnt < dp->count; cnt++) {
200                         int index = cnt % PLSIZE;
201                         struct sk_buff *b = skb_clone(buf, GFP_ATOMIC);
202
203                         if (b == NULL) {
204                                 warn("Buffer allocation failure\n");
205                                 msg_dbg(msg, "LOST:");
206                                 goto exit;
207                         }
208                         if ((index == 0) && (cnt != 0)) {
209                                 item = item->next;
210                         }
211                         msg_set_destport(buf_msg(b),item->ports[index]);
212                         port_recv_msg(b);
213                 }
214         }
215 exit:
216         buf_discard(buf);
217         port_list_free(dp);
218 }
219
220 /**
221  * tipc_createport_raw - create a native TIPC port
222  * 
223  * Returns local port reference
224  */
225
226 u32 tipc_createport_raw(void *usr_handle,
227                         u32 (*dispatcher)(struct tipc_port *, struct sk_buff *),
228                         void (*wakeup)(struct tipc_port *),
229                         const u32 importance)
230 {
231         struct port *p_ptr;
232         struct tipc_msg *msg;
233         u32 ref;
234
235         p_ptr = kmalloc(sizeof(*p_ptr), GFP_ATOMIC);
236         if (p_ptr == NULL) {
237                 warn("Memory squeeze; failed to create port\n");
238                 return 0;
239         }
240         memset(p_ptr, 0, sizeof(*p_ptr));
241         ref = ref_acquire(p_ptr, &p_ptr->publ.lock);
242         if (!ref) {
243                 warn("Reference Table Exhausted\n");
244                 kfree(p_ptr);
245                 return 0;
246         }
247
248         port_lock(ref);
249         p_ptr->publ.ref = ref;
250         msg = &p_ptr->publ.phdr;
251         msg_init(msg, DATA_LOW, TIPC_NAMED_MSG, TIPC_OK, LONG_H_SIZE, 0);
252         msg_set_orignode(msg, tipc_own_addr);
253         msg_set_prevnode(msg, tipc_own_addr);
254         msg_set_origport(msg, ref);
255         msg_set_importance(msg,importance);
256         p_ptr->last_in_seqno = 41;
257         p_ptr->sent = 1;
258         p_ptr->publ.usr_handle = usr_handle;
259         INIT_LIST_HEAD(&p_ptr->wait_list);
260         INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list);
261         p_ptr->congested_link = 0;
262         p_ptr->max_pkt = MAX_PKT_DEFAULT;
263         p_ptr->dispatcher = dispatcher;
264         p_ptr->wakeup = wakeup;
265         p_ptr->user_port = 0;
266         k_init_timer(&p_ptr->timer, (Handler)port_timeout, ref);
267         spin_lock_bh(&port_list_lock);
268         INIT_LIST_HEAD(&p_ptr->publications);
269         INIT_LIST_HEAD(&p_ptr->port_list);
270         list_add_tail(&p_ptr->port_list, &ports);
271         spin_unlock_bh(&port_list_lock);
272         port_unlock(p_ptr);
273         return ref;
274 }
275
276 int tipc_deleteport(u32 ref)
277 {
278         struct port *p_ptr;
279         struct sk_buff *buf = 0;
280
281         tipc_withdraw(ref, 0, 0);
282         p_ptr = port_lock(ref);
283         if (!p_ptr) 
284                 return -EINVAL;
285
286         ref_discard(ref);
287         port_unlock(p_ptr);
288
289         k_cancel_timer(&p_ptr->timer);
290         if (p_ptr->publ.connected) {
291                 buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
292                 nodesub_unsubscribe(&p_ptr->subscription);
293         }
294         if (p_ptr->user_port) {
295                 reg_remove_port(p_ptr->user_port);
296                 kfree(p_ptr->user_port);
297         }
298
299         spin_lock_bh(&port_list_lock);
300         list_del(&p_ptr->port_list);
301         list_del(&p_ptr->wait_list);
302         spin_unlock_bh(&port_list_lock);
303         k_term_timer(&p_ptr->timer);
304         kfree(p_ptr);
305         dbg("Deleted port %u\n", ref);
306         net_route_msg(buf);
307         return TIPC_OK;
308 }
309
310 /**
311  * tipc_get_port() - return port associated with 'ref'
312  * 
313  * Note: Port is not locked.
314  */
315
316 struct tipc_port *tipc_get_port(const u32 ref)
317 {
318         return (struct tipc_port *)ref_deref(ref);
319 }
320
321 /**
322  * tipc_get_handle - return user handle associated to port 'ref'
323  */
324
325 void *tipc_get_handle(const u32 ref)
326 {
327         struct port *p_ptr;
328         void * handle;
329
330         p_ptr = port_lock(ref);
331         if (!p_ptr)
332                 return 0;
333         handle = p_ptr->publ.usr_handle;
334         port_unlock(p_ptr);
335         return handle;
336 }
337
338 static inline int port_unreliable(struct port *p_ptr)
339 {
340         return msg_src_droppable(&p_ptr->publ.phdr);
341 }
342
343 int tipc_portunreliable(u32 ref, unsigned int *isunreliable)
344 {
345         struct port *p_ptr;
346         
347         p_ptr = port_lock(ref);
348         if (!p_ptr)
349                 return -EINVAL;
350         *isunreliable = port_unreliable(p_ptr);
351         spin_unlock_bh(p_ptr->publ.lock);
352         return TIPC_OK;
353 }
354
355 int tipc_set_portunreliable(u32 ref, unsigned int isunreliable)
356 {
357         struct port *p_ptr;
358         
359         p_ptr = port_lock(ref);
360         if (!p_ptr)
361                 return -EINVAL;
362         msg_set_src_droppable(&p_ptr->publ.phdr, (isunreliable != 0));
363         port_unlock(p_ptr);
364         return TIPC_OK;
365 }
366
367 static inline int port_unreturnable(struct port *p_ptr)
368 {
369         return msg_dest_droppable(&p_ptr->publ.phdr);
370 }
371
372 int tipc_portunreturnable(u32 ref, unsigned int *isunrejectable)
373 {
374         struct port *p_ptr;
375         
376         p_ptr = port_lock(ref);
377         if (!p_ptr)
378                 return -EINVAL;
379         *isunrejectable = port_unreturnable(p_ptr);
380         spin_unlock_bh(p_ptr->publ.lock);
381         return TIPC_OK;
382 }
383
384 int tipc_set_portunreturnable(u32 ref, unsigned int isunrejectable)
385 {
386         struct port *p_ptr;
387         
388         p_ptr = port_lock(ref);
389         if (!p_ptr)
390                 return -EINVAL;
391         msg_set_dest_droppable(&p_ptr->publ.phdr, (isunrejectable != 0));
392         port_unlock(p_ptr);
393         return TIPC_OK;
394 }
395
396 /* 
397  * port_build_proto_msg(): build a port level protocol 
398  * or a connection abortion message. Called with 
399  * tipc_port lock on.
400  */
401 static struct sk_buff *port_build_proto_msg(u32 destport, u32 destnode,
402                                             u32 origport, u32 orignode,
403                                             u32 usr, u32 type, u32 err, 
404                                             u32 seqno, u32 ack)
405 {
406         struct sk_buff *buf;
407         struct tipc_msg *msg;
408         
409         buf = buf_acquire(LONG_H_SIZE);
410         if (buf) {
411                 msg = buf_msg(buf);
412                 msg_init(msg, usr, type, err, LONG_H_SIZE, destnode);
413                 msg_set_destport(msg, destport);
414                 msg_set_origport(msg, origport);
415                 msg_set_destnode(msg, destnode);
416                 msg_set_orignode(msg, orignode);
417                 msg_set_transp_seqno(msg, seqno);
418                 msg_set_msgcnt(msg, ack);
419                 msg_dbg(msg, "PORT>SEND>:");
420         }
421         return buf;
422 }
423
424 int tipc_set_msg_option(struct tipc_port *tp_ptr, const char *opt, const u32 sz)
425 {
426         msg_expand(&tp_ptr->phdr, msg_destnode(&tp_ptr->phdr));
427         msg_set_options(&tp_ptr->phdr, opt, sz);
428         return TIPC_OK;
429 }
430
431 int tipc_reject_msg(struct sk_buff *buf, u32 err)
432 {
433         struct tipc_msg *msg = buf_msg(buf);
434         struct sk_buff *rbuf;
435         struct tipc_msg *rmsg;
436         int hdr_sz;
437         u32 imp = msg_importance(msg);
438         u32 data_sz = msg_data_sz(msg);
439
440         if (data_sz > MAX_REJECT_SIZE)
441                 data_sz = MAX_REJECT_SIZE;
442         if (msg_connected(msg) && (imp < TIPC_CRITICAL_IMPORTANCE))
443                 imp++;
444         msg_dbg(msg, "port->rej: ");
445
446         /* discard rejected message if it shouldn't be returned to sender */
447         if (msg_errcode(msg) || msg_dest_droppable(msg)) {
448                 buf_discard(buf);
449                 return data_sz;
450         }
451
452         /* construct rejected message */
453         if (msg_mcast(msg))
454                 hdr_sz = MCAST_H_SIZE;
455         else
456                 hdr_sz = LONG_H_SIZE;
457         rbuf = buf_acquire(data_sz + hdr_sz);
458         if (rbuf == NULL) {
459                 buf_discard(buf);
460                 return data_sz;
461         }
462         rmsg = buf_msg(rbuf);
463         msg_init(rmsg, imp, msg_type(msg), err, hdr_sz, msg_orignode(msg));
464         msg_set_destport(rmsg, msg_origport(msg));
465         msg_set_prevnode(rmsg, tipc_own_addr);
466         msg_set_origport(rmsg, msg_destport(msg));
467         if (msg_short(msg))
468                 msg_set_orignode(rmsg, tipc_own_addr);
469         else
470                 msg_set_orignode(rmsg, msg_destnode(msg));
471         msg_set_size(rmsg, data_sz + hdr_sz); 
472         msg_set_nametype(rmsg, msg_nametype(msg));
473         msg_set_nameinst(rmsg, msg_nameinst(msg));
474         memcpy(rbuf->data + hdr_sz, msg_data(msg), data_sz);
475
476         /* send self-abort message when rejecting on a connected port */
477         if (msg_connected(msg)) {
478                 struct sk_buff *abuf = 0;
479                 struct port *p_ptr = port_lock(msg_destport(msg));
480
481                 if (p_ptr) {
482                         if (p_ptr->publ.connected)
483                                 abuf = port_build_self_abort_msg(p_ptr, err);
484                         port_unlock(p_ptr);
485                 }
486                 net_route_msg(abuf);
487         }
488
489         /* send rejected message */
490         buf_discard(buf);
491         net_route_msg(rbuf);
492         return data_sz;
493 }
494
495 int port_reject_sections(struct port *p_ptr, struct tipc_msg *hdr,
496                          struct iovec const *msg_sect, u32 num_sect,
497                          int err)
498 {
499         struct sk_buff *buf;
500         int res;
501
502         res = msg_build(hdr, msg_sect, num_sect, MAX_MSG_SIZE, 
503                         !p_ptr->user_port, &buf);
504         if (!buf)
505                 return res;
506
507         return tipc_reject_msg(buf, err);
508 }
509
510 static void port_timeout(unsigned long ref)
511 {
512         struct port *p_ptr = port_lock(ref);
513         struct sk_buff *buf = 0;
514
515         if (!p_ptr || !p_ptr->publ.connected)
516                 return;
517
518         /* Last probe answered ? */
519         if (p_ptr->probing_state == PROBING) {
520                 buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
521         } else {
522                 buf = port_build_proto_msg(port_peerport(p_ptr),
523                                            port_peernode(p_ptr),
524                                            p_ptr->publ.ref,
525                                            tipc_own_addr,
526                                            CONN_MANAGER,
527                                            CONN_PROBE,
528                                            TIPC_OK, 
529                                            port_out_seqno(p_ptr),
530                                            0);
531                 port_incr_out_seqno(p_ptr);
532                 p_ptr->probing_state = PROBING;
533                 k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
534         }
535         port_unlock(p_ptr);
536         net_route_msg(buf);
537 }
538
539
540 static void port_handle_node_down(unsigned long ref)
541 {
542         struct port *p_ptr = port_lock(ref);
543         struct sk_buff* buf = 0;
544
545         if (!p_ptr)
546                 return;
547         buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_NODE);
548         port_unlock(p_ptr);
549         net_route_msg(buf);
550 }
551
552
553 static struct sk_buff *port_build_self_abort_msg(struct port *p_ptr, u32 err)
554 {
555         u32 imp = msg_importance(&p_ptr->publ.phdr);
556
557         if (!p_ptr->publ.connected)
558                 return 0;
559         if (imp < TIPC_CRITICAL_IMPORTANCE)
560                 imp++;
561         return port_build_proto_msg(p_ptr->publ.ref,
562                                     tipc_own_addr,
563                                     port_peerport(p_ptr),
564                                     port_peernode(p_ptr),
565                                     imp,
566                                     TIPC_CONN_MSG,
567                                     err, 
568                                     p_ptr->last_in_seqno + 1,
569                                     0);
570 }
571
572
573 static struct sk_buff *port_build_peer_abort_msg(struct port *p_ptr, u32 err)
574 {
575         u32 imp = msg_importance(&p_ptr->publ.phdr);
576
577         if (!p_ptr->publ.connected)
578                 return 0;
579         if (imp < TIPC_CRITICAL_IMPORTANCE)
580                 imp++;
581         return port_build_proto_msg(port_peerport(p_ptr),
582                                     port_peernode(p_ptr),
583                                     p_ptr->publ.ref,
584                                     tipc_own_addr,
585                                     imp,
586                                     TIPC_CONN_MSG,
587                                     err, 
588                                     port_out_seqno(p_ptr),
589                                     0);
590 }
591
592 void port_recv_proto_msg(struct sk_buff *buf)
593 {
594         struct tipc_msg *msg = buf_msg(buf);
595         struct port *p_ptr = port_lock(msg_destport(msg));
596         u32 err = TIPC_OK;
597         struct sk_buff *r_buf = 0;
598         struct sk_buff *abort_buf = 0;
599
600         msg_dbg(msg, "PORT<RECV<:");
601
602         if (!p_ptr) {
603                 err = TIPC_ERR_NO_PORT;
604         } else if (p_ptr->publ.connected) {
605                 if (port_peernode(p_ptr) != msg_orignode(msg))
606                         err = TIPC_ERR_NO_PORT;
607                 if (port_peerport(p_ptr) != msg_origport(msg))
608                         err = TIPC_ERR_NO_PORT;
609                 if (!err && msg_routed(msg)) {
610                         u32 seqno = msg_transp_seqno(msg);
611                         u32 myno =  ++p_ptr->last_in_seqno;
612                         if (seqno != myno) {
613                                 err = TIPC_ERR_NO_PORT;
614                                 abort_buf = port_build_self_abort_msg(p_ptr, err);
615                         }
616                 }
617                 if (msg_type(msg) == CONN_ACK) {
618                         int wakeup = port_congested(p_ptr) && 
619                                      p_ptr->publ.congested &&
620                                      p_ptr->wakeup;
621                         p_ptr->acked += msg_msgcnt(msg);
622                         if (port_congested(p_ptr))
623                                 goto exit;
624                         p_ptr->publ.congested = 0;
625                         if (!wakeup)
626                                 goto exit;
627                         p_ptr->wakeup(&p_ptr->publ);
628                         goto exit;
629                 }
630         } else if (p_ptr->publ.published) {
631                 err = TIPC_ERR_NO_PORT;
632         }
633         if (err) {
634                 r_buf = port_build_proto_msg(msg_origport(msg),
635                                              msg_orignode(msg), 
636                                              msg_destport(msg), 
637                                              tipc_own_addr,
638                                              DATA_HIGH,
639                                              TIPC_CONN_MSG,
640                                              err,
641                                              0,
642                                              0);
643                 goto exit;
644         }
645
646         /* All is fine */
647         if (msg_type(msg) == CONN_PROBE) {
648                 r_buf = port_build_proto_msg(msg_origport(msg), 
649                                              msg_orignode(msg), 
650                                              msg_destport(msg), 
651                                              tipc_own_addr, 
652                                              CONN_MANAGER,
653                                              CONN_PROBE_REPLY,
654                                              TIPC_OK,
655                                              port_out_seqno(p_ptr),
656                                              0);
657         }
658         p_ptr->probing_state = CONFIRMED;
659         port_incr_out_seqno(p_ptr);
660 exit:
661         if (p_ptr)
662                 port_unlock(p_ptr);
663         net_route_msg(r_buf);
664         net_route_msg(abort_buf);
665         buf_discard(buf);
666 }
667
668 static void port_print(struct port *p_ptr, struct print_buf *buf, int full_id)
669 {
670         struct publication *publ;
671
672         if (full_id)
673                 tipc_printf(buf, "<%u.%u.%u:%u>:", 
674                             tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr),
675                             tipc_node(tipc_own_addr), p_ptr->publ.ref);
676         else
677                 tipc_printf(buf, "%-10u:", p_ptr->publ.ref);
678
679         if (p_ptr->publ.connected) {
680                 u32 dport = port_peerport(p_ptr);
681                 u32 destnode = port_peernode(p_ptr);
682
683                 tipc_printf(buf, " connected to <%u.%u.%u:%u>",
684                             tipc_zone(destnode), tipc_cluster(destnode),
685                             tipc_node(destnode), dport);
686                 if (p_ptr->publ.conn_type != 0)
687                         tipc_printf(buf, " via {%u,%u}",
688                                     p_ptr->publ.conn_type,
689                                     p_ptr->publ.conn_instance);
690         }
691         else if (p_ptr->publ.published) {
692                 tipc_printf(buf, " bound to");
693                 list_for_each_entry(publ, &p_ptr->publications, pport_list) {
694                         if (publ->lower == publ->upper)
695                                 tipc_printf(buf, " {%u,%u}", publ->type,
696                                             publ->lower);
697                         else
698                                 tipc_printf(buf, " {%u,%u,%u}", publ->type, 
699                                             publ->lower, publ->upper);
700                 }
701         }
702         tipc_printf(buf, "\n");
703 }
704
705 #define MAX_PORT_QUERY 32768
706
707 struct sk_buff *port_get_ports(void)
708 {
709         struct sk_buff *buf;
710         struct tlv_desc *rep_tlv;
711         struct print_buf pb;
712         struct port *p_ptr;
713         int str_len;
714
715         buf = cfg_reply_alloc(TLV_SPACE(MAX_PORT_QUERY));
716         if (!buf)
717                 return NULL;
718         rep_tlv = (struct tlv_desc *)buf->data;
719
720         printbuf_init(&pb, TLV_DATA(rep_tlv), MAX_PORT_QUERY);
721         spin_lock_bh(&port_list_lock);
722         list_for_each_entry(p_ptr, &ports, port_list) {
723                 spin_lock_bh(p_ptr->publ.lock);
724                 port_print(p_ptr, &pb, 0);
725                 spin_unlock_bh(p_ptr->publ.lock);
726         }
727         spin_unlock_bh(&port_list_lock);
728         str_len = printbuf_validate(&pb);
729
730         skb_put(buf, TLV_SPACE(str_len));
731         TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
732
733         return buf;
734 }
735
736 #if 0
737
738 #define MAX_PORT_STATS 2000
739
740 struct sk_buff *port_show_stats(const void *req_tlv_area, int req_tlv_space)
741 {
742         u32 ref;
743         struct port *p_ptr;
744         struct sk_buff *buf;
745         struct tlv_desc *rep_tlv;
746         struct print_buf pb;
747         int str_len;
748
749         if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_PORT_REF))
750                 return cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
751
752         ref = *(u32 *)TLV_DATA(req_tlv_area);
753         ref = ntohl(ref);
754
755         p_ptr = port_lock(ref);
756         if (!p_ptr)
757                 return cfg_reply_error_string("port not found");
758
759         buf = cfg_reply_alloc(TLV_SPACE(MAX_PORT_STATS));
760         if (!buf) {
761                 port_unlock(p_ptr);
762                 return NULL;
763         }
764         rep_tlv = (struct tlv_desc *)buf->data;
765
766         printbuf_init(&pb, TLV_DATA(rep_tlv), MAX_PORT_STATS);
767         port_print(p_ptr, &pb, 1);
768         /* NEED TO FILL IN ADDITIONAL PORT STATISTICS HERE */
769         port_unlock(p_ptr);
770         str_len = printbuf_validate(&pb);
771
772         skb_put(buf, TLV_SPACE(str_len));
773         TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
774
775         return buf;
776 }
777
778 #endif
779
780 void port_reinit(void)
781 {
782         struct port *p_ptr;
783         struct tipc_msg *msg;
784
785         spin_lock_bh(&port_list_lock);
786         list_for_each_entry(p_ptr, &ports, port_list) {
787                 msg = &p_ptr->publ.phdr;
788                 if (msg_orignode(msg) == tipc_own_addr)
789                         break;
790                 msg_set_orignode(msg, tipc_own_addr);
791         }
792         spin_unlock_bh(&port_list_lock);
793 }
794
795
796 /*
797  *  port_dispatcher_sigh(): Signal handler for messages destinated
798  *                          to the tipc_port interface.
799  */
800
801 static void port_dispatcher_sigh(void *dummy)
802 {
803         struct sk_buff *buf;
804
805         spin_lock_bh(&queue_lock);
806         buf = msg_queue_head;
807         msg_queue_head = 0;
808         spin_unlock_bh(&queue_lock);
809
810         while (buf) {
811                 struct port *p_ptr;
812                 struct user_port *up_ptr;
813                 struct tipc_portid orig;
814                 struct tipc_name_seq dseq;
815                 void *usr_handle;
816                 int connected;
817                 int published;
818
819                 struct sk_buff *next = buf->next;
820                 struct tipc_msg *msg = buf_msg(buf);
821                 u32 dref = msg_destport(msg);
822                 
823                 p_ptr = port_lock(dref);
824                 if (!p_ptr) {
825                         /* Port deleted while msg in queue */
826                         tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
827                         buf = next;
828                         continue;
829                 }
830                 orig.ref = msg_origport(msg);
831                 orig.node = msg_orignode(msg);
832                 up_ptr = p_ptr->user_port;
833                 usr_handle = up_ptr->usr_handle;
834                 connected = p_ptr->publ.connected;
835                 published = p_ptr->publ.published;
836
837                 if (unlikely(msg_errcode(msg)))
838                         goto err;
839
840                 switch (msg_type(msg)) {
841                 
842                 case TIPC_CONN_MSG:{
843                                 tipc_conn_msg_event cb = up_ptr->conn_msg_cb;
844                                 u32 peer_port = port_peerport(p_ptr);
845                                 u32 peer_node = port_peernode(p_ptr);
846
847                                 spin_unlock_bh(p_ptr->publ.lock);
848                                 if (unlikely(!connected)) {
849                                         if (unlikely(published))
850                                                 goto reject;
851                                         tipc_connect2port(dref,&orig);
852                                 }
853                                 if (unlikely(msg_origport(msg) != peer_port))
854                                         goto reject;
855                                 if (unlikely(msg_orignode(msg) != peer_node))
856                                         goto reject;
857                                 if (unlikely(!cb))
858                                         goto reject;
859                                 if (unlikely(++p_ptr->publ.conn_unacked >= 
860                                              TIPC_FLOW_CONTROL_WIN))
861                                         tipc_acknowledge(dref, 
862                                                          p_ptr->publ.conn_unacked);
863                                 skb_pull(buf, msg_hdr_sz(msg));
864                                 cb(usr_handle, dref, &buf, msg_data(msg),
865                                    msg_data_sz(msg));
866                                 break;
867                         }
868                 case TIPC_DIRECT_MSG:{
869                                 tipc_msg_event cb = up_ptr->msg_cb;
870
871                                 spin_unlock_bh(p_ptr->publ.lock);
872                                 if (unlikely(connected))
873                                         goto reject;
874                                 if (unlikely(!cb))
875                                         goto reject;
876                                 skb_pull(buf, msg_hdr_sz(msg));
877                                 cb(usr_handle, dref, &buf, msg_data(msg), 
878                                    msg_data_sz(msg), msg_importance(msg),
879                                    &orig);
880                                 break;
881                         }
882                 case TIPC_NAMED_MSG:{
883                                 tipc_named_msg_event cb = up_ptr->named_msg_cb;
884
885                                 spin_unlock_bh(p_ptr->publ.lock);
886                                 if (unlikely(connected))
887                                         goto reject;
888                                 if (unlikely(!cb))
889                                         goto reject;
890                                 if (unlikely(!published))
891                                         goto reject;
892                                 dseq.type =  msg_nametype(msg);
893                                 dseq.lower = msg_nameinst(msg);
894                                 dseq.upper = dseq.lower;
895                                 skb_pull(buf, msg_hdr_sz(msg));
896                                 cb(usr_handle, dref, &buf, msg_data(msg), 
897                                    msg_data_sz(msg), msg_importance(msg),
898                                    &orig, &dseq);
899                                 break;
900                         }
901                 }
902                 if (buf)
903                         buf_discard(buf);
904                 buf = next;
905                 continue;
906 err:
907                 switch (msg_type(msg)) {
908                 
909                 case TIPC_CONN_MSG:{
910                                 tipc_conn_shutdown_event cb = 
911                                         up_ptr->conn_err_cb;
912                                 u32 peer_port = port_peerport(p_ptr);
913                                 u32 peer_node = port_peernode(p_ptr);
914
915                                 spin_unlock_bh(p_ptr->publ.lock);
916                                 if (!connected || !cb)
917                                         break;
918                                 if (msg_origport(msg) != peer_port)
919                                         break;
920                                 if (msg_orignode(msg) != peer_node)
921                                         break;
922                                 tipc_disconnect(dref);
923                                 skb_pull(buf, msg_hdr_sz(msg));
924                                 cb(usr_handle, dref, &buf, msg_data(msg),
925                                    msg_data_sz(msg), msg_errcode(msg));
926                                 break;
927                         }
928                 case TIPC_DIRECT_MSG:{
929                                 tipc_msg_err_event cb = up_ptr->err_cb;
930
931                                 spin_unlock_bh(p_ptr->publ.lock);
932                                 if (connected || !cb)
933                                         break;
934                                 skb_pull(buf, msg_hdr_sz(msg));
935                                 cb(usr_handle, dref, &buf, msg_data(msg),
936                                    msg_data_sz(msg), msg_errcode(msg), &orig);
937                                 break;
938                         }
939                 case TIPC_NAMED_MSG:{
940                                 tipc_named_msg_err_event cb = 
941                                         up_ptr->named_err_cb;
942
943                                 spin_unlock_bh(p_ptr->publ.lock);
944                                 if (connected || !cb)
945                                         break;
946                                 dseq.type =  msg_nametype(msg);
947                                 dseq.lower = msg_nameinst(msg);
948                                 dseq.upper = dseq.lower;
949                                 skb_pull(buf, msg_hdr_sz(msg));
950                                 cb(usr_handle, dref, &buf, msg_data(msg), 
951                                    msg_data_sz(msg), msg_errcode(msg), &dseq);
952                                 break;
953                         }
954                 }
955                 if (buf)
956                         buf_discard(buf);
957                 buf = next;
958                 continue;
959 reject:
960                 tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
961                 buf = next;
962         }
963 }
964
965 /*
966  *  port_dispatcher(): Dispatcher for messages destinated
967  *  to the tipc_port interface. Called with port locked.
968  */
969
970 static u32 port_dispatcher(struct tipc_port *dummy, struct sk_buff *buf)
971 {
972         buf->next = NULL;
973         spin_lock_bh(&queue_lock);
974         if (msg_queue_head) {
975                 msg_queue_tail->next = buf;
976                 msg_queue_tail = buf;
977         } else {
978                 msg_queue_tail = msg_queue_head = buf;
979                 k_signal((Handler)port_dispatcher_sigh, 0);
980         }
981         spin_unlock_bh(&queue_lock);
982         return TIPC_OK;
983 }
984
985 /* 
986  * Wake up port after congestion: Called with port locked,
987  *                                
988  */
989
990 static void port_wakeup_sh(unsigned long ref)
991 {
992         struct port *p_ptr;
993         struct user_port *up_ptr;
994         tipc_continue_event cb = 0;
995         void *uh = 0;
996
997         p_ptr = port_lock(ref);
998         if (p_ptr) {
999                 up_ptr = p_ptr->user_port;
1000                 if (up_ptr) {
1001                         cb = up_ptr->continue_event_cb;
1002                         uh = up_ptr->usr_handle;
1003                 }
1004                 port_unlock(p_ptr);
1005         }
1006         if (cb)
1007                 cb(uh, ref);
1008 }
1009
1010
1011 static void port_wakeup(struct tipc_port *p_ptr)
1012 {
1013         k_signal((Handler)port_wakeup_sh, p_ptr->ref);
1014 }
1015
1016 void tipc_acknowledge(u32 ref, u32 ack)
1017 {
1018         struct port *p_ptr;
1019         struct sk_buff *buf = 0;
1020
1021         p_ptr = port_lock(ref);
1022         if (!p_ptr)
1023                 return;
1024         if (p_ptr->publ.connected) {
1025                 p_ptr->publ.conn_unacked -= ack;
1026                 buf = port_build_proto_msg(port_peerport(p_ptr),
1027                                            port_peernode(p_ptr),
1028                                            ref,
1029                                            tipc_own_addr,
1030                                            CONN_MANAGER,
1031                                            CONN_ACK,
1032                                            TIPC_OK, 
1033                                            port_out_seqno(p_ptr),
1034                                            ack);
1035         }
1036         port_unlock(p_ptr);
1037         net_route_msg(buf);
1038 }
1039
1040 /*
1041  * tipc_createport(): user level call. Will add port to
1042  *                    registry if non-zero user_ref.
1043  */
1044
1045 int tipc_createport(u32 user_ref, 
1046                     void *usr_handle, 
1047                     unsigned int importance, 
1048                     tipc_msg_err_event error_cb, 
1049                     tipc_named_msg_err_event named_error_cb, 
1050                     tipc_conn_shutdown_event conn_error_cb, 
1051                     tipc_msg_event msg_cb, 
1052                     tipc_named_msg_event named_msg_cb, 
1053                     tipc_conn_msg_event conn_msg_cb, 
1054                     tipc_continue_event continue_event_cb,/* May be zero */
1055                     u32 *portref)
1056 {
1057         struct user_port *up_ptr;
1058         struct port *p_ptr; 
1059         u32 ref;
1060
1061         up_ptr = (struct user_port *)kmalloc(sizeof(*up_ptr), GFP_ATOMIC);
1062         if (up_ptr == NULL) {
1063                 return -ENOMEM;
1064         }
1065         ref = tipc_createport_raw(0, port_dispatcher, port_wakeup, importance);
1066         p_ptr = port_lock(ref);
1067         if (!p_ptr) {
1068                 kfree(up_ptr);
1069                 return -ENOMEM;
1070         }
1071
1072         p_ptr->user_port = up_ptr;
1073         up_ptr->user_ref = user_ref;
1074         up_ptr->usr_handle = usr_handle;
1075         up_ptr->ref = p_ptr->publ.ref;
1076         up_ptr->err_cb = error_cb;
1077         up_ptr->named_err_cb = named_error_cb;
1078         up_ptr->conn_err_cb = conn_error_cb;
1079         up_ptr->msg_cb = msg_cb;
1080         up_ptr->named_msg_cb = named_msg_cb;
1081         up_ptr->conn_msg_cb = conn_msg_cb;
1082         up_ptr->continue_event_cb = continue_event_cb;
1083         INIT_LIST_HEAD(&up_ptr->uport_list);
1084         reg_add_port(up_ptr);
1085         *portref = p_ptr->publ.ref;
1086         dbg(" tipc_createport: %x with ref %u\n", p_ptr, p_ptr->publ.ref);        
1087         port_unlock(p_ptr);
1088         return TIPC_OK;
1089 }
1090
1091 int tipc_ownidentity(u32 ref, struct tipc_portid *id)
1092 {
1093         id->ref = ref;
1094         id->node = tipc_own_addr;
1095         return TIPC_OK;
1096 }
1097
1098 int tipc_portimportance(u32 ref, unsigned int *importance)
1099 {
1100         struct port *p_ptr;
1101         
1102         p_ptr = port_lock(ref);
1103         if (!p_ptr)
1104                 return -EINVAL;
1105         *importance = (unsigned int)msg_importance(&p_ptr->publ.phdr);
1106         spin_unlock_bh(p_ptr->publ.lock);
1107         return TIPC_OK;
1108 }
1109
1110 int tipc_set_portimportance(u32 ref, unsigned int imp)
1111 {
1112         struct port *p_ptr;
1113
1114         if (imp > TIPC_CRITICAL_IMPORTANCE)
1115                 return -EINVAL;
1116
1117         p_ptr = port_lock(ref);
1118         if (!p_ptr)
1119                 return -EINVAL;
1120         msg_set_importance(&p_ptr->publ.phdr, (u32)imp);
1121         spin_unlock_bh(p_ptr->publ.lock);
1122         return TIPC_OK;
1123 }
1124
1125
1126 int tipc_publish(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
1127 {
1128         struct port *p_ptr;
1129         struct publication *publ;
1130         u32 key;
1131         int res = -EINVAL;
1132
1133         p_ptr = port_lock(ref);
1134         dbg("tipc_publ %u, p_ptr = %x, conn = %x, scope = %x, "
1135             "lower = %u, upper = %u\n",
1136             ref, p_ptr, p_ptr->publ.connected, scope, seq->lower, seq->upper);
1137         if (!p_ptr)
1138                 return -EINVAL;
1139         if (p_ptr->publ.connected)
1140                 goto exit;
1141         if (seq->lower > seq->upper)
1142                 goto exit;
1143         if ((scope < TIPC_ZONE_SCOPE) || (scope > TIPC_NODE_SCOPE))
1144                 goto exit;
1145         key = ref + p_ptr->pub_count + 1;
1146         if (key == ref) {
1147                 res = -EADDRINUSE;
1148                 goto exit;
1149         }
1150         publ = nametbl_publish(seq->type, seq->lower, seq->upper,
1151                                scope, p_ptr->publ.ref, key);
1152         if (publ) {
1153                 list_add(&publ->pport_list, &p_ptr->publications);
1154                 p_ptr->pub_count++;
1155                 p_ptr->publ.published = 1;
1156                 res = TIPC_OK;
1157         }
1158 exit:
1159         port_unlock(p_ptr);
1160         return res;
1161 }
1162
1163 int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
1164 {
1165         struct port *p_ptr;
1166         struct publication *publ;
1167         struct publication *tpubl;
1168         int res = -EINVAL;
1169         
1170         p_ptr = port_lock(ref);
1171         if (!p_ptr)
1172                 return -EINVAL;
1173         if (!p_ptr->publ.published)
1174                 goto exit;
1175         if (!seq) {
1176                 list_for_each_entry_safe(publ, tpubl, 
1177                                          &p_ptr->publications, pport_list) {
1178                         nametbl_withdraw(publ->type, publ->lower, 
1179                                          publ->ref, publ->key);
1180                 }
1181                 res = TIPC_OK;
1182         } else {
1183                 list_for_each_entry_safe(publ, tpubl, 
1184                                          &p_ptr->publications, pport_list) {
1185                         if (publ->scope != scope)
1186                                 continue;
1187                         if (publ->type != seq->type)
1188                                 continue;
1189                         if (publ->lower != seq->lower)
1190                                 continue;
1191                         if (publ->upper != seq->upper)
1192                                 break;
1193                         nametbl_withdraw(publ->type, publ->lower, 
1194                                          publ->ref, publ->key);
1195                         res = TIPC_OK;
1196                         break;
1197                 }
1198         }
1199         if (list_empty(&p_ptr->publications))
1200                 p_ptr->publ.published = 0;
1201 exit:
1202         port_unlock(p_ptr);
1203         return res;
1204 }
1205
1206 int tipc_connect2port(u32 ref, struct tipc_portid const *peer)
1207 {
1208         struct port *p_ptr;
1209         struct tipc_msg *msg;
1210         int res = -EINVAL;
1211
1212         p_ptr = port_lock(ref);
1213         if (!p_ptr)
1214                 return -EINVAL;
1215         if (p_ptr->publ.published || p_ptr->publ.connected)
1216                 goto exit;
1217         if (!peer->ref)
1218                 goto exit;
1219
1220         msg = &p_ptr->publ.phdr;
1221         msg_set_destnode(msg, peer->node);
1222         msg_set_destport(msg, peer->ref);
1223         msg_set_orignode(msg, tipc_own_addr);
1224         msg_set_origport(msg, p_ptr->publ.ref);
1225         msg_set_transp_seqno(msg, 42);
1226         msg_set_type(msg, TIPC_CONN_MSG);
1227         if (!may_route(peer->node))
1228                 msg_set_hdr_sz(msg, SHORT_H_SIZE);
1229         else
1230                 msg_set_hdr_sz(msg, LONG_H_SIZE);
1231
1232         p_ptr->probing_interval = PROBING_INTERVAL;
1233         p_ptr->probing_state = CONFIRMED;
1234         p_ptr->publ.connected = 1;
1235         k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
1236
1237         nodesub_subscribe(&p_ptr->subscription,peer->node,
1238                           (void *)(unsigned long)ref,
1239                           (net_ev_handler)port_handle_node_down);
1240         res = TIPC_OK;
1241 exit:
1242         port_unlock(p_ptr);
1243         p_ptr->max_pkt = link_get_max_pkt(peer->node, ref);
1244         return res;
1245 }
1246
1247 /*
1248  * tipc_disconnect(): Disconnect port form peer.
1249  *                    This is a node local operation.
1250  */
1251
1252 int tipc_disconnect(u32 ref)
1253 {
1254         struct port *p_ptr;
1255         int res = -ENOTCONN;
1256
1257         p_ptr = port_lock(ref);
1258         if (!p_ptr)
1259                 return -EINVAL;
1260         if (p_ptr->publ.connected) {
1261                 p_ptr->publ.connected = 0;
1262                 /* let timer expire on it's own to avoid deadlock! */
1263                 nodesub_unsubscribe(&p_ptr->subscription);
1264                 res = TIPC_OK;
1265         }
1266         port_unlock(p_ptr);
1267         return res;
1268 }
1269
1270 /*
1271  * tipc_shutdown(): Send a SHUTDOWN msg to peer and disconnect
1272  */
1273 int tipc_shutdown(u32 ref)
1274 {
1275         struct port *p_ptr;
1276         struct sk_buff *buf = 0;
1277
1278         p_ptr = port_lock(ref);
1279         if (!p_ptr)
1280                 return -EINVAL;
1281
1282         if (p_ptr->publ.connected) {
1283                 u32 imp = msg_importance(&p_ptr->publ.phdr);
1284                 if (imp < TIPC_CRITICAL_IMPORTANCE)
1285                         imp++;
1286                 buf = port_build_proto_msg(port_peerport(p_ptr),
1287                                            port_peernode(p_ptr),
1288                                            ref,
1289                                            tipc_own_addr,
1290                                            imp,
1291                                            TIPC_CONN_MSG,
1292                                            TIPC_CONN_SHUTDOWN, 
1293                                            port_out_seqno(p_ptr),
1294                                            0);
1295         }
1296         port_unlock(p_ptr);
1297         net_route_msg(buf);
1298         return tipc_disconnect(ref);
1299 }
1300
1301 int tipc_isconnected(u32 ref, int *isconnected)
1302 {
1303         struct port *p_ptr;
1304         
1305         p_ptr = port_lock(ref);
1306         if (!p_ptr)
1307                 return -EINVAL;
1308         *isconnected = p_ptr->publ.connected;
1309         port_unlock(p_ptr);
1310         return TIPC_OK;
1311 }
1312
1313 int tipc_peer(u32 ref, struct tipc_portid *peer)
1314 {
1315         struct port *p_ptr;
1316         int res;
1317          
1318         p_ptr = port_lock(ref);
1319         if (!p_ptr)
1320                 return -EINVAL;
1321         if (p_ptr->publ.connected) {
1322                 peer->ref = port_peerport(p_ptr);
1323                 peer->node = port_peernode(p_ptr);
1324                 res = TIPC_OK;
1325         } else
1326                 res = -ENOTCONN;
1327         port_unlock(p_ptr);
1328         return res;
1329 }
1330
1331 int tipc_ref_valid(u32 ref)
1332 {
1333         /* Works irrespective of type */
1334         return !!ref_deref(ref);
1335 }
1336
1337
1338 /*
1339  *  port_recv_sections(): Concatenate and deliver sectioned
1340  *                        message for this node.
1341  */
1342
1343 int port_recv_sections(struct port *sender, unsigned int num_sect,
1344                        struct iovec const *msg_sect)
1345 {
1346         struct sk_buff *buf;
1347         int res;
1348          
1349         res = msg_build(&sender->publ.phdr, msg_sect, num_sect,
1350                         MAX_MSG_SIZE, !sender->user_port, &buf);
1351         if (likely(buf))
1352                 port_recv_msg(buf);
1353         return res;
1354 }
1355
1356 /**
1357  * tipc_send - send message sections on connection
1358  */
1359
1360 int tipc_send(u32 ref, unsigned int num_sect, struct iovec const *msg_sect)
1361 {
1362         struct port *p_ptr;
1363         u32 destnode;
1364         int res;
1365
1366         p_ptr = port_deref(ref);
1367         if (!p_ptr || !p_ptr->publ.connected)
1368                 return -EINVAL;
1369
1370         p_ptr->publ.congested = 1;
1371         if (!port_congested(p_ptr)) {
1372                 destnode = port_peernode(p_ptr);
1373                 if (likely(destnode != tipc_own_addr))
1374                         res = link_send_sections_fast(p_ptr, msg_sect, num_sect,
1375                                                       destnode);
1376                 else
1377                         res = port_recv_sections(p_ptr, num_sect, msg_sect);
1378
1379                 if (likely(res != -ELINKCONG)) {
1380                         port_incr_out_seqno(p_ptr);
1381                         p_ptr->publ.congested = 0;
1382                         p_ptr->sent++;
1383                         return res;
1384                 }
1385         }
1386         if (port_unreliable(p_ptr)) {
1387                 p_ptr->publ.congested = 0;
1388                 /* Just calculate msg length and return */
1389                 return msg_calc_data_size(msg_sect, num_sect);
1390         }
1391         return -ELINKCONG;
1392 }
1393
1394 /** 
1395  * tipc_send_buf - send message buffer on connection
1396  */
1397
1398 int tipc_send_buf(u32 ref, struct sk_buff *buf, unsigned int dsz)
1399 {
1400         struct port *p_ptr;
1401         struct tipc_msg *msg;
1402         u32 destnode;
1403         u32 hsz;
1404         u32 sz;
1405         u32 res;
1406          
1407         p_ptr = port_deref(ref);
1408         if (!p_ptr || !p_ptr->publ.connected)
1409                 return -EINVAL;
1410
1411         msg = &p_ptr->publ.phdr;
1412         hsz = msg_hdr_sz(msg);
1413         sz = hsz + dsz;
1414         msg_set_size(msg, sz);
1415         if (skb_cow(buf, hsz))
1416                 return -ENOMEM;
1417
1418         skb_push(buf, hsz);
1419         memcpy(buf->data, (unchar *)msg, hsz);
1420         destnode = msg_destnode(msg);
1421         p_ptr->publ.congested = 1;
1422         if (!port_congested(p_ptr)) {
1423                 if (likely(destnode != tipc_own_addr))
1424                         res = tipc_send_buf_fast(buf, destnode);
1425                 else {
1426                         port_recv_msg(buf);
1427                         res = sz;
1428                 }
1429                 if (likely(res != -ELINKCONG)) {
1430                         port_incr_out_seqno(p_ptr);
1431                         p_ptr->sent++;
1432                         p_ptr->publ.congested = 0;
1433                         return res;
1434                 }
1435         }
1436         if (port_unreliable(p_ptr)) {
1437                 p_ptr->publ.congested = 0;
1438                 return dsz;
1439         }
1440         return -ELINKCONG;
1441 }
1442
1443 /**
1444  * tipc_forward2name - forward message sections to port name
1445  */
1446
1447 int tipc_forward2name(u32 ref, 
1448                       struct tipc_name const *name, 
1449                       u32 domain,
1450                       u32 num_sect, 
1451                       struct iovec const *msg_sect,
1452                       struct tipc_portid const *orig, 
1453                       unsigned int importance)
1454 {
1455         struct port *p_ptr;
1456         struct tipc_msg *msg;
1457         u32 destnode = domain;
1458         u32 destport = 0;
1459         int res;
1460
1461         p_ptr = port_deref(ref);
1462         if (!p_ptr || p_ptr->publ.connected)
1463                 return -EINVAL;
1464
1465         msg = &p_ptr->publ.phdr;
1466         msg_set_type(msg, TIPC_NAMED_MSG);
1467         msg_set_orignode(msg, orig->node);
1468         msg_set_origport(msg, orig->ref);
1469         msg_set_hdr_sz(msg, LONG_H_SIZE);
1470         msg_set_nametype(msg, name->type);
1471         msg_set_nameinst(msg, name->instance);
1472         msg_set_lookup_scope(msg, addr_scope(domain));
1473         if (importance <= TIPC_CRITICAL_IMPORTANCE)
1474                 msg_set_importance(msg,importance);
1475         destport = nametbl_translate(name->type, name->instance, &destnode);
1476         msg_set_destnode(msg, destnode);
1477         msg_set_destport(msg, destport);
1478
1479         if (likely(destport || destnode)) {
1480                 p_ptr->sent++;
1481                 if (likely(destnode == tipc_own_addr))
1482                         return port_recv_sections(p_ptr, num_sect, msg_sect);
1483                 res = link_send_sections_fast(p_ptr, msg_sect, num_sect, 
1484                                               destnode);
1485                 if (likely(res != -ELINKCONG))
1486                         return res;
1487                 if (port_unreliable(p_ptr)) {
1488                         /* Just calculate msg length and return */
1489                         return msg_calc_data_size(msg_sect, num_sect);
1490                 }
1491                 return -ELINKCONG;
1492         }
1493         return port_reject_sections(p_ptr, msg, msg_sect, num_sect, 
1494                                     TIPC_ERR_NO_NAME);
1495 }
1496
1497 /**
1498  * tipc_send2name - send message sections to port name
1499  */
1500
1501 int tipc_send2name(u32 ref, 
1502                    struct tipc_name const *name,
1503                    unsigned int domain, 
1504                    unsigned int num_sect, 
1505                    struct iovec const *msg_sect)
1506 {
1507         struct tipc_portid orig;
1508
1509         orig.ref = ref;
1510         orig.node = tipc_own_addr;
1511         return tipc_forward2name(ref, name, domain, num_sect, msg_sect, &orig,
1512                                  TIPC_PORT_IMPORTANCE);
1513 }
1514
1515 /** 
1516  * tipc_forward_buf2name - forward message buffer to port name
1517  */
1518
1519 int tipc_forward_buf2name(u32 ref,
1520                           struct tipc_name const *name,
1521                           u32 domain,
1522                           struct sk_buff *buf,
1523                           unsigned int dsz,
1524                           struct tipc_portid const *orig,
1525                           unsigned int importance)
1526 {
1527         struct port *p_ptr;
1528         struct tipc_msg *msg;
1529         u32 destnode = domain;
1530         u32 destport = 0;
1531         int res;
1532
1533         p_ptr = (struct port *)ref_deref(ref);
1534         if (!p_ptr || p_ptr->publ.connected)
1535                 return -EINVAL;
1536
1537         msg = &p_ptr->publ.phdr;
1538         if (importance <= TIPC_CRITICAL_IMPORTANCE)
1539                 msg_set_importance(msg, importance);
1540         msg_set_type(msg, TIPC_NAMED_MSG);
1541         msg_set_orignode(msg, orig->node);
1542         msg_set_origport(msg, orig->ref);
1543         msg_set_nametype(msg, name->type);
1544         msg_set_nameinst(msg, name->instance);
1545         msg_set_lookup_scope(msg, addr_scope(domain));
1546         msg_set_hdr_sz(msg, LONG_H_SIZE);
1547         msg_set_size(msg, LONG_H_SIZE + dsz);
1548         destport = nametbl_translate(name->type, name->instance, &destnode);
1549         msg_set_destnode(msg, destnode);
1550         msg_set_destport(msg, destport);
1551         msg_dbg(msg, "forw2name ==> ");
1552         if (skb_cow(buf, LONG_H_SIZE))
1553                 return -ENOMEM;
1554         skb_push(buf, LONG_H_SIZE);
1555         memcpy(buf->data, (unchar *)msg, LONG_H_SIZE);
1556         msg_dbg(buf_msg(buf),"PREP:");
1557         if (likely(destport || destnode)) {
1558                 p_ptr->sent++;
1559                 if (destnode == tipc_own_addr)
1560                         return port_recv_msg(buf);
1561                 res = tipc_send_buf_fast(buf, destnode);
1562                 if (likely(res != -ELINKCONG))
1563                         return res;
1564                 if (port_unreliable(p_ptr))
1565                         return dsz;
1566                 return -ELINKCONG;
1567         }
1568         return tipc_reject_msg(buf, TIPC_ERR_NO_NAME);
1569 }
1570
1571 /** 
1572  * tipc_send_buf2name - send message buffer to port name
1573  */
1574
1575 int tipc_send_buf2name(u32 ref, 
1576                        struct tipc_name const *dest, 
1577                        u32 domain,
1578                        struct sk_buff *buf, 
1579                        unsigned int dsz)
1580 {
1581         struct tipc_portid orig;
1582
1583         orig.ref = ref;
1584         orig.node = tipc_own_addr;
1585         return tipc_forward_buf2name(ref, dest, domain, buf, dsz, &orig,
1586                                      TIPC_PORT_IMPORTANCE);
1587 }
1588
1589 /** 
1590  * tipc_forward2port - forward message sections to port identity
1591  */
1592
1593 int tipc_forward2port(u32 ref,
1594                       struct tipc_portid const *dest,
1595                       unsigned int num_sect, 
1596                       struct iovec const *msg_sect,
1597                       struct tipc_portid const *orig, 
1598                       unsigned int importance)
1599 {
1600         struct port *p_ptr;
1601         struct tipc_msg *msg;
1602         int res;
1603
1604         p_ptr = port_deref(ref);
1605         if (!p_ptr || p_ptr->publ.connected)
1606                 return -EINVAL;
1607
1608         msg = &p_ptr->publ.phdr;
1609         msg_set_type(msg, TIPC_DIRECT_MSG);
1610         msg_set_orignode(msg, orig->node);
1611         msg_set_origport(msg, orig->ref);
1612         msg_set_destnode(msg, dest->node);
1613         msg_set_destport(msg, dest->ref);
1614         msg_set_hdr_sz(msg, DIR_MSG_H_SIZE);
1615         if (importance <= TIPC_CRITICAL_IMPORTANCE)
1616                 msg_set_importance(msg, importance);
1617         p_ptr->sent++;
1618         if (dest->node == tipc_own_addr)
1619                 return port_recv_sections(p_ptr, num_sect, msg_sect);
1620         res = link_send_sections_fast(p_ptr, msg_sect, num_sect, dest->node);
1621         if (likely(res != -ELINKCONG))
1622                 return res;
1623         if (port_unreliable(p_ptr)) {
1624                 /* Just calculate msg length and return */
1625                 return msg_calc_data_size(msg_sect, num_sect);
1626         }
1627         return -ELINKCONG;
1628 }
1629
1630 /** 
1631  * tipc_send2port - send message sections to port identity 
1632  */
1633
1634 int tipc_send2port(u32 ref, 
1635                    struct tipc_portid const *dest,
1636                    unsigned int num_sect, 
1637                    struct iovec const *msg_sect)
1638 {
1639         struct tipc_portid orig;
1640
1641         orig.ref = ref;
1642         orig.node = tipc_own_addr;
1643         return tipc_forward2port(ref, dest, num_sect, msg_sect, &orig, 
1644                                  TIPC_PORT_IMPORTANCE);
1645 }
1646
1647 /** 
1648  * tipc_forward_buf2port - forward message buffer to port identity
1649  */
1650 int tipc_forward_buf2port(u32 ref,
1651                           struct tipc_portid const *dest,
1652                           struct sk_buff *buf,
1653                           unsigned int dsz,
1654                           struct tipc_portid const *orig,
1655                           unsigned int importance)
1656 {
1657         struct port *p_ptr;
1658         struct tipc_msg *msg;
1659         int res;
1660
1661         p_ptr = (struct port *)ref_deref(ref);
1662         if (!p_ptr || p_ptr->publ.connected)
1663                 return -EINVAL;
1664
1665         msg = &p_ptr->publ.phdr;
1666         msg_set_type(msg, TIPC_DIRECT_MSG);
1667         msg_set_orignode(msg, orig->node);
1668         msg_set_origport(msg, orig->ref);
1669         msg_set_destnode(msg, dest->node);
1670         msg_set_destport(msg, dest->ref);
1671         msg_set_hdr_sz(msg, DIR_MSG_H_SIZE);
1672         if (importance <= TIPC_CRITICAL_IMPORTANCE)
1673                 msg_set_importance(msg, importance);
1674         msg_set_size(msg, DIR_MSG_H_SIZE + dsz);
1675         if (skb_cow(buf, DIR_MSG_H_SIZE))
1676                 return -ENOMEM;
1677
1678         skb_push(buf, DIR_MSG_H_SIZE);
1679         memcpy(buf->data, (unchar *)msg, DIR_MSG_H_SIZE);
1680         msg_dbg(msg, "buf2port: ");
1681         p_ptr->sent++;
1682         if (dest->node == tipc_own_addr)
1683                 return port_recv_msg(buf);
1684         res = tipc_send_buf_fast(buf, dest->node);
1685         if (likely(res != -ELINKCONG))
1686                 return res;
1687         if (port_unreliable(p_ptr))
1688                 return dsz;
1689         return -ELINKCONG;
1690 }
1691
1692 /** 
1693  * tipc_send_buf2port - send message buffer to port identity
1694  */
1695
1696 int tipc_send_buf2port(u32 ref, 
1697                        struct tipc_portid const *dest,
1698                        struct sk_buff *buf, 
1699                        unsigned int dsz)
1700 {
1701         struct tipc_portid orig;
1702
1703         orig.ref = ref;
1704         orig.node = tipc_own_addr;
1705         return tipc_forward_buf2port(ref, dest, buf, dsz, &orig, 
1706                                      TIPC_PORT_IMPORTANCE);
1707 }
1708