Merge branch 'ab/config-based-hooks-base' into seen
[git] / compat / simple-ipc / ipc-unix-socket.c
1 #include "cache.h"
2 #include "simple-ipc.h"
3 #include "strbuf.h"
4 #include "pkt-line.h"
5 #include "thread-utils.h"
6 #include "unix-socket.h"
7 #include "unix-stream-server.h"
8
9 #ifndef SUPPORTS_SIMPLE_IPC
10 /*
11  * This source file should only be compiled when Simple IPC is supported.
12  * See the top-level Makefile.
13  */
14 #error SUPPORTS_SIMPLE_IPC not defined
15 #endif
16
17 enum ipc_active_state ipc_get_active_state(const char *path)
18 {
19         enum ipc_active_state state = IPC_STATE__OTHER_ERROR;
20         struct ipc_client_connect_options options
21                 = IPC_CLIENT_CONNECT_OPTIONS_INIT;
22         struct stat st;
23         struct ipc_client_connection *connection_test = NULL;
24
25         options.wait_if_busy = 0;
26         options.wait_if_not_found = 0;
27
28         if (lstat(path, &st) == -1) {
29                 switch (errno) {
30                 case ENOENT:
31                 case ENOTDIR:
32                         return IPC_STATE__NOT_LISTENING;
33                 default:
34                         return IPC_STATE__INVALID_PATH;
35                 }
36         }
37
38         /* also complain if a plain file is in the way */
39         if ((st.st_mode & S_IFMT) != S_IFSOCK)
40                 return IPC_STATE__INVALID_PATH;
41
42         /*
43          * Just because the filesystem has a S_IFSOCK type inode
44          * at `path`, doesn't mean it that there is a server listening.
45          * Ping it to be sure.
46          */
47         state = ipc_client_try_connect(path, &options, &connection_test);
48         ipc_client_close_connection(connection_test);
49
50         return state;
51 }
52
53 /*
54  * Retry frequency when trying to connect to a server.
55  *
56  * This value should be short enough that we don't seriously delay our
57  * caller, but not fast enough that our spinning puts pressure on the
58  * system.
59  */
60 #define WAIT_STEP_MS (50)
61
62 /*
63  * Try to connect to the server.  If the server is just starting up or
64  * is very busy, we may not get a connection the first time.
65  */
66 static enum ipc_active_state connect_to_server(
67         const char *path,
68         int timeout_ms,
69         const struct ipc_client_connect_options *options,
70         int *pfd)
71 {
72         int k;
73
74         *pfd = -1;
75
76         for (k = 0; k < timeout_ms; k += WAIT_STEP_MS) {
77                 int fd = unix_stream_connect(path, options->uds_disallow_chdir);
78
79                 if (fd != -1) {
80                         *pfd = fd;
81                         return IPC_STATE__LISTENING;
82                 }
83
84                 if (errno == ENOENT) {
85                         if (!options->wait_if_not_found)
86                                 return IPC_STATE__PATH_NOT_FOUND;
87
88                         goto sleep_and_try_again;
89                 }
90
91                 if (errno == ETIMEDOUT) {
92                         if (!options->wait_if_busy)
93                                 return IPC_STATE__NOT_LISTENING;
94
95                         goto sleep_and_try_again;
96                 }
97
98                 if (errno == ECONNREFUSED) {
99                         if (!options->wait_if_busy)
100                                 return IPC_STATE__NOT_LISTENING;
101
102                         goto sleep_and_try_again;
103                 }
104
105                 return IPC_STATE__OTHER_ERROR;
106
107         sleep_and_try_again:
108                 sleep_millisec(WAIT_STEP_MS);
109         }
110
111         return IPC_STATE__NOT_LISTENING;
112 }
113
114 /*
115  * The total amount of time that we are willing to wait when trying to
116  * connect to a server.
117  *
118  * When the server is first started, it might take a little while for
119  * it to become ready to service requests.  Likewise, the server may
120  * be very (temporarily) busy and not respond to our connections.
121  *
122  * We should gracefully and silently handle those conditions and try
123  * again for a reasonable time period.
124  *
125  * The value chosen here should be long enough for the server
126  * to reliably heal from the above conditions.
127  */
128 #define MY_CONNECTION_TIMEOUT_MS (1000)
129
130 enum ipc_active_state ipc_client_try_connect(
131         const char *path,
132         const struct ipc_client_connect_options *options,
133         struct ipc_client_connection **p_connection)
134 {
135         enum ipc_active_state state = IPC_STATE__OTHER_ERROR;
136         int fd = -1;
137
138         *p_connection = NULL;
139
140         trace2_region_enter("ipc-client", "try-connect", NULL);
141         trace2_data_string("ipc-client", NULL, "try-connect/path", path);
142
143         state = connect_to_server(path, MY_CONNECTION_TIMEOUT_MS,
144                                   options, &fd);
145
146         trace2_data_intmax("ipc-client", NULL, "try-connect/state",
147                            (intmax_t)state);
148         trace2_region_leave("ipc-client", "try-connect", NULL);
149
150         if (state == IPC_STATE__LISTENING) {
151                 (*p_connection) = xcalloc(1, sizeof(struct ipc_client_connection));
152                 (*p_connection)->fd = fd;
153         }
154
155         return state;
156 }
157
158 void ipc_client_close_connection(struct ipc_client_connection *connection)
159 {
160         if (!connection)
161                 return;
162
163         if (connection->fd != -1)
164                 close(connection->fd);
165
166         free(connection);
167 }
168
169 int ipc_client_send_command_to_connection(
170         struct ipc_client_connection *connection,
171         const char *message, size_t message_len,
172         struct strbuf *answer)
173 {
174         int ret = 0;
175
176         strbuf_setlen(answer, 0);
177
178         trace2_region_enter("ipc-client", "send-command", NULL);
179
180         if (write_packetized_from_buf_no_flush(message, message_len,
181                                                connection->fd) < 0 ||
182             packet_flush_gently(connection->fd) < 0) {
183                 ret = error(_("could not send IPC command"));
184                 goto done;
185         }
186
187         if (read_packetized_to_strbuf(
188                     connection->fd, answer,
189                     PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR) < 0) {
190                 ret = error(_("could not read IPC response"));
191                 goto done;
192         }
193
194 done:
195         trace2_region_leave("ipc-client", "send-command", NULL);
196         return ret;
197 }
198
199 int ipc_client_send_command(const char *path,
200                             const struct ipc_client_connect_options *options,
201                             const char *message, size_t message_len,
202                             struct strbuf *answer)
203 {
204         int ret = -1;
205         enum ipc_active_state state;
206         struct ipc_client_connection *connection = NULL;
207
208         state = ipc_client_try_connect(path, options, &connection);
209
210         if (state != IPC_STATE__LISTENING)
211                 return ret;
212
213         ret = ipc_client_send_command_to_connection(connection,
214                                                     message, message_len,
215                                                     answer);
216
217         ipc_client_close_connection(connection);
218
219         return ret;
220 }
221
222 static int set_socket_blocking_flag(int fd, int make_nonblocking)
223 {
224         int flags;
225
226         flags = fcntl(fd, F_GETFL, NULL);
227
228         if (flags < 0)
229                 return -1;
230
231         if (make_nonblocking)
232                 flags |= O_NONBLOCK;
233         else
234                 flags &= ~O_NONBLOCK;
235
236         return fcntl(fd, F_SETFL, flags);
237 }
238
239 /*
240  * Magic numbers used to annotate callback instance data.
241  * These are used to help guard against accidentally passing the
242  * wrong instance data across multiple levels of callbacks (which
243  * is easy to do if there are `void*` arguments).
244  */
245 enum magic {
246         MAGIC_SERVER_REPLY_DATA,
247         MAGIC_WORKER_THREAD_DATA,
248         MAGIC_ACCEPT_THREAD_DATA,
249         MAGIC_SERVER_DATA,
250 };
251
252 struct ipc_server_reply_data {
253         enum magic magic;
254         int fd;
255         struct ipc_worker_thread_data *worker_thread_data;
256 };
257
258 struct ipc_worker_thread_data {
259         enum magic magic;
260         struct ipc_worker_thread_data *next_thread;
261         struct ipc_server_data *server_data;
262         pthread_t pthread_id;
263 };
264
265 struct ipc_accept_thread_data {
266         enum magic magic;
267         struct ipc_server_data *server_data;
268
269         struct unix_ss_socket *server_socket;
270
271         int fd_send_shutdown;
272         int fd_wait_shutdown;
273         pthread_t pthread_id;
274 };
275
276 /*
277  * With unix-sockets, the conceptual "ipc-server" is implemented as a single
278  * controller "accept-thread" thread and a pool of "worker-thread" threads.
279  * The former does the usual `accept()` loop and dispatches connections
280  * to an idle worker thread.  The worker threads wait in an idle loop for
281  * a new connection, communicate with the client and relay data to/from
282  * the `application_cb` and then wait for another connection from the
283  * server thread.  This avoids the overhead of constantly creating and
284  * destroying threads.
285  */
286 struct ipc_server_data {
287         enum magic magic;
288         ipc_server_application_cb *application_cb;
289         void *application_data;
290         struct strbuf buf_path;
291
292         struct ipc_accept_thread_data *accept_thread;
293         struct ipc_worker_thread_data *worker_thread_list;
294
295         pthread_mutex_t work_available_mutex;
296         pthread_cond_t work_available_cond;
297
298         /*
299          * Accepted but not yet processed client connections are kept
300          * in a circular buffer FIFO.  The queue is empty when the
301          * positions are equal.
302          */
303         int *fifo_fds;
304         int queue_size;
305         int back_pos;
306         int front_pos;
307
308         int shutdown_requested;
309         int is_stopped;
310 };
311
312 /*
313  * Remove and return the oldest queued connection.
314  *
315  * Returns -1 if empty.
316  */
317 static int fifo_dequeue(struct ipc_server_data *server_data)
318 {
319         /* ASSERT holding mutex */
320
321         int fd;
322
323         if (server_data->back_pos == server_data->front_pos)
324                 return -1;
325
326         fd = server_data->fifo_fds[server_data->front_pos];
327         server_data->fifo_fds[server_data->front_pos] = -1;
328
329         server_data->front_pos++;
330         if (server_data->front_pos == server_data->queue_size)
331                 server_data->front_pos = 0;
332
333         return fd;
334 }
335
336 /*
337  * Push a new fd onto the back of the queue.
338  *
339  * Drop it and return -1 if queue is already full.
340  */
341 static int fifo_enqueue(struct ipc_server_data *server_data, int fd)
342 {
343         /* ASSERT holding mutex */
344
345         int next_back_pos;
346
347         next_back_pos = server_data->back_pos + 1;
348         if (next_back_pos == server_data->queue_size)
349                 next_back_pos = 0;
350
351         if (next_back_pos == server_data->front_pos) {
352                 /* Queue is full. Just drop it. */
353                 close(fd);
354                 return -1;
355         }
356
357         server_data->fifo_fds[server_data->back_pos] = fd;
358         server_data->back_pos = next_back_pos;
359
360         return fd;
361 }
362
363 /*
364  * Wait for a connection to be queued to the FIFO and return it.
365  *
366  * Returns -1 if someone has already requested a shutdown.
367  */
368 static int worker_thread__wait_for_connection(
369         struct ipc_worker_thread_data *worker_thread_data)
370 {
371         /* ASSERT NOT holding mutex */
372
373         struct ipc_server_data *server_data = worker_thread_data->server_data;
374         int fd = -1;
375
376         pthread_mutex_lock(&server_data->work_available_mutex);
377         for (;;) {
378                 if (server_data->shutdown_requested)
379                         break;
380
381                 fd = fifo_dequeue(server_data);
382                 if (fd >= 0)
383                         break;
384
385                 pthread_cond_wait(&server_data->work_available_cond,
386                                   &server_data->work_available_mutex);
387         }
388         pthread_mutex_unlock(&server_data->work_available_mutex);
389
390         return fd;
391 }
392
393 /*
394  * Forward declare our reply callback function so that any compiler
395  * errors are reported when we actually define the function (in addition
396  * to any errors reported when we try to pass this callback function as
397  * a parameter in a function call).  The former are easier to understand.
398  */
399 static ipc_server_reply_cb do_io_reply_callback;
400
401 /*
402  * Relay application's response message to the client process.
403  * (We do not flush at this point because we allow the caller
404  * to chunk data to the client thru us.)
405  */
406 static int do_io_reply_callback(struct ipc_server_reply_data *reply_data,
407                        const char *response, size_t response_len)
408 {
409         if (reply_data->magic != MAGIC_SERVER_REPLY_DATA)
410                 BUG("reply_cb called with wrong instance data");
411
412         return write_packetized_from_buf_no_flush(response, response_len,
413                                                   reply_data->fd);
414 }
415
416 /* A randomly chosen value. */
417 #define MY_WAIT_POLL_TIMEOUT_MS (10)
418
419 /*
420  * If the client hangs up without sending any data on the wire, just
421  * quietly close the socket and ignore this client.
422  *
423  * This worker thread is committed to reading the IPC request data
424  * from the client at the other end of this fd.  Wait here for the
425  * client to actually put something on the wire -- because if the
426  * client just does a ping (connect and hangup without sending any
427  * data), our use of the pkt-line read routines will spew an error
428  * message.
429  *
430  * Return -1 if the client hung up.
431  * Return 0 if data (possibly incomplete) is ready.
432  */
433 static int worker_thread__wait_for_io_start(
434         struct ipc_worker_thread_data *worker_thread_data,
435         int fd)
436 {
437         struct ipc_server_data *server_data = worker_thread_data->server_data;
438         struct pollfd pollfd[1];
439         int result;
440
441         for (;;) {
442                 pollfd[0].fd = fd;
443                 pollfd[0].events = POLLIN;
444
445                 result = poll(pollfd, 1, MY_WAIT_POLL_TIMEOUT_MS);
446                 if (result < 0) {
447                         if (errno == EINTR)
448                                 continue;
449                         goto cleanup;
450                 }
451
452                 if (result == 0) {
453                         /* a timeout */
454
455                         int in_shutdown;
456
457                         pthread_mutex_lock(&server_data->work_available_mutex);
458                         in_shutdown = server_data->shutdown_requested;
459                         pthread_mutex_unlock(&server_data->work_available_mutex);
460
461                         /*
462                          * If a shutdown is already in progress and this
463                          * client has not started talking yet, just drop it.
464                          */
465                         if (in_shutdown)
466                                 goto cleanup;
467                         continue;
468                 }
469
470                 if (pollfd[0].revents & POLLHUP)
471                         goto cleanup;
472
473                 if (pollfd[0].revents & POLLIN)
474                         return 0;
475
476                 goto cleanup;
477         }
478
479 cleanup:
480         close(fd);
481         return -1;
482 }
483
484 /*
485  * Receive the request/command from the client and pass it to the
486  * registered request-callback.  The request-callback will compose
487  * a response and call our reply-callback to send it to the client.
488  */
489 static int worker_thread__do_io(
490         struct ipc_worker_thread_data *worker_thread_data,
491         int fd)
492 {
493         /* ASSERT NOT holding lock */
494
495         struct strbuf buf = STRBUF_INIT;
496         struct ipc_server_reply_data reply_data;
497         int ret = 0;
498
499         reply_data.magic = MAGIC_SERVER_REPLY_DATA;
500         reply_data.worker_thread_data = worker_thread_data;
501
502         reply_data.fd = fd;
503
504         ret = read_packetized_to_strbuf(
505                 reply_data.fd, &buf,
506                 PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR);
507         if (ret >= 0) {
508                 ret = worker_thread_data->server_data->application_cb(
509                         worker_thread_data->server_data->application_data,
510                         buf.buf, buf.len, do_io_reply_callback, &reply_data);
511
512                 packet_flush_gently(reply_data.fd);
513         }
514         else {
515                 /*
516                  * The client probably disconnected/shutdown before it
517                  * could send a well-formed message.  Ignore it.
518                  */
519         }
520
521         strbuf_release(&buf);
522         close(reply_data.fd);
523
524         return ret;
525 }
526
527 /*
528  * Block SIGPIPE on the current thread (so that we get EPIPE from
529  * write() rather than an actual signal).
530  *
531  * Note that using sigchain_push() and _pop() to control SIGPIPE
532  * around our IO calls is not thread safe:
533  * [] It uses a global stack of handler frames.
534  * [] It uses ALLOC_GROW() to resize it.
535  * [] Finally, according to the `signal(2)` man-page:
536  *    "The effects of `signal()` in a multithreaded process are unspecified."
537  */
538 static void thread_block_sigpipe(sigset_t *old_set)
539 {
540         sigset_t new_set;
541
542         sigemptyset(&new_set);
543         sigaddset(&new_set, SIGPIPE);
544
545         sigemptyset(old_set);
546         pthread_sigmask(SIG_BLOCK, &new_set, old_set);
547 }
548
549 /*
550  * Thread proc for an IPC worker thread.  It handles a series of
551  * connections from clients.  It pulls the next fd from the queue
552  * processes it, and then waits for the next client.
553  *
554  * Block SIGPIPE in this worker thread for the life of the thread.
555  * This avoids stray (and sometimes delayed) SIGPIPE signals caused
556  * by client errors and/or when we are under extremely heavy IO load.
557  *
558  * This means that the application callback will have SIGPIPE blocked.
559  * The callback should not change it.
560  */
561 static void *worker_thread_proc(void *_worker_thread_data)
562 {
563         struct ipc_worker_thread_data *worker_thread_data = _worker_thread_data;
564         struct ipc_server_data *server_data = worker_thread_data->server_data;
565         sigset_t old_set;
566         int fd, io;
567         int ret;
568
569         trace2_thread_start("ipc-worker");
570
571         thread_block_sigpipe(&old_set);
572
573         for (;;) {
574                 fd = worker_thread__wait_for_connection(worker_thread_data);
575                 if (fd == -1)
576                         break; /* in shutdown */
577
578                 io = worker_thread__wait_for_io_start(worker_thread_data, fd);
579                 if (io == -1)
580                         continue; /* client hung up without sending anything */
581
582                 ret = worker_thread__do_io(worker_thread_data, fd);
583
584                 if (ret == SIMPLE_IPC_QUIT) {
585                         trace2_data_string("ipc-worker", NULL, "queue_stop_async",
586                                            "application_quit");
587                         /*
588                          * The application layer is telling the ipc-server
589                          * layer to shutdown.
590                          *
591                          * We DO NOT have a response to send to the client.
592                          *
593                          * Queue an async stop (to stop the other threads) and
594                          * allow this worker thread to exit now (no sense waiting
595                          * for the thread-pool shutdown signal).
596                          *
597                          * Other non-idle worker threads are allowed to finish
598                          * responding to their current clients.
599                          */
600                         ipc_server_stop_async(server_data);
601                         break;
602                 }
603         }
604
605         trace2_thread_exit();
606         return NULL;
607 }
608
609 /* A randomly chosen value. */
610 #define MY_ACCEPT_POLL_TIMEOUT_MS (60 * 1000)
611
612 /*
613  * Accept a new client connection on our socket.  This uses non-blocking
614  * IO so that we can also wait for shutdown requests on our socket-pair
615  * without actually spinning on a fast timeout.
616  */
617 static int accept_thread__wait_for_connection(
618         struct ipc_accept_thread_data *accept_thread_data)
619 {
620         struct pollfd pollfd[2];
621         int result;
622
623         for (;;) {
624                 pollfd[0].fd = accept_thread_data->fd_wait_shutdown;
625                 pollfd[0].events = POLLIN;
626
627                 pollfd[1].fd = accept_thread_data->server_socket->fd_socket;
628                 pollfd[1].events = POLLIN;
629
630                 result = poll(pollfd, 2, MY_ACCEPT_POLL_TIMEOUT_MS);
631                 if (result < 0) {
632                         if (errno == EINTR)
633                                 continue;
634                         return result;
635                 }
636
637                 if (result == 0) {
638                         /* a timeout */
639
640                         /*
641                          * If someone deletes or force-creates a new unix
642                          * domain socket at our path, all future clients
643                          * will be routed elsewhere and we silently starve.
644                          * If that happens, just queue a shutdown.
645                          */
646                         if (unix_ss_was_stolen(
647                                     accept_thread_data->server_socket)) {
648                                 trace2_data_string("ipc-accept", NULL,
649                                                    "queue_stop_async",
650                                                    "socket_stolen");
651                                 ipc_server_stop_async(
652                                         accept_thread_data->server_data);
653                         }
654                         continue;
655                 }
656
657                 if (pollfd[0].revents & POLLIN) {
658                         /* shutdown message queued to socketpair */
659                         return -1;
660                 }
661
662                 if (pollfd[1].revents & POLLIN) {
663                         /* a connection is available on server_socket */
664
665                         int client_fd =
666                                 accept(accept_thread_data->server_socket->fd_socket,
667                                        NULL, NULL);
668                         if (client_fd >= 0)
669                                 return client_fd;
670
671                         /*
672                          * An error here is unlikely -- it probably
673                          * indicates that the connecting process has
674                          * already dropped the connection.
675                          */
676                         continue;
677                 }
678
679                 BUG("unandled poll result errno=%d r[0]=%d r[1]=%d",
680                     errno, pollfd[0].revents, pollfd[1].revents);
681         }
682 }
683
684 /*
685  * Thread proc for the IPC server "accept thread".  This waits for
686  * an incoming socket connection, appends it to the queue of available
687  * connections, and notifies a worker thread to process it.
688  *
689  * Block SIGPIPE in this thread for the life of the thread.  This
690  * avoids any stray SIGPIPE signals when closing pipe fds under
691  * extremely heavy loads (such as when the fifo queue is full and we
692  * drop incomming connections).
693  */
694 static void *accept_thread_proc(void *_accept_thread_data)
695 {
696         struct ipc_accept_thread_data *accept_thread_data = _accept_thread_data;
697         struct ipc_server_data *server_data = accept_thread_data->server_data;
698         sigset_t old_set;
699
700         trace2_thread_start("ipc-accept");
701
702         thread_block_sigpipe(&old_set);
703
704         for (;;) {
705                 int client_fd = accept_thread__wait_for_connection(
706                         accept_thread_data);
707
708                 pthread_mutex_lock(&server_data->work_available_mutex);
709                 if (server_data->shutdown_requested) {
710                         pthread_mutex_unlock(&server_data->work_available_mutex);
711                         if (client_fd >= 0)
712                                 close(client_fd);
713                         break;
714                 }
715
716                 if (client_fd < 0) {
717                         /* ignore transient accept() errors */
718                 }
719                 else {
720                         fifo_enqueue(server_data, client_fd);
721                         pthread_cond_broadcast(&server_data->work_available_cond);
722                 }
723                 pthread_mutex_unlock(&server_data->work_available_mutex);
724         }
725
726         trace2_thread_exit();
727         return NULL;
728 }
729
730 /*
731  * We can't predict the connection arrival rate relative to the worker
732  * processing rate, therefore we allow the "accept-thread" to queue up
733  * a generous number of connections, since we'd rather have the client
734  * not unnecessarily timeout if we can avoid it.  (The assumption is
735  * that this will be used for FSMonitor and a few second wait on a
736  * connection is better than having the client timeout and do the full
737  * computation itself.)
738  *
739  * The FIFO queue size is set to a multiple of the worker pool size.
740  * This value chosen at random.
741  */
742 #define FIFO_SCALE (100)
743
744 /*
745  * The backlog value for `listen(2)`.  This doesn't need to huge,
746  * rather just large enough for our "accept-thread" to wake up and
747  * queue incoming connections onto the FIFO without the kernel
748  * dropping any.
749  *
750  * This value chosen at random.
751  */
752 #define LISTEN_BACKLOG (50)
753
754 static int create_listener_socket(
755         const char *path,
756         const struct ipc_server_opts *ipc_opts,
757         struct unix_ss_socket **new_server_socket)
758 {
759         struct unix_ss_socket *server_socket = NULL;
760         struct unix_stream_listen_opts uslg_opts = UNIX_STREAM_LISTEN_OPTS_INIT;
761         int ret;
762
763         uslg_opts.listen_backlog_size = LISTEN_BACKLOG;
764         uslg_opts.disallow_chdir = ipc_opts->uds_disallow_chdir;
765
766         ret = unix_ss_create(path, &uslg_opts, -1, &server_socket);
767         if (ret)
768                 return ret;
769
770         if (set_socket_blocking_flag(server_socket->fd_socket, 1)) {
771                 int saved_errno = errno;
772                 unix_ss_free(server_socket);
773                 errno = saved_errno;
774                 return -1;
775         }
776
777         *new_server_socket = server_socket;
778
779         trace2_data_string("ipc-server", NULL, "listen-with-lock", path);
780         return 0;
781 }
782
783 static int setup_listener_socket(
784         const char *path,
785         const struct ipc_server_opts *ipc_opts,
786         struct unix_ss_socket **new_server_socket)
787 {
788         int ret, saved_errno;
789
790         trace2_region_enter("ipc-server", "create-listener_socket", NULL);
791
792         ret = create_listener_socket(path, ipc_opts, new_server_socket);
793
794         saved_errno = errno;
795         trace2_region_leave("ipc-server", "create-listener_socket", NULL);
796         errno = saved_errno;
797
798         return ret;
799 }
800
801 /*
802  * Start IPC server in a pool of background threads.
803  */
804 int ipc_server_run_async(struct ipc_server_data **returned_server_data,
805                          const char *path, const struct ipc_server_opts *opts,
806                          ipc_server_application_cb *application_cb,
807                          void *application_data)
808 {
809         struct unix_ss_socket *server_socket = NULL;
810         struct ipc_server_data *server_data;
811         int sv[2];
812         int k;
813         int ret;
814         int nr_threads = opts->nr_threads;
815
816         *returned_server_data = NULL;
817
818         /*
819          * Create a socketpair and set sv[1] to non-blocking.  This
820          * will used to send a shutdown message to the accept-thread
821          * and allows the accept-thread to wait on EITHER a client
822          * connection or a shutdown request without spinning.
823          */
824         if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) < 0)
825                 return -1;
826
827         if (set_socket_blocking_flag(sv[1], 1)) {
828                 int saved_errno = errno;
829                 close(sv[0]);
830                 close(sv[1]);
831                 errno = saved_errno;
832                 return -1;
833         }
834
835         ret = setup_listener_socket(path, opts, &server_socket);
836         if (ret) {
837                 int saved_errno = errno;
838                 close(sv[0]);
839                 close(sv[1]);
840                 errno = saved_errno;
841                 return ret;
842         }
843
844         server_data = xcalloc(1, sizeof(*server_data));
845         server_data->magic = MAGIC_SERVER_DATA;
846         server_data->application_cb = application_cb;
847         server_data->application_data = application_data;
848         strbuf_init(&server_data->buf_path, 0);
849         strbuf_addstr(&server_data->buf_path, path);
850
851         if (nr_threads < 1)
852                 nr_threads = 1;
853
854         pthread_mutex_init(&server_data->work_available_mutex, NULL);
855         pthread_cond_init(&server_data->work_available_cond, NULL);
856
857         server_data->queue_size = nr_threads * FIFO_SCALE;
858         CALLOC_ARRAY(server_data->fifo_fds, server_data->queue_size);
859
860         server_data->accept_thread =
861                 xcalloc(1, sizeof(*server_data->accept_thread));
862         server_data->accept_thread->magic = MAGIC_ACCEPT_THREAD_DATA;
863         server_data->accept_thread->server_data = server_data;
864         server_data->accept_thread->server_socket = server_socket;
865         server_data->accept_thread->fd_send_shutdown = sv[0];
866         server_data->accept_thread->fd_wait_shutdown = sv[1];
867
868         if (pthread_create(&server_data->accept_thread->pthread_id, NULL,
869                            accept_thread_proc, server_data->accept_thread))
870                 die_errno(_("could not start accept_thread '%s'"), path);
871
872         for (k = 0; k < nr_threads; k++) {
873                 struct ipc_worker_thread_data *wtd;
874
875                 wtd = xcalloc(1, sizeof(*wtd));
876                 wtd->magic = MAGIC_WORKER_THREAD_DATA;
877                 wtd->server_data = server_data;
878
879                 if (pthread_create(&wtd->pthread_id, NULL, worker_thread_proc,
880                                    wtd)) {
881                         if (k == 0)
882                                 die(_("could not start worker[0] for '%s'"),
883                                     path);
884                         /*
885                          * Limp along with the thread pool that we have.
886                          */
887                         break;
888                 }
889
890                 wtd->next_thread = server_data->worker_thread_list;
891                 server_data->worker_thread_list = wtd;
892         }
893
894         *returned_server_data = server_data;
895         return 0;
896 }
897
898 /*
899  * Gently tell the IPC server treads to shutdown.
900  * Can be run on any thread.
901  */
902 int ipc_server_stop_async(struct ipc_server_data *server_data)
903 {
904         /* ASSERT NOT holding mutex */
905
906         int fd;
907
908         if (!server_data)
909                 return 0;
910
911         trace2_region_enter("ipc-server", "server-stop-async", NULL);
912
913         pthread_mutex_lock(&server_data->work_available_mutex);
914
915         server_data->shutdown_requested = 1;
916
917         /*
918          * Write a byte to the shutdown socket pair to wake up the
919          * accept-thread.
920          */
921         if (write(server_data->accept_thread->fd_send_shutdown, "Q", 1) < 0)
922                 error_errno("could not write to fd_send_shutdown");
923
924         /*
925          * Drain the queue of existing connections.
926          */
927         while ((fd = fifo_dequeue(server_data)) != -1)
928                 close(fd);
929
930         /*
931          * Gently tell worker threads to stop processing new connections
932          * and exit.  (This does not abort in-process conversations.)
933          */
934         pthread_cond_broadcast(&server_data->work_available_cond);
935
936         pthread_mutex_unlock(&server_data->work_available_mutex);
937
938         trace2_region_leave("ipc-server", "server-stop-async", NULL);
939
940         return 0;
941 }
942
943 /*
944  * Wait for all IPC server threads to stop.
945  */
946 int ipc_server_await(struct ipc_server_data *server_data)
947 {
948         pthread_join(server_data->accept_thread->pthread_id, NULL);
949
950         if (!server_data->shutdown_requested)
951                 BUG("ipc-server: accept-thread stopped for '%s'",
952                     server_data->buf_path.buf);
953
954         while (server_data->worker_thread_list) {
955                 struct ipc_worker_thread_data *wtd =
956                         server_data->worker_thread_list;
957
958                 pthread_join(wtd->pthread_id, NULL);
959
960                 server_data->worker_thread_list = wtd->next_thread;
961                 free(wtd);
962         }
963
964         server_data->is_stopped = 1;
965
966         return 0;
967 }
968
969 void ipc_server_free(struct ipc_server_data *server_data)
970 {
971         struct ipc_accept_thread_data * accept_thread_data;
972
973         if (!server_data)
974                 return;
975
976         if (!server_data->is_stopped)
977                 BUG("cannot free ipc-server while running for '%s'",
978                     server_data->buf_path.buf);
979
980         accept_thread_data = server_data->accept_thread;
981         if (accept_thread_data) {
982                 unix_ss_free(accept_thread_data->server_socket);
983
984                 if (accept_thread_data->fd_send_shutdown != -1)
985                         close(accept_thread_data->fd_send_shutdown);
986                 if (accept_thread_data->fd_wait_shutdown != -1)
987                         close(accept_thread_data->fd_wait_shutdown);
988
989                 free(server_data->accept_thread);
990         }
991
992         while (server_data->worker_thread_list) {
993                 struct ipc_worker_thread_data *wtd =
994                         server_data->worker_thread_list;
995
996                 server_data->worker_thread_list = wtd->next_thread;
997                 free(wtd);
998         }
999
1000         pthread_cond_destroy(&server_data->work_available_cond);
1001         pthread_mutex_destroy(&server_data->work_available_mutex);
1002
1003         strbuf_release(&server_data->buf_path);
1004
1005         free(server_data->fifo_fds);
1006         free(server_data);
1007 }