xdiff users: use designated initializers for out_line
[git] / compat / simple-ipc / ipc-unix-socket.c
1 #include "cache.h"
2 #include "simple-ipc.h"
3 #include "strbuf.h"
4 #include "pkt-line.h"
5 #include "thread-utils.h"
6 #include "unix-socket.h"
7 #include "unix-stream-server.h"
8
9 #ifdef NO_UNIX_SOCKETS
10 #error compat/simple-ipc/ipc-unix-socket.c requires Unix sockets
11 #endif
12
13 enum ipc_active_state ipc_get_active_state(const char *path)
14 {
15         enum ipc_active_state state = IPC_STATE__OTHER_ERROR;
16         struct ipc_client_connect_options options
17                 = IPC_CLIENT_CONNECT_OPTIONS_INIT;
18         struct stat st;
19         struct ipc_client_connection *connection_test = NULL;
20
21         options.wait_if_busy = 0;
22         options.wait_if_not_found = 0;
23
24         if (lstat(path, &st) == -1) {
25                 switch (errno) {
26                 case ENOENT:
27                 case ENOTDIR:
28                         return IPC_STATE__NOT_LISTENING;
29                 default:
30                         return IPC_STATE__INVALID_PATH;
31                 }
32         }
33
34         /* also complain if a plain file is in the way */
35         if ((st.st_mode & S_IFMT) != S_IFSOCK)
36                 return IPC_STATE__INVALID_PATH;
37
38         /*
39          * Just because the filesystem has a S_IFSOCK type inode
40          * at `path`, doesn't mean it that there is a server listening.
41          * Ping it to be sure.
42          */
43         state = ipc_client_try_connect(path, &options, &connection_test);
44         ipc_client_close_connection(connection_test);
45
46         return state;
47 }
48
49 /*
50  * Retry frequency when trying to connect to a server.
51  *
52  * This value should be short enough that we don't seriously delay our
53  * caller, but not fast enough that our spinning puts pressure on the
54  * system.
55  */
56 #define WAIT_STEP_MS (50)
57
58 /*
59  * Try to connect to the server.  If the server is just starting up or
60  * is very busy, we may not get a connection the first time.
61  */
62 static enum ipc_active_state connect_to_server(
63         const char *path,
64         int timeout_ms,
65         const struct ipc_client_connect_options *options,
66         int *pfd)
67 {
68         int k;
69
70         *pfd = -1;
71
72         for (k = 0; k < timeout_ms; k += WAIT_STEP_MS) {
73                 int fd = unix_stream_connect(path, options->uds_disallow_chdir);
74
75                 if (fd != -1) {
76                         *pfd = fd;
77                         return IPC_STATE__LISTENING;
78                 }
79
80                 if (errno == ENOENT) {
81                         if (!options->wait_if_not_found)
82                                 return IPC_STATE__PATH_NOT_FOUND;
83
84                         goto sleep_and_try_again;
85                 }
86
87                 if (errno == ETIMEDOUT) {
88                         if (!options->wait_if_busy)
89                                 return IPC_STATE__NOT_LISTENING;
90
91                         goto sleep_and_try_again;
92                 }
93
94                 if (errno == ECONNREFUSED) {
95                         if (!options->wait_if_busy)
96                                 return IPC_STATE__NOT_LISTENING;
97
98                         goto sleep_and_try_again;
99                 }
100
101                 return IPC_STATE__OTHER_ERROR;
102
103         sleep_and_try_again:
104                 sleep_millisec(WAIT_STEP_MS);
105         }
106
107         return IPC_STATE__NOT_LISTENING;
108 }
109
110 /*
111  * The total amount of time that we are willing to wait when trying to
112  * connect to a server.
113  *
114  * When the server is first started, it might take a little while for
115  * it to become ready to service requests.  Likewise, the server may
116  * be very (temporarily) busy and not respond to our connections.
117  *
118  * We should gracefully and silently handle those conditions and try
119  * again for a reasonable time period.
120  *
121  * The value chosen here should be long enough for the server
122  * to reliably heal from the above conditions.
123  */
124 #define MY_CONNECTION_TIMEOUT_MS (1000)
125
126 enum ipc_active_state ipc_client_try_connect(
127         const char *path,
128         const struct ipc_client_connect_options *options,
129         struct ipc_client_connection **p_connection)
130 {
131         enum ipc_active_state state = IPC_STATE__OTHER_ERROR;
132         int fd = -1;
133
134         *p_connection = NULL;
135
136         trace2_region_enter("ipc-client", "try-connect", NULL);
137         trace2_data_string("ipc-client", NULL, "try-connect/path", path);
138
139         state = connect_to_server(path, MY_CONNECTION_TIMEOUT_MS,
140                                   options, &fd);
141
142         trace2_data_intmax("ipc-client", NULL, "try-connect/state",
143                            (intmax_t)state);
144         trace2_region_leave("ipc-client", "try-connect", NULL);
145
146         if (state == IPC_STATE__LISTENING) {
147                 (*p_connection) = xcalloc(1, sizeof(struct ipc_client_connection));
148                 (*p_connection)->fd = fd;
149         }
150
151         return state;
152 }
153
154 void ipc_client_close_connection(struct ipc_client_connection *connection)
155 {
156         if (!connection)
157                 return;
158
159         if (connection->fd != -1)
160                 close(connection->fd);
161
162         free(connection);
163 }
164
165 int ipc_client_send_command_to_connection(
166         struct ipc_client_connection *connection,
167         const char *message, struct strbuf *answer)
168 {
169         int ret = 0;
170
171         strbuf_setlen(answer, 0);
172
173         trace2_region_enter("ipc-client", "send-command", NULL);
174
175         if (write_packetized_from_buf_no_flush(message, strlen(message),
176                                                connection->fd) < 0 ||
177             packet_flush_gently(connection->fd) < 0) {
178                 ret = error(_("could not send IPC command"));
179                 goto done;
180         }
181
182         if (read_packetized_to_strbuf(
183                     connection->fd, answer,
184                     PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR) < 0) {
185                 ret = error(_("could not read IPC response"));
186                 goto done;
187         }
188
189 done:
190         trace2_region_leave("ipc-client", "send-command", NULL);
191         return ret;
192 }
193
194 int ipc_client_send_command(const char *path,
195                             const struct ipc_client_connect_options *options,
196                             const char *message, struct strbuf *answer)
197 {
198         int ret = -1;
199         enum ipc_active_state state;
200         struct ipc_client_connection *connection = NULL;
201
202         state = ipc_client_try_connect(path, options, &connection);
203
204         if (state != IPC_STATE__LISTENING)
205                 return ret;
206
207         ret = ipc_client_send_command_to_connection(connection, message, answer);
208
209         ipc_client_close_connection(connection);
210
211         return ret;
212 }
213
214 static int set_socket_blocking_flag(int fd, int make_nonblocking)
215 {
216         int flags;
217
218         flags = fcntl(fd, F_GETFL, NULL);
219
220         if (flags < 0)
221                 return -1;
222
223         if (make_nonblocking)
224                 flags |= O_NONBLOCK;
225         else
226                 flags &= ~O_NONBLOCK;
227
228         return fcntl(fd, F_SETFL, flags);
229 }
230
231 /*
232  * Magic numbers used to annotate callback instance data.
233  * These are used to help guard against accidentally passing the
234  * wrong instance data across multiple levels of callbacks (which
235  * is easy to do if there are `void*` arguments).
236  */
237 enum magic {
238         MAGIC_SERVER_REPLY_DATA,
239         MAGIC_WORKER_THREAD_DATA,
240         MAGIC_ACCEPT_THREAD_DATA,
241         MAGIC_SERVER_DATA,
242 };
243
244 struct ipc_server_reply_data {
245         enum magic magic;
246         int fd;
247         struct ipc_worker_thread_data *worker_thread_data;
248 };
249
250 struct ipc_worker_thread_data {
251         enum magic magic;
252         struct ipc_worker_thread_data *next_thread;
253         struct ipc_server_data *server_data;
254         pthread_t pthread_id;
255 };
256
257 struct ipc_accept_thread_data {
258         enum magic magic;
259         struct ipc_server_data *server_data;
260
261         struct unix_ss_socket *server_socket;
262
263         int fd_send_shutdown;
264         int fd_wait_shutdown;
265         pthread_t pthread_id;
266 };
267
268 /*
269  * With unix-sockets, the conceptual "ipc-server" is implemented as a single
270  * controller "accept-thread" thread and a pool of "worker-thread" threads.
271  * The former does the usual `accept()` loop and dispatches connections
272  * to an idle worker thread.  The worker threads wait in an idle loop for
273  * a new connection, communicate with the client and relay data to/from
274  * the `application_cb` and then wait for another connection from the
275  * server thread.  This avoids the overhead of constantly creating and
276  * destroying threads.
277  */
278 struct ipc_server_data {
279         enum magic magic;
280         ipc_server_application_cb *application_cb;
281         void *application_data;
282         struct strbuf buf_path;
283
284         struct ipc_accept_thread_data *accept_thread;
285         struct ipc_worker_thread_data *worker_thread_list;
286
287         pthread_mutex_t work_available_mutex;
288         pthread_cond_t work_available_cond;
289
290         /*
291          * Accepted but not yet processed client connections are kept
292          * in a circular buffer FIFO.  The queue is empty when the
293          * positions are equal.
294          */
295         int *fifo_fds;
296         int queue_size;
297         int back_pos;
298         int front_pos;
299
300         int shutdown_requested;
301         int is_stopped;
302 };
303
304 /*
305  * Remove and return the oldest queued connection.
306  *
307  * Returns -1 if empty.
308  */
309 static int fifo_dequeue(struct ipc_server_data *server_data)
310 {
311         /* ASSERT holding mutex */
312
313         int fd;
314
315         if (server_data->back_pos == server_data->front_pos)
316                 return -1;
317
318         fd = server_data->fifo_fds[server_data->front_pos];
319         server_data->fifo_fds[server_data->front_pos] = -1;
320
321         server_data->front_pos++;
322         if (server_data->front_pos == server_data->queue_size)
323                 server_data->front_pos = 0;
324
325         return fd;
326 }
327
328 /*
329  * Push a new fd onto the back of the queue.
330  *
331  * Drop it and return -1 if queue is already full.
332  */
333 static int fifo_enqueue(struct ipc_server_data *server_data, int fd)
334 {
335         /* ASSERT holding mutex */
336
337         int next_back_pos;
338
339         next_back_pos = server_data->back_pos + 1;
340         if (next_back_pos == server_data->queue_size)
341                 next_back_pos = 0;
342
343         if (next_back_pos == server_data->front_pos) {
344                 /* Queue is full. Just drop it. */
345                 close(fd);
346                 return -1;
347         }
348
349         server_data->fifo_fds[server_data->back_pos] = fd;
350         server_data->back_pos = next_back_pos;
351
352         return fd;
353 }
354
355 /*
356  * Wait for a connection to be queued to the FIFO and return it.
357  *
358  * Returns -1 if someone has already requested a shutdown.
359  */
360 static int worker_thread__wait_for_connection(
361         struct ipc_worker_thread_data *worker_thread_data)
362 {
363         /* ASSERT NOT holding mutex */
364
365         struct ipc_server_data *server_data = worker_thread_data->server_data;
366         int fd = -1;
367
368         pthread_mutex_lock(&server_data->work_available_mutex);
369         for (;;) {
370                 if (server_data->shutdown_requested)
371                         break;
372
373                 fd = fifo_dequeue(server_data);
374                 if (fd >= 0)
375                         break;
376
377                 pthread_cond_wait(&server_data->work_available_cond,
378                                   &server_data->work_available_mutex);
379         }
380         pthread_mutex_unlock(&server_data->work_available_mutex);
381
382         return fd;
383 }
384
385 /*
386  * Forward declare our reply callback function so that any compiler
387  * errors are reported when we actually define the function (in addition
388  * to any errors reported when we try to pass this callback function as
389  * a parameter in a function call).  The former are easier to understand.
390  */
391 static ipc_server_reply_cb do_io_reply_callback;
392
393 /*
394  * Relay application's response message to the client process.
395  * (We do not flush at this point because we allow the caller
396  * to chunk data to the client thru us.)
397  */
398 static int do_io_reply_callback(struct ipc_server_reply_data *reply_data,
399                        const char *response, size_t response_len)
400 {
401         if (reply_data->magic != MAGIC_SERVER_REPLY_DATA)
402                 BUG("reply_cb called with wrong instance data");
403
404         return write_packetized_from_buf_no_flush(response, response_len,
405                                                   reply_data->fd);
406 }
407
408 /* A randomly chosen value. */
409 #define MY_WAIT_POLL_TIMEOUT_MS (10)
410
411 /*
412  * If the client hangs up without sending any data on the wire, just
413  * quietly close the socket and ignore this client.
414  *
415  * This worker thread is committed to reading the IPC request data
416  * from the client at the other end of this fd.  Wait here for the
417  * client to actually put something on the wire -- because if the
418  * client just does a ping (connect and hangup without sending any
419  * data), our use of the pkt-line read routines will spew an error
420  * message.
421  *
422  * Return -1 if the client hung up.
423  * Return 0 if data (possibly incomplete) is ready.
424  */
425 static int worker_thread__wait_for_io_start(
426         struct ipc_worker_thread_data *worker_thread_data,
427         int fd)
428 {
429         struct ipc_server_data *server_data = worker_thread_data->server_data;
430         struct pollfd pollfd[1];
431         int result;
432
433         for (;;) {
434                 pollfd[0].fd = fd;
435                 pollfd[0].events = POLLIN;
436
437                 result = poll(pollfd, 1, MY_WAIT_POLL_TIMEOUT_MS);
438                 if (result < 0) {
439                         if (errno == EINTR)
440                                 continue;
441                         goto cleanup;
442                 }
443
444                 if (result == 0) {
445                         /* a timeout */
446
447                         int in_shutdown;
448
449                         pthread_mutex_lock(&server_data->work_available_mutex);
450                         in_shutdown = server_data->shutdown_requested;
451                         pthread_mutex_unlock(&server_data->work_available_mutex);
452
453                         /*
454                          * If a shutdown is already in progress and this
455                          * client has not started talking yet, just drop it.
456                          */
457                         if (in_shutdown)
458                                 goto cleanup;
459                         continue;
460                 }
461
462                 if (pollfd[0].revents & POLLHUP)
463                         goto cleanup;
464
465                 if (pollfd[0].revents & POLLIN)
466                         return 0;
467
468                 goto cleanup;
469         }
470
471 cleanup:
472         close(fd);
473         return -1;
474 }
475
476 /*
477  * Receive the request/command from the client and pass it to the
478  * registered request-callback.  The request-callback will compose
479  * a response and call our reply-callback to send it to the client.
480  */
481 static int worker_thread__do_io(
482         struct ipc_worker_thread_data *worker_thread_data,
483         int fd)
484 {
485         /* ASSERT NOT holding lock */
486
487         struct strbuf buf = STRBUF_INIT;
488         struct ipc_server_reply_data reply_data;
489         int ret = 0;
490
491         reply_data.magic = MAGIC_SERVER_REPLY_DATA;
492         reply_data.worker_thread_data = worker_thread_data;
493
494         reply_data.fd = fd;
495
496         ret = read_packetized_to_strbuf(
497                 reply_data.fd, &buf,
498                 PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR);
499         if (ret >= 0) {
500                 ret = worker_thread_data->server_data->application_cb(
501                         worker_thread_data->server_data->application_data,
502                         buf.buf, do_io_reply_callback, &reply_data);
503
504                 packet_flush_gently(reply_data.fd);
505         }
506         else {
507                 /*
508                  * The client probably disconnected/shutdown before it
509                  * could send a well-formed message.  Ignore it.
510                  */
511         }
512
513         strbuf_release(&buf);
514         close(reply_data.fd);
515
516         return ret;
517 }
518
519 /*
520  * Block SIGPIPE on the current thread (so that we get EPIPE from
521  * write() rather than an actual signal).
522  *
523  * Note that using sigchain_push() and _pop() to control SIGPIPE
524  * around our IO calls is not thread safe:
525  * [] It uses a global stack of handler frames.
526  * [] It uses ALLOC_GROW() to resize it.
527  * [] Finally, according to the `signal(2)` man-page:
528  *    "The effects of `signal()` in a multithreaded process are unspecified."
529  */
530 static void thread_block_sigpipe(sigset_t *old_set)
531 {
532         sigset_t new_set;
533
534         sigemptyset(&new_set);
535         sigaddset(&new_set, SIGPIPE);
536
537         sigemptyset(old_set);
538         pthread_sigmask(SIG_BLOCK, &new_set, old_set);
539 }
540
541 /*
542  * Thread proc for an IPC worker thread.  It handles a series of
543  * connections from clients.  It pulls the next fd from the queue
544  * processes it, and then waits for the next client.
545  *
546  * Block SIGPIPE in this worker thread for the life of the thread.
547  * This avoids stray (and sometimes delayed) SIGPIPE signals caused
548  * by client errors and/or when we are under extremely heavy IO load.
549  *
550  * This means that the application callback will have SIGPIPE blocked.
551  * The callback should not change it.
552  */
553 static void *worker_thread_proc(void *_worker_thread_data)
554 {
555         struct ipc_worker_thread_data *worker_thread_data = _worker_thread_data;
556         struct ipc_server_data *server_data = worker_thread_data->server_data;
557         sigset_t old_set;
558         int fd, io;
559         int ret;
560
561         trace2_thread_start("ipc-worker");
562
563         thread_block_sigpipe(&old_set);
564
565         for (;;) {
566                 fd = worker_thread__wait_for_connection(worker_thread_data);
567                 if (fd == -1)
568                         break; /* in shutdown */
569
570                 io = worker_thread__wait_for_io_start(worker_thread_data, fd);
571                 if (io == -1)
572                         continue; /* client hung up without sending anything */
573
574                 ret = worker_thread__do_io(worker_thread_data, fd);
575
576                 if (ret == SIMPLE_IPC_QUIT) {
577                         trace2_data_string("ipc-worker", NULL, "queue_stop_async",
578                                            "application_quit");
579                         /*
580                          * The application layer is telling the ipc-server
581                          * layer to shutdown.
582                          *
583                          * We DO NOT have a response to send to the client.
584                          *
585                          * Queue an async stop (to stop the other threads) and
586                          * allow this worker thread to exit now (no sense waiting
587                          * for the thread-pool shutdown signal).
588                          *
589                          * Other non-idle worker threads are allowed to finish
590                          * responding to their current clients.
591                          */
592                         ipc_server_stop_async(server_data);
593                         break;
594                 }
595         }
596
597         trace2_thread_exit();
598         return NULL;
599 }
600
601 /* A randomly chosen value. */
602 #define MY_ACCEPT_POLL_TIMEOUT_MS (60 * 1000)
603
604 /*
605  * Accept a new client connection on our socket.  This uses non-blocking
606  * IO so that we can also wait for shutdown requests on our socket-pair
607  * without actually spinning on a fast timeout.
608  */
609 static int accept_thread__wait_for_connection(
610         struct ipc_accept_thread_data *accept_thread_data)
611 {
612         struct pollfd pollfd[2];
613         int result;
614
615         for (;;) {
616                 pollfd[0].fd = accept_thread_data->fd_wait_shutdown;
617                 pollfd[0].events = POLLIN;
618
619                 pollfd[1].fd = accept_thread_data->server_socket->fd_socket;
620                 pollfd[1].events = POLLIN;
621
622                 result = poll(pollfd, 2, MY_ACCEPT_POLL_TIMEOUT_MS);
623                 if (result < 0) {
624                         if (errno == EINTR)
625                                 continue;
626                         return result;
627                 }
628
629                 if (result == 0) {
630                         /* a timeout */
631
632                         /*
633                          * If someone deletes or force-creates a new unix
634                          * domain socket at our path, all future clients
635                          * will be routed elsewhere and we silently starve.
636                          * If that happens, just queue a shutdown.
637                          */
638                         if (unix_ss_was_stolen(
639                                     accept_thread_data->server_socket)) {
640                                 trace2_data_string("ipc-accept", NULL,
641                                                    "queue_stop_async",
642                                                    "socket_stolen");
643                                 ipc_server_stop_async(
644                                         accept_thread_data->server_data);
645                         }
646                         continue;
647                 }
648
649                 if (pollfd[0].revents & POLLIN) {
650                         /* shutdown message queued to socketpair */
651                         return -1;
652                 }
653
654                 if (pollfd[1].revents & POLLIN) {
655                         /* a connection is available on server_socket */
656
657                         int client_fd =
658                                 accept(accept_thread_data->server_socket->fd_socket,
659                                        NULL, NULL);
660                         if (client_fd >= 0)
661                                 return client_fd;
662
663                         /*
664                          * An error here is unlikely -- it probably
665                          * indicates that the connecting process has
666                          * already dropped the connection.
667                          */
668                         continue;
669                 }
670
671                 BUG("unandled poll result errno=%d r[0]=%d r[1]=%d",
672                     errno, pollfd[0].revents, pollfd[1].revents);
673         }
674 }
675
676 /*
677  * Thread proc for the IPC server "accept thread".  This waits for
678  * an incoming socket connection, appends it to the queue of available
679  * connections, and notifies a worker thread to process it.
680  *
681  * Block SIGPIPE in this thread for the life of the thread.  This
682  * avoids any stray SIGPIPE signals when closing pipe fds under
683  * extremely heavy loads (such as when the fifo queue is full and we
684  * drop incomming connections).
685  */
686 static void *accept_thread_proc(void *_accept_thread_data)
687 {
688         struct ipc_accept_thread_data *accept_thread_data = _accept_thread_data;
689         struct ipc_server_data *server_data = accept_thread_data->server_data;
690         sigset_t old_set;
691
692         trace2_thread_start("ipc-accept");
693
694         thread_block_sigpipe(&old_set);
695
696         for (;;) {
697                 int client_fd = accept_thread__wait_for_connection(
698                         accept_thread_data);
699
700                 pthread_mutex_lock(&server_data->work_available_mutex);
701                 if (server_data->shutdown_requested) {
702                         pthread_mutex_unlock(&server_data->work_available_mutex);
703                         if (client_fd >= 0)
704                                 close(client_fd);
705                         break;
706                 }
707
708                 if (client_fd < 0) {
709                         /* ignore transient accept() errors */
710                 }
711                 else {
712                         fifo_enqueue(server_data, client_fd);
713                         pthread_cond_broadcast(&server_data->work_available_cond);
714                 }
715                 pthread_mutex_unlock(&server_data->work_available_mutex);
716         }
717
718         trace2_thread_exit();
719         return NULL;
720 }
721
722 /*
723  * We can't predict the connection arrival rate relative to the worker
724  * processing rate, therefore we allow the "accept-thread" to queue up
725  * a generous number of connections, since we'd rather have the client
726  * not unnecessarily timeout if we can avoid it.  (The assumption is
727  * that this will be used for FSMonitor and a few second wait on a
728  * connection is better than having the client timeout and do the full
729  * computation itself.)
730  *
731  * The FIFO queue size is set to a multiple of the worker pool size.
732  * This value chosen at random.
733  */
734 #define FIFO_SCALE (100)
735
736 /*
737  * The backlog value for `listen(2)`.  This doesn't need to huge,
738  * rather just large enough for our "accept-thread" to wake up and
739  * queue incoming connections onto the FIFO without the kernel
740  * dropping any.
741  *
742  * This value chosen at random.
743  */
744 #define LISTEN_BACKLOG (50)
745
746 static int create_listener_socket(
747         const char *path,
748         const struct ipc_server_opts *ipc_opts,
749         struct unix_ss_socket **new_server_socket)
750 {
751         struct unix_ss_socket *server_socket = NULL;
752         struct unix_stream_listen_opts uslg_opts = UNIX_STREAM_LISTEN_OPTS_INIT;
753         int ret;
754
755         uslg_opts.listen_backlog_size = LISTEN_BACKLOG;
756         uslg_opts.disallow_chdir = ipc_opts->uds_disallow_chdir;
757
758         ret = unix_ss_create(path, &uslg_opts, -1, &server_socket);
759         if (ret)
760                 return ret;
761
762         if (set_socket_blocking_flag(server_socket->fd_socket, 1)) {
763                 int saved_errno = errno;
764                 unix_ss_free(server_socket);
765                 errno = saved_errno;
766                 return -1;
767         }
768
769         *new_server_socket = server_socket;
770
771         trace2_data_string("ipc-server", NULL, "listen-with-lock", path);
772         return 0;
773 }
774
775 static int setup_listener_socket(
776         const char *path,
777         const struct ipc_server_opts *ipc_opts,
778         struct unix_ss_socket **new_server_socket)
779 {
780         int ret, saved_errno;
781
782         trace2_region_enter("ipc-server", "create-listener_socket", NULL);
783
784         ret = create_listener_socket(path, ipc_opts, new_server_socket);
785
786         saved_errno = errno;
787         trace2_region_leave("ipc-server", "create-listener_socket", NULL);
788         errno = saved_errno;
789
790         return ret;
791 }
792
793 /*
794  * Start IPC server in a pool of background threads.
795  */
796 int ipc_server_run_async(struct ipc_server_data **returned_server_data,
797                          const char *path, const struct ipc_server_opts *opts,
798                          ipc_server_application_cb *application_cb,
799                          void *application_data)
800 {
801         struct unix_ss_socket *server_socket = NULL;
802         struct ipc_server_data *server_data;
803         int sv[2];
804         int k;
805         int ret;
806         int nr_threads = opts->nr_threads;
807
808         *returned_server_data = NULL;
809
810         /*
811          * Create a socketpair and set sv[1] to non-blocking.  This
812          * will used to send a shutdown message to the accept-thread
813          * and allows the accept-thread to wait on EITHER a client
814          * connection or a shutdown request without spinning.
815          */
816         if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) < 0)
817                 return -1;
818
819         if (set_socket_blocking_flag(sv[1], 1)) {
820                 int saved_errno = errno;
821                 close(sv[0]);
822                 close(sv[1]);
823                 errno = saved_errno;
824                 return -1;
825         }
826
827         ret = setup_listener_socket(path, opts, &server_socket);
828         if (ret) {
829                 int saved_errno = errno;
830                 close(sv[0]);
831                 close(sv[1]);
832                 errno = saved_errno;
833                 return ret;
834         }
835
836         server_data = xcalloc(1, sizeof(*server_data));
837         server_data->magic = MAGIC_SERVER_DATA;
838         server_data->application_cb = application_cb;
839         server_data->application_data = application_data;
840         strbuf_init(&server_data->buf_path, 0);
841         strbuf_addstr(&server_data->buf_path, path);
842
843         if (nr_threads < 1)
844                 nr_threads = 1;
845
846         pthread_mutex_init(&server_data->work_available_mutex, NULL);
847         pthread_cond_init(&server_data->work_available_cond, NULL);
848
849         server_data->queue_size = nr_threads * FIFO_SCALE;
850         CALLOC_ARRAY(server_data->fifo_fds, server_data->queue_size);
851
852         server_data->accept_thread =
853                 xcalloc(1, sizeof(*server_data->accept_thread));
854         server_data->accept_thread->magic = MAGIC_ACCEPT_THREAD_DATA;
855         server_data->accept_thread->server_data = server_data;
856         server_data->accept_thread->server_socket = server_socket;
857         server_data->accept_thread->fd_send_shutdown = sv[0];
858         server_data->accept_thread->fd_wait_shutdown = sv[1];
859
860         if (pthread_create(&server_data->accept_thread->pthread_id, NULL,
861                            accept_thread_proc, server_data->accept_thread))
862                 die_errno(_("could not start accept_thread '%s'"), path);
863
864         for (k = 0; k < nr_threads; k++) {
865                 struct ipc_worker_thread_data *wtd;
866
867                 wtd = xcalloc(1, sizeof(*wtd));
868                 wtd->magic = MAGIC_WORKER_THREAD_DATA;
869                 wtd->server_data = server_data;
870
871                 if (pthread_create(&wtd->pthread_id, NULL, worker_thread_proc,
872                                    wtd)) {
873                         if (k == 0)
874                                 die(_("could not start worker[0] for '%s'"),
875                                     path);
876                         /*
877                          * Limp along with the thread pool that we have.
878                          */
879                         break;
880                 }
881
882                 wtd->next_thread = server_data->worker_thread_list;
883                 server_data->worker_thread_list = wtd;
884         }
885
886         *returned_server_data = server_data;
887         return 0;
888 }
889
890 /*
891  * Gently tell the IPC server treads to shutdown.
892  * Can be run on any thread.
893  */
894 int ipc_server_stop_async(struct ipc_server_data *server_data)
895 {
896         /* ASSERT NOT holding mutex */
897
898         int fd;
899
900         if (!server_data)
901                 return 0;
902
903         trace2_region_enter("ipc-server", "server-stop-async", NULL);
904
905         pthread_mutex_lock(&server_data->work_available_mutex);
906
907         server_data->shutdown_requested = 1;
908
909         /*
910          * Write a byte to the shutdown socket pair to wake up the
911          * accept-thread.
912          */
913         if (write(server_data->accept_thread->fd_send_shutdown, "Q", 1) < 0)
914                 error_errno("could not write to fd_send_shutdown");
915
916         /*
917          * Drain the queue of existing connections.
918          */
919         while ((fd = fifo_dequeue(server_data)) != -1)
920                 close(fd);
921
922         /*
923          * Gently tell worker threads to stop processing new connections
924          * and exit.  (This does not abort in-process conversations.)
925          */
926         pthread_cond_broadcast(&server_data->work_available_cond);
927
928         pthread_mutex_unlock(&server_data->work_available_mutex);
929
930         trace2_region_leave("ipc-server", "server-stop-async", NULL);
931
932         return 0;
933 }
934
935 /*
936  * Wait for all IPC server threads to stop.
937  */
938 int ipc_server_await(struct ipc_server_data *server_data)
939 {
940         pthread_join(server_data->accept_thread->pthread_id, NULL);
941
942         if (!server_data->shutdown_requested)
943                 BUG("ipc-server: accept-thread stopped for '%s'",
944                     server_data->buf_path.buf);
945
946         while (server_data->worker_thread_list) {
947                 struct ipc_worker_thread_data *wtd =
948                         server_data->worker_thread_list;
949
950                 pthread_join(wtd->pthread_id, NULL);
951
952                 server_data->worker_thread_list = wtd->next_thread;
953                 free(wtd);
954         }
955
956         server_data->is_stopped = 1;
957
958         return 0;
959 }
960
961 void ipc_server_free(struct ipc_server_data *server_data)
962 {
963         struct ipc_accept_thread_data * accept_thread_data;
964
965         if (!server_data)
966                 return;
967
968         if (!server_data->is_stopped)
969                 BUG("cannot free ipc-server while running for '%s'",
970                     server_data->buf_path.buf);
971
972         accept_thread_data = server_data->accept_thread;
973         if (accept_thread_data) {
974                 unix_ss_free(accept_thread_data->server_socket);
975
976                 if (accept_thread_data->fd_send_shutdown != -1)
977                         close(accept_thread_data->fd_send_shutdown);
978                 if (accept_thread_data->fd_wait_shutdown != -1)
979                         close(accept_thread_data->fd_wait_shutdown);
980
981                 free(server_data->accept_thread);
982         }
983
984         while (server_data->worker_thread_list) {
985                 struct ipc_worker_thread_data *wtd =
986                         server_data->worker_thread_list;
987
988                 server_data->worker_thread_list = wtd->next_thread;
989                 free(wtd);
990         }
991
992         pthread_cond_destroy(&server_data->work_available_cond);
993         pthread_mutex_destroy(&server_data->work_available_mutex);
994
995         strbuf_release(&server_data->buf_path);
996
997         free(server_data->fifo_fds);
998         free(server_data);
999 }