1 /******************************************************************************
2 *******************************************************************************
4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5 ** Copyright (C) 2004-2006 Red Hat, Inc. All rights reserved.
7 ** This copyrighted material is made available to anyone wishing to use,
8 ** modify, copy, or redistribute it subject to the terms and conditions
9 ** of the GNU General Public License v.2.
11 *******************************************************************************
12 ******************************************************************************/
17 * This is the "low-level" comms layer.
19 * It is responsible for sending/receiving messages
20 * from other nodes in the cluster.
22 * Cluster nodes are referred to by their nodeids. nodeids are
23 * simply 32 bit numbers to the locking module - if they need to
24 * be expanded for the cluster infrastructure then that is it's
25 * responsibility. It is this layer's
26 * responsibility to resolve these into IP address or
27 * whatever it needs for inter-node communication.
29 * The comms level is two kernel threads that deal mainly with
30 * the receiving of messages from other nodes and passing them
31 * up to the mid-level comms layer (which understands the
32 * message format) for execution by the locking core, and
33 * a send thread which does all the setting up of connections
34 * to remote nodes and the sending of data. Threads are not allowed
35 * to send their own data because it may cause them to wait in times
36 * of high load. Also, this way, the sending thread can collect together
37 * messages bound for one node and send them in one block.
39 * I don't see any problem with the recv thread executing the locking
40 * code on behalf of remote processes as the locking code is
41 * short, efficient and never (well, hardly ever) waits.
45 #include <asm/ioctls.h>
48 #include <net/sctp/user.h>
49 #include <linux/pagemap.h>
50 #include <linux/socket.h>
51 #include <linux/idr.h>
53 #include "dlm_internal.h"
58 static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
59 static int dlm_local_count;
60 static int dlm_local_nodeid;
62 /* One of these per connected node */
64 #define NI_INIT_PENDING 1
65 #define NI_WRITE_PENDING 2
69 sctp_assoc_t assoc_id;
71 struct list_head write_list; /* nodes with pending writes */
72 struct list_head writequeue; /* outgoing writequeue_entries */
73 spinlock_t writequeue_lock;
77 static DEFINE_IDR(nodeinfo_idr);
78 static DECLARE_RWSEM(nodeinfo_lock);
79 static int max_nodeid;
87 /* Just the one of these, now. But this struct keeps
88 the connection-specific variables together */
90 #define CF_READ_PENDING 1
96 atomic_t waiting_requests;
101 /* An entry waiting to be sent */
103 struct writequeue_entry {
104 struct list_head list;
113 static void cbuf_add(struct cbuf *cb, int n)
118 static int cbuf_data(struct cbuf *cb)
120 return ((cb->base + cb->len) & cb->mask);
123 static void cbuf_init(struct cbuf *cb, int size)
125 cb->base = cb->len = 0;
129 static void cbuf_eat(struct cbuf *cb, int n)
133 cb->base &= cb->mask;
136 /* List of nodes which have writes pending */
137 static LIST_HEAD(write_nodes);
138 static DEFINE_SPINLOCK(write_nodes_lock);
140 /* Maximum number of incoming messages to process before
143 #define MAX_RX_MSG_COUNT 25
146 static struct task_struct *recv_task;
147 static struct task_struct *send_task;
148 static DECLARE_WAIT_QUEUE_HEAD(lowcomms_recv_wait);
150 /* The SCTP connection */
151 static struct connection sctp_con;
154 static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr)
156 struct sockaddr_storage addr;
159 if (!dlm_local_count)
162 error = dlm_nodeid_to_addr(nodeid, &addr);
166 if (dlm_local_addr[0]->ss_family == AF_INET) {
167 struct sockaddr_in *in4 = (struct sockaddr_in *) &addr;
168 struct sockaddr_in *ret4 = (struct sockaddr_in *) retaddr;
169 ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
171 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &addr;
172 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) retaddr;
173 memcpy(&ret6->sin6_addr, &in6->sin6_addr,
174 sizeof(in6->sin6_addr));
180 /* If alloc is 0 here we will not attempt to allocate a new
182 static struct nodeinfo *nodeid2nodeinfo(int nodeid, gfp_t alloc)
188 down_read(&nodeinfo_lock);
189 ni = idr_find(&nodeinfo_idr, nodeid);
190 up_read(&nodeinfo_lock);
195 down_write(&nodeinfo_lock);
197 ni = idr_find(&nodeinfo_idr, nodeid);
201 r = idr_pre_get(&nodeinfo_idr, alloc);
205 ni = kmalloc(sizeof(struct nodeinfo), alloc);
209 r = idr_get_new_above(&nodeinfo_idr, ni, nodeid, &n);
216 idr_remove(&nodeinfo_idr, n);
221 memset(ni, 0, sizeof(struct nodeinfo));
222 spin_lock_init(&ni->lock);
223 INIT_LIST_HEAD(&ni->writequeue);
224 spin_lock_init(&ni->writequeue_lock);
227 if (nodeid > max_nodeid)
230 up_write(&nodeinfo_lock);
235 /* Don't call this too often... */
236 static struct nodeinfo *assoc2nodeinfo(sctp_assoc_t assoc)
241 for (i=1; i<=max_nodeid; i++) {
242 ni = nodeid2nodeinfo(i, 0);
243 if (ni && ni->assoc_id == assoc)
249 /* Data or notification available on socket */
250 static void lowcomms_data_ready(struct sock *sk, int count_unused)
252 atomic_inc(&sctp_con.waiting_requests);
253 if (test_and_set_bit(CF_READ_PENDING, &sctp_con.flags))
256 wake_up_interruptible(&lowcomms_recv_wait);
260 /* Add the port number to an IP6 or 4 sockaddr and return the address length.
261 Also padd out the struct with zeros to make comparisons meaningful */
263 static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
266 struct sockaddr_in *local4_addr;
267 struct sockaddr_in6 *local6_addr;
269 if (!dlm_local_count)
273 if (dlm_local_addr[0]->ss_family == AF_INET) {
274 local4_addr = (struct sockaddr_in *)dlm_local_addr[0];
275 port = be16_to_cpu(local4_addr->sin_port);
277 local6_addr = (struct sockaddr_in6 *)dlm_local_addr[0];
278 port = be16_to_cpu(local6_addr->sin6_port);
282 saddr->ss_family = dlm_local_addr[0]->ss_family;
283 if (dlm_local_addr[0]->ss_family == AF_INET) {
284 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
285 in4_addr->sin_port = cpu_to_be16(port);
286 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
287 memset(in4_addr+1, 0, sizeof(struct sockaddr_storage) -
288 sizeof(struct sockaddr_in));
289 *addr_len = sizeof(struct sockaddr_in);
291 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
292 in6_addr->sin6_port = cpu_to_be16(port);
293 memset(in6_addr+1, 0, sizeof(struct sockaddr_storage) -
294 sizeof(struct sockaddr_in6));
295 *addr_len = sizeof(struct sockaddr_in6);
299 /* Close the connection and tidy up */
300 static void close_connection(void)
303 sock_release(sctp_con.sock);
304 sctp_con.sock = NULL;
307 if (sctp_con.rx_page) {
308 __free_page(sctp_con.rx_page);
309 sctp_con.rx_page = NULL;
313 /* We only send shutdown messages to nodes that are not part of the cluster */
314 static void send_shutdown(sctp_assoc_t associd)
316 static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
317 struct msghdr outmessage;
318 struct cmsghdr *cmsg;
319 struct sctp_sndrcvinfo *sinfo;
322 outmessage.msg_name = NULL;
323 outmessage.msg_namelen = 0;
324 outmessage.msg_control = outcmsg;
325 outmessage.msg_controllen = sizeof(outcmsg);
326 outmessage.msg_flags = MSG_EOR;
328 cmsg = CMSG_FIRSTHDR(&outmessage);
329 cmsg->cmsg_level = IPPROTO_SCTP;
330 cmsg->cmsg_type = SCTP_SNDRCV;
331 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
332 outmessage.msg_controllen = cmsg->cmsg_len;
333 sinfo = CMSG_DATA(cmsg);
334 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
336 sinfo->sinfo_flags |= MSG_EOF;
337 sinfo->sinfo_assoc_id = associd;
339 ret = kernel_sendmsg(sctp_con.sock, &outmessage, NULL, 0, 0);
342 log_print("send EOF to node failed: %d", ret);
346 /* INIT failed but we don't know which node...
347 restart INIT on all pending nodes */
348 static void init_failed(void)
353 for (i=1; i<=max_nodeid; i++) {
354 ni = nodeid2nodeinfo(i, 0);
358 if (test_and_clear_bit(NI_INIT_PENDING, &ni->flags)) {
360 if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
361 spin_lock_bh(&write_nodes_lock);
362 list_add_tail(&ni->write_list, &write_nodes);
363 spin_unlock_bh(&write_nodes_lock);
367 wake_up_process(send_task);
370 /* Something happened to an association */
371 static void process_sctp_notification(struct msghdr *msg, char *buf)
373 union sctp_notification *sn = (union sctp_notification *)buf;
375 if (sn->sn_header.sn_type == SCTP_ASSOC_CHANGE) {
376 switch (sn->sn_assoc_change.sac_state) {
381 /* Check that the new node is in the lockspace */
382 struct sctp_prim prim;
389 /* This seems to happen when we received a connection
390 * too early... or something... anyway, it happens but
391 * we always seem to get a real message too, see
392 * receive_from_sock */
394 if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) {
395 log_print("COMM_UP for invalid assoc ID %d",
396 (int)sn->sn_assoc_change.sac_assoc_id);
400 memset(&prim, 0, sizeof(struct sctp_prim));
401 prim_len = sizeof(struct sctp_prim);
402 prim.ssp_assoc_id = sn->sn_assoc_change.sac_assoc_id;
406 ret = sctp_con.sock->ops->getsockopt(sctp_con.sock,
415 log_print("getsockopt/sctp_primary_addr on "
416 "new assoc %d failed : %d",
417 (int)sn->sn_assoc_change.sac_assoc_id,
420 /* Retry INIT later */
421 ni = assoc2nodeinfo(sn->sn_assoc_change.sac_assoc_id);
423 clear_bit(NI_INIT_PENDING, &ni->flags);
426 make_sockaddr(&prim.ssp_addr, 0, &addr_len);
427 if (dlm_addr_to_nodeid(&prim.ssp_addr, &nodeid)) {
428 log_print("reject connect from unknown addr");
429 send_shutdown(prim.ssp_assoc_id);
433 ni = nodeid2nodeinfo(nodeid, GFP_KERNEL);
437 /* Save the assoc ID */
438 ni->assoc_id = sn->sn_assoc_change.sac_assoc_id;
440 log_print("got new/restarted association %d nodeid %d",
441 (int)sn->sn_assoc_change.sac_assoc_id, nodeid);
443 /* Send any pending writes */
444 clear_bit(NI_INIT_PENDING, &ni->flags);
445 if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
446 spin_lock_bh(&write_nodes_lock);
447 list_add_tail(&ni->write_list, &write_nodes);
448 spin_unlock_bh(&write_nodes_lock);
450 wake_up_process(send_task);
455 case SCTP_SHUTDOWN_COMP:
459 ni = assoc2nodeinfo(sn->sn_assoc_change.sac_assoc_id);
461 spin_lock(&ni->lock);
463 spin_unlock(&ni->lock);
468 /* We don't know which INIT failed, so clear the PENDING flags
469 * on them all. if assoc_id is zero then it will then try
472 case SCTP_CANT_STR_ASSOC:
474 log_print("Can't start SCTP association - retrying");
480 log_print("unexpected SCTP assoc change id=%d state=%d",
481 (int)sn->sn_assoc_change.sac_assoc_id,
482 sn->sn_assoc_change.sac_state);
487 /* Data received from remote end */
488 static int receive_from_sock(void)
495 struct sctp_sndrcvinfo *sinfo;
496 struct cmsghdr *cmsg;
499 /* These two are marginally too big for stack allocation, but this
500 * function is (currently) only called by dlm_recvd so static should be
503 static struct sockaddr_storage msgname;
504 static char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
506 if (sctp_con.sock == NULL)
509 if (sctp_con.rx_page == NULL) {
511 * This doesn't need to be atomic, but I think it should
512 * improve performance if it is.
514 sctp_con.rx_page = alloc_page(GFP_ATOMIC);
515 if (sctp_con.rx_page == NULL)
517 cbuf_init(&sctp_con.cb, PAGE_CACHE_SIZE);
520 memset(&incmsg, 0, sizeof(incmsg));
521 memset(&msgname, 0, sizeof(msgname));
523 msg.msg_name = &msgname;
524 msg.msg_namelen = sizeof(msgname);
526 msg.msg_control = incmsg;
527 msg.msg_controllen = sizeof(incmsg);
530 /* I don't see why this circular buffer stuff is necessary for SCTP
531 * which is a packet-based protocol, but the whole thing breaks under
532 * load without it! The overhead is minimal (and is in the TCP lowcomms
533 * anyway, of course) so I'll leave it in until I can figure out what's
538 * iov[0] is the bit of the circular buffer between the current end
539 * point (cb.base + cb.len) and the end of the buffer.
541 iov[0].iov_len = sctp_con.cb.base - cbuf_data(&sctp_con.cb);
542 iov[0].iov_base = page_address(sctp_con.rx_page) +
543 cbuf_data(&sctp_con.cb);
547 * iov[1] is the bit of the circular buffer between the start of the
548 * buffer and the start of the currently used section (cb.base)
550 if (cbuf_data(&sctp_con.cb) >= sctp_con.cb.base) {
551 iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&sctp_con.cb);
552 iov[1].iov_len = sctp_con.cb.base;
553 iov[1].iov_base = page_address(sctp_con.rx_page);
556 len = iov[0].iov_len + iov[1].iov_len;
558 r = ret = kernel_recvmsg(sctp_con.sock, &msg, iov, msg.msg_iovlen, len,
559 MSG_NOSIGNAL | MSG_DONTWAIT);
563 msg.msg_control = incmsg;
564 msg.msg_controllen = sizeof(incmsg);
565 cmsg = CMSG_FIRSTHDR(&msg);
566 sinfo = CMSG_DATA(cmsg);
568 if (msg.msg_flags & MSG_NOTIFICATION) {
569 process_sctp_notification(&msg, page_address(sctp_con.rx_page));
573 /* Is this a new association ? */
574 ni = nodeid2nodeinfo(le32_to_cpu(sinfo->sinfo_ppid), GFP_KERNEL);
576 ni->assoc_id = sinfo->sinfo_assoc_id;
577 if (test_and_clear_bit(NI_INIT_PENDING, &ni->flags)) {
579 if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
580 spin_lock_bh(&write_nodes_lock);
581 list_add_tail(&ni->write_list, &write_nodes);
582 spin_unlock_bh(&write_nodes_lock);
584 wake_up_process(send_task);
588 /* INIT sends a message with length of 1 - ignore it */
592 cbuf_add(&sctp_con.cb, ret);
593 ret = dlm_process_incoming_buffer(cpu_to_le32(sinfo->sinfo_ppid),
594 page_address(sctp_con.rx_page),
595 sctp_con.cb.base, sctp_con.cb.len,
599 cbuf_eat(&sctp_con.cb, ret);
606 lowcomms_data_ready(sctp_con.sock->sk, 0);
613 log_print("error reading from sctp socket: %d", ret);
618 /* Bind to an IP address. SCTP allows multiple address so it can do multi-homing */
619 static int add_bind_addr(struct sockaddr_storage *addr, int addr_len, int num)
627 result = sctp_con.sock->ops->bind(sctp_con.sock,
628 (struct sockaddr *) addr,
631 result = sctp_con.sock->ops->setsockopt(sctp_con.sock, SOL_SCTP,
632 SCTP_SOCKOPT_BINDX_ADD,
633 (char *)addr, addr_len);
637 log_print("Can't bind to port %d addr number %d",
638 dlm_config.tcp_port, num);
643 static void init_local(void)
645 struct sockaddr_storage sas, *addr;
648 dlm_local_nodeid = dlm_our_nodeid();
650 for (i = 0; i < DLM_MAX_ADDR_COUNT - 1; i++) {
651 if (dlm_our_addr(&sas, i))
654 addr = kmalloc(sizeof(*addr), GFP_KERNEL);
657 memcpy(addr, &sas, sizeof(*addr));
658 dlm_local_addr[dlm_local_count++] = addr;
662 /* Initialise SCTP socket and bind to all interfaces */
663 static int init_sock(void)
666 struct socket *sock = NULL;
667 struct sockaddr_storage localaddr;
668 struct sctp_event_subscribe subscribe;
669 int result = -EINVAL, num = 1, i, addr_len;
671 if (!dlm_local_count) {
673 if (!dlm_local_count) {
674 log_print("no local IP address has been set");
679 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_SEQPACKET,
680 IPPROTO_SCTP, &sock);
682 log_print("Can't create comms socket, check SCTP is loaded");
686 /* Listen for events */
687 memset(&subscribe, 0, sizeof(subscribe));
688 subscribe.sctp_data_io_event = 1;
689 subscribe.sctp_association_event = 1;
690 subscribe.sctp_send_failure_event = 1;
691 subscribe.sctp_shutdown_event = 1;
692 subscribe.sctp_partial_delivery_event = 1;
696 result = sock->ops->setsockopt(sock, SOL_SCTP, SCTP_EVENTS,
697 (char *)&subscribe, sizeof(subscribe));
701 log_print("Failed to set SCTP_EVENTS on socket: result=%d",
706 /* Init con struct */
707 sock->sk->sk_user_data = &sctp_con;
708 sctp_con.sock = sock;
709 sctp_con.sock->sk->sk_data_ready = lowcomms_data_ready;
711 /* Bind to all interfaces. */
712 for (i = 0; i < dlm_local_count; i++) {
713 memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
714 make_sockaddr(&localaddr, dlm_config.tcp_port, &addr_len);
716 result = add_bind_addr(&localaddr, addr_len, num);
722 result = sock->ops->listen(sock, 5);
724 log_print("Can't set socket listening");
732 sctp_con.sock = NULL;
738 static struct writequeue_entry *new_writequeue_entry(gfp_t allocation)
740 struct writequeue_entry *entry;
742 entry = kmalloc(sizeof(struct writequeue_entry), allocation);
746 entry->page = alloc_page(allocation);
760 void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
762 struct writequeue_entry *e;
767 ni = nodeid2nodeinfo(nodeid, allocation);
771 spin_lock(&ni->writequeue_lock);
772 e = list_entry(ni->writequeue.prev, struct writequeue_entry, list);
773 if ((&e->list == &ni->writequeue) ||
774 (PAGE_CACHE_SIZE - e->end < len)) {
781 spin_unlock(&ni->writequeue_lock);
787 *ppc = page_address(e->page) + offset;
791 e = new_writequeue_entry(allocation);
793 spin_lock(&ni->writequeue_lock);
798 list_add_tail(&e->list, &ni->writequeue);
799 spin_unlock(&ni->writequeue_lock);
805 void dlm_lowcomms_commit_buffer(void *arg)
807 struct writequeue_entry *e = (struct writequeue_entry *) arg;
809 struct nodeinfo *ni = e->ni;
811 spin_lock(&ni->writequeue_lock);
815 e->len = e->end - e->offset;
817 spin_unlock(&ni->writequeue_lock);
819 if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
820 spin_lock_bh(&write_nodes_lock);
821 list_add_tail(&ni->write_list, &write_nodes);
822 spin_unlock_bh(&write_nodes_lock);
823 wake_up_process(send_task);
828 spin_unlock(&ni->writequeue_lock);
832 static void free_entry(struct writequeue_entry *e)
834 __free_page(e->page);
838 /* Initiate an SCTP association. In theory we could just use sendmsg() on
839 the first IP address and it should work, but this allows us to set up the
840 association before sending any valuable data that we can't afford to lose.
841 It also keeps the send path clean as it can now always use the association ID */
842 static void initiate_association(int nodeid)
844 struct sockaddr_storage rem_addr;
845 static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
846 struct msghdr outmessage;
847 struct cmsghdr *cmsg;
848 struct sctp_sndrcvinfo *sinfo;
855 log_print("Initiating association with node %d", nodeid);
857 ni = nodeid2nodeinfo(nodeid, GFP_KERNEL);
861 if (nodeid_to_addr(nodeid, (struct sockaddr *)&rem_addr)) {
862 log_print("no address for nodeid %d", nodeid);
866 make_sockaddr(&rem_addr, dlm_config.tcp_port, &addrlen);
868 outmessage.msg_name = &rem_addr;
869 outmessage.msg_namelen = addrlen;
870 outmessage.msg_control = outcmsg;
871 outmessage.msg_controllen = sizeof(outcmsg);
872 outmessage.msg_flags = MSG_EOR;
874 iov[0].iov_base = buf;
877 /* Real INIT messages seem to cause trouble. Just send a 1 byte message
878 we can afford to lose */
879 cmsg = CMSG_FIRSTHDR(&outmessage);
880 cmsg->cmsg_level = IPPROTO_SCTP;
881 cmsg->cmsg_type = SCTP_SNDRCV;
882 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
883 sinfo = CMSG_DATA(cmsg);
884 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
885 sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid);
887 outmessage.msg_controllen = cmsg->cmsg_len;
888 ret = kernel_sendmsg(sctp_con.sock, &outmessage, iov, 1, 1);
890 log_print("send INIT to node failed: %d", ret);
891 /* Try again later */
892 clear_bit(NI_INIT_PENDING, &ni->flags);
897 static void send_to_sock(struct nodeinfo *ni)
900 struct writequeue_entry *e;
902 struct msghdr outmsg;
903 static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
904 struct cmsghdr *cmsg;
905 struct sctp_sndrcvinfo *sinfo;
908 /* See if we need to init an association before we start
909 sending precious messages */
910 spin_lock(&ni->lock);
911 if (!ni->assoc_id && !test_and_set_bit(NI_INIT_PENDING, &ni->flags)) {
912 spin_unlock(&ni->lock);
913 initiate_association(ni->nodeid);
916 spin_unlock(&ni->lock);
918 outmsg.msg_name = NULL; /* We use assoc_id */
919 outmsg.msg_namelen = 0;
920 outmsg.msg_control = outcmsg;
921 outmsg.msg_controllen = sizeof(outcmsg);
922 outmsg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL | MSG_EOR;
924 cmsg = CMSG_FIRSTHDR(&outmsg);
925 cmsg->cmsg_level = IPPROTO_SCTP;
926 cmsg->cmsg_type = SCTP_SNDRCV;
927 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
928 sinfo = CMSG_DATA(cmsg);
929 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
930 sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid);
931 sinfo->sinfo_assoc_id = ni->assoc_id;
932 outmsg.msg_controllen = cmsg->cmsg_len;
934 spin_lock(&ni->writequeue_lock);
936 if (list_empty(&ni->writequeue))
938 e = list_entry(ni->writequeue.next, struct writequeue_entry,
942 BUG_ON(len == 0 && e->users == 0);
943 spin_unlock(&ni->writequeue_lock);
948 iov.iov_base = page_address(e->page)+offset;
951 ret = kernel_sendmsg(sctp_con.sock, &outmsg, &iov, 1,
953 if (ret == -EAGAIN) {
954 sctp_con.eagain_flag = 1;
959 /* Don't starve people filling buffers */
963 spin_lock(&ni->writequeue_lock);
967 if (e->len == 0 && e->users == 0) {
974 spin_unlock(&ni->writequeue_lock);
979 log_print("Error sending to node %d %d", ni->nodeid, ret);
980 spin_lock(&ni->lock);
981 if (!test_and_set_bit(NI_INIT_PENDING, &ni->flags)) {
983 spin_unlock(&ni->lock);
984 initiate_association(ni->nodeid);
986 spin_unlock(&ni->lock);
991 /* Try to send any messages that are pending */
992 static void process_output_queue(void)
994 struct list_head *list;
995 struct list_head *temp;
997 spin_lock_bh(&write_nodes_lock);
998 list_for_each_safe(list, temp, &write_nodes) {
999 struct nodeinfo *ni =
1000 list_entry(list, struct nodeinfo, write_list);
1001 clear_bit(NI_WRITE_PENDING, &ni->flags);
1002 list_del(&ni->write_list);
1004 spin_unlock_bh(&write_nodes_lock);
1007 spin_lock_bh(&write_nodes_lock);
1009 spin_unlock_bh(&write_nodes_lock);
1012 /* Called after we've had -EAGAIN and been woken up */
1013 static void refill_write_queue(void)
1017 for (i=1; i<=max_nodeid; i++) {
1018 struct nodeinfo *ni = nodeid2nodeinfo(i, 0);
1021 if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
1022 spin_lock_bh(&write_nodes_lock);
1023 list_add_tail(&ni->write_list, &write_nodes);
1024 spin_unlock_bh(&write_nodes_lock);
1030 static void clean_one_writequeue(struct nodeinfo *ni)
1032 struct list_head *list;
1033 struct list_head *temp;
1035 spin_lock(&ni->writequeue_lock);
1036 list_for_each_safe(list, temp, &ni->writequeue) {
1037 struct writequeue_entry *e =
1038 list_entry(list, struct writequeue_entry, list);
1042 spin_unlock(&ni->writequeue_lock);
1045 static void clean_writequeues(void)
1049 for (i=1; i<=max_nodeid; i++) {
1050 struct nodeinfo *ni = nodeid2nodeinfo(i, 0);
1052 clean_one_writequeue(ni);
1057 static void dealloc_nodeinfo(void)
1061 for (i=1; i<=max_nodeid; i++) {
1062 struct nodeinfo *ni = nodeid2nodeinfo(i, 0);
1064 idr_remove(&nodeinfo_idr, i);
1070 int dlm_lowcomms_close(int nodeid)
1072 struct nodeinfo *ni;
1074 ni = nodeid2nodeinfo(nodeid, 0);
1078 spin_lock(&ni->lock);
1081 /* Don't send shutdown here, sctp will just queue it
1082 till the node comes back up! */
1084 spin_unlock(&ni->lock);
1086 clean_one_writequeue(ni);
1087 clear_bit(NI_INIT_PENDING, &ni->flags);
1091 static int write_list_empty(void)
1095 spin_lock_bh(&write_nodes_lock);
1096 status = list_empty(&write_nodes);
1097 spin_unlock_bh(&write_nodes_lock);
1102 static int dlm_recvd(void *data)
1104 DECLARE_WAITQUEUE(wait, current);
1106 while (!kthread_should_stop()) {
1109 set_current_state(TASK_INTERRUPTIBLE);
1110 add_wait_queue(&lowcomms_recv_wait, &wait);
1111 if (!test_bit(CF_READ_PENDING, &sctp_con.flags))
1113 remove_wait_queue(&lowcomms_recv_wait, &wait);
1114 set_current_state(TASK_RUNNING);
1116 if (test_and_clear_bit(CF_READ_PENDING, &sctp_con.flags)) {
1120 ret = receive_from_sock();
1122 /* Don't starve out everyone else */
1123 if (++count >= MAX_RX_MSG_COUNT) {
1127 } while (!kthread_should_stop() && ret >=0);
1135 static int dlm_sendd(void *data)
1137 DECLARE_WAITQUEUE(wait, current);
1139 add_wait_queue(sctp_con.sock->sk->sk_sleep, &wait);
1141 while (!kthread_should_stop()) {
1142 set_current_state(TASK_INTERRUPTIBLE);
1143 if (write_list_empty())
1145 set_current_state(TASK_RUNNING);
1147 if (sctp_con.eagain_flag) {
1148 sctp_con.eagain_flag = 0;
1149 refill_write_queue();
1151 process_output_queue();
1154 remove_wait_queue(sctp_con.sock->sk->sk_sleep, &wait);
1159 static void daemons_stop(void)
1161 kthread_stop(recv_task);
1162 kthread_stop(send_task);
1165 static int daemons_start(void)
1167 struct task_struct *p;
1170 p = kthread_run(dlm_recvd, NULL, "dlm_recvd");
1173 log_print("can't start dlm_recvd %d", error);
1178 p = kthread_run(dlm_sendd, NULL, "dlm_sendd");
1181 log_print("can't start dlm_sendd %d", error);
1182 kthread_stop(recv_task);
1191 * This is quite likely to sleep...
1193 int dlm_lowcomms_start(void)
1197 error = init_sock();
1200 error = daemons_start();
1210 void dlm_lowcomms_stop(void)
1214 sctp_con.flags = 0x7;
1216 clean_writequeues();
1221 dlm_local_count = 0;
1222 dlm_local_nodeid = 0;
1224 for (i = 0; i < dlm_local_count; i++)
1225 kfree(dlm_local_addr[i]);