Git 2.32
[git] / compat / simple-ipc / ipc-unix-socket.c
1 #include "cache.h"
2 #include "simple-ipc.h"
3 #include "strbuf.h"
4 #include "pkt-line.h"
5 #include "thread-utils.h"
6 #include "unix-socket.h"
7 #include "unix-stream-server.h"
8
9 #ifndef SUPPORTS_SIMPLE_IPC
10 /*
11  * This source file should only be compiled when Simple IPC is supported.
12  * See the top-level Makefile.
13  */
14 #error SUPPORTS_SIMPLE_IPC not defined
15 #endif
16
17 enum ipc_active_state ipc_get_active_state(const char *path)
18 {
19         enum ipc_active_state state = IPC_STATE__OTHER_ERROR;
20         struct ipc_client_connect_options options
21                 = IPC_CLIENT_CONNECT_OPTIONS_INIT;
22         struct stat st;
23         struct ipc_client_connection *connection_test = NULL;
24
25         options.wait_if_busy = 0;
26         options.wait_if_not_found = 0;
27
28         if (lstat(path, &st) == -1) {
29                 switch (errno) {
30                 case ENOENT:
31                 case ENOTDIR:
32                         return IPC_STATE__NOT_LISTENING;
33                 default:
34                         return IPC_STATE__INVALID_PATH;
35                 }
36         }
37
38         /* also complain if a plain file is in the way */
39         if ((st.st_mode & S_IFMT) != S_IFSOCK)
40                 return IPC_STATE__INVALID_PATH;
41
42         /*
43          * Just because the filesystem has a S_IFSOCK type inode
44          * at `path`, doesn't mean it that there is a server listening.
45          * Ping it to be sure.
46          */
47         state = ipc_client_try_connect(path, &options, &connection_test);
48         ipc_client_close_connection(connection_test);
49
50         return state;
51 }
52
53 /*
54  * Retry frequency when trying to connect to a server.
55  *
56  * This value should be short enough that we don't seriously delay our
57  * caller, but not fast enough that our spinning puts pressure on the
58  * system.
59  */
60 #define WAIT_STEP_MS (50)
61
62 /*
63  * Try to connect to the server.  If the server is just starting up or
64  * is very busy, we may not get a connection the first time.
65  */
66 static enum ipc_active_state connect_to_server(
67         const char *path,
68         int timeout_ms,
69         const struct ipc_client_connect_options *options,
70         int *pfd)
71 {
72         int k;
73
74         *pfd = -1;
75
76         for (k = 0; k < timeout_ms; k += WAIT_STEP_MS) {
77                 int fd = unix_stream_connect(path, options->uds_disallow_chdir);
78
79                 if (fd != -1) {
80                         *pfd = fd;
81                         return IPC_STATE__LISTENING;
82                 }
83
84                 if (errno == ENOENT) {
85                         if (!options->wait_if_not_found)
86                                 return IPC_STATE__PATH_NOT_FOUND;
87
88                         goto sleep_and_try_again;
89                 }
90
91                 if (errno == ETIMEDOUT) {
92                         if (!options->wait_if_busy)
93                                 return IPC_STATE__NOT_LISTENING;
94
95                         goto sleep_and_try_again;
96                 }
97
98                 if (errno == ECONNREFUSED) {
99                         if (!options->wait_if_busy)
100                                 return IPC_STATE__NOT_LISTENING;
101
102                         goto sleep_and_try_again;
103                 }
104
105                 return IPC_STATE__OTHER_ERROR;
106
107         sleep_and_try_again:
108                 sleep_millisec(WAIT_STEP_MS);
109         }
110
111         return IPC_STATE__NOT_LISTENING;
112 }
113
114 /*
115  * The total amount of time that we are willing to wait when trying to
116  * connect to a server.
117  *
118  * When the server is first started, it might take a little while for
119  * it to become ready to service requests.  Likewise, the server may
120  * be very (temporarily) busy and not respond to our connections.
121  *
122  * We should gracefully and silently handle those conditions and try
123  * again for a reasonable time period.
124  *
125  * The value chosen here should be long enough for the server
126  * to reliably heal from the above conditions.
127  */
128 #define MY_CONNECTION_TIMEOUT_MS (1000)
129
130 enum ipc_active_state ipc_client_try_connect(
131         const char *path,
132         const struct ipc_client_connect_options *options,
133         struct ipc_client_connection **p_connection)
134 {
135         enum ipc_active_state state = IPC_STATE__OTHER_ERROR;
136         int fd = -1;
137
138         *p_connection = NULL;
139
140         trace2_region_enter("ipc-client", "try-connect", NULL);
141         trace2_data_string("ipc-client", NULL, "try-connect/path", path);
142
143         state = connect_to_server(path, MY_CONNECTION_TIMEOUT_MS,
144                                   options, &fd);
145
146         trace2_data_intmax("ipc-client", NULL, "try-connect/state",
147                            (intmax_t)state);
148         trace2_region_leave("ipc-client", "try-connect", NULL);
149
150         if (state == IPC_STATE__LISTENING) {
151                 (*p_connection) = xcalloc(1, sizeof(struct ipc_client_connection));
152                 (*p_connection)->fd = fd;
153         }
154
155         return state;
156 }
157
158 void ipc_client_close_connection(struct ipc_client_connection *connection)
159 {
160         if (!connection)
161                 return;
162
163         if (connection->fd != -1)
164                 close(connection->fd);
165
166         free(connection);
167 }
168
169 int ipc_client_send_command_to_connection(
170         struct ipc_client_connection *connection,
171         const char *message, struct strbuf *answer)
172 {
173         int ret = 0;
174
175         strbuf_setlen(answer, 0);
176
177         trace2_region_enter("ipc-client", "send-command", NULL);
178
179         if (write_packetized_from_buf_no_flush(message, strlen(message),
180                                                connection->fd) < 0 ||
181             packet_flush_gently(connection->fd) < 0) {
182                 ret = error(_("could not send IPC command"));
183                 goto done;
184         }
185
186         if (read_packetized_to_strbuf(
187                     connection->fd, answer,
188                     PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR) < 0) {
189                 ret = error(_("could not read IPC response"));
190                 goto done;
191         }
192
193 done:
194         trace2_region_leave("ipc-client", "send-command", NULL);
195         return ret;
196 }
197
198 int ipc_client_send_command(const char *path,
199                             const struct ipc_client_connect_options *options,
200                             const char *message, struct strbuf *answer)
201 {
202         int ret = -1;
203         enum ipc_active_state state;
204         struct ipc_client_connection *connection = NULL;
205
206         state = ipc_client_try_connect(path, options, &connection);
207
208         if (state != IPC_STATE__LISTENING)
209                 return ret;
210
211         ret = ipc_client_send_command_to_connection(connection, message, answer);
212
213         ipc_client_close_connection(connection);
214
215         return ret;
216 }
217
218 static int set_socket_blocking_flag(int fd, int make_nonblocking)
219 {
220         int flags;
221
222         flags = fcntl(fd, F_GETFL, NULL);
223
224         if (flags < 0)
225                 return -1;
226
227         if (make_nonblocking)
228                 flags |= O_NONBLOCK;
229         else
230                 flags &= ~O_NONBLOCK;
231
232         return fcntl(fd, F_SETFL, flags);
233 }
234
235 /*
236  * Magic numbers used to annotate callback instance data.
237  * These are used to help guard against accidentally passing the
238  * wrong instance data across multiple levels of callbacks (which
239  * is easy to do if there are `void*` arguments).
240  */
241 enum magic {
242         MAGIC_SERVER_REPLY_DATA,
243         MAGIC_WORKER_THREAD_DATA,
244         MAGIC_ACCEPT_THREAD_DATA,
245         MAGIC_SERVER_DATA,
246 };
247
248 struct ipc_server_reply_data {
249         enum magic magic;
250         int fd;
251         struct ipc_worker_thread_data *worker_thread_data;
252 };
253
254 struct ipc_worker_thread_data {
255         enum magic magic;
256         struct ipc_worker_thread_data *next_thread;
257         struct ipc_server_data *server_data;
258         pthread_t pthread_id;
259 };
260
261 struct ipc_accept_thread_data {
262         enum magic magic;
263         struct ipc_server_data *server_data;
264
265         struct unix_ss_socket *server_socket;
266
267         int fd_send_shutdown;
268         int fd_wait_shutdown;
269         pthread_t pthread_id;
270 };
271
272 /*
273  * With unix-sockets, the conceptual "ipc-server" is implemented as a single
274  * controller "accept-thread" thread and a pool of "worker-thread" threads.
275  * The former does the usual `accept()` loop and dispatches connections
276  * to an idle worker thread.  The worker threads wait in an idle loop for
277  * a new connection, communicate with the client and relay data to/from
278  * the `application_cb` and then wait for another connection from the
279  * server thread.  This avoids the overhead of constantly creating and
280  * destroying threads.
281  */
282 struct ipc_server_data {
283         enum magic magic;
284         ipc_server_application_cb *application_cb;
285         void *application_data;
286         struct strbuf buf_path;
287
288         struct ipc_accept_thread_data *accept_thread;
289         struct ipc_worker_thread_data *worker_thread_list;
290
291         pthread_mutex_t work_available_mutex;
292         pthread_cond_t work_available_cond;
293
294         /*
295          * Accepted but not yet processed client connections are kept
296          * in a circular buffer FIFO.  The queue is empty when the
297          * positions are equal.
298          */
299         int *fifo_fds;
300         int queue_size;
301         int back_pos;
302         int front_pos;
303
304         int shutdown_requested;
305         int is_stopped;
306 };
307
308 /*
309  * Remove and return the oldest queued connection.
310  *
311  * Returns -1 if empty.
312  */
313 static int fifo_dequeue(struct ipc_server_data *server_data)
314 {
315         /* ASSERT holding mutex */
316
317         int fd;
318
319         if (server_data->back_pos == server_data->front_pos)
320                 return -1;
321
322         fd = server_data->fifo_fds[server_data->front_pos];
323         server_data->fifo_fds[server_data->front_pos] = -1;
324
325         server_data->front_pos++;
326         if (server_data->front_pos == server_data->queue_size)
327                 server_data->front_pos = 0;
328
329         return fd;
330 }
331
332 /*
333  * Push a new fd onto the back of the queue.
334  *
335  * Drop it and return -1 if queue is already full.
336  */
337 static int fifo_enqueue(struct ipc_server_data *server_data, int fd)
338 {
339         /* ASSERT holding mutex */
340
341         int next_back_pos;
342
343         next_back_pos = server_data->back_pos + 1;
344         if (next_back_pos == server_data->queue_size)
345                 next_back_pos = 0;
346
347         if (next_back_pos == server_data->front_pos) {
348                 /* Queue is full. Just drop it. */
349                 close(fd);
350                 return -1;
351         }
352
353         server_data->fifo_fds[server_data->back_pos] = fd;
354         server_data->back_pos = next_back_pos;
355
356         return fd;
357 }
358
359 /*
360  * Wait for a connection to be queued to the FIFO and return it.
361  *
362  * Returns -1 if someone has already requested a shutdown.
363  */
364 static int worker_thread__wait_for_connection(
365         struct ipc_worker_thread_data *worker_thread_data)
366 {
367         /* ASSERT NOT holding mutex */
368
369         struct ipc_server_data *server_data = worker_thread_data->server_data;
370         int fd = -1;
371
372         pthread_mutex_lock(&server_data->work_available_mutex);
373         for (;;) {
374                 if (server_data->shutdown_requested)
375                         break;
376
377                 fd = fifo_dequeue(server_data);
378                 if (fd >= 0)
379                         break;
380
381                 pthread_cond_wait(&server_data->work_available_cond,
382                                   &server_data->work_available_mutex);
383         }
384         pthread_mutex_unlock(&server_data->work_available_mutex);
385
386         return fd;
387 }
388
389 /*
390  * Forward declare our reply callback function so that any compiler
391  * errors are reported when we actually define the function (in addition
392  * to any errors reported when we try to pass this callback function as
393  * a parameter in a function call).  The former are easier to understand.
394  */
395 static ipc_server_reply_cb do_io_reply_callback;
396
397 /*
398  * Relay application's response message to the client process.
399  * (We do not flush at this point because we allow the caller
400  * to chunk data to the client thru us.)
401  */
402 static int do_io_reply_callback(struct ipc_server_reply_data *reply_data,
403                        const char *response, size_t response_len)
404 {
405         if (reply_data->magic != MAGIC_SERVER_REPLY_DATA)
406                 BUG("reply_cb called with wrong instance data");
407
408         return write_packetized_from_buf_no_flush(response, response_len,
409                                                   reply_data->fd);
410 }
411
412 /* A randomly chosen value. */
413 #define MY_WAIT_POLL_TIMEOUT_MS (10)
414
415 /*
416  * If the client hangs up without sending any data on the wire, just
417  * quietly close the socket and ignore this client.
418  *
419  * This worker thread is committed to reading the IPC request data
420  * from the client at the other end of this fd.  Wait here for the
421  * client to actually put something on the wire -- because if the
422  * client just does a ping (connect and hangup without sending any
423  * data), our use of the pkt-line read routines will spew an error
424  * message.
425  *
426  * Return -1 if the client hung up.
427  * Return 0 if data (possibly incomplete) is ready.
428  */
429 static int worker_thread__wait_for_io_start(
430         struct ipc_worker_thread_data *worker_thread_data,
431         int fd)
432 {
433         struct ipc_server_data *server_data = worker_thread_data->server_data;
434         struct pollfd pollfd[1];
435         int result;
436
437         for (;;) {
438                 pollfd[0].fd = fd;
439                 pollfd[0].events = POLLIN;
440
441                 result = poll(pollfd, 1, MY_WAIT_POLL_TIMEOUT_MS);
442                 if (result < 0) {
443                         if (errno == EINTR)
444                                 continue;
445                         goto cleanup;
446                 }
447
448                 if (result == 0) {
449                         /* a timeout */
450
451                         int in_shutdown;
452
453                         pthread_mutex_lock(&server_data->work_available_mutex);
454                         in_shutdown = server_data->shutdown_requested;
455                         pthread_mutex_unlock(&server_data->work_available_mutex);
456
457                         /*
458                          * If a shutdown is already in progress and this
459                          * client has not started talking yet, just drop it.
460                          */
461                         if (in_shutdown)
462                                 goto cleanup;
463                         continue;
464                 }
465
466                 if (pollfd[0].revents & POLLHUP)
467                         goto cleanup;
468
469                 if (pollfd[0].revents & POLLIN)
470                         return 0;
471
472                 goto cleanup;
473         }
474
475 cleanup:
476         close(fd);
477         return -1;
478 }
479
480 /*
481  * Receive the request/command from the client and pass it to the
482  * registered request-callback.  The request-callback will compose
483  * a response and call our reply-callback to send it to the client.
484  */
485 static int worker_thread__do_io(
486         struct ipc_worker_thread_data *worker_thread_data,
487         int fd)
488 {
489         /* ASSERT NOT holding lock */
490
491         struct strbuf buf = STRBUF_INIT;
492         struct ipc_server_reply_data reply_data;
493         int ret = 0;
494
495         reply_data.magic = MAGIC_SERVER_REPLY_DATA;
496         reply_data.worker_thread_data = worker_thread_data;
497
498         reply_data.fd = fd;
499
500         ret = read_packetized_to_strbuf(
501                 reply_data.fd, &buf,
502                 PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR);
503         if (ret >= 0) {
504                 ret = worker_thread_data->server_data->application_cb(
505                         worker_thread_data->server_data->application_data,
506                         buf.buf, do_io_reply_callback, &reply_data);
507
508                 packet_flush_gently(reply_data.fd);
509         }
510         else {
511                 /*
512                  * The client probably disconnected/shutdown before it
513                  * could send a well-formed message.  Ignore it.
514                  */
515         }
516
517         strbuf_release(&buf);
518         close(reply_data.fd);
519
520         return ret;
521 }
522
523 /*
524  * Block SIGPIPE on the current thread (so that we get EPIPE from
525  * write() rather than an actual signal).
526  *
527  * Note that using sigchain_push() and _pop() to control SIGPIPE
528  * around our IO calls is not thread safe:
529  * [] It uses a global stack of handler frames.
530  * [] It uses ALLOC_GROW() to resize it.
531  * [] Finally, according to the `signal(2)` man-page:
532  *    "The effects of `signal()` in a multithreaded process are unspecified."
533  */
534 static void thread_block_sigpipe(sigset_t *old_set)
535 {
536         sigset_t new_set;
537
538         sigemptyset(&new_set);
539         sigaddset(&new_set, SIGPIPE);
540
541         sigemptyset(old_set);
542         pthread_sigmask(SIG_BLOCK, &new_set, old_set);
543 }
544
545 /*
546  * Thread proc for an IPC worker thread.  It handles a series of
547  * connections from clients.  It pulls the next fd from the queue
548  * processes it, and then waits for the next client.
549  *
550  * Block SIGPIPE in this worker thread for the life of the thread.
551  * This avoids stray (and sometimes delayed) SIGPIPE signals caused
552  * by client errors and/or when we are under extremely heavy IO load.
553  *
554  * This means that the application callback will have SIGPIPE blocked.
555  * The callback should not change it.
556  */
557 static void *worker_thread_proc(void *_worker_thread_data)
558 {
559         struct ipc_worker_thread_data *worker_thread_data = _worker_thread_data;
560         struct ipc_server_data *server_data = worker_thread_data->server_data;
561         sigset_t old_set;
562         int fd, io;
563         int ret;
564
565         trace2_thread_start("ipc-worker");
566
567         thread_block_sigpipe(&old_set);
568
569         for (;;) {
570                 fd = worker_thread__wait_for_connection(worker_thread_data);
571                 if (fd == -1)
572                         break; /* in shutdown */
573
574                 io = worker_thread__wait_for_io_start(worker_thread_data, fd);
575                 if (io == -1)
576                         continue; /* client hung up without sending anything */
577
578                 ret = worker_thread__do_io(worker_thread_data, fd);
579
580                 if (ret == SIMPLE_IPC_QUIT) {
581                         trace2_data_string("ipc-worker", NULL, "queue_stop_async",
582                                            "application_quit");
583                         /*
584                          * The application layer is telling the ipc-server
585                          * layer to shutdown.
586                          *
587                          * We DO NOT have a response to send to the client.
588                          *
589                          * Queue an async stop (to stop the other threads) and
590                          * allow this worker thread to exit now (no sense waiting
591                          * for the thread-pool shutdown signal).
592                          *
593                          * Other non-idle worker threads are allowed to finish
594                          * responding to their current clients.
595                          */
596                         ipc_server_stop_async(server_data);
597                         break;
598                 }
599         }
600
601         trace2_thread_exit();
602         return NULL;
603 }
604
605 /* A randomly chosen value. */
606 #define MY_ACCEPT_POLL_TIMEOUT_MS (60 * 1000)
607
608 /*
609  * Accept a new client connection on our socket.  This uses non-blocking
610  * IO so that we can also wait for shutdown requests on our socket-pair
611  * without actually spinning on a fast timeout.
612  */
613 static int accept_thread__wait_for_connection(
614         struct ipc_accept_thread_data *accept_thread_data)
615 {
616         struct pollfd pollfd[2];
617         int result;
618
619         for (;;) {
620                 pollfd[0].fd = accept_thread_data->fd_wait_shutdown;
621                 pollfd[0].events = POLLIN;
622
623                 pollfd[1].fd = accept_thread_data->server_socket->fd_socket;
624                 pollfd[1].events = POLLIN;
625
626                 result = poll(pollfd, 2, MY_ACCEPT_POLL_TIMEOUT_MS);
627                 if (result < 0) {
628                         if (errno == EINTR)
629                                 continue;
630                         return result;
631                 }
632
633                 if (result == 0) {
634                         /* a timeout */
635
636                         /*
637                          * If someone deletes or force-creates a new unix
638                          * domain socket at our path, all future clients
639                          * will be routed elsewhere and we silently starve.
640                          * If that happens, just queue a shutdown.
641                          */
642                         if (unix_ss_was_stolen(
643                                     accept_thread_data->server_socket)) {
644                                 trace2_data_string("ipc-accept", NULL,
645                                                    "queue_stop_async",
646                                                    "socket_stolen");
647                                 ipc_server_stop_async(
648                                         accept_thread_data->server_data);
649                         }
650                         continue;
651                 }
652
653                 if (pollfd[0].revents & POLLIN) {
654                         /* shutdown message queued to socketpair */
655                         return -1;
656                 }
657
658                 if (pollfd[1].revents & POLLIN) {
659                         /* a connection is available on server_socket */
660
661                         int client_fd =
662                                 accept(accept_thread_data->server_socket->fd_socket,
663                                        NULL, NULL);
664                         if (client_fd >= 0)
665                                 return client_fd;
666
667                         /*
668                          * An error here is unlikely -- it probably
669                          * indicates that the connecting process has
670                          * already dropped the connection.
671                          */
672                         continue;
673                 }
674
675                 BUG("unandled poll result errno=%d r[0]=%d r[1]=%d",
676                     errno, pollfd[0].revents, pollfd[1].revents);
677         }
678 }
679
680 /*
681  * Thread proc for the IPC server "accept thread".  This waits for
682  * an incoming socket connection, appends it to the queue of available
683  * connections, and notifies a worker thread to process it.
684  *
685  * Block SIGPIPE in this thread for the life of the thread.  This
686  * avoids any stray SIGPIPE signals when closing pipe fds under
687  * extremely heavy loads (such as when the fifo queue is full and we
688  * drop incomming connections).
689  */
690 static void *accept_thread_proc(void *_accept_thread_data)
691 {
692         struct ipc_accept_thread_data *accept_thread_data = _accept_thread_data;
693         struct ipc_server_data *server_data = accept_thread_data->server_data;
694         sigset_t old_set;
695
696         trace2_thread_start("ipc-accept");
697
698         thread_block_sigpipe(&old_set);
699
700         for (;;) {
701                 int client_fd = accept_thread__wait_for_connection(
702                         accept_thread_data);
703
704                 pthread_mutex_lock(&server_data->work_available_mutex);
705                 if (server_data->shutdown_requested) {
706                         pthread_mutex_unlock(&server_data->work_available_mutex);
707                         if (client_fd >= 0)
708                                 close(client_fd);
709                         break;
710                 }
711
712                 if (client_fd < 0) {
713                         /* ignore transient accept() errors */
714                 }
715                 else {
716                         fifo_enqueue(server_data, client_fd);
717                         pthread_cond_broadcast(&server_data->work_available_cond);
718                 }
719                 pthread_mutex_unlock(&server_data->work_available_mutex);
720         }
721
722         trace2_thread_exit();
723         return NULL;
724 }
725
726 /*
727  * We can't predict the connection arrival rate relative to the worker
728  * processing rate, therefore we allow the "accept-thread" to queue up
729  * a generous number of connections, since we'd rather have the client
730  * not unnecessarily timeout if we can avoid it.  (The assumption is
731  * that this will be used for FSMonitor and a few second wait on a
732  * connection is better than having the client timeout and do the full
733  * computation itself.)
734  *
735  * The FIFO queue size is set to a multiple of the worker pool size.
736  * This value chosen at random.
737  */
738 #define FIFO_SCALE (100)
739
740 /*
741  * The backlog value for `listen(2)`.  This doesn't need to huge,
742  * rather just large enough for our "accept-thread" to wake up and
743  * queue incoming connections onto the FIFO without the kernel
744  * dropping any.
745  *
746  * This value chosen at random.
747  */
748 #define LISTEN_BACKLOG (50)
749
750 static int create_listener_socket(
751         const char *path,
752         const struct ipc_server_opts *ipc_opts,
753         struct unix_ss_socket **new_server_socket)
754 {
755         struct unix_ss_socket *server_socket = NULL;
756         struct unix_stream_listen_opts uslg_opts = UNIX_STREAM_LISTEN_OPTS_INIT;
757         int ret;
758
759         uslg_opts.listen_backlog_size = LISTEN_BACKLOG;
760         uslg_opts.disallow_chdir = ipc_opts->uds_disallow_chdir;
761
762         ret = unix_ss_create(path, &uslg_opts, -1, &server_socket);
763         if (ret)
764                 return ret;
765
766         if (set_socket_blocking_flag(server_socket->fd_socket, 1)) {
767                 int saved_errno = errno;
768                 unix_ss_free(server_socket);
769                 errno = saved_errno;
770                 return -1;
771         }
772
773         *new_server_socket = server_socket;
774
775         trace2_data_string("ipc-server", NULL, "listen-with-lock", path);
776         return 0;
777 }
778
779 static int setup_listener_socket(
780         const char *path,
781         const struct ipc_server_opts *ipc_opts,
782         struct unix_ss_socket **new_server_socket)
783 {
784         int ret, saved_errno;
785
786         trace2_region_enter("ipc-server", "create-listener_socket", NULL);
787
788         ret = create_listener_socket(path, ipc_opts, new_server_socket);
789
790         saved_errno = errno;
791         trace2_region_leave("ipc-server", "create-listener_socket", NULL);
792         errno = saved_errno;
793
794         return ret;
795 }
796
797 /*
798  * Start IPC server in a pool of background threads.
799  */
800 int ipc_server_run_async(struct ipc_server_data **returned_server_data,
801                          const char *path, const struct ipc_server_opts *opts,
802                          ipc_server_application_cb *application_cb,
803                          void *application_data)
804 {
805         struct unix_ss_socket *server_socket = NULL;
806         struct ipc_server_data *server_data;
807         int sv[2];
808         int k;
809         int ret;
810         int nr_threads = opts->nr_threads;
811
812         *returned_server_data = NULL;
813
814         /*
815          * Create a socketpair and set sv[1] to non-blocking.  This
816          * will used to send a shutdown message to the accept-thread
817          * and allows the accept-thread to wait on EITHER a client
818          * connection or a shutdown request without spinning.
819          */
820         if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) < 0)
821                 return -1;
822
823         if (set_socket_blocking_flag(sv[1], 1)) {
824                 int saved_errno = errno;
825                 close(sv[0]);
826                 close(sv[1]);
827                 errno = saved_errno;
828                 return -1;
829         }
830
831         ret = setup_listener_socket(path, opts, &server_socket);
832         if (ret) {
833                 int saved_errno = errno;
834                 close(sv[0]);
835                 close(sv[1]);
836                 errno = saved_errno;
837                 return ret;
838         }
839
840         server_data = xcalloc(1, sizeof(*server_data));
841         server_data->magic = MAGIC_SERVER_DATA;
842         server_data->application_cb = application_cb;
843         server_data->application_data = application_data;
844         strbuf_init(&server_data->buf_path, 0);
845         strbuf_addstr(&server_data->buf_path, path);
846
847         if (nr_threads < 1)
848                 nr_threads = 1;
849
850         pthread_mutex_init(&server_data->work_available_mutex, NULL);
851         pthread_cond_init(&server_data->work_available_cond, NULL);
852
853         server_data->queue_size = nr_threads * FIFO_SCALE;
854         CALLOC_ARRAY(server_data->fifo_fds, server_data->queue_size);
855
856         server_data->accept_thread =
857                 xcalloc(1, sizeof(*server_data->accept_thread));
858         server_data->accept_thread->magic = MAGIC_ACCEPT_THREAD_DATA;
859         server_data->accept_thread->server_data = server_data;
860         server_data->accept_thread->server_socket = server_socket;
861         server_data->accept_thread->fd_send_shutdown = sv[0];
862         server_data->accept_thread->fd_wait_shutdown = sv[1];
863
864         if (pthread_create(&server_data->accept_thread->pthread_id, NULL,
865                            accept_thread_proc, server_data->accept_thread))
866                 die_errno(_("could not start accept_thread '%s'"), path);
867
868         for (k = 0; k < nr_threads; k++) {
869                 struct ipc_worker_thread_data *wtd;
870
871                 wtd = xcalloc(1, sizeof(*wtd));
872                 wtd->magic = MAGIC_WORKER_THREAD_DATA;
873                 wtd->server_data = server_data;
874
875                 if (pthread_create(&wtd->pthread_id, NULL, worker_thread_proc,
876                                    wtd)) {
877                         if (k == 0)
878                                 die(_("could not start worker[0] for '%s'"),
879                                     path);
880                         /*
881                          * Limp along with the thread pool that we have.
882                          */
883                         break;
884                 }
885
886                 wtd->next_thread = server_data->worker_thread_list;
887                 server_data->worker_thread_list = wtd;
888         }
889
890         *returned_server_data = server_data;
891         return 0;
892 }
893
894 /*
895  * Gently tell the IPC server treads to shutdown.
896  * Can be run on any thread.
897  */
898 int ipc_server_stop_async(struct ipc_server_data *server_data)
899 {
900         /* ASSERT NOT holding mutex */
901
902         int fd;
903
904         if (!server_data)
905                 return 0;
906
907         trace2_region_enter("ipc-server", "server-stop-async", NULL);
908
909         pthread_mutex_lock(&server_data->work_available_mutex);
910
911         server_data->shutdown_requested = 1;
912
913         /*
914          * Write a byte to the shutdown socket pair to wake up the
915          * accept-thread.
916          */
917         if (write(server_data->accept_thread->fd_send_shutdown, "Q", 1) < 0)
918                 error_errno("could not write to fd_send_shutdown");
919
920         /*
921          * Drain the queue of existing connections.
922          */
923         while ((fd = fifo_dequeue(server_data)) != -1)
924                 close(fd);
925
926         /*
927          * Gently tell worker threads to stop processing new connections
928          * and exit.  (This does not abort in-process conversations.)
929          */
930         pthread_cond_broadcast(&server_data->work_available_cond);
931
932         pthread_mutex_unlock(&server_data->work_available_mutex);
933
934         trace2_region_leave("ipc-server", "server-stop-async", NULL);
935
936         return 0;
937 }
938
939 /*
940  * Wait for all IPC server threads to stop.
941  */
942 int ipc_server_await(struct ipc_server_data *server_data)
943 {
944         pthread_join(server_data->accept_thread->pthread_id, NULL);
945
946         if (!server_data->shutdown_requested)
947                 BUG("ipc-server: accept-thread stopped for '%s'",
948                     server_data->buf_path.buf);
949
950         while (server_data->worker_thread_list) {
951                 struct ipc_worker_thread_data *wtd =
952                         server_data->worker_thread_list;
953
954                 pthread_join(wtd->pthread_id, NULL);
955
956                 server_data->worker_thread_list = wtd->next_thread;
957                 free(wtd);
958         }
959
960         server_data->is_stopped = 1;
961
962         return 0;
963 }
964
965 void ipc_server_free(struct ipc_server_data *server_data)
966 {
967         struct ipc_accept_thread_data * accept_thread_data;
968
969         if (!server_data)
970                 return;
971
972         if (!server_data->is_stopped)
973                 BUG("cannot free ipc-server while running for '%s'",
974                     server_data->buf_path.buf);
975
976         accept_thread_data = server_data->accept_thread;
977         if (accept_thread_data) {
978                 unix_ss_free(accept_thread_data->server_socket);
979
980                 if (accept_thread_data->fd_send_shutdown != -1)
981                         close(accept_thread_data->fd_send_shutdown);
982                 if (accept_thread_data->fd_wait_shutdown != -1)
983                         close(accept_thread_data->fd_wait_shutdown);
984
985                 free(server_data->accept_thread);
986         }
987
988         while (server_data->worker_thread_list) {
989                 struct ipc_worker_thread_data *wtd =
990                         server_data->worker_thread_list;
991
992                 server_data->worker_thread_list = wtd->next_thread;
993                 free(wtd);
994         }
995
996         pthread_cond_destroy(&server_data->work_available_cond);
997         pthread_mutex_destroy(&server_data->work_available_mutex);
998
999         strbuf_release(&server_data->buf_path);
1000
1001         free(server_data->fifo_fds);
1002         free(server_data);
1003 }