ocfs2: small cleanup of ocfs2_request_delete()
[linux-2.6] / fs / dlm / lowcomms-tcp.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2007 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 /*
15  * lowcomms.c
16  *
17  * This is the "low-level" comms layer.
18  *
19  * It is responsible for sending/receiving messages
20  * from other nodes in the cluster.
21  *
22  * Cluster nodes are referred to by their nodeids. nodeids are
23  * simply 32 bit numbers to the locking module - if they need to
24  * be expanded for the cluster infrastructure then that is it's
25  * responsibility. It is this layer's
26  * responsibility to resolve these into IP address or
27  * whatever it needs for inter-node communication.
28  *
29  * The comms level is two kernel threads that deal mainly with
30  * the receiving of messages from other nodes and passing them
31  * up to the mid-level comms layer (which understands the
32  * message format) for execution by the locking core, and
33  * a send thread which does all the setting up of connections
34  * to remote nodes and the sending of data. Threads are not allowed
35  * to send their own data because it may cause them to wait in times
36  * of high load. Also, this way, the sending thread can collect together
37  * messages bound for one node and send them in one block.
38  *
39  * I don't see any problem with the recv thread executing the locking
40  * code on behalf of remote processes as the locking code is
41  * short, efficient and never waits.
42  *
43  */
44
45
46 #include <asm/ioctls.h>
47 #include <net/sock.h>
48 #include <net/tcp.h>
49 #include <linux/pagemap.h>
50
51 #include "dlm_internal.h"
52 #include "lowcomms.h"
53 #include "midcomms.h"
54 #include "config.h"
55
56 struct cbuf {
57         unsigned int base;
58         unsigned int len;
59         unsigned int mask;
60 };
61
62 #define NODE_INCREMENT 32
63 static void cbuf_add(struct cbuf *cb, int n)
64 {
65         cb->len += n;
66 }
67
68 static int cbuf_data(struct cbuf *cb)
69 {
70         return ((cb->base + cb->len) & cb->mask);
71 }
72
73 static void cbuf_init(struct cbuf *cb, int size)
74 {
75         cb->base = cb->len = 0;
76         cb->mask = size-1;
77 }
78
79 static void cbuf_eat(struct cbuf *cb, int n)
80 {
81         cb->len  -= n;
82         cb->base += n;
83         cb->base &= cb->mask;
84 }
85
86 static bool cbuf_empty(struct cbuf *cb)
87 {
88         return cb->len == 0;
89 }
90
91 /* Maximum number of incoming messages to process before
92    doing a cond_resched()
93 */
94 #define MAX_RX_MSG_COUNT 25
95
96 struct connection {
97         struct socket *sock;    /* NULL if not connected */
98         uint32_t nodeid;        /* So we know who we are in the list */
99         struct mutex sock_mutex;
100         unsigned long flags;    /* bit 1,2 = We are on the read/write lists */
101 #define CF_READ_PENDING 1
102 #define CF_WRITE_PENDING 2
103 #define CF_CONNECT_PENDING 3
104 #define CF_IS_OTHERCON 4
105         struct list_head writequeue;  /* List of outgoing writequeue_entries */
106         struct list_head listenlist;  /* List of allocated listening sockets */
107         spinlock_t writequeue_lock;
108         int (*rx_action) (struct connection *); /* What to do when active */
109         struct page *rx_page;
110         struct cbuf cb;
111         int retries;
112 #define MAX_CONNECT_RETRIES 3
113         struct connection *othercon;
114         struct work_struct rwork; /* Receive workqueue */
115         struct work_struct swork; /* Send workqueue */
116 };
117 #define sock2con(x) ((struct connection *)(x)->sk_user_data)
118
119 /* An entry waiting to be sent */
120 struct writequeue_entry {
121         struct list_head list;
122         struct page *page;
123         int offset;
124         int len;
125         int end;
126         int users;
127         struct connection *con;
128 };
129
130 static struct sockaddr_storage dlm_local_addr;
131
132 /* Work queues */
133 static struct workqueue_struct *recv_workqueue;
134 static struct workqueue_struct *send_workqueue;
135
136 /* An array of pointers to connections, indexed by NODEID */
137 static struct connection **connections;
138 static DECLARE_MUTEX(connections_lock);
139 static struct kmem_cache *con_cache;
140 static int conn_array_size;
141
142 static void process_recv_sockets(struct work_struct *work);
143 static void process_send_sockets(struct work_struct *work);
144
145 static struct connection *nodeid2con(int nodeid, gfp_t allocation)
146 {
147         struct connection *con = NULL;
148
149         down(&connections_lock);
150         if (nodeid >= conn_array_size) {
151                 int new_size = nodeid + NODE_INCREMENT;
152                 struct connection **new_conns;
153
154                 new_conns = kzalloc(sizeof(struct connection *) *
155                                     new_size, allocation);
156                 if (!new_conns)
157                         goto finish;
158
159                 memcpy(new_conns, connections,  sizeof(struct connection *) * conn_array_size);
160                 conn_array_size = new_size;
161                 kfree(connections);
162                 connections = new_conns;
163
164         }
165
166         con = connections[nodeid];
167         if (con == NULL && allocation) {
168                 con = kmem_cache_zalloc(con_cache, allocation);
169                 if (!con)
170                         goto finish;
171
172                 con->nodeid = nodeid;
173                 mutex_init(&con->sock_mutex);
174                 INIT_LIST_HEAD(&con->writequeue);
175                 spin_lock_init(&con->writequeue_lock);
176                 INIT_WORK(&con->swork, process_send_sockets);
177                 INIT_WORK(&con->rwork, process_recv_sockets);
178
179                 connections[nodeid] = con;
180         }
181
182 finish:
183         up(&connections_lock);
184         return con;
185 }
186
187 /* Data available on socket or listen socket received a connect */
188 static void lowcomms_data_ready(struct sock *sk, int count_unused)
189 {
190         struct connection *con = sock2con(sk);
191
192         if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
193                 queue_work(recv_workqueue, &con->rwork);
194 }
195
196 static void lowcomms_write_space(struct sock *sk)
197 {
198         struct connection *con = sock2con(sk);
199
200         if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
201                 queue_work(send_workqueue, &con->swork);
202 }
203
204 static inline void lowcomms_connect_sock(struct connection *con)
205 {
206         if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags))
207                 queue_work(send_workqueue, &con->swork);
208 }
209
210 static void lowcomms_state_change(struct sock *sk)
211 {
212         if (sk->sk_state == TCP_ESTABLISHED)
213                 lowcomms_write_space(sk);
214 }
215
216 /* Make a socket active */
217 static int add_sock(struct socket *sock, struct connection *con)
218 {
219         con->sock = sock;
220
221         /* Install a data_ready callback */
222         con->sock->sk->sk_data_ready = lowcomms_data_ready;
223         con->sock->sk->sk_write_space = lowcomms_write_space;
224         con->sock->sk->sk_state_change = lowcomms_state_change;
225
226         return 0;
227 }
228
229 /* Add the port number to an IP6 or 4 sockaddr and return the address
230    length */
231 static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
232                           int *addr_len)
233 {
234         saddr->ss_family =  dlm_local_addr.ss_family;
235         if (saddr->ss_family == AF_INET) {
236                 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
237                 in4_addr->sin_port = cpu_to_be16(port);
238                 *addr_len = sizeof(struct sockaddr_in);
239         } else {
240                 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
241                 in6_addr->sin6_port = cpu_to_be16(port);
242                 *addr_len = sizeof(struct sockaddr_in6);
243         }
244 }
245
246 /* Close a remote connection and tidy up */
247 static void close_connection(struct connection *con, bool and_other)
248 {
249         mutex_lock(&con->sock_mutex);
250
251         if (con->sock) {
252                 sock_release(con->sock);
253                 con->sock = NULL;
254         }
255         if (con->othercon && and_other) {
256                 /* Will only re-enter once. */
257                 close_connection(con->othercon, false);
258         }
259         if (con->rx_page) {
260                 __free_page(con->rx_page);
261                 con->rx_page = NULL;
262         }
263         con->retries = 0;
264         mutex_unlock(&con->sock_mutex);
265 }
266
267 /* Data received from remote end */
268 static int receive_from_sock(struct connection *con)
269 {
270         int ret = 0;
271         struct msghdr msg = {};
272         struct kvec iov[2];
273         unsigned len;
274         int r;
275         int call_again_soon = 0;
276         int nvec;
277
278         mutex_lock(&con->sock_mutex);
279
280         if (con->sock == NULL) {
281                 ret = -EAGAIN;
282                 goto out_close;
283         }
284
285         if (con->rx_page == NULL) {
286                 /*
287                  * This doesn't need to be atomic, but I think it should
288                  * improve performance if it is.
289                  */
290                 con->rx_page = alloc_page(GFP_ATOMIC);
291                 if (con->rx_page == NULL)
292                         goto out_resched;
293                 cbuf_init(&con->cb, PAGE_CACHE_SIZE);
294         }
295
296         /*
297          * iov[0] is the bit of the circular buffer between the current end
298          * point (cb.base + cb.len) and the end of the buffer.
299          */
300         iov[0].iov_len = con->cb.base - cbuf_data(&con->cb);
301         iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb);
302         nvec = 1;
303
304         /*
305          * iov[1] is the bit of the circular buffer between the start of the
306          * buffer and the start of the currently used section (cb.base)
307          */
308         if (cbuf_data(&con->cb) >= con->cb.base) {
309                 iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&con->cb);
310                 iov[1].iov_len = con->cb.base;
311                 iov[1].iov_base = page_address(con->rx_page);
312                 nvec = 2;
313         }
314         len = iov[0].iov_len + iov[1].iov_len;
315
316         r = ret = kernel_recvmsg(con->sock, &msg, iov, nvec, len,
317                                MSG_DONTWAIT | MSG_NOSIGNAL);
318
319         if (ret <= 0)
320                 goto out_close;
321         if (ret == -EAGAIN)
322                 goto out_resched;
323
324         if (ret == len)
325                 call_again_soon = 1;
326         cbuf_add(&con->cb, ret);
327         ret = dlm_process_incoming_buffer(con->nodeid,
328                                           page_address(con->rx_page),
329                                           con->cb.base, con->cb.len,
330                                           PAGE_CACHE_SIZE);
331         if (ret == -EBADMSG) {
332                 printk(KERN_INFO "dlm: lowcomms: addr=%p, base=%u, len=%u, "
333                        "iov_len=%u, iov_base[0]=%p, read=%d\n",
334                        page_address(con->rx_page), con->cb.base, con->cb.len,
335                        len, iov[0].iov_base, r);
336         }
337         if (ret < 0)
338                 goto out_close;
339         cbuf_eat(&con->cb, ret);
340
341         if (cbuf_empty(&con->cb) && !call_again_soon) {
342                 __free_page(con->rx_page);
343                 con->rx_page = NULL;
344         }
345
346         if (call_again_soon)
347                 goto out_resched;
348         mutex_unlock(&con->sock_mutex);
349         return 0;
350
351 out_resched:
352         if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
353                 queue_work(recv_workqueue, &con->rwork);
354         mutex_unlock(&con->sock_mutex);
355         return -EAGAIN;
356
357 out_close:
358         mutex_unlock(&con->sock_mutex);
359         if (ret != -EAGAIN && !test_bit(CF_IS_OTHERCON, &con->flags)) {
360                 close_connection(con, false);
361                 /* Reconnect when there is something to send */
362         }
363         /* Don't return success if we really got EOF */
364         if (ret == 0)
365                 ret = -EAGAIN;
366
367         return ret;
368 }
369
370 /* Listening socket is busy, accept a connection */
371 static int accept_from_sock(struct connection *con)
372 {
373         int result;
374         struct sockaddr_storage peeraddr;
375         struct socket *newsock;
376         int len;
377         int nodeid;
378         struct connection *newcon;
379         struct connection *addcon;
380
381         memset(&peeraddr, 0, sizeof(peeraddr));
382         result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM,
383                                   IPPROTO_TCP, &newsock);
384         if (result < 0)
385                 return -ENOMEM;
386
387         mutex_lock_nested(&con->sock_mutex, 0);
388
389         result = -ENOTCONN;
390         if (con->sock == NULL)
391                 goto accept_err;
392
393         newsock->type = con->sock->type;
394         newsock->ops = con->sock->ops;
395
396         result = con->sock->ops->accept(con->sock, newsock, O_NONBLOCK);
397         if (result < 0)
398                 goto accept_err;
399
400         /* Get the connected socket's peer */
401         memset(&peeraddr, 0, sizeof(peeraddr));
402         if (newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr,
403                                   &len, 2)) {
404                 result = -ECONNABORTED;
405                 goto accept_err;
406         }
407
408         /* Get the new node's NODEID */
409         make_sockaddr(&peeraddr, 0, &len);
410         if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) {
411                 printk("dlm: connect from non cluster node\n");
412                 sock_release(newsock);
413                 mutex_unlock(&con->sock_mutex);
414                 return -1;
415         }
416
417         log_print("got connection from %d", nodeid);
418
419         /*  Check to see if we already have a connection to this node. This
420          *  could happen if the two nodes initiate a connection at roughly
421          *  the same time and the connections cross on the wire.
422          * TEMPORARY FIX:
423          *  In this case we store the incoming one in "othercon"
424          */
425         newcon = nodeid2con(nodeid, GFP_KERNEL);
426         if (!newcon) {
427                 result = -ENOMEM;
428                 goto accept_err;
429         }
430         mutex_lock_nested(&newcon->sock_mutex, 1);
431         if (newcon->sock) {
432                 struct connection *othercon = newcon->othercon;
433
434                 if (!othercon) {
435                         othercon = kmem_cache_zalloc(con_cache, GFP_KERNEL);
436                         if (!othercon) {
437                                 printk("dlm: failed to allocate incoming socket\n");
438                                 mutex_unlock(&newcon->sock_mutex);
439                                 result = -ENOMEM;
440                                 goto accept_err;
441                         }
442                         othercon->nodeid = nodeid;
443                         othercon->rx_action = receive_from_sock;
444                         mutex_init(&othercon->sock_mutex);
445                         INIT_WORK(&othercon->swork, process_send_sockets);
446                         INIT_WORK(&othercon->rwork, process_recv_sockets);
447                         set_bit(CF_IS_OTHERCON, &othercon->flags);
448                         newcon->othercon = othercon;
449                 }
450                 othercon->sock = newsock;
451                 newsock->sk->sk_user_data = othercon;
452                 add_sock(newsock, othercon);
453                 addcon = othercon;
454         }
455         else {
456                 newsock->sk->sk_user_data = newcon;
457                 newcon->rx_action = receive_from_sock;
458                 add_sock(newsock, newcon);
459                 addcon = newcon;
460         }
461
462         mutex_unlock(&newcon->sock_mutex);
463
464         /*
465          * Add it to the active queue in case we got data
466          * beween processing the accept adding the socket
467          * to the read_sockets list
468          */
469         if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
470                 queue_work(recv_workqueue, &addcon->rwork);
471         mutex_unlock(&con->sock_mutex);
472
473         return 0;
474
475 accept_err:
476         mutex_unlock(&con->sock_mutex);
477         sock_release(newsock);
478
479         if (result != -EAGAIN)
480                 printk("dlm: error accepting connection from node: %d\n", result);
481         return result;
482 }
483
484 /* Connect a new socket to its peer */
485 static void connect_to_sock(struct connection *con)
486 {
487         int result = -EHOSTUNREACH;
488         struct sockaddr_storage saddr;
489         int addr_len;
490         struct socket *sock;
491
492         if (con->nodeid == 0) {
493                 log_print("attempt to connect sock 0 foiled");
494                 return;
495         }
496
497         mutex_lock(&con->sock_mutex);
498         if (con->retries++ > MAX_CONNECT_RETRIES)
499                 goto out;
500
501         /* Some odd races can cause double-connects, ignore them */
502         if (con->sock) {
503                 result = 0;
504                 goto out;
505         }
506
507         /* Create a socket to communicate with */
508         result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM,
509                                   IPPROTO_TCP, &sock);
510         if (result < 0)
511                 goto out_err;
512
513         memset(&saddr, 0, sizeof(saddr));
514         if (dlm_nodeid_to_addr(con->nodeid, &saddr))
515                 goto out_err;
516
517         sock->sk->sk_user_data = con;
518         con->rx_action = receive_from_sock;
519
520         make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len);
521
522         add_sock(sock, con);
523
524         log_print("connecting to %d", con->nodeid);
525         result =
526                 sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
527                                    O_NONBLOCK);
528         if (result == -EINPROGRESS)
529                 result = 0;
530         if (result == 0)
531                 goto out;
532
533 out_err:
534         if (con->sock) {
535                 sock_release(con->sock);
536                 con->sock = NULL;
537         }
538         /*
539          * Some errors are fatal and this list might need adjusting. For other
540          * errors we try again until the max number of retries is reached.
541          */
542         if (result != -EHOSTUNREACH && result != -ENETUNREACH &&
543             result != -ENETDOWN && result != EINVAL
544             && result != -EPROTONOSUPPORT) {
545                 lowcomms_connect_sock(con);
546                 result = 0;
547         }
548 out:
549         mutex_unlock(&con->sock_mutex);
550         return;
551 }
552
553 static struct socket *create_listen_sock(struct connection *con,
554                                          struct sockaddr_storage *saddr)
555 {
556         struct socket *sock = NULL;
557         mm_segment_t fs;
558         int result = 0;
559         int one = 1;
560         int addr_len;
561
562         if (dlm_local_addr.ss_family == AF_INET)
563                 addr_len = sizeof(struct sockaddr_in);
564         else
565                 addr_len = sizeof(struct sockaddr_in6);
566
567         /* Create a socket to communicate with */
568         result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, IPPROTO_TCP, &sock);
569         if (result < 0) {
570                 printk("dlm: Can't create listening comms socket\n");
571                 goto create_out;
572         }
573
574         fs = get_fs();
575         set_fs(get_ds());
576         result = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
577                                  (char *)&one, sizeof(one));
578         set_fs(fs);
579         if (result < 0) {
580                 printk("dlm: Failed to set SO_REUSEADDR on socket: result=%d\n",
581                        result);
582         }
583         sock->sk->sk_user_data = con;
584         con->rx_action = accept_from_sock;
585         con->sock = sock;
586
587         /* Bind to our port */
588         make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len);
589         result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len);
590         if (result < 0) {
591                 printk("dlm: Can't bind to port %d\n", dlm_config.ci_tcp_port);
592                 sock_release(sock);
593                 sock = NULL;
594                 con->sock = NULL;
595                 goto create_out;
596         }
597
598         fs = get_fs();
599         set_fs(get_ds());
600
601         result = sock_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
602                                  (char *)&one, sizeof(one));
603         set_fs(fs);
604         if (result < 0) {
605                 printk("dlm: Set keepalive failed: %d\n", result);
606         }
607
608         result = sock->ops->listen(sock, 5);
609         if (result < 0) {
610                 printk("dlm: Can't listen on port %d\n", dlm_config.ci_tcp_port);
611                 sock_release(sock);
612                 sock = NULL;
613                 goto create_out;
614         }
615
616 create_out:
617         return sock;
618 }
619
620
621 /* Listen on all interfaces */
622 static int listen_for_all(void)
623 {
624         struct socket *sock = NULL;
625         struct connection *con = nodeid2con(0, GFP_KERNEL);
626         int result = -EINVAL;
627
628         /* We don't support multi-homed hosts */
629         set_bit(CF_IS_OTHERCON, &con->flags);
630
631         sock = create_listen_sock(con, &dlm_local_addr);
632         if (sock) {
633                 add_sock(sock, con);
634                 result = 0;
635         }
636         else {
637                 result = -EADDRINUSE;
638         }
639
640         return result;
641 }
642
643
644
645 static struct writequeue_entry *new_writequeue_entry(struct connection *con,
646                                                      gfp_t allocation)
647 {
648         struct writequeue_entry *entry;
649
650         entry = kmalloc(sizeof(struct writequeue_entry), allocation);
651         if (!entry)
652                 return NULL;
653
654         entry->page = alloc_page(allocation);
655         if (!entry->page) {
656                 kfree(entry);
657                 return NULL;
658         }
659
660         entry->offset = 0;
661         entry->len = 0;
662         entry->end = 0;
663         entry->users = 0;
664         entry->con = con;
665
666         return entry;
667 }
668
669 void *dlm_lowcomms_get_buffer(int nodeid, int len,
670                               gfp_t allocation, char **ppc)
671 {
672         struct connection *con;
673         struct writequeue_entry *e;
674         int offset = 0;
675         int users = 0;
676
677         con = nodeid2con(nodeid, allocation);
678         if (!con)
679                 return NULL;
680
681         spin_lock(&con->writequeue_lock);
682         e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
683         if ((&e->list == &con->writequeue) ||
684             (PAGE_CACHE_SIZE - e->end < len)) {
685                 e = NULL;
686         } else {
687                 offset = e->end;
688                 e->end += len;
689                 users = e->users++;
690         }
691         spin_unlock(&con->writequeue_lock);
692
693         if (e) {
694         got_one:
695                 if (users == 0)
696                         kmap(e->page);
697                 *ppc = page_address(e->page) + offset;
698                 return e;
699         }
700
701         e = new_writequeue_entry(con, allocation);
702         if (e) {
703                 spin_lock(&con->writequeue_lock);
704                 offset = e->end;
705                 e->end += len;
706                 users = e->users++;
707                 list_add_tail(&e->list, &con->writequeue);
708                 spin_unlock(&con->writequeue_lock);
709                 goto got_one;
710         }
711         return NULL;
712 }
713
714 void dlm_lowcomms_commit_buffer(void *mh)
715 {
716         struct writequeue_entry *e = (struct writequeue_entry *)mh;
717         struct connection *con = e->con;
718         int users;
719
720         spin_lock(&con->writequeue_lock);
721         users = --e->users;
722         if (users)
723                 goto out;
724         e->len = e->end - e->offset;
725         kunmap(e->page);
726         spin_unlock(&con->writequeue_lock);
727
728         if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) {
729                 queue_work(send_workqueue, &con->swork);
730         }
731         return;
732
733 out:
734         spin_unlock(&con->writequeue_lock);
735         return;
736 }
737
738 static void free_entry(struct writequeue_entry *e)
739 {
740         __free_page(e->page);
741         kfree(e);
742 }
743
744 /* Send a message */
745 static void send_to_sock(struct connection *con)
746 {
747         int ret = 0;
748         ssize_t(*sendpage) (struct socket *, struct page *, int, size_t, int);
749         const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
750         struct writequeue_entry *e;
751         int len, offset;
752
753         mutex_lock(&con->sock_mutex);
754         if (con->sock == NULL)
755                 goto out_connect;
756
757         sendpage = con->sock->ops->sendpage;
758
759         spin_lock(&con->writequeue_lock);
760         for (;;) {
761                 e = list_entry(con->writequeue.next, struct writequeue_entry,
762                                list);
763                 if ((struct list_head *) e == &con->writequeue)
764                         break;
765
766                 len = e->len;
767                 offset = e->offset;
768                 BUG_ON(len == 0 && e->users == 0);
769                 spin_unlock(&con->writequeue_lock);
770                 kmap(e->page);
771
772                 ret = 0;
773                 if (len) {
774                         ret = sendpage(con->sock, e->page, offset, len,
775                                        msg_flags);
776                         if (ret == -EAGAIN || ret == 0)
777                                 goto out;
778                         if (ret <= 0)
779                                 goto send_error;
780                 }
781                 else {
782                         /* Don't starve people filling buffers */
783                         cond_resched();
784                 }
785
786                 spin_lock(&con->writequeue_lock);
787                 e->offset += ret;
788                 e->len -= ret;
789
790                 if (e->len == 0 && e->users == 0) {
791                         list_del(&e->list);
792                         kunmap(e->page);
793                         free_entry(e);
794                         continue;
795                 }
796         }
797         spin_unlock(&con->writequeue_lock);
798 out:
799         mutex_unlock(&con->sock_mutex);
800         return;
801
802 send_error:
803         mutex_unlock(&con->sock_mutex);
804         close_connection(con, false);
805         lowcomms_connect_sock(con);
806         return;
807
808 out_connect:
809         mutex_unlock(&con->sock_mutex);
810         connect_to_sock(con);
811         return;
812 }
813
814 static void clean_one_writequeue(struct connection *con)
815 {
816         struct list_head *list;
817         struct list_head *temp;
818
819         spin_lock(&con->writequeue_lock);
820         list_for_each_safe(list, temp, &con->writequeue) {
821                 struct writequeue_entry *e =
822                         list_entry(list, struct writequeue_entry, list);
823                 list_del(&e->list);
824                 free_entry(e);
825         }
826         spin_unlock(&con->writequeue_lock);
827 }
828
829 /* Called from recovery when it knows that a node has
830    left the cluster */
831 int dlm_lowcomms_close(int nodeid)
832 {
833         struct connection *con;
834
835         if (!connections)
836                 goto out;
837
838         log_print("closing connection to node %d", nodeid);
839         con = nodeid2con(nodeid, 0);
840         if (con) {
841                 clean_one_writequeue(con);
842                 close_connection(con, true);
843         }
844         return 0;
845
846 out:
847         return -1;
848 }
849
850 /* Look for activity on active sockets */
851 static void process_recv_sockets(struct work_struct *work)
852 {
853         struct connection *con = container_of(work, struct connection, rwork);
854         int err;
855
856         clear_bit(CF_READ_PENDING, &con->flags);
857         do {
858                 err = con->rx_action(con);
859         } while (!err);
860 }
861
862
863 static void process_send_sockets(struct work_struct *work)
864 {
865         struct connection *con = container_of(work, struct connection, swork);
866
867         if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) {
868                 connect_to_sock(con);
869         }
870
871         clear_bit(CF_WRITE_PENDING, &con->flags);
872         send_to_sock(con);
873 }
874
875
876 /* Discard all entries on the write queues */
877 static void clean_writequeues(void)
878 {
879         int nodeid;
880
881         for (nodeid = 1; nodeid < conn_array_size; nodeid++) {
882                 struct connection *con = nodeid2con(nodeid, 0);
883
884                 if (con)
885                         clean_one_writequeue(con);
886         }
887 }
888
889 static void work_stop(void)
890 {
891         destroy_workqueue(recv_workqueue);
892         destroy_workqueue(send_workqueue);
893 }
894
895 static int work_start(void)
896 {
897         int error;
898         recv_workqueue = create_workqueue("dlm_recv");
899         error = IS_ERR(recv_workqueue);
900         if (error) {
901                 log_print("can't start dlm_recv %d", error);
902                 return error;
903         }
904
905         send_workqueue = create_singlethread_workqueue("dlm_send");
906         error = IS_ERR(send_workqueue);
907         if (error) {
908                 log_print("can't start dlm_send %d", error);
909                 destroy_workqueue(recv_workqueue);
910                 return error;
911         }
912
913         return 0;
914 }
915
916 void dlm_lowcomms_stop(void)
917 {
918         int i;
919
920         /* Set all the flags to prevent any
921            socket activity.
922         */
923         for (i = 0; i < conn_array_size; i++) {
924                 if (connections[i])
925                         connections[i]->flags |= 0xFF;
926         }
927
928         work_stop();
929         clean_writequeues();
930
931         for (i = 0; i < conn_array_size; i++) {
932                 if (connections[i]) {
933                         close_connection(connections[i], true);
934                         if (connections[i]->othercon)
935                                 kmem_cache_free(con_cache, connections[i]->othercon);
936                         kmem_cache_free(con_cache, connections[i]);
937                 }
938         }
939
940         kfree(connections);
941         connections = NULL;
942
943         kmem_cache_destroy(con_cache);
944 }
945
946 /* This is quite likely to sleep... */
947 int dlm_lowcomms_start(void)
948 {
949         int error = 0;
950
951         error = -ENOMEM;
952         connections = kzalloc(sizeof(struct connection *) *
953                               NODE_INCREMENT, GFP_KERNEL);
954         if (!connections)
955                 goto out;
956
957         conn_array_size = NODE_INCREMENT;
958
959         if (dlm_our_addr(&dlm_local_addr, 0)) {
960                 log_print("no local IP address has been set");
961                 goto fail_free_conn;
962         }
963         if (!dlm_our_addr(&dlm_local_addr, 1)) {
964                 log_print("This dlm comms module does not support multi-homed clustering");
965                 goto fail_free_conn;
966         }
967
968         con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection),
969                                       __alignof__(struct connection), 0,
970                                       NULL, NULL);
971         if (!con_cache)
972                 goto fail_free_conn;
973
974
975         /* Start listening */
976         error = listen_for_all();
977         if (error)
978                 goto fail_unlisten;
979
980         error = work_start();
981         if (error)
982                 goto fail_unlisten;
983
984         return 0;
985
986 fail_unlisten:
987         close_connection(connections[0], false);
988         kmem_cache_free(con_cache, connections[0]);
989         kmem_cache_destroy(con_cache);
990
991 fail_free_conn:
992         kfree(connections);
993
994 out:
995         return error;
996 }
997
998 /*
999  * Overrides for Emacs so that we follow Linus's tabbing style.
1000  * Emacs will notice this stuff at the end of the file and automatically
1001  * adjust the settings for this buffer only.  This must remain at the end
1002  * of the file.
1003  * ---------------------------------------------------------------------------
1004  * Local variables:
1005  * c-file-style: "linux"
1006  * End:
1007  */