2 #include "simple-ipc.h"
5 #include "thread-utils.h"
6 #include "unix-socket.h"
7 #include "unix-stream-server.h"
10 #error compat/simple-ipc/ipc-unix-socket.c requires Unix sockets
13 enum ipc_active_state ipc_get_active_state(const char *path)
15 enum ipc_active_state state = IPC_STATE__OTHER_ERROR;
16 struct ipc_client_connect_options options
17 = IPC_CLIENT_CONNECT_OPTIONS_INIT;
19 struct ipc_client_connection *connection_test = NULL;
21 options.wait_if_busy = 0;
22 options.wait_if_not_found = 0;
24 if (lstat(path, &st) == -1) {
28 return IPC_STATE__NOT_LISTENING;
30 return IPC_STATE__INVALID_PATH;
34 /* also complain if a plain file is in the way */
35 if ((st.st_mode & S_IFMT) != S_IFSOCK)
36 return IPC_STATE__INVALID_PATH;
39 * Just because the filesystem has a S_IFSOCK type inode
40 * at `path`, doesn't mean it that there is a server listening.
43 state = ipc_client_try_connect(path, &options, &connection_test);
44 ipc_client_close_connection(connection_test);
50 * Retry frequency when trying to connect to a server.
52 * This value should be short enough that we don't seriously delay our
53 * caller, but not fast enough that our spinning puts pressure on the
56 #define WAIT_STEP_MS (50)
59 * Try to connect to the server. If the server is just starting up or
60 * is very busy, we may not get a connection the first time.
62 static enum ipc_active_state connect_to_server(
65 const struct ipc_client_connect_options *options,
72 for (k = 0; k < timeout_ms; k += WAIT_STEP_MS) {
73 int fd = unix_stream_connect(path, options->uds_disallow_chdir);
77 return IPC_STATE__LISTENING;
80 if (errno == ENOENT) {
81 if (!options->wait_if_not_found)
82 return IPC_STATE__PATH_NOT_FOUND;
84 goto sleep_and_try_again;
87 if (errno == ETIMEDOUT) {
88 if (!options->wait_if_busy)
89 return IPC_STATE__NOT_LISTENING;
91 goto sleep_and_try_again;
94 if (errno == ECONNREFUSED) {
95 if (!options->wait_if_busy)
96 return IPC_STATE__NOT_LISTENING;
98 goto sleep_and_try_again;
101 return IPC_STATE__OTHER_ERROR;
104 sleep_millisec(WAIT_STEP_MS);
107 return IPC_STATE__NOT_LISTENING;
111 * The total amount of time that we are willing to wait when trying to
112 * connect to a server.
114 * When the server is first started, it might take a little while for
115 * it to become ready to service requests. Likewise, the server may
116 * be very (temporarily) busy and not respond to our connections.
118 * We should gracefully and silently handle those conditions and try
119 * again for a reasonable time period.
121 * The value chosen here should be long enough for the server
122 * to reliably heal from the above conditions.
124 #define MY_CONNECTION_TIMEOUT_MS (1000)
126 enum ipc_active_state ipc_client_try_connect(
128 const struct ipc_client_connect_options *options,
129 struct ipc_client_connection **p_connection)
131 enum ipc_active_state state = IPC_STATE__OTHER_ERROR;
134 *p_connection = NULL;
136 trace2_region_enter("ipc-client", "try-connect", NULL);
137 trace2_data_string("ipc-client", NULL, "try-connect/path", path);
139 state = connect_to_server(path, MY_CONNECTION_TIMEOUT_MS,
142 trace2_data_intmax("ipc-client", NULL, "try-connect/state",
144 trace2_region_leave("ipc-client", "try-connect", NULL);
146 if (state == IPC_STATE__LISTENING) {
147 (*p_connection) = xcalloc(1, sizeof(struct ipc_client_connection));
148 (*p_connection)->fd = fd;
154 void ipc_client_close_connection(struct ipc_client_connection *connection)
159 if (connection->fd != -1)
160 close(connection->fd);
165 int ipc_client_send_command_to_connection(
166 struct ipc_client_connection *connection,
167 const char *message, struct strbuf *answer)
171 strbuf_setlen(answer, 0);
173 trace2_region_enter("ipc-client", "send-command", NULL);
175 if (write_packetized_from_buf_no_flush(message, strlen(message),
176 connection->fd) < 0 ||
177 packet_flush_gently(connection->fd) < 0) {
178 ret = error(_("could not send IPC command"));
182 if (read_packetized_to_strbuf(
183 connection->fd, answer,
184 PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR) < 0) {
185 ret = error(_("could not read IPC response"));
190 trace2_region_leave("ipc-client", "send-command", NULL);
194 int ipc_client_send_command(const char *path,
195 const struct ipc_client_connect_options *options,
196 const char *message, struct strbuf *answer)
199 enum ipc_active_state state;
200 struct ipc_client_connection *connection = NULL;
202 state = ipc_client_try_connect(path, options, &connection);
204 if (state != IPC_STATE__LISTENING)
207 ret = ipc_client_send_command_to_connection(connection, message, answer);
209 ipc_client_close_connection(connection);
214 static int set_socket_blocking_flag(int fd, int make_nonblocking)
218 flags = fcntl(fd, F_GETFL, NULL);
223 if (make_nonblocking)
226 flags &= ~O_NONBLOCK;
228 return fcntl(fd, F_SETFL, flags);
232 * Magic numbers used to annotate callback instance data.
233 * These are used to help guard against accidentally passing the
234 * wrong instance data across multiple levels of callbacks (which
235 * is easy to do if there are `void*` arguments).
238 MAGIC_SERVER_REPLY_DATA,
239 MAGIC_WORKER_THREAD_DATA,
240 MAGIC_ACCEPT_THREAD_DATA,
244 struct ipc_server_reply_data {
247 struct ipc_worker_thread_data *worker_thread_data;
250 struct ipc_worker_thread_data {
252 struct ipc_worker_thread_data *next_thread;
253 struct ipc_server_data *server_data;
254 pthread_t pthread_id;
257 struct ipc_accept_thread_data {
259 struct ipc_server_data *server_data;
261 struct unix_ss_socket *server_socket;
263 int fd_send_shutdown;
264 int fd_wait_shutdown;
265 pthread_t pthread_id;
269 * With unix-sockets, the conceptual "ipc-server" is implemented as a single
270 * controller "accept-thread" thread and a pool of "worker-thread" threads.
271 * The former does the usual `accept()` loop and dispatches connections
272 * to an idle worker thread. The worker threads wait in an idle loop for
273 * a new connection, communicate with the client and relay data to/from
274 * the `application_cb` and then wait for another connection from the
275 * server thread. This avoids the overhead of constantly creating and
276 * destroying threads.
278 struct ipc_server_data {
280 ipc_server_application_cb *application_cb;
281 void *application_data;
282 struct strbuf buf_path;
284 struct ipc_accept_thread_data *accept_thread;
285 struct ipc_worker_thread_data *worker_thread_list;
287 pthread_mutex_t work_available_mutex;
288 pthread_cond_t work_available_cond;
291 * Accepted but not yet processed client connections are kept
292 * in a circular buffer FIFO. The queue is empty when the
293 * positions are equal.
300 int shutdown_requested;
305 * Remove and return the oldest queued connection.
307 * Returns -1 if empty.
309 static int fifo_dequeue(struct ipc_server_data *server_data)
311 /* ASSERT holding mutex */
315 if (server_data->back_pos == server_data->front_pos)
318 fd = server_data->fifo_fds[server_data->front_pos];
319 server_data->fifo_fds[server_data->front_pos] = -1;
321 server_data->front_pos++;
322 if (server_data->front_pos == server_data->queue_size)
323 server_data->front_pos = 0;
329 * Push a new fd onto the back of the queue.
331 * Drop it and return -1 if queue is already full.
333 static int fifo_enqueue(struct ipc_server_data *server_data, int fd)
335 /* ASSERT holding mutex */
339 next_back_pos = server_data->back_pos + 1;
340 if (next_back_pos == server_data->queue_size)
343 if (next_back_pos == server_data->front_pos) {
344 /* Queue is full. Just drop it. */
349 server_data->fifo_fds[server_data->back_pos] = fd;
350 server_data->back_pos = next_back_pos;
356 * Wait for a connection to be queued to the FIFO and return it.
358 * Returns -1 if someone has already requested a shutdown.
360 static int worker_thread__wait_for_connection(
361 struct ipc_worker_thread_data *worker_thread_data)
363 /* ASSERT NOT holding mutex */
365 struct ipc_server_data *server_data = worker_thread_data->server_data;
368 pthread_mutex_lock(&server_data->work_available_mutex);
370 if (server_data->shutdown_requested)
373 fd = fifo_dequeue(server_data);
377 pthread_cond_wait(&server_data->work_available_cond,
378 &server_data->work_available_mutex);
380 pthread_mutex_unlock(&server_data->work_available_mutex);
386 * Forward declare our reply callback function so that any compiler
387 * errors are reported when we actually define the function (in addition
388 * to any errors reported when we try to pass this callback function as
389 * a parameter in a function call). The former are easier to understand.
391 static ipc_server_reply_cb do_io_reply_callback;
394 * Relay application's response message to the client process.
395 * (We do not flush at this point because we allow the caller
396 * to chunk data to the client thru us.)
398 static int do_io_reply_callback(struct ipc_server_reply_data *reply_data,
399 const char *response, size_t response_len)
401 if (reply_data->magic != MAGIC_SERVER_REPLY_DATA)
402 BUG("reply_cb called with wrong instance data");
404 return write_packetized_from_buf_no_flush(response, response_len,
408 /* A randomly chosen value. */
409 #define MY_WAIT_POLL_TIMEOUT_MS (10)
412 * If the client hangs up without sending any data on the wire, just
413 * quietly close the socket and ignore this client.
415 * This worker thread is committed to reading the IPC request data
416 * from the client at the other end of this fd. Wait here for the
417 * client to actually put something on the wire -- because if the
418 * client just does a ping (connect and hangup without sending any
419 * data), our use of the pkt-line read routines will spew an error
422 * Return -1 if the client hung up.
423 * Return 0 if data (possibly incomplete) is ready.
425 static int worker_thread__wait_for_io_start(
426 struct ipc_worker_thread_data *worker_thread_data,
429 struct ipc_server_data *server_data = worker_thread_data->server_data;
430 struct pollfd pollfd[1];
435 pollfd[0].events = POLLIN;
437 result = poll(pollfd, 1, MY_WAIT_POLL_TIMEOUT_MS);
449 pthread_mutex_lock(&server_data->work_available_mutex);
450 in_shutdown = server_data->shutdown_requested;
451 pthread_mutex_unlock(&server_data->work_available_mutex);
454 * If a shutdown is already in progress and this
455 * client has not started talking yet, just drop it.
462 if (pollfd[0].revents & POLLHUP)
465 if (pollfd[0].revents & POLLIN)
477 * Receive the request/command from the client and pass it to the
478 * registered request-callback. The request-callback will compose
479 * a response and call our reply-callback to send it to the client.
481 static int worker_thread__do_io(
482 struct ipc_worker_thread_data *worker_thread_data,
485 /* ASSERT NOT holding lock */
487 struct strbuf buf = STRBUF_INIT;
488 struct ipc_server_reply_data reply_data;
491 reply_data.magic = MAGIC_SERVER_REPLY_DATA;
492 reply_data.worker_thread_data = worker_thread_data;
496 ret = read_packetized_to_strbuf(
498 PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR);
500 ret = worker_thread_data->server_data->application_cb(
501 worker_thread_data->server_data->application_data,
502 buf.buf, do_io_reply_callback, &reply_data);
504 packet_flush_gently(reply_data.fd);
508 * The client probably disconnected/shutdown before it
509 * could send a well-formed message. Ignore it.
513 strbuf_release(&buf);
514 close(reply_data.fd);
520 * Block SIGPIPE on the current thread (so that we get EPIPE from
521 * write() rather than an actual signal).
523 * Note that using sigchain_push() and _pop() to control SIGPIPE
524 * around our IO calls is not thread safe:
525 * [] It uses a global stack of handler frames.
526 * [] It uses ALLOC_GROW() to resize it.
527 * [] Finally, according to the `signal(2)` man-page:
528 * "The effects of `signal()` in a multithreaded process are unspecified."
530 static void thread_block_sigpipe(sigset_t *old_set)
534 sigemptyset(&new_set);
535 sigaddset(&new_set, SIGPIPE);
537 sigemptyset(old_set);
538 pthread_sigmask(SIG_BLOCK, &new_set, old_set);
542 * Thread proc for an IPC worker thread. It handles a series of
543 * connections from clients. It pulls the next fd from the queue
544 * processes it, and then waits for the next client.
546 * Block SIGPIPE in this worker thread for the life of the thread.
547 * This avoids stray (and sometimes delayed) SIGPIPE signals caused
548 * by client errors and/or when we are under extremely heavy IO load.
550 * This means that the application callback will have SIGPIPE blocked.
551 * The callback should not change it.
553 static void *worker_thread_proc(void *_worker_thread_data)
555 struct ipc_worker_thread_data *worker_thread_data = _worker_thread_data;
556 struct ipc_server_data *server_data = worker_thread_data->server_data;
561 trace2_thread_start("ipc-worker");
563 thread_block_sigpipe(&old_set);
566 fd = worker_thread__wait_for_connection(worker_thread_data);
568 break; /* in shutdown */
570 io = worker_thread__wait_for_io_start(worker_thread_data, fd);
572 continue; /* client hung up without sending anything */
574 ret = worker_thread__do_io(worker_thread_data, fd);
576 if (ret == SIMPLE_IPC_QUIT) {
577 trace2_data_string("ipc-worker", NULL, "queue_stop_async",
580 * The application layer is telling the ipc-server
583 * We DO NOT have a response to send to the client.
585 * Queue an async stop (to stop the other threads) and
586 * allow this worker thread to exit now (no sense waiting
587 * for the thread-pool shutdown signal).
589 * Other non-idle worker threads are allowed to finish
590 * responding to their current clients.
592 ipc_server_stop_async(server_data);
597 trace2_thread_exit();
601 /* A randomly chosen value. */
602 #define MY_ACCEPT_POLL_TIMEOUT_MS (60 * 1000)
605 * Accept a new client connection on our socket. This uses non-blocking
606 * IO so that we can also wait for shutdown requests on our socket-pair
607 * without actually spinning on a fast timeout.
609 static int accept_thread__wait_for_connection(
610 struct ipc_accept_thread_data *accept_thread_data)
612 struct pollfd pollfd[2];
616 pollfd[0].fd = accept_thread_data->fd_wait_shutdown;
617 pollfd[0].events = POLLIN;
619 pollfd[1].fd = accept_thread_data->server_socket->fd_socket;
620 pollfd[1].events = POLLIN;
622 result = poll(pollfd, 2, MY_ACCEPT_POLL_TIMEOUT_MS);
633 * If someone deletes or force-creates a new unix
634 * domain socket at our path, all future clients
635 * will be routed elsewhere and we silently starve.
636 * If that happens, just queue a shutdown.
638 if (unix_ss_was_stolen(
639 accept_thread_data->server_socket)) {
640 trace2_data_string("ipc-accept", NULL,
643 ipc_server_stop_async(
644 accept_thread_data->server_data);
649 if (pollfd[0].revents & POLLIN) {
650 /* shutdown message queued to socketpair */
654 if (pollfd[1].revents & POLLIN) {
655 /* a connection is available on server_socket */
658 accept(accept_thread_data->server_socket->fd_socket,
664 * An error here is unlikely -- it probably
665 * indicates that the connecting process has
666 * already dropped the connection.
671 BUG("unandled poll result errno=%d r[0]=%d r[1]=%d",
672 errno, pollfd[0].revents, pollfd[1].revents);
677 * Thread proc for the IPC server "accept thread". This waits for
678 * an incoming socket connection, appends it to the queue of available
679 * connections, and notifies a worker thread to process it.
681 * Block SIGPIPE in this thread for the life of the thread. This
682 * avoids any stray SIGPIPE signals when closing pipe fds under
683 * extremely heavy loads (such as when the fifo queue is full and we
684 * drop incomming connections).
686 static void *accept_thread_proc(void *_accept_thread_data)
688 struct ipc_accept_thread_data *accept_thread_data = _accept_thread_data;
689 struct ipc_server_data *server_data = accept_thread_data->server_data;
692 trace2_thread_start("ipc-accept");
694 thread_block_sigpipe(&old_set);
697 int client_fd = accept_thread__wait_for_connection(
700 pthread_mutex_lock(&server_data->work_available_mutex);
701 if (server_data->shutdown_requested) {
702 pthread_mutex_unlock(&server_data->work_available_mutex);
709 /* ignore transient accept() errors */
712 fifo_enqueue(server_data, client_fd);
713 pthread_cond_broadcast(&server_data->work_available_cond);
715 pthread_mutex_unlock(&server_data->work_available_mutex);
718 trace2_thread_exit();
723 * We can't predict the connection arrival rate relative to the worker
724 * processing rate, therefore we allow the "accept-thread" to queue up
725 * a generous number of connections, since we'd rather have the client
726 * not unnecessarily timeout if we can avoid it. (The assumption is
727 * that this will be used for FSMonitor and a few second wait on a
728 * connection is better than having the client timeout and do the full
729 * computation itself.)
731 * The FIFO queue size is set to a multiple of the worker pool size.
732 * This value chosen at random.
734 #define FIFO_SCALE (100)
737 * The backlog value for `listen(2)`. This doesn't need to huge,
738 * rather just large enough for our "accept-thread" to wake up and
739 * queue incoming connections onto the FIFO without the kernel
742 * This value chosen at random.
744 #define LISTEN_BACKLOG (50)
746 static int create_listener_socket(
748 const struct ipc_server_opts *ipc_opts,
749 struct unix_ss_socket **new_server_socket)
751 struct unix_ss_socket *server_socket = NULL;
752 struct unix_stream_listen_opts uslg_opts = UNIX_STREAM_LISTEN_OPTS_INIT;
755 uslg_opts.listen_backlog_size = LISTEN_BACKLOG;
756 uslg_opts.disallow_chdir = ipc_opts->uds_disallow_chdir;
758 ret = unix_ss_create(path, &uslg_opts, -1, &server_socket);
762 if (set_socket_blocking_flag(server_socket->fd_socket, 1)) {
763 int saved_errno = errno;
764 unix_ss_free(server_socket);
769 *new_server_socket = server_socket;
771 trace2_data_string("ipc-server", NULL, "listen-with-lock", path);
775 static int setup_listener_socket(
777 const struct ipc_server_opts *ipc_opts,
778 struct unix_ss_socket **new_server_socket)
780 int ret, saved_errno;
782 trace2_region_enter("ipc-server", "create-listener_socket", NULL);
784 ret = create_listener_socket(path, ipc_opts, new_server_socket);
787 trace2_region_leave("ipc-server", "create-listener_socket", NULL);
794 * Start IPC server in a pool of background threads.
796 int ipc_server_run_async(struct ipc_server_data **returned_server_data,
797 const char *path, const struct ipc_server_opts *opts,
798 ipc_server_application_cb *application_cb,
799 void *application_data)
801 struct unix_ss_socket *server_socket = NULL;
802 struct ipc_server_data *server_data;
806 int nr_threads = opts->nr_threads;
808 *returned_server_data = NULL;
811 * Create a socketpair and set sv[1] to non-blocking. This
812 * will used to send a shutdown message to the accept-thread
813 * and allows the accept-thread to wait on EITHER a client
814 * connection or a shutdown request without spinning.
816 if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) < 0)
819 if (set_socket_blocking_flag(sv[1], 1)) {
820 int saved_errno = errno;
827 ret = setup_listener_socket(path, opts, &server_socket);
829 int saved_errno = errno;
836 server_data = xcalloc(1, sizeof(*server_data));
837 server_data->magic = MAGIC_SERVER_DATA;
838 server_data->application_cb = application_cb;
839 server_data->application_data = application_data;
840 strbuf_init(&server_data->buf_path, 0);
841 strbuf_addstr(&server_data->buf_path, path);
846 pthread_mutex_init(&server_data->work_available_mutex, NULL);
847 pthread_cond_init(&server_data->work_available_cond, NULL);
849 server_data->queue_size = nr_threads * FIFO_SCALE;
850 CALLOC_ARRAY(server_data->fifo_fds, server_data->queue_size);
852 server_data->accept_thread =
853 xcalloc(1, sizeof(*server_data->accept_thread));
854 server_data->accept_thread->magic = MAGIC_ACCEPT_THREAD_DATA;
855 server_data->accept_thread->server_data = server_data;
856 server_data->accept_thread->server_socket = server_socket;
857 server_data->accept_thread->fd_send_shutdown = sv[0];
858 server_data->accept_thread->fd_wait_shutdown = sv[1];
860 if (pthread_create(&server_data->accept_thread->pthread_id, NULL,
861 accept_thread_proc, server_data->accept_thread))
862 die_errno(_("could not start accept_thread '%s'"), path);
864 for (k = 0; k < nr_threads; k++) {
865 struct ipc_worker_thread_data *wtd;
867 wtd = xcalloc(1, sizeof(*wtd));
868 wtd->magic = MAGIC_WORKER_THREAD_DATA;
869 wtd->server_data = server_data;
871 if (pthread_create(&wtd->pthread_id, NULL, worker_thread_proc,
874 die(_("could not start worker[0] for '%s'"),
877 * Limp along with the thread pool that we have.
882 wtd->next_thread = server_data->worker_thread_list;
883 server_data->worker_thread_list = wtd;
886 *returned_server_data = server_data;
891 * Gently tell the IPC server treads to shutdown.
892 * Can be run on any thread.
894 int ipc_server_stop_async(struct ipc_server_data *server_data)
896 /* ASSERT NOT holding mutex */
903 trace2_region_enter("ipc-server", "server-stop-async", NULL);
905 pthread_mutex_lock(&server_data->work_available_mutex);
907 server_data->shutdown_requested = 1;
910 * Write a byte to the shutdown socket pair to wake up the
913 if (write(server_data->accept_thread->fd_send_shutdown, "Q", 1) < 0)
914 error_errno("could not write to fd_send_shutdown");
917 * Drain the queue of existing connections.
919 while ((fd = fifo_dequeue(server_data)) != -1)
923 * Gently tell worker threads to stop processing new connections
924 * and exit. (This does not abort in-process conversations.)
926 pthread_cond_broadcast(&server_data->work_available_cond);
928 pthread_mutex_unlock(&server_data->work_available_mutex);
930 trace2_region_leave("ipc-server", "server-stop-async", NULL);
936 * Wait for all IPC server threads to stop.
938 int ipc_server_await(struct ipc_server_data *server_data)
940 pthread_join(server_data->accept_thread->pthread_id, NULL);
942 if (!server_data->shutdown_requested)
943 BUG("ipc-server: accept-thread stopped for '%s'",
944 server_data->buf_path.buf);
946 while (server_data->worker_thread_list) {
947 struct ipc_worker_thread_data *wtd =
948 server_data->worker_thread_list;
950 pthread_join(wtd->pthread_id, NULL);
952 server_data->worker_thread_list = wtd->next_thread;
956 server_data->is_stopped = 1;
961 void ipc_server_free(struct ipc_server_data *server_data)
963 struct ipc_accept_thread_data * accept_thread_data;
968 if (!server_data->is_stopped)
969 BUG("cannot free ipc-server while running for '%s'",
970 server_data->buf_path.buf);
972 accept_thread_data = server_data->accept_thread;
973 if (accept_thread_data) {
974 unix_ss_free(accept_thread_data->server_socket);
976 if (accept_thread_data->fd_send_shutdown != -1)
977 close(accept_thread_data->fd_send_shutdown);
978 if (accept_thread_data->fd_wait_shutdown != -1)
979 close(accept_thread_data->fd_wait_shutdown);
981 free(server_data->accept_thread);
984 while (server_data->worker_thread_list) {
985 struct ipc_worker_thread_data *wtd =
986 server_data->worker_thread_list;
988 server_data->worker_thread_list = wtd->next_thread;
992 pthread_cond_destroy(&server_data->work_available_cond);
993 pthread_mutex_destroy(&server_data->work_available_mutex);
995 strbuf_release(&server_data->buf_path);
997 free(server_data->fifo_fds);