1 /******************************************************************************
2 *******************************************************************************
4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5 ** Copyright (C) 2004-2006 Red Hat, Inc. All rights reserved.
7 ** This copyrighted material is made available to anyone wishing to use,
8 ** modify, copy, or redistribute it subject to the terms and conditions
9 ** of the GNU General Public License v.2.
11 *******************************************************************************
12 ******************************************************************************/
17 * This is the "low-level" comms layer.
19 * It is responsible for sending/receiving messages
20 * from other nodes in the cluster.
22 * Cluster nodes are referred to by their nodeids. nodeids are
23 * simply 32 bit numbers to the locking module - if they need to
24 * be expanded for the cluster infrastructure then that is it's
25 * responsibility. It is this layer's
26 * responsibility to resolve these into IP address or
27 * whatever it needs for inter-node communication.
29 * The comms level is two kernel threads that deal mainly with
30 * the receiving of messages from other nodes and passing them
31 * up to the mid-level comms layer (which understands the
32 * message format) for execution by the locking core, and
33 * a send thread which does all the setting up of connections
34 * to remote nodes and the sending of data. Threads are not allowed
35 * to send their own data because it may cause them to wait in times
36 * of high load. Also, this way, the sending thread can collect together
37 * messages bound for one node and send them in one block.
39 * I don't see any problem with the recv thread executing the locking
40 * code on behalf of remote processes as the locking code is
41 * short, efficient and never (well, hardly ever) waits.
45 #include <asm/ioctls.h>
48 #include <net/sctp/user.h>
49 #include <linux/pagemap.h>
50 #include <linux/socket.h>
51 #include <linux/idr.h>
53 #include "dlm_internal.h"
58 static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
59 static int dlm_local_count;
60 static int dlm_local_nodeid;
62 /* One of these per connected node */
64 #define NI_INIT_PENDING 1
65 #define NI_WRITE_PENDING 2
69 sctp_assoc_t assoc_id;
71 struct list_head write_list; /* nodes with pending writes */
72 struct list_head writequeue; /* outgoing writequeue_entries */
73 spinlock_t writequeue_lock;
75 struct work_struct swork; /* Send workqueue */
76 struct work_struct lwork; /* Locking workqueue */
79 static DEFINE_IDR(nodeinfo_idr);
80 static DECLARE_RWSEM(nodeinfo_lock);
81 static int max_nodeid;
89 /* Just the one of these, now. But this struct keeps
90 the connection-specific variables together */
92 #define CF_READ_PENDING 1
98 atomic_t waiting_requests;
101 struct work_struct work; /* Send workqueue */
104 /* An entry waiting to be sent */
106 struct writequeue_entry {
107 struct list_head list;
116 static void cbuf_add(struct cbuf *cb, int n)
121 static int cbuf_data(struct cbuf *cb)
123 return ((cb->base + cb->len) & cb->mask);
126 static void cbuf_init(struct cbuf *cb, int size)
128 cb->base = cb->len = 0;
132 static void cbuf_eat(struct cbuf *cb, int n)
136 cb->base &= cb->mask;
139 /* List of nodes which have writes pending */
140 static LIST_HEAD(write_nodes);
141 static DEFINE_SPINLOCK(write_nodes_lock);
144 /* Maximum number of incoming messages to process before
147 #define MAX_RX_MSG_COUNT 25
150 static struct workqueue_struct *recv_workqueue;
151 static struct workqueue_struct *send_workqueue;
152 static struct workqueue_struct *lock_workqueue;
154 /* The SCTP connection */
155 static struct connection sctp_con;
157 static void process_send_sockets(struct work_struct *work);
158 static void process_recv_sockets(struct work_struct *work);
159 static void process_lock_request(struct work_struct *work);
161 static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr)
163 struct sockaddr_storage addr;
166 if (!dlm_local_count)
169 error = dlm_nodeid_to_addr(nodeid, &addr);
173 if (dlm_local_addr[0]->ss_family == AF_INET) {
174 struct sockaddr_in *in4 = (struct sockaddr_in *) &addr;
175 struct sockaddr_in *ret4 = (struct sockaddr_in *) retaddr;
176 ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
178 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &addr;
179 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) retaddr;
180 memcpy(&ret6->sin6_addr, &in6->sin6_addr,
181 sizeof(in6->sin6_addr));
187 /* If alloc is 0 here we will not attempt to allocate a new
189 static struct nodeinfo *nodeid2nodeinfo(int nodeid, gfp_t alloc)
195 down_read(&nodeinfo_lock);
196 ni = idr_find(&nodeinfo_idr, nodeid);
197 up_read(&nodeinfo_lock);
202 down_write(&nodeinfo_lock);
204 ni = idr_find(&nodeinfo_idr, nodeid);
208 r = idr_pre_get(&nodeinfo_idr, alloc);
212 ni = kmalloc(sizeof(struct nodeinfo), alloc);
216 r = idr_get_new_above(&nodeinfo_idr, ni, nodeid, &n);
223 idr_remove(&nodeinfo_idr, n);
228 memset(ni, 0, sizeof(struct nodeinfo));
229 spin_lock_init(&ni->lock);
230 INIT_LIST_HEAD(&ni->writequeue);
231 spin_lock_init(&ni->writequeue_lock);
232 INIT_WORK(&ni->lwork, process_lock_request);
233 INIT_WORK(&ni->swork, process_send_sockets);
236 if (nodeid > max_nodeid)
239 up_write(&nodeinfo_lock);
244 /* Don't call this too often... */
245 static struct nodeinfo *assoc2nodeinfo(sctp_assoc_t assoc)
250 for (i=1; i<=max_nodeid; i++) {
251 ni = nodeid2nodeinfo(i, 0);
252 if (ni && ni->assoc_id == assoc)
258 /* Data or notification available on socket */
259 static void lowcomms_data_ready(struct sock *sk, int count_unused)
261 if (test_and_set_bit(CF_READ_PENDING, &sctp_con.flags))
262 queue_work(recv_workqueue, &sctp_con.work);
266 /* Add the port number to an IP6 or 4 sockaddr and return the address length.
267 Also padd out the struct with zeros to make comparisons meaningful */
269 static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
272 struct sockaddr_in *local4_addr;
273 struct sockaddr_in6 *local6_addr;
275 if (!dlm_local_count)
279 if (dlm_local_addr[0]->ss_family == AF_INET) {
280 local4_addr = (struct sockaddr_in *)dlm_local_addr[0];
281 port = be16_to_cpu(local4_addr->sin_port);
283 local6_addr = (struct sockaddr_in6 *)dlm_local_addr[0];
284 port = be16_to_cpu(local6_addr->sin6_port);
288 saddr->ss_family = dlm_local_addr[0]->ss_family;
289 if (dlm_local_addr[0]->ss_family == AF_INET) {
290 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
291 in4_addr->sin_port = cpu_to_be16(port);
292 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
293 memset(in4_addr+1, 0, sizeof(struct sockaddr_storage) -
294 sizeof(struct sockaddr_in));
295 *addr_len = sizeof(struct sockaddr_in);
297 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
298 in6_addr->sin6_port = cpu_to_be16(port);
299 memset(in6_addr+1, 0, sizeof(struct sockaddr_storage) -
300 sizeof(struct sockaddr_in6));
301 *addr_len = sizeof(struct sockaddr_in6);
305 /* Close the connection and tidy up */
306 static void close_connection(void)
309 sock_release(sctp_con.sock);
310 sctp_con.sock = NULL;
313 if (sctp_con.rx_page) {
314 __free_page(sctp_con.rx_page);
315 sctp_con.rx_page = NULL;
319 /* We only send shutdown messages to nodes that are not part of the cluster */
320 static void send_shutdown(sctp_assoc_t associd)
322 static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
323 struct msghdr outmessage;
324 struct cmsghdr *cmsg;
325 struct sctp_sndrcvinfo *sinfo;
328 outmessage.msg_name = NULL;
329 outmessage.msg_namelen = 0;
330 outmessage.msg_control = outcmsg;
331 outmessage.msg_controllen = sizeof(outcmsg);
332 outmessage.msg_flags = MSG_EOR;
334 cmsg = CMSG_FIRSTHDR(&outmessage);
335 cmsg->cmsg_level = IPPROTO_SCTP;
336 cmsg->cmsg_type = SCTP_SNDRCV;
337 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
338 outmessage.msg_controllen = cmsg->cmsg_len;
339 sinfo = CMSG_DATA(cmsg);
340 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
342 sinfo->sinfo_flags |= MSG_EOF;
343 sinfo->sinfo_assoc_id = associd;
345 ret = kernel_sendmsg(sctp_con.sock, &outmessage, NULL, 0, 0);
348 log_print("send EOF to node failed: %d", ret);
352 /* INIT failed but we don't know which node...
353 restart INIT on all pending nodes */
354 static void init_failed(void)
359 for (i=1; i<=max_nodeid; i++) {
360 ni = nodeid2nodeinfo(i, 0);
364 if (test_and_clear_bit(NI_INIT_PENDING, &ni->flags)) {
366 if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
367 spin_lock_bh(&write_nodes_lock);
368 list_add_tail(&ni->write_list, &write_nodes);
369 spin_unlock_bh(&write_nodes_lock);
370 queue_work(send_workqueue, &ni->swork);
376 /* Something happened to an association */
377 static void process_sctp_notification(struct msghdr *msg, char *buf)
379 union sctp_notification *sn = (union sctp_notification *)buf;
381 if (sn->sn_header.sn_type == SCTP_ASSOC_CHANGE) {
382 switch (sn->sn_assoc_change.sac_state) {
387 /* Check that the new node is in the lockspace */
388 struct sctp_prim prim;
395 /* This seems to happen when we received a connection
396 * too early... or something... anyway, it happens but
397 * we always seem to get a real message too, see
398 * receive_from_sock */
400 if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) {
401 log_print("COMM_UP for invalid assoc ID %d",
402 (int)sn->sn_assoc_change.sac_assoc_id);
406 memset(&prim, 0, sizeof(struct sctp_prim));
407 prim_len = sizeof(struct sctp_prim);
408 prim.ssp_assoc_id = sn->sn_assoc_change.sac_assoc_id;
412 ret = sctp_con.sock->ops->getsockopt(sctp_con.sock,
421 log_print("getsockopt/sctp_primary_addr on "
422 "new assoc %d failed : %d",
423 (int)sn->sn_assoc_change.sac_assoc_id,
426 /* Retry INIT later */
427 ni = assoc2nodeinfo(sn->sn_assoc_change.sac_assoc_id);
429 clear_bit(NI_INIT_PENDING, &ni->flags);
432 make_sockaddr(&prim.ssp_addr, 0, &addr_len);
433 if (dlm_addr_to_nodeid(&prim.ssp_addr, &nodeid)) {
434 log_print("reject connect from unknown addr");
435 send_shutdown(prim.ssp_assoc_id);
439 ni = nodeid2nodeinfo(nodeid, GFP_KERNEL);
443 /* Save the assoc ID */
444 ni->assoc_id = sn->sn_assoc_change.sac_assoc_id;
446 log_print("got new/restarted association %d nodeid %d",
447 (int)sn->sn_assoc_change.sac_assoc_id, nodeid);
449 /* Send any pending writes */
450 clear_bit(NI_INIT_PENDING, &ni->flags);
451 if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
452 spin_lock_bh(&write_nodes_lock);
453 list_add_tail(&ni->write_list, &write_nodes);
454 spin_unlock_bh(&write_nodes_lock);
455 queue_work(send_workqueue, &ni->swork);
461 case SCTP_SHUTDOWN_COMP:
465 ni = assoc2nodeinfo(sn->sn_assoc_change.sac_assoc_id);
467 spin_lock(&ni->lock);
469 spin_unlock(&ni->lock);
474 /* We don't know which INIT failed, so clear the PENDING flags
475 * on them all. if assoc_id is zero then it will then try
478 case SCTP_CANT_STR_ASSOC:
480 log_print("Can't start SCTP association - retrying");
486 log_print("unexpected SCTP assoc change id=%d state=%d",
487 (int)sn->sn_assoc_change.sac_assoc_id,
488 sn->sn_assoc_change.sac_state);
493 /* Data received from remote end */
494 static int receive_from_sock(void)
501 struct sctp_sndrcvinfo *sinfo;
502 struct cmsghdr *cmsg;
505 /* These two are marginally too big for stack allocation, but this
506 * function is (currently) only called by dlm_recvd so static should be
509 static struct sockaddr_storage msgname;
510 static char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
512 if (sctp_con.sock == NULL)
515 if (sctp_con.rx_page == NULL) {
517 * This doesn't need to be atomic, but I think it should
518 * improve performance if it is.
520 sctp_con.rx_page = alloc_page(GFP_ATOMIC);
521 if (sctp_con.rx_page == NULL)
523 cbuf_init(&sctp_con.cb, PAGE_CACHE_SIZE);
526 memset(&incmsg, 0, sizeof(incmsg));
527 memset(&msgname, 0, sizeof(msgname));
529 msg.msg_name = &msgname;
530 msg.msg_namelen = sizeof(msgname);
532 msg.msg_control = incmsg;
533 msg.msg_controllen = sizeof(incmsg);
536 /* I don't see why this circular buffer stuff is necessary for SCTP
537 * which is a packet-based protocol, but the whole thing breaks under
538 * load without it! The overhead is minimal (and is in the TCP lowcomms
539 * anyway, of course) so I'll leave it in until I can figure out what's
544 * iov[0] is the bit of the circular buffer between the current end
545 * point (cb.base + cb.len) and the end of the buffer.
547 iov[0].iov_len = sctp_con.cb.base - cbuf_data(&sctp_con.cb);
548 iov[0].iov_base = page_address(sctp_con.rx_page) +
549 cbuf_data(&sctp_con.cb);
553 * iov[1] is the bit of the circular buffer between the start of the
554 * buffer and the start of the currently used section (cb.base)
556 if (cbuf_data(&sctp_con.cb) >= sctp_con.cb.base) {
557 iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&sctp_con.cb);
558 iov[1].iov_len = sctp_con.cb.base;
559 iov[1].iov_base = page_address(sctp_con.rx_page);
562 len = iov[0].iov_len + iov[1].iov_len;
564 r = ret = kernel_recvmsg(sctp_con.sock, &msg, iov, msg.msg_iovlen, len,
565 MSG_NOSIGNAL | MSG_DONTWAIT);
569 msg.msg_control = incmsg;
570 msg.msg_controllen = sizeof(incmsg);
571 cmsg = CMSG_FIRSTHDR(&msg);
572 sinfo = CMSG_DATA(cmsg);
574 if (msg.msg_flags & MSG_NOTIFICATION) {
575 process_sctp_notification(&msg, page_address(sctp_con.rx_page));
579 /* Is this a new association ? */
580 ni = nodeid2nodeinfo(le32_to_cpu(sinfo->sinfo_ppid), GFP_KERNEL);
582 ni->assoc_id = sinfo->sinfo_assoc_id;
583 if (test_and_clear_bit(NI_INIT_PENDING, &ni->flags)) {
585 if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
586 spin_lock_bh(&write_nodes_lock);
587 list_add_tail(&ni->write_list, &write_nodes);
588 spin_unlock_bh(&write_nodes_lock);
589 queue_work(send_workqueue, &ni->swork);
594 /* INIT sends a message with length of 1 - ignore it */
598 cbuf_add(&sctp_con.cb, ret);
599 // PJC: TODO: Add to node's workqueue....can we ??
600 ret = dlm_process_incoming_buffer(cpu_to_le32(sinfo->sinfo_ppid),
601 page_address(sctp_con.rx_page),
602 sctp_con.cb.base, sctp_con.cb.len,
606 cbuf_eat(&sctp_con.cb, ret);
613 lowcomms_data_ready(sctp_con.sock->sk, 0);
620 log_print("error reading from sctp socket: %d", ret);
625 /* Bind to an IP address. SCTP allows multiple address so it can do multi-homing */
626 static int add_bind_addr(struct sockaddr_storage *addr, int addr_len, int num)
634 result = sctp_con.sock->ops->bind(sctp_con.sock,
635 (struct sockaddr *) addr,
638 result = sctp_con.sock->ops->setsockopt(sctp_con.sock, SOL_SCTP,
639 SCTP_SOCKOPT_BINDX_ADD,
640 (char *)addr, addr_len);
644 log_print("Can't bind to port %d addr number %d",
645 dlm_config.ci_tcp_port, num);
650 static void init_local(void)
652 struct sockaddr_storage sas, *addr;
655 dlm_local_nodeid = dlm_our_nodeid();
657 for (i = 0; i < DLM_MAX_ADDR_COUNT - 1; i++) {
658 if (dlm_our_addr(&sas, i))
661 addr = kmalloc(sizeof(*addr), GFP_KERNEL);
664 memcpy(addr, &sas, sizeof(*addr));
665 dlm_local_addr[dlm_local_count++] = addr;
669 /* Initialise SCTP socket and bind to all interfaces */
670 static int init_sock(void)
673 struct socket *sock = NULL;
674 struct sockaddr_storage localaddr;
675 struct sctp_event_subscribe subscribe;
676 int result = -EINVAL, num = 1, i, addr_len;
678 if (!dlm_local_count) {
680 if (!dlm_local_count) {
681 log_print("no local IP address has been set");
686 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_SEQPACKET,
687 IPPROTO_SCTP, &sock);
689 log_print("Can't create comms socket, check SCTP is loaded");
693 /* Listen for events */
694 memset(&subscribe, 0, sizeof(subscribe));
695 subscribe.sctp_data_io_event = 1;
696 subscribe.sctp_association_event = 1;
697 subscribe.sctp_send_failure_event = 1;
698 subscribe.sctp_shutdown_event = 1;
699 subscribe.sctp_partial_delivery_event = 1;
703 result = sock->ops->setsockopt(sock, SOL_SCTP, SCTP_EVENTS,
704 (char *)&subscribe, sizeof(subscribe));
708 log_print("Failed to set SCTP_EVENTS on socket: result=%d",
713 /* Init con struct */
714 sock->sk->sk_user_data = &sctp_con;
715 sctp_con.sock = sock;
716 sctp_con.sock->sk->sk_data_ready = lowcomms_data_ready;
718 /* Bind to all interfaces. */
719 for (i = 0; i < dlm_local_count; i++) {
720 memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
721 make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len);
723 result = add_bind_addr(&localaddr, addr_len, num);
729 result = sock->ops->listen(sock, 5);
731 log_print("Can't set socket listening");
739 sctp_con.sock = NULL;
745 static struct writequeue_entry *new_writequeue_entry(gfp_t allocation)
747 struct writequeue_entry *entry;
749 entry = kmalloc(sizeof(struct writequeue_entry), allocation);
753 entry->page = alloc_page(allocation);
767 void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
769 struct writequeue_entry *e;
774 ni = nodeid2nodeinfo(nodeid, allocation);
778 spin_lock(&ni->writequeue_lock);
779 e = list_entry(ni->writequeue.prev, struct writequeue_entry, list);
780 if ((&e->list == &ni->writequeue) ||
781 (PAGE_CACHE_SIZE - e->end < len)) {
788 spin_unlock(&ni->writequeue_lock);
794 *ppc = page_address(e->page) + offset;
798 e = new_writequeue_entry(allocation);
800 spin_lock(&ni->writequeue_lock);
805 list_add_tail(&e->list, &ni->writequeue);
806 spin_unlock(&ni->writequeue_lock);
812 void dlm_lowcomms_commit_buffer(void *arg)
814 struct writequeue_entry *e = (struct writequeue_entry *) arg;
816 struct nodeinfo *ni = e->ni;
818 spin_lock(&ni->writequeue_lock);
822 e->len = e->end - e->offset;
824 spin_unlock(&ni->writequeue_lock);
826 if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
827 spin_lock_bh(&write_nodes_lock);
828 list_add_tail(&ni->write_list, &write_nodes);
829 spin_unlock_bh(&write_nodes_lock);
831 queue_work(send_workqueue, &ni->swork);
836 spin_unlock(&ni->writequeue_lock);
840 static void free_entry(struct writequeue_entry *e)
842 __free_page(e->page);
846 /* Initiate an SCTP association. In theory we could just use sendmsg() on
847 the first IP address and it should work, but this allows us to set up the
848 association before sending any valuable data that we can't afford to lose.
849 It also keeps the send path clean as it can now always use the association ID */
850 static void initiate_association(int nodeid)
852 struct sockaddr_storage rem_addr;
853 static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
854 struct msghdr outmessage;
855 struct cmsghdr *cmsg;
856 struct sctp_sndrcvinfo *sinfo;
863 log_print("Initiating association with node %d", nodeid);
865 ni = nodeid2nodeinfo(nodeid, GFP_KERNEL);
869 if (nodeid_to_addr(nodeid, (struct sockaddr *)&rem_addr)) {
870 log_print("no address for nodeid %d", nodeid);
874 make_sockaddr(&rem_addr, dlm_config.ci_tcp_port, &addrlen);
876 outmessage.msg_name = &rem_addr;
877 outmessage.msg_namelen = addrlen;
878 outmessage.msg_control = outcmsg;
879 outmessage.msg_controllen = sizeof(outcmsg);
880 outmessage.msg_flags = MSG_EOR;
882 iov[0].iov_base = buf;
885 /* Real INIT messages seem to cause trouble. Just send a 1 byte message
886 we can afford to lose */
887 cmsg = CMSG_FIRSTHDR(&outmessage);
888 cmsg->cmsg_level = IPPROTO_SCTP;
889 cmsg->cmsg_type = SCTP_SNDRCV;
890 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
891 sinfo = CMSG_DATA(cmsg);
892 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
893 sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid);
895 outmessage.msg_controllen = cmsg->cmsg_len;
896 ret = kernel_sendmsg(sctp_con.sock, &outmessage, iov, 1, 1);
898 log_print("send INIT to node failed: %d", ret);
899 /* Try again later */
900 clear_bit(NI_INIT_PENDING, &ni->flags);
905 static void send_to_sock(struct nodeinfo *ni)
908 struct writequeue_entry *e;
910 struct msghdr outmsg;
911 static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
912 struct cmsghdr *cmsg;
913 struct sctp_sndrcvinfo *sinfo;
916 /* See if we need to init an association before we start
917 sending precious messages */
918 spin_lock(&ni->lock);
919 if (!ni->assoc_id && !test_and_set_bit(NI_INIT_PENDING, &ni->flags)) {
920 spin_unlock(&ni->lock);
921 initiate_association(ni->nodeid);
924 spin_unlock(&ni->lock);
926 outmsg.msg_name = NULL; /* We use assoc_id */
927 outmsg.msg_namelen = 0;
928 outmsg.msg_control = outcmsg;
929 outmsg.msg_controllen = sizeof(outcmsg);
930 outmsg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL | MSG_EOR;
932 cmsg = CMSG_FIRSTHDR(&outmsg);
933 cmsg->cmsg_level = IPPROTO_SCTP;
934 cmsg->cmsg_type = SCTP_SNDRCV;
935 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
936 sinfo = CMSG_DATA(cmsg);
937 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
938 sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid);
939 sinfo->sinfo_assoc_id = ni->assoc_id;
940 outmsg.msg_controllen = cmsg->cmsg_len;
942 spin_lock(&ni->writequeue_lock);
944 if (list_empty(&ni->writequeue))
946 e = list_entry(ni->writequeue.next, struct writequeue_entry,
950 BUG_ON(len == 0 && e->users == 0);
951 spin_unlock(&ni->writequeue_lock);
956 iov.iov_base = page_address(e->page)+offset;
959 ret = kernel_sendmsg(sctp_con.sock, &outmsg, &iov, 1,
961 if (ret == -EAGAIN) {
962 sctp_con.eagain_flag = 1;
967 /* Don't starve people filling buffers */
971 spin_lock(&ni->writequeue_lock);
975 if (e->len == 0 && e->users == 0) {
982 spin_unlock(&ni->writequeue_lock);
987 log_print("Error sending to node %d %d", ni->nodeid, ret);
988 spin_lock(&ni->lock);
989 if (!test_and_set_bit(NI_INIT_PENDING, &ni->flags)) {
991 spin_unlock(&ni->lock);
992 initiate_association(ni->nodeid);
994 spin_unlock(&ni->lock);
999 /* Try to send any messages that are pending */
1000 static void process_output_queue(void)
1002 struct list_head *list;
1003 struct list_head *temp;
1005 spin_lock_bh(&write_nodes_lock);
1006 list_for_each_safe(list, temp, &write_nodes) {
1007 struct nodeinfo *ni =
1008 list_entry(list, struct nodeinfo, write_list);
1009 clear_bit(NI_WRITE_PENDING, &ni->flags);
1010 list_del(&ni->write_list);
1012 spin_unlock_bh(&write_nodes_lock);
1015 spin_lock_bh(&write_nodes_lock);
1017 spin_unlock_bh(&write_nodes_lock);
1020 /* Called after we've had -EAGAIN and been woken up */
1021 static void refill_write_queue(void)
1025 for (i=1; i<=max_nodeid; i++) {
1026 struct nodeinfo *ni = nodeid2nodeinfo(i, 0);
1029 if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
1030 spin_lock_bh(&write_nodes_lock);
1031 list_add_tail(&ni->write_list, &write_nodes);
1032 spin_unlock_bh(&write_nodes_lock);
1038 static void clean_one_writequeue(struct nodeinfo *ni)
1040 struct list_head *list;
1041 struct list_head *temp;
1043 spin_lock(&ni->writequeue_lock);
1044 list_for_each_safe(list, temp, &ni->writequeue) {
1045 struct writequeue_entry *e =
1046 list_entry(list, struct writequeue_entry, list);
1050 spin_unlock(&ni->writequeue_lock);
1053 static void clean_writequeues(void)
1057 for (i=1; i<=max_nodeid; i++) {
1058 struct nodeinfo *ni = nodeid2nodeinfo(i, 0);
1060 clean_one_writequeue(ni);
1065 static void dealloc_nodeinfo(void)
1069 for (i=1; i<=max_nodeid; i++) {
1070 struct nodeinfo *ni = nodeid2nodeinfo(i, 0);
1072 idr_remove(&nodeinfo_idr, i);
1078 int dlm_lowcomms_close(int nodeid)
1080 struct nodeinfo *ni;
1082 ni = nodeid2nodeinfo(nodeid, 0);
1086 spin_lock(&ni->lock);
1089 /* Don't send shutdown here, sctp will just queue it
1090 till the node comes back up! */
1092 spin_unlock(&ni->lock);
1094 clean_one_writequeue(ni);
1095 clear_bit(NI_INIT_PENDING, &ni->flags);
1099 // PJC: The work queue function for receiving.
1100 static void process_recv_sockets(struct work_struct *work)
1102 if (test_and_clear_bit(CF_READ_PENDING, &sctp_con.flags)) {
1107 ret = receive_from_sock();
1109 /* Don't starve out everyone else */
1110 if (++count >= MAX_RX_MSG_COUNT) {
1114 } while (!kthread_should_stop() && ret >=0);
1119 // PJC: the work queue function for sending
1120 static void process_send_sockets(struct work_struct *work)
1122 if (sctp_con.eagain_flag) {
1123 sctp_con.eagain_flag = 0;
1124 refill_write_queue();
1126 process_output_queue();
1129 // PJC: Process lock requests from a particular node.
1130 // TODO: can we optimise this out on UP ??
1131 static void process_lock_request(struct work_struct *work)
1135 static void daemons_stop(void)
1137 destroy_workqueue(recv_workqueue);
1138 destroy_workqueue(send_workqueue);
1139 destroy_workqueue(lock_workqueue);
1142 static int daemons_start(void)
1145 recv_workqueue = create_workqueue("dlm_recv");
1146 error = IS_ERR(recv_workqueue);
1148 log_print("can't start dlm_recv %d", error);
1152 send_workqueue = create_singlethread_workqueue("dlm_send");
1153 error = IS_ERR(send_workqueue);
1155 log_print("can't start dlm_send %d", error);
1156 destroy_workqueue(recv_workqueue);
1160 lock_workqueue = create_workqueue("dlm_rlock");
1161 error = IS_ERR(lock_workqueue);
1163 log_print("can't start dlm_rlock %d", error);
1164 destroy_workqueue(send_workqueue);
1165 destroy_workqueue(recv_workqueue);
1173 * This is quite likely to sleep...
1175 int dlm_lowcomms_start(void)
1179 INIT_WORK(&sctp_con.work, process_recv_sockets);
1181 error = init_sock();
1184 error = daemons_start();
1194 void dlm_lowcomms_stop(void)
1198 sctp_con.flags = 0x7;
1200 clean_writequeues();
1205 dlm_local_count = 0;
1206 dlm_local_nodeid = 0;
1208 for (i = 0; i < dlm_local_count; i++)
1209 kfree(dlm_local_addr[i]);