1 /******************************************************************************
 
   2 *******************************************************************************
 
   4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
 
   5 **  Copyright (C) 2004-2006 Red Hat, Inc.  All rights reserved.
 
   7 **  This copyrighted material is made available to anyone wishing to use,
 
   8 **  modify, copy, or redistribute it subject to the terms and conditions
 
   9 **  of the GNU General Public License v.2.
 
  11 *******************************************************************************
 
  12 ******************************************************************************/
 
  17  * This is the "low-level" comms layer.
 
  19  * It is responsible for sending/receiving messages
 
  20  * from other nodes in the cluster.
 
  22  * Cluster nodes are referred to by their nodeids. nodeids are
 
  23  * simply 32 bit numbers to the locking module - if they need to
 
  24  * be expanded for the cluster infrastructure then that is it's
 
  25  * responsibility. It is this layer's
 
  26  * responsibility to resolve these into IP address or
 
  27  * whatever it needs for inter-node communication.
 
  29  * The comms level is two kernel threads that deal mainly with
 
  30  * the receiving of messages from other nodes and passing them
 
  31  * up to the mid-level comms layer (which understands the
 
  32  * message format) for execution by the locking core, and
 
  33  * a send thread which does all the setting up of connections
 
  34  * to remote nodes and the sending of data. Threads are not allowed
 
  35  * to send their own data because it may cause them to wait in times
 
  36  * of high load. Also, this way, the sending thread can collect together
 
  37  * messages bound for one node and send them in one block.
 
  39  * I don't see any problem with the recv thread executing the locking
 
  40  * code on behalf of remote processes as the locking code is
 
  41  * short, efficient and never (well, hardly ever) waits.
 
  45 #include <asm/ioctls.h>
 
  48 #include <net/sctp/user.h>
 
  49 #include <linux/pagemap.h>
 
  50 #include <linux/socket.h>
 
  51 #include <linux/idr.h>
 
  53 #include "dlm_internal.h"
 
  58 static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
 
  59 static int                      dlm_local_count;
 
  60 static int                      dlm_local_nodeid;
 
  62 /* One of these per connected node */
 
  64 #define NI_INIT_PENDING 1
 
  65 #define NI_WRITE_PENDING 2
 
  69         sctp_assoc_t            assoc_id;
 
  71         struct list_head        write_list; /* nodes with pending writes */
 
  72         struct list_head        writequeue; /* outgoing writequeue_entries */
 
  73         spinlock_t              writequeue_lock;
 
  75         struct work_struct      swork; /* Send workqueue */
 
  76         struct work_struct      lwork; /* Locking workqueue */
 
  79 static DEFINE_IDR(nodeinfo_idr);
 
  80 static DECLARE_RWSEM(nodeinfo_lock);
 
  81 static int max_nodeid;
 
  89 /* Just the one of these, now. But this struct keeps
 
  90    the connection-specific variables together */
 
  92 #define CF_READ_PENDING 1
 
  98         atomic_t                waiting_requests;
 
 101         struct work_struct      work; /* Send workqueue */
 
 104 /* An entry waiting to be sent */
 
 106 struct writequeue_entry {
 
 107         struct list_head        list;
 
 116 static void cbuf_add(struct cbuf *cb, int n)
 
 121 static int cbuf_data(struct cbuf *cb)
 
 123         return ((cb->base + cb->len) & cb->mask);
 
 126 static void cbuf_init(struct cbuf *cb, int size)
 
 128         cb->base = cb->len = 0;
 
 132 static void cbuf_eat(struct cbuf *cb, int n)
 
 136         cb->base &= cb->mask;
 
 139 /* List of nodes which have writes pending */
 
 140 static LIST_HEAD(write_nodes);
 
 141 static DEFINE_SPINLOCK(write_nodes_lock);
 
 144 /* Maximum number of incoming messages to process before
 
 147 #define MAX_RX_MSG_COUNT 25
 
 150 static struct workqueue_struct *recv_workqueue;
 
 151 static struct workqueue_struct *send_workqueue;
 
 152 static struct workqueue_struct *lock_workqueue;
 
 154 /* The SCTP connection */
 
 155 static struct connection sctp_con;
 
 157 static void process_send_sockets(struct work_struct *work);
 
 158 static void process_recv_sockets(struct work_struct *work);
 
 159 static void process_lock_request(struct work_struct *work);
 
 161 static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr)
 
 163         struct sockaddr_storage addr;
 
 166         if (!dlm_local_count)
 
 169         error = dlm_nodeid_to_addr(nodeid, &addr);
 
 173         if (dlm_local_addr[0]->ss_family == AF_INET) {
 
 174                 struct sockaddr_in *in4  = (struct sockaddr_in *) &addr;
 
 175                 struct sockaddr_in *ret4 = (struct sockaddr_in *) retaddr;
 
 176                 ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
 
 178                 struct sockaddr_in6 *in6  = (struct sockaddr_in6 *) &addr;
 
 179                 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) retaddr;
 
 180                 memcpy(&ret6->sin6_addr, &in6->sin6_addr,
 
 181                        sizeof(in6->sin6_addr));
 
 187 /* If alloc is 0 here we will not attempt to allocate a new
 
 189 static struct nodeinfo *nodeid2nodeinfo(int nodeid, gfp_t alloc)
 
 195         down_read(&nodeinfo_lock);
 
 196         ni = idr_find(&nodeinfo_idr, nodeid);
 
 197         up_read(&nodeinfo_lock);
 
 202         down_write(&nodeinfo_lock);
 
 204         ni = idr_find(&nodeinfo_idr, nodeid);
 
 208         r = idr_pre_get(&nodeinfo_idr, alloc);
 
 212         ni = kmalloc(sizeof(struct nodeinfo), alloc);
 
 216         r = idr_get_new_above(&nodeinfo_idr, ni, nodeid, &n);
 
 223                 idr_remove(&nodeinfo_idr, n);
 
 228         memset(ni, 0, sizeof(struct nodeinfo));
 
 229         spin_lock_init(&ni->lock);
 
 230         INIT_LIST_HEAD(&ni->writequeue);
 
 231         spin_lock_init(&ni->writequeue_lock);
 
 232         INIT_WORK(&ni->lwork, process_lock_request);
 
 233         INIT_WORK(&ni->swork, process_send_sockets);
 
 236         if (nodeid > max_nodeid)
 
 239         up_write(&nodeinfo_lock);
 
 244 /* Don't call this too often... */
 
 245 static struct nodeinfo *assoc2nodeinfo(sctp_assoc_t assoc)
 
 250         for (i=1; i<=max_nodeid; i++) {
 
 251                 ni = nodeid2nodeinfo(i, 0);
 
 252                 if (ni && ni->assoc_id == assoc)
 
 258 /* Data or notification available on socket */
 
 259 static void lowcomms_data_ready(struct sock *sk, int count_unused)
 
 261         if (test_and_set_bit(CF_READ_PENDING, &sctp_con.flags))
 
 262                 queue_work(recv_workqueue, &sctp_con.work);
 
 266 /* Add the port number to an IP6 or 4 sockaddr and return the address length.
 
 267    Also padd out the struct with zeros to make comparisons meaningful */
 
 269 static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
 
 272         struct sockaddr_in *local4_addr;
 
 273         struct sockaddr_in6 *local6_addr;
 
 275         if (!dlm_local_count)
 
 279                 if (dlm_local_addr[0]->ss_family == AF_INET) {
 
 280                         local4_addr = (struct sockaddr_in *)dlm_local_addr[0];
 
 281                         port = be16_to_cpu(local4_addr->sin_port);
 
 283                         local6_addr = (struct sockaddr_in6 *)dlm_local_addr[0];
 
 284                         port = be16_to_cpu(local6_addr->sin6_port);
 
 288         saddr->ss_family = dlm_local_addr[0]->ss_family;
 
 289         if (dlm_local_addr[0]->ss_family == AF_INET) {
 
 290                 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
 
 291                 in4_addr->sin_port = cpu_to_be16(port);
 
 292                 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
 
 293                 memset(in4_addr+1, 0, sizeof(struct sockaddr_storage) -
 
 294                        sizeof(struct sockaddr_in));
 
 295                 *addr_len = sizeof(struct sockaddr_in);
 
 297                 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
 
 298                 in6_addr->sin6_port = cpu_to_be16(port);
 
 299                 memset(in6_addr+1, 0, sizeof(struct sockaddr_storage) -
 
 300                        sizeof(struct sockaddr_in6));
 
 301                 *addr_len = sizeof(struct sockaddr_in6);
 
 305 /* Close the connection and tidy up */
 
 306 static void close_connection(void)
 
 309                 sock_release(sctp_con.sock);
 
 310                 sctp_con.sock = NULL;
 
 313         if (sctp_con.rx_page) {
 
 314                 __free_page(sctp_con.rx_page);
 
 315                 sctp_con.rx_page = NULL;
 
 319 /* We only send shutdown messages to nodes that are not part of the cluster */
 
 320 static void send_shutdown(sctp_assoc_t associd)
 
 322         static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
 
 323         struct msghdr outmessage;
 
 324         struct cmsghdr *cmsg;
 
 325         struct sctp_sndrcvinfo *sinfo;
 
 328         outmessage.msg_name = NULL;
 
 329         outmessage.msg_namelen = 0;
 
 330         outmessage.msg_control = outcmsg;
 
 331         outmessage.msg_controllen = sizeof(outcmsg);
 
 332         outmessage.msg_flags = MSG_EOR;
 
 334         cmsg = CMSG_FIRSTHDR(&outmessage);
 
 335         cmsg->cmsg_level = IPPROTO_SCTP;
 
 336         cmsg->cmsg_type = SCTP_SNDRCV;
 
 337         cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
 
 338         outmessage.msg_controllen = cmsg->cmsg_len;
 
 339         sinfo = CMSG_DATA(cmsg);
 
 340         memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
 
 342         sinfo->sinfo_flags |= MSG_EOF;
 
 343         sinfo->sinfo_assoc_id = associd;
 
 345         ret = kernel_sendmsg(sctp_con.sock, &outmessage, NULL, 0, 0);
 
 348                 log_print("send EOF to node failed: %d", ret);
 
 352 /* INIT failed but we don't know which node...
 
 353    restart INIT on all pending nodes */
 
 354 static void init_failed(void)
 
 359         for (i=1; i<=max_nodeid; i++) {
 
 360                 ni = nodeid2nodeinfo(i, 0);
 
 364                 if (test_and_clear_bit(NI_INIT_PENDING, &ni->flags)) {
 
 366                         if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
 
 367                                 spin_lock_bh(&write_nodes_lock);
 
 368                                 list_add_tail(&ni->write_list, &write_nodes);
 
 369                                 spin_unlock_bh(&write_nodes_lock);
 
 370                                 queue_work(send_workqueue, &ni->swork);
 
 376 /* Something happened to an association */
 
 377 static void process_sctp_notification(struct msghdr *msg, char *buf)
 
 379         union sctp_notification *sn = (union sctp_notification *)buf;
 
 381         if (sn->sn_header.sn_type == SCTP_ASSOC_CHANGE) {
 
 382                 switch (sn->sn_assoc_change.sac_state) {
 
 387                         /* Check that the new node is in the lockspace */
 
 388                         struct sctp_prim prim;
 
 395                         /* This seems to happen when we received a connection
 
 396                          * too early... or something...  anyway, it happens but
 
 397                          * we always seem to get a real message too, see
 
 398                          * receive_from_sock */
 
 400                         if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) {
 
 401                                 log_print("COMM_UP for invalid assoc ID %d",
 
 402                                           (int)sn->sn_assoc_change.sac_assoc_id);
 
 406                         memset(&prim, 0, sizeof(struct sctp_prim));
 
 407                         prim_len = sizeof(struct sctp_prim);
 
 408                         prim.ssp_assoc_id = sn->sn_assoc_change.sac_assoc_id;
 
 412                         ret = sctp_con.sock->ops->getsockopt(sctp_con.sock,
 
 421                                 log_print("getsockopt/sctp_primary_addr on "
 
 422                                           "new assoc %d failed : %d",
 
 423                                           (int)sn->sn_assoc_change.sac_assoc_id,
 
 426                                 /* Retry INIT later */
 
 427                                 ni = assoc2nodeinfo(sn->sn_assoc_change.sac_assoc_id);
 
 429                                         clear_bit(NI_INIT_PENDING, &ni->flags);
 
 432                         make_sockaddr(&prim.ssp_addr, 0, &addr_len);
 
 433                         if (dlm_addr_to_nodeid(&prim.ssp_addr, &nodeid)) {
 
 434                                 log_print("reject connect from unknown addr");
 
 435                                 send_shutdown(prim.ssp_assoc_id);
 
 439                         ni = nodeid2nodeinfo(nodeid, GFP_KERNEL);
 
 443                         /* Save the assoc ID */
 
 444                         ni->assoc_id = sn->sn_assoc_change.sac_assoc_id;
 
 446                         log_print("got new/restarted association %d nodeid %d",
 
 447                                   (int)sn->sn_assoc_change.sac_assoc_id, nodeid);
 
 449                         /* Send any pending writes */
 
 450                         clear_bit(NI_INIT_PENDING, &ni->flags);
 
 451                         if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
 
 452                                 spin_lock_bh(&write_nodes_lock);
 
 453                                 list_add_tail(&ni->write_list, &write_nodes);
 
 454                                 spin_unlock_bh(&write_nodes_lock);
 
 455                                 queue_work(send_workqueue, &ni->swork);
 
 461                 case SCTP_SHUTDOWN_COMP:
 
 465                         ni = assoc2nodeinfo(sn->sn_assoc_change.sac_assoc_id);
 
 467                                 spin_lock(&ni->lock);
 
 469                                 spin_unlock(&ni->lock);
 
 474                 /* We don't know which INIT failed, so clear the PENDING flags
 
 475                  * on them all.  if assoc_id is zero then it will then try
 
 478                 case SCTP_CANT_STR_ASSOC:
 
 480                         log_print("Can't start SCTP association - retrying");
 
 486                         log_print("unexpected SCTP assoc change id=%d state=%d",
 
 487                                   (int)sn->sn_assoc_change.sac_assoc_id,
 
 488                                   sn->sn_assoc_change.sac_state);
 
 493 /* Data received from remote end */
 
 494 static int receive_from_sock(void)
 
 501         struct sctp_sndrcvinfo *sinfo;
 
 502         struct cmsghdr *cmsg;
 
 505         /* These two are marginally too big for stack allocation, but this
 
 506          * function is (currently) only called by dlm_recvd so static should be
 
 509         static struct sockaddr_storage msgname;
 
 510         static char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
 
 512         if (sctp_con.sock == NULL)
 
 515         if (sctp_con.rx_page == NULL) {
 
 517                  * This doesn't need to be atomic, but I think it should
 
 518                  * improve performance if it is.
 
 520                 sctp_con.rx_page = alloc_page(GFP_ATOMIC);
 
 521                 if (sctp_con.rx_page == NULL)
 
 523                 cbuf_init(&sctp_con.cb, PAGE_CACHE_SIZE);
 
 526         memset(&incmsg, 0, sizeof(incmsg));
 
 527         memset(&msgname, 0, sizeof(msgname));
 
 529         msg.msg_name = &msgname;
 
 530         msg.msg_namelen = sizeof(msgname);
 
 532         msg.msg_control = incmsg;
 
 533         msg.msg_controllen = sizeof(incmsg);
 
 536         /* I don't see why this circular buffer stuff is necessary for SCTP
 
 537          * which is a packet-based protocol, but the whole thing breaks under
 
 538          * load without it! The overhead is minimal (and is in the TCP lowcomms
 
 539          * anyway, of course) so I'll leave it in until I can figure out what's
 
 544          * iov[0] is the bit of the circular buffer between the current end
 
 545          * point (cb.base + cb.len) and the end of the buffer.
 
 547         iov[0].iov_len = sctp_con.cb.base - cbuf_data(&sctp_con.cb);
 
 548         iov[0].iov_base = page_address(sctp_con.rx_page) +
 
 549                 cbuf_data(&sctp_con.cb);
 
 553          * iov[1] is the bit of the circular buffer between the start of the
 
 554          * buffer and the start of the currently used section (cb.base)
 
 556         if (cbuf_data(&sctp_con.cb) >= sctp_con.cb.base) {
 
 557                 iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&sctp_con.cb);
 
 558                 iov[1].iov_len = sctp_con.cb.base;
 
 559                 iov[1].iov_base = page_address(sctp_con.rx_page);
 
 562         len = iov[0].iov_len + iov[1].iov_len;
 
 564         r = ret = kernel_recvmsg(sctp_con.sock, &msg, iov, msg.msg_iovlen, len,
 
 565                                  MSG_NOSIGNAL | MSG_DONTWAIT);
 
 569         msg.msg_control = incmsg;
 
 570         msg.msg_controllen = sizeof(incmsg);
 
 571         cmsg = CMSG_FIRSTHDR(&msg);
 
 572         sinfo = CMSG_DATA(cmsg);
 
 574         if (msg.msg_flags & MSG_NOTIFICATION) {
 
 575                 process_sctp_notification(&msg, page_address(sctp_con.rx_page));
 
 579         /* Is this a new association ? */
 
 580         ni = nodeid2nodeinfo(le32_to_cpu(sinfo->sinfo_ppid), GFP_KERNEL);
 
 582                 ni->assoc_id = sinfo->sinfo_assoc_id;
 
 583                 if (test_and_clear_bit(NI_INIT_PENDING, &ni->flags)) {
 
 585                         if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
 
 586                                 spin_lock_bh(&write_nodes_lock);
 
 587                                 list_add_tail(&ni->write_list, &write_nodes);
 
 588                                 spin_unlock_bh(&write_nodes_lock);
 
 589                                 queue_work(send_workqueue, &ni->swork);
 
 594         /* INIT sends a message with length of 1 - ignore it */
 
 598         cbuf_add(&sctp_con.cb, ret);
 
 599         // PJC: TODO: Add to node's workqueue....can we ??
 
 600         ret = dlm_process_incoming_buffer(cpu_to_le32(sinfo->sinfo_ppid),
 
 601                                           page_address(sctp_con.rx_page),
 
 602                                           sctp_con.cb.base, sctp_con.cb.len,
 
 606         cbuf_eat(&sctp_con.cb, ret);
 
 613         lowcomms_data_ready(sctp_con.sock->sk, 0);
 
 620                 log_print("error reading from sctp socket: %d", ret);
 
 625 /* Bind to an IP address. SCTP allows multiple address so it can do multi-homing */
 
 626 static int add_bind_addr(struct sockaddr_storage *addr, int addr_len, int num)
 
 634                 result = sctp_con.sock->ops->bind(sctp_con.sock,
 
 635                                                   (struct sockaddr *) addr,
 
 638                 result = sctp_con.sock->ops->setsockopt(sctp_con.sock, SOL_SCTP,
 
 639                                                         SCTP_SOCKOPT_BINDX_ADD,
 
 640                                                         (char *)addr, addr_len);
 
 644                 log_print("Can't bind to port %d addr number %d",
 
 645                           dlm_config.ci_tcp_port, num);
 
 650 static void init_local(void)
 
 652         struct sockaddr_storage sas, *addr;
 
 655         dlm_local_nodeid = dlm_our_nodeid();
 
 657         for (i = 0; i < DLM_MAX_ADDR_COUNT - 1; i++) {
 
 658                 if (dlm_our_addr(&sas, i))
 
 661                 addr = kmalloc(sizeof(*addr), GFP_KERNEL);
 
 664                 memcpy(addr, &sas, sizeof(*addr));
 
 665                 dlm_local_addr[dlm_local_count++] = addr;
 
 669 /* Initialise SCTP socket and bind to all interfaces */
 
 670 static int init_sock(void)
 
 673         struct socket *sock = NULL;
 
 674         struct sockaddr_storage localaddr;
 
 675         struct sctp_event_subscribe subscribe;
 
 676         int result = -EINVAL, num = 1, i, addr_len;
 
 678         if (!dlm_local_count) {
 
 680                 if (!dlm_local_count) {
 
 681                         log_print("no local IP address has been set");
 
 686         result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_SEQPACKET,
 
 687                                   IPPROTO_SCTP, &sock);
 
 689                 log_print("Can't create comms socket, check SCTP is loaded");
 
 693         /* Listen for events */
 
 694         memset(&subscribe, 0, sizeof(subscribe));
 
 695         subscribe.sctp_data_io_event = 1;
 
 696         subscribe.sctp_association_event = 1;
 
 697         subscribe.sctp_send_failure_event = 1;
 
 698         subscribe.sctp_shutdown_event = 1;
 
 699         subscribe.sctp_partial_delivery_event = 1;
 
 703         result = sock->ops->setsockopt(sock, SOL_SCTP, SCTP_EVENTS,
 
 704                                        (char *)&subscribe, sizeof(subscribe));
 
 708                 log_print("Failed to set SCTP_EVENTS on socket: result=%d",
 
 713         /* Init con struct */
 
 714         sock->sk->sk_user_data = &sctp_con;
 
 715         sctp_con.sock = sock;
 
 716         sctp_con.sock->sk->sk_data_ready = lowcomms_data_ready;
 
 718         /* Bind to all interfaces. */
 
 719         for (i = 0; i < dlm_local_count; i++) {
 
 720                 memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
 
 721                 make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len);
 
 723                 result = add_bind_addr(&localaddr, addr_len, num);
 
 729         result = sock->ops->listen(sock, 5);
 
 731                 log_print("Can't set socket listening");
 
 739         sctp_con.sock = NULL;
 
 745 static struct writequeue_entry *new_writequeue_entry(gfp_t allocation)
 
 747         struct writequeue_entry *entry;
 
 749         entry = kmalloc(sizeof(struct writequeue_entry), allocation);
 
 753         entry->page = alloc_page(allocation);
 
 767 void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
 
 769         struct writequeue_entry *e;
 
 774         ni = nodeid2nodeinfo(nodeid, allocation);
 
 778         spin_lock(&ni->writequeue_lock);
 
 779         e = list_entry(ni->writequeue.prev, struct writequeue_entry, list);
 
 780         if ((&e->list == &ni->writequeue) ||
 
 781             (PAGE_CACHE_SIZE - e->end < len)) {
 
 788         spin_unlock(&ni->writequeue_lock);
 
 794                 *ppc = page_address(e->page) + offset;
 
 798         e = new_writequeue_entry(allocation);
 
 800                 spin_lock(&ni->writequeue_lock);
 
 805                 list_add_tail(&e->list, &ni->writequeue);
 
 806                 spin_unlock(&ni->writequeue_lock);
 
 812 void dlm_lowcomms_commit_buffer(void *arg)
 
 814         struct writequeue_entry *e = (struct writequeue_entry *) arg;
 
 816         struct nodeinfo *ni = e->ni;
 
 818         spin_lock(&ni->writequeue_lock);
 
 822         e->len = e->end - e->offset;
 
 824         spin_unlock(&ni->writequeue_lock);
 
 826         if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
 
 827                 spin_lock_bh(&write_nodes_lock);
 
 828                 list_add_tail(&ni->write_list, &write_nodes);
 
 829                 spin_unlock_bh(&write_nodes_lock);
 
 831                 queue_work(send_workqueue, &ni->swork);
 
 836         spin_unlock(&ni->writequeue_lock);
 
 840 static void free_entry(struct writequeue_entry *e)
 
 842         __free_page(e->page);
 
 846 /* Initiate an SCTP association. In theory we could just use sendmsg() on
 
 847    the first IP address and it should work, but this allows us to set up the
 
 848    association before sending any valuable data that we can't afford to lose.
 
 849    It also keeps the send path clean as it can now always use the association ID */
 
 850 static void initiate_association(int nodeid)
 
 852         struct sockaddr_storage rem_addr;
 
 853         static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
 
 854         struct msghdr outmessage;
 
 855         struct cmsghdr *cmsg;
 
 856         struct sctp_sndrcvinfo *sinfo;
 
 863         log_print("Initiating association with node %d", nodeid);
 
 865         ni = nodeid2nodeinfo(nodeid, GFP_KERNEL);
 
 869         if (nodeid_to_addr(nodeid, (struct sockaddr *)&rem_addr)) {
 
 870                 log_print("no address for nodeid %d", nodeid);
 
 874         make_sockaddr(&rem_addr, dlm_config.ci_tcp_port, &addrlen);
 
 876         outmessage.msg_name = &rem_addr;
 
 877         outmessage.msg_namelen = addrlen;
 
 878         outmessage.msg_control = outcmsg;
 
 879         outmessage.msg_controllen = sizeof(outcmsg);
 
 880         outmessage.msg_flags = MSG_EOR;
 
 882         iov[0].iov_base = buf;
 
 885         /* Real INIT messages seem to cause trouble. Just send a 1 byte message
 
 886            we can afford to lose */
 
 887         cmsg = CMSG_FIRSTHDR(&outmessage);
 
 888         cmsg->cmsg_level = IPPROTO_SCTP;
 
 889         cmsg->cmsg_type = SCTP_SNDRCV;
 
 890         cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
 
 891         sinfo = CMSG_DATA(cmsg);
 
 892         memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
 
 893         sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid);
 
 895         outmessage.msg_controllen = cmsg->cmsg_len;
 
 896         ret = kernel_sendmsg(sctp_con.sock, &outmessage, iov, 1, 1);
 
 898                 log_print("send INIT to node failed: %d", ret);
 
 899                 /* Try again later */
 
 900                 clear_bit(NI_INIT_PENDING, &ni->flags);
 
 905 static void send_to_sock(struct nodeinfo *ni)
 
 908         struct writequeue_entry *e;
 
 910         struct msghdr outmsg;
 
 911         static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
 
 912         struct cmsghdr *cmsg;
 
 913         struct sctp_sndrcvinfo *sinfo;
 
 916         /* See if we need to init an association before we start
 
 917            sending precious messages */
 
 918         spin_lock(&ni->lock);
 
 919         if (!ni->assoc_id && !test_and_set_bit(NI_INIT_PENDING, &ni->flags)) {
 
 920                 spin_unlock(&ni->lock);
 
 921                 initiate_association(ni->nodeid);
 
 924         spin_unlock(&ni->lock);
 
 926         outmsg.msg_name = NULL; /* We use assoc_id */
 
 927         outmsg.msg_namelen = 0;
 
 928         outmsg.msg_control = outcmsg;
 
 929         outmsg.msg_controllen = sizeof(outcmsg);
 
 930         outmsg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL | MSG_EOR;
 
 932         cmsg = CMSG_FIRSTHDR(&outmsg);
 
 933         cmsg->cmsg_level = IPPROTO_SCTP;
 
 934         cmsg->cmsg_type = SCTP_SNDRCV;
 
 935         cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
 
 936         sinfo = CMSG_DATA(cmsg);
 
 937         memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
 
 938         sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid);
 
 939         sinfo->sinfo_assoc_id = ni->assoc_id;
 
 940         outmsg.msg_controllen = cmsg->cmsg_len;
 
 942         spin_lock(&ni->writequeue_lock);
 
 944                 if (list_empty(&ni->writequeue))
 
 946                 e = list_entry(ni->writequeue.next, struct writequeue_entry,
 
 950                 BUG_ON(len == 0 && e->users == 0);
 
 951                 spin_unlock(&ni->writequeue_lock);
 
 956                         iov.iov_base = page_address(e->page)+offset;
 
 959                         ret = kernel_sendmsg(sctp_con.sock, &outmsg, &iov, 1,
 
 961                         if (ret == -EAGAIN) {
 
 962                                 sctp_con.eagain_flag = 1;
 
 967                         /* Don't starve people filling buffers */
 
 971                 spin_lock(&ni->writequeue_lock);
 
 975                 if (e->len == 0 && e->users == 0) {
 
 982         spin_unlock(&ni->writequeue_lock);
 
 987         log_print("Error sending to node %d %d", ni->nodeid, ret);
 
 988         spin_lock(&ni->lock);
 
 989         if (!test_and_set_bit(NI_INIT_PENDING, &ni->flags)) {
 
 991                 spin_unlock(&ni->lock);
 
 992                 initiate_association(ni->nodeid);
 
 994                 spin_unlock(&ni->lock);
 
 999 /* Try to send any messages that are pending */
 
1000 static void process_output_queue(void)
 
1002         struct list_head *list;
 
1003         struct list_head *temp;
 
1005         spin_lock_bh(&write_nodes_lock);
 
1006         list_for_each_safe(list, temp, &write_nodes) {
 
1007                 struct nodeinfo *ni =
 
1008                         list_entry(list, struct nodeinfo, write_list);
 
1009                 clear_bit(NI_WRITE_PENDING, &ni->flags);
 
1010                 list_del(&ni->write_list);
 
1012                 spin_unlock_bh(&write_nodes_lock);
 
1015                 spin_lock_bh(&write_nodes_lock);
 
1017         spin_unlock_bh(&write_nodes_lock);
 
1020 /* Called after we've had -EAGAIN and been woken up */
 
1021 static void refill_write_queue(void)
 
1025         for (i=1; i<=max_nodeid; i++) {
 
1026                 struct nodeinfo *ni = nodeid2nodeinfo(i, 0);
 
1029                         if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
 
1030                                 spin_lock_bh(&write_nodes_lock);
 
1031                                 list_add_tail(&ni->write_list, &write_nodes);
 
1032                                 spin_unlock_bh(&write_nodes_lock);
 
1038 static void clean_one_writequeue(struct nodeinfo *ni)
 
1040         struct list_head *list;
 
1041         struct list_head *temp;
 
1043         spin_lock(&ni->writequeue_lock);
 
1044         list_for_each_safe(list, temp, &ni->writequeue) {
 
1045                 struct writequeue_entry *e =
 
1046                         list_entry(list, struct writequeue_entry, list);
 
1050         spin_unlock(&ni->writequeue_lock);
 
1053 static void clean_writequeues(void)
 
1057         for (i=1; i<=max_nodeid; i++) {
 
1058                 struct nodeinfo *ni = nodeid2nodeinfo(i, 0);
 
1060                         clean_one_writequeue(ni);
 
1065 static void dealloc_nodeinfo(void)
 
1069         for (i=1; i<=max_nodeid; i++) {
 
1070                 struct nodeinfo *ni = nodeid2nodeinfo(i, 0);
 
1072                         idr_remove(&nodeinfo_idr, i);
 
1078 int dlm_lowcomms_close(int nodeid)
 
1080         struct nodeinfo *ni;
 
1082         ni = nodeid2nodeinfo(nodeid, 0);
 
1086         spin_lock(&ni->lock);
 
1089                 /* Don't send shutdown here, sctp will just queue it
 
1090                    till the node comes back up! */
 
1092         spin_unlock(&ni->lock);
 
1094         clean_one_writequeue(ni);
 
1095         clear_bit(NI_INIT_PENDING, &ni->flags);
 
1099 // PJC: The work queue function for receiving.
 
1100 static void process_recv_sockets(struct work_struct *work)
 
1102         if (test_and_clear_bit(CF_READ_PENDING, &sctp_con.flags)) {
 
1107                         ret = receive_from_sock();
 
1109                         /* Don't starve out everyone else */
 
1110                         if (++count >= MAX_RX_MSG_COUNT) {
 
1114                 } while (!kthread_should_stop() && ret >=0);
 
1119 // PJC: the work queue function for sending
 
1120 static void process_send_sockets(struct work_struct *work)
 
1122         if (sctp_con.eagain_flag) {
 
1123                 sctp_con.eagain_flag = 0;
 
1124                 refill_write_queue();
 
1126         process_output_queue();
 
1129 // PJC: Process lock requests from a particular node.
 
1130 // TODO: can we optimise this out on UP ??
 
1131 static void process_lock_request(struct work_struct *work)
 
1135 static void daemons_stop(void)
 
1137         destroy_workqueue(recv_workqueue);
 
1138         destroy_workqueue(send_workqueue);
 
1139         destroy_workqueue(lock_workqueue);
 
1142 static int daemons_start(void)
 
1145         recv_workqueue = create_workqueue("dlm_recv");
 
1146         error = IS_ERR(recv_workqueue);
 
1148                 log_print("can't start dlm_recv %d", error);
 
1152         send_workqueue = create_singlethread_workqueue("dlm_send");
 
1153         error = IS_ERR(send_workqueue);
 
1155                 log_print("can't start dlm_send %d", error);
 
1156                 destroy_workqueue(recv_workqueue);
 
1160         lock_workqueue = create_workqueue("dlm_rlock");
 
1161         error = IS_ERR(lock_workqueue);
 
1163                 log_print("can't start dlm_rlock %d", error);
 
1164                 destroy_workqueue(send_workqueue);
 
1165                 destroy_workqueue(recv_workqueue);
 
1173  * This is quite likely to sleep...
 
1175 int dlm_lowcomms_start(void)
 
1179         INIT_WORK(&sctp_con.work, process_recv_sockets);
 
1181         error = init_sock();
 
1184         error = daemons_start();
 
1194 void dlm_lowcomms_stop(void)
 
1198         sctp_con.flags = 0x7;
 
1200         clean_writequeues();
 
1205         dlm_local_count = 0;
 
1206         dlm_local_nodeid = 0;
 
1208         for (i = 0; i < dlm_local_count; i++)
 
1209                 kfree(dlm_local_addr[i]);