2 #include "simple-ipc.h"
5 #include "thread-utils.h"
6 #include "unix-socket.h"
7 #include "unix-stream-server.h"
9 #ifndef SUPPORTS_SIMPLE_IPC
11 * This source file should only be compiled when Simple IPC is supported.
12 * See the top-level Makefile.
14 #error SUPPORTS_SIMPLE_IPC not defined
17 enum ipc_active_state ipc_get_active_state(const char *path)
19 enum ipc_active_state state = IPC_STATE__OTHER_ERROR;
20 struct ipc_client_connect_options options
21 = IPC_CLIENT_CONNECT_OPTIONS_INIT;
23 struct ipc_client_connection *connection_test = NULL;
25 options.wait_if_busy = 0;
26 options.wait_if_not_found = 0;
28 if (lstat(path, &st) == -1) {
32 return IPC_STATE__NOT_LISTENING;
34 return IPC_STATE__INVALID_PATH;
38 /* also complain if a plain file is in the way */
39 if ((st.st_mode & S_IFMT) != S_IFSOCK)
40 return IPC_STATE__INVALID_PATH;
43 * Just because the filesystem has a S_IFSOCK type inode
44 * at `path`, doesn't mean it that there is a server listening.
47 state = ipc_client_try_connect(path, &options, &connection_test);
48 ipc_client_close_connection(connection_test);
54 * Retry frequency when trying to connect to a server.
56 * This value should be short enough that we don't seriously delay our
57 * caller, but not fast enough that our spinning puts pressure on the
60 #define WAIT_STEP_MS (50)
63 * Try to connect to the server. If the server is just starting up or
64 * is very busy, we may not get a connection the first time.
66 static enum ipc_active_state connect_to_server(
69 const struct ipc_client_connect_options *options,
76 for (k = 0; k < timeout_ms; k += WAIT_STEP_MS) {
77 int fd = unix_stream_connect(path, options->uds_disallow_chdir);
81 return IPC_STATE__LISTENING;
84 if (errno == ENOENT) {
85 if (!options->wait_if_not_found)
86 return IPC_STATE__PATH_NOT_FOUND;
88 goto sleep_and_try_again;
91 if (errno == ETIMEDOUT) {
92 if (!options->wait_if_busy)
93 return IPC_STATE__NOT_LISTENING;
95 goto sleep_and_try_again;
98 if (errno == ECONNREFUSED) {
99 if (!options->wait_if_busy)
100 return IPC_STATE__NOT_LISTENING;
102 goto sleep_and_try_again;
105 return IPC_STATE__OTHER_ERROR;
108 sleep_millisec(WAIT_STEP_MS);
111 return IPC_STATE__NOT_LISTENING;
115 * The total amount of time that we are willing to wait when trying to
116 * connect to a server.
118 * When the server is first started, it might take a little while for
119 * it to become ready to service requests. Likewise, the server may
120 * be very (temporarily) busy and not respond to our connections.
122 * We should gracefully and silently handle those conditions and try
123 * again for a reasonable time period.
125 * The value chosen here should be long enough for the server
126 * to reliably heal from the above conditions.
128 #define MY_CONNECTION_TIMEOUT_MS (1000)
130 enum ipc_active_state ipc_client_try_connect(
132 const struct ipc_client_connect_options *options,
133 struct ipc_client_connection **p_connection)
135 enum ipc_active_state state = IPC_STATE__OTHER_ERROR;
138 *p_connection = NULL;
140 trace2_region_enter("ipc-client", "try-connect", NULL);
141 trace2_data_string("ipc-client", NULL, "try-connect/path", path);
143 state = connect_to_server(path, MY_CONNECTION_TIMEOUT_MS,
146 trace2_data_intmax("ipc-client", NULL, "try-connect/state",
148 trace2_region_leave("ipc-client", "try-connect", NULL);
150 if (state == IPC_STATE__LISTENING) {
151 (*p_connection) = xcalloc(1, sizeof(struct ipc_client_connection));
152 (*p_connection)->fd = fd;
158 void ipc_client_close_connection(struct ipc_client_connection *connection)
163 if (connection->fd != -1)
164 close(connection->fd);
169 int ipc_client_send_command_to_connection(
170 struct ipc_client_connection *connection,
171 const char *message, size_t message_len,
172 struct strbuf *answer)
176 strbuf_setlen(answer, 0);
178 trace2_region_enter("ipc-client", "send-command", NULL);
180 if (write_packetized_from_buf_no_flush(message, message_len,
181 connection->fd) < 0 ||
182 packet_flush_gently(connection->fd) < 0) {
183 ret = error(_("could not send IPC command"));
187 if (read_packetized_to_strbuf(
188 connection->fd, answer,
189 PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR) < 0) {
190 ret = error(_("could not read IPC response"));
195 trace2_region_leave("ipc-client", "send-command", NULL);
199 int ipc_client_send_command(const char *path,
200 const struct ipc_client_connect_options *options,
201 const char *message, size_t message_len,
202 struct strbuf *answer)
205 enum ipc_active_state state;
206 struct ipc_client_connection *connection = NULL;
208 state = ipc_client_try_connect(path, options, &connection);
210 if (state != IPC_STATE__LISTENING)
213 ret = ipc_client_send_command_to_connection(connection,
214 message, message_len,
217 ipc_client_close_connection(connection);
222 static int set_socket_blocking_flag(int fd, int make_nonblocking)
226 flags = fcntl(fd, F_GETFL, NULL);
231 if (make_nonblocking)
234 flags &= ~O_NONBLOCK;
236 return fcntl(fd, F_SETFL, flags);
240 * Magic numbers used to annotate callback instance data.
241 * These are used to help guard against accidentally passing the
242 * wrong instance data across multiple levels of callbacks (which
243 * is easy to do if there are `void*` arguments).
246 MAGIC_SERVER_REPLY_DATA,
247 MAGIC_WORKER_THREAD_DATA,
248 MAGIC_ACCEPT_THREAD_DATA,
252 struct ipc_server_reply_data {
255 struct ipc_worker_thread_data *worker_thread_data;
258 struct ipc_worker_thread_data {
260 struct ipc_worker_thread_data *next_thread;
261 struct ipc_server_data *server_data;
262 pthread_t pthread_id;
265 struct ipc_accept_thread_data {
267 struct ipc_server_data *server_data;
269 struct unix_ss_socket *server_socket;
271 int fd_send_shutdown;
272 int fd_wait_shutdown;
273 pthread_t pthread_id;
277 * With unix-sockets, the conceptual "ipc-server" is implemented as a single
278 * controller "accept-thread" thread and a pool of "worker-thread" threads.
279 * The former does the usual `accept()` loop and dispatches connections
280 * to an idle worker thread. The worker threads wait in an idle loop for
281 * a new connection, communicate with the client and relay data to/from
282 * the `application_cb` and then wait for another connection from the
283 * server thread. This avoids the overhead of constantly creating and
284 * destroying threads.
286 struct ipc_server_data {
288 ipc_server_application_cb *application_cb;
289 void *application_data;
290 struct strbuf buf_path;
292 struct ipc_accept_thread_data *accept_thread;
293 struct ipc_worker_thread_data *worker_thread_list;
295 pthread_mutex_t work_available_mutex;
296 pthread_cond_t work_available_cond;
299 * Accepted but not yet processed client connections are kept
300 * in a circular buffer FIFO. The queue is empty when the
301 * positions are equal.
308 int shutdown_requested;
313 * Remove and return the oldest queued connection.
315 * Returns -1 if empty.
317 static int fifo_dequeue(struct ipc_server_data *server_data)
319 /* ASSERT holding mutex */
323 if (server_data->back_pos == server_data->front_pos)
326 fd = server_data->fifo_fds[server_data->front_pos];
327 server_data->fifo_fds[server_data->front_pos] = -1;
329 server_data->front_pos++;
330 if (server_data->front_pos == server_data->queue_size)
331 server_data->front_pos = 0;
337 * Push a new fd onto the back of the queue.
339 * Drop it and return -1 if queue is already full.
341 static int fifo_enqueue(struct ipc_server_data *server_data, int fd)
343 /* ASSERT holding mutex */
347 next_back_pos = server_data->back_pos + 1;
348 if (next_back_pos == server_data->queue_size)
351 if (next_back_pos == server_data->front_pos) {
352 /* Queue is full. Just drop it. */
357 server_data->fifo_fds[server_data->back_pos] = fd;
358 server_data->back_pos = next_back_pos;
364 * Wait for a connection to be queued to the FIFO and return it.
366 * Returns -1 if someone has already requested a shutdown.
368 static int worker_thread__wait_for_connection(
369 struct ipc_worker_thread_data *worker_thread_data)
371 /* ASSERT NOT holding mutex */
373 struct ipc_server_data *server_data = worker_thread_data->server_data;
376 pthread_mutex_lock(&server_data->work_available_mutex);
378 if (server_data->shutdown_requested)
381 fd = fifo_dequeue(server_data);
385 pthread_cond_wait(&server_data->work_available_cond,
386 &server_data->work_available_mutex);
388 pthread_mutex_unlock(&server_data->work_available_mutex);
394 * Forward declare our reply callback function so that any compiler
395 * errors are reported when we actually define the function (in addition
396 * to any errors reported when we try to pass this callback function as
397 * a parameter in a function call). The former are easier to understand.
399 static ipc_server_reply_cb do_io_reply_callback;
402 * Relay application's response message to the client process.
403 * (We do not flush at this point because we allow the caller
404 * to chunk data to the client thru us.)
406 static int do_io_reply_callback(struct ipc_server_reply_data *reply_data,
407 const char *response, size_t response_len)
409 if (reply_data->magic != MAGIC_SERVER_REPLY_DATA)
410 BUG("reply_cb called with wrong instance data");
412 return write_packetized_from_buf_no_flush(response, response_len,
416 /* A randomly chosen value. */
417 #define MY_WAIT_POLL_TIMEOUT_MS (10)
420 * If the client hangs up without sending any data on the wire, just
421 * quietly close the socket and ignore this client.
423 * This worker thread is committed to reading the IPC request data
424 * from the client at the other end of this fd. Wait here for the
425 * client to actually put something on the wire -- because if the
426 * client just does a ping (connect and hangup without sending any
427 * data), our use of the pkt-line read routines will spew an error
430 * Return -1 if the client hung up.
431 * Return 0 if data (possibly incomplete) is ready.
433 static int worker_thread__wait_for_io_start(
434 struct ipc_worker_thread_data *worker_thread_data,
437 struct ipc_server_data *server_data = worker_thread_data->server_data;
438 struct pollfd pollfd[1];
443 pollfd[0].events = POLLIN;
445 result = poll(pollfd, 1, MY_WAIT_POLL_TIMEOUT_MS);
457 pthread_mutex_lock(&server_data->work_available_mutex);
458 in_shutdown = server_data->shutdown_requested;
459 pthread_mutex_unlock(&server_data->work_available_mutex);
462 * If a shutdown is already in progress and this
463 * client has not started talking yet, just drop it.
470 if (pollfd[0].revents & POLLHUP)
473 if (pollfd[0].revents & POLLIN)
485 * Receive the request/command from the client and pass it to the
486 * registered request-callback. The request-callback will compose
487 * a response and call our reply-callback to send it to the client.
489 static int worker_thread__do_io(
490 struct ipc_worker_thread_data *worker_thread_data,
493 /* ASSERT NOT holding lock */
495 struct strbuf buf = STRBUF_INIT;
496 struct ipc_server_reply_data reply_data;
499 reply_data.magic = MAGIC_SERVER_REPLY_DATA;
500 reply_data.worker_thread_data = worker_thread_data;
504 ret = read_packetized_to_strbuf(
506 PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR);
508 ret = worker_thread_data->server_data->application_cb(
509 worker_thread_data->server_data->application_data,
510 buf.buf, buf.len, do_io_reply_callback, &reply_data);
512 packet_flush_gently(reply_data.fd);
516 * The client probably disconnected/shutdown before it
517 * could send a well-formed message. Ignore it.
521 strbuf_release(&buf);
522 close(reply_data.fd);
528 * Block SIGPIPE on the current thread (so that we get EPIPE from
529 * write() rather than an actual signal).
531 * Note that using sigchain_push() and _pop() to control SIGPIPE
532 * around our IO calls is not thread safe:
533 * [] It uses a global stack of handler frames.
534 * [] It uses ALLOC_GROW() to resize it.
535 * [] Finally, according to the `signal(2)` man-page:
536 * "The effects of `signal()` in a multithreaded process are unspecified."
538 static void thread_block_sigpipe(sigset_t *old_set)
542 sigemptyset(&new_set);
543 sigaddset(&new_set, SIGPIPE);
545 sigemptyset(old_set);
546 pthread_sigmask(SIG_BLOCK, &new_set, old_set);
550 * Thread proc for an IPC worker thread. It handles a series of
551 * connections from clients. It pulls the next fd from the queue
552 * processes it, and then waits for the next client.
554 * Block SIGPIPE in this worker thread for the life of the thread.
555 * This avoids stray (and sometimes delayed) SIGPIPE signals caused
556 * by client errors and/or when we are under extremely heavy IO load.
558 * This means that the application callback will have SIGPIPE blocked.
559 * The callback should not change it.
561 static void *worker_thread_proc(void *_worker_thread_data)
563 struct ipc_worker_thread_data *worker_thread_data = _worker_thread_data;
564 struct ipc_server_data *server_data = worker_thread_data->server_data;
569 trace2_thread_start("ipc-worker");
571 thread_block_sigpipe(&old_set);
574 fd = worker_thread__wait_for_connection(worker_thread_data);
576 break; /* in shutdown */
578 io = worker_thread__wait_for_io_start(worker_thread_data, fd);
580 continue; /* client hung up without sending anything */
582 ret = worker_thread__do_io(worker_thread_data, fd);
584 if (ret == SIMPLE_IPC_QUIT) {
585 trace2_data_string("ipc-worker", NULL, "queue_stop_async",
588 * The application layer is telling the ipc-server
591 * We DO NOT have a response to send to the client.
593 * Queue an async stop (to stop the other threads) and
594 * allow this worker thread to exit now (no sense waiting
595 * for the thread-pool shutdown signal).
597 * Other non-idle worker threads are allowed to finish
598 * responding to their current clients.
600 ipc_server_stop_async(server_data);
605 trace2_thread_exit();
609 /* A randomly chosen value. */
610 #define MY_ACCEPT_POLL_TIMEOUT_MS (60 * 1000)
613 * Accept a new client connection on our socket. This uses non-blocking
614 * IO so that we can also wait for shutdown requests on our socket-pair
615 * without actually spinning on a fast timeout.
617 static int accept_thread__wait_for_connection(
618 struct ipc_accept_thread_data *accept_thread_data)
620 struct pollfd pollfd[2];
624 pollfd[0].fd = accept_thread_data->fd_wait_shutdown;
625 pollfd[0].events = POLLIN;
627 pollfd[1].fd = accept_thread_data->server_socket->fd_socket;
628 pollfd[1].events = POLLIN;
630 result = poll(pollfd, 2, MY_ACCEPT_POLL_TIMEOUT_MS);
641 * If someone deletes or force-creates a new unix
642 * domain socket at our path, all future clients
643 * will be routed elsewhere and we silently starve.
644 * If that happens, just queue a shutdown.
646 if (unix_ss_was_stolen(
647 accept_thread_data->server_socket)) {
648 trace2_data_string("ipc-accept", NULL,
651 ipc_server_stop_async(
652 accept_thread_data->server_data);
657 if (pollfd[0].revents & POLLIN) {
658 /* shutdown message queued to socketpair */
662 if (pollfd[1].revents & POLLIN) {
663 /* a connection is available on server_socket */
666 accept(accept_thread_data->server_socket->fd_socket,
672 * An error here is unlikely -- it probably
673 * indicates that the connecting process has
674 * already dropped the connection.
679 BUG("unandled poll result errno=%d r[0]=%d r[1]=%d",
680 errno, pollfd[0].revents, pollfd[1].revents);
685 * Thread proc for the IPC server "accept thread". This waits for
686 * an incoming socket connection, appends it to the queue of available
687 * connections, and notifies a worker thread to process it.
689 * Block SIGPIPE in this thread for the life of the thread. This
690 * avoids any stray SIGPIPE signals when closing pipe fds under
691 * extremely heavy loads (such as when the fifo queue is full and we
692 * drop incomming connections).
694 static void *accept_thread_proc(void *_accept_thread_data)
696 struct ipc_accept_thread_data *accept_thread_data = _accept_thread_data;
697 struct ipc_server_data *server_data = accept_thread_data->server_data;
700 trace2_thread_start("ipc-accept");
702 thread_block_sigpipe(&old_set);
705 int client_fd = accept_thread__wait_for_connection(
708 pthread_mutex_lock(&server_data->work_available_mutex);
709 if (server_data->shutdown_requested) {
710 pthread_mutex_unlock(&server_data->work_available_mutex);
717 /* ignore transient accept() errors */
720 fifo_enqueue(server_data, client_fd);
721 pthread_cond_broadcast(&server_data->work_available_cond);
723 pthread_mutex_unlock(&server_data->work_available_mutex);
726 trace2_thread_exit();
731 * We can't predict the connection arrival rate relative to the worker
732 * processing rate, therefore we allow the "accept-thread" to queue up
733 * a generous number of connections, since we'd rather have the client
734 * not unnecessarily timeout if we can avoid it. (The assumption is
735 * that this will be used for FSMonitor and a few second wait on a
736 * connection is better than having the client timeout and do the full
737 * computation itself.)
739 * The FIFO queue size is set to a multiple of the worker pool size.
740 * This value chosen at random.
742 #define FIFO_SCALE (100)
745 * The backlog value for `listen(2)`. This doesn't need to huge,
746 * rather just large enough for our "accept-thread" to wake up and
747 * queue incoming connections onto the FIFO without the kernel
750 * This value chosen at random.
752 #define LISTEN_BACKLOG (50)
754 static int create_listener_socket(
756 const struct ipc_server_opts *ipc_opts,
757 struct unix_ss_socket **new_server_socket)
759 struct unix_ss_socket *server_socket = NULL;
760 struct unix_stream_listen_opts uslg_opts = UNIX_STREAM_LISTEN_OPTS_INIT;
763 uslg_opts.listen_backlog_size = LISTEN_BACKLOG;
764 uslg_opts.disallow_chdir = ipc_opts->uds_disallow_chdir;
766 ret = unix_ss_create(path, &uslg_opts, -1, &server_socket);
770 if (set_socket_blocking_flag(server_socket->fd_socket, 1)) {
771 int saved_errno = errno;
772 unix_ss_free(server_socket);
777 *new_server_socket = server_socket;
779 trace2_data_string("ipc-server", NULL, "listen-with-lock", path);
783 static int setup_listener_socket(
785 const struct ipc_server_opts *ipc_opts,
786 struct unix_ss_socket **new_server_socket)
788 int ret, saved_errno;
790 trace2_region_enter("ipc-server", "create-listener_socket", NULL);
792 ret = create_listener_socket(path, ipc_opts, new_server_socket);
795 trace2_region_leave("ipc-server", "create-listener_socket", NULL);
802 * Start IPC server in a pool of background threads.
804 int ipc_server_run_async(struct ipc_server_data **returned_server_data,
805 const char *path, const struct ipc_server_opts *opts,
806 ipc_server_application_cb *application_cb,
807 void *application_data)
809 struct unix_ss_socket *server_socket = NULL;
810 struct ipc_server_data *server_data;
814 int nr_threads = opts->nr_threads;
816 *returned_server_data = NULL;
819 * Create a socketpair and set sv[1] to non-blocking. This
820 * will used to send a shutdown message to the accept-thread
821 * and allows the accept-thread to wait on EITHER a client
822 * connection or a shutdown request without spinning.
824 if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) < 0)
827 if (set_socket_blocking_flag(sv[1], 1)) {
828 int saved_errno = errno;
835 ret = setup_listener_socket(path, opts, &server_socket);
837 int saved_errno = errno;
844 server_data = xcalloc(1, sizeof(*server_data));
845 server_data->magic = MAGIC_SERVER_DATA;
846 server_data->application_cb = application_cb;
847 server_data->application_data = application_data;
848 strbuf_init(&server_data->buf_path, 0);
849 strbuf_addstr(&server_data->buf_path, path);
854 pthread_mutex_init(&server_data->work_available_mutex, NULL);
855 pthread_cond_init(&server_data->work_available_cond, NULL);
857 server_data->queue_size = nr_threads * FIFO_SCALE;
858 CALLOC_ARRAY(server_data->fifo_fds, server_data->queue_size);
860 server_data->accept_thread =
861 xcalloc(1, sizeof(*server_data->accept_thread));
862 server_data->accept_thread->magic = MAGIC_ACCEPT_THREAD_DATA;
863 server_data->accept_thread->server_data = server_data;
864 server_data->accept_thread->server_socket = server_socket;
865 server_data->accept_thread->fd_send_shutdown = sv[0];
866 server_data->accept_thread->fd_wait_shutdown = sv[1];
868 if (pthread_create(&server_data->accept_thread->pthread_id, NULL,
869 accept_thread_proc, server_data->accept_thread))
870 die_errno(_("could not start accept_thread '%s'"), path);
872 for (k = 0; k < nr_threads; k++) {
873 struct ipc_worker_thread_data *wtd;
875 wtd = xcalloc(1, sizeof(*wtd));
876 wtd->magic = MAGIC_WORKER_THREAD_DATA;
877 wtd->server_data = server_data;
879 if (pthread_create(&wtd->pthread_id, NULL, worker_thread_proc,
882 die(_("could not start worker[0] for '%s'"),
885 * Limp along with the thread pool that we have.
890 wtd->next_thread = server_data->worker_thread_list;
891 server_data->worker_thread_list = wtd;
894 *returned_server_data = server_data;
899 * Gently tell the IPC server treads to shutdown.
900 * Can be run on any thread.
902 int ipc_server_stop_async(struct ipc_server_data *server_data)
904 /* ASSERT NOT holding mutex */
911 trace2_region_enter("ipc-server", "server-stop-async", NULL);
913 pthread_mutex_lock(&server_data->work_available_mutex);
915 server_data->shutdown_requested = 1;
918 * Write a byte to the shutdown socket pair to wake up the
921 if (write(server_data->accept_thread->fd_send_shutdown, "Q", 1) < 0)
922 error_errno("could not write to fd_send_shutdown");
925 * Drain the queue of existing connections.
927 while ((fd = fifo_dequeue(server_data)) != -1)
931 * Gently tell worker threads to stop processing new connections
932 * and exit. (This does not abort in-process conversations.)
934 pthread_cond_broadcast(&server_data->work_available_cond);
936 pthread_mutex_unlock(&server_data->work_available_mutex);
938 trace2_region_leave("ipc-server", "server-stop-async", NULL);
944 * Wait for all IPC server threads to stop.
946 int ipc_server_await(struct ipc_server_data *server_data)
948 pthread_join(server_data->accept_thread->pthread_id, NULL);
950 if (!server_data->shutdown_requested)
951 BUG("ipc-server: accept-thread stopped for '%s'",
952 server_data->buf_path.buf);
954 while (server_data->worker_thread_list) {
955 struct ipc_worker_thread_data *wtd =
956 server_data->worker_thread_list;
958 pthread_join(wtd->pthread_id, NULL);
960 server_data->worker_thread_list = wtd->next_thread;
964 server_data->is_stopped = 1;
969 void ipc_server_free(struct ipc_server_data *server_data)
971 struct ipc_accept_thread_data * accept_thread_data;
976 if (!server_data->is_stopped)
977 BUG("cannot free ipc-server while running for '%s'",
978 server_data->buf_path.buf);
980 accept_thread_data = server_data->accept_thread;
981 if (accept_thread_data) {
982 unix_ss_free(accept_thread_data->server_socket);
984 if (accept_thread_data->fd_send_shutdown != -1)
985 close(accept_thread_data->fd_send_shutdown);
986 if (accept_thread_data->fd_wait_shutdown != -1)
987 close(accept_thread_data->fd_wait_shutdown);
989 free(server_data->accept_thread);
992 while (server_data->worker_thread_list) {
993 struct ipc_worker_thread_data *wtd =
994 server_data->worker_thread_list;
996 server_data->worker_thread_list = wtd->next_thread;
1000 pthread_cond_destroy(&server_data->work_available_cond);
1001 pthread_mutex_destroy(&server_data->work_available_mutex);
1003 strbuf_release(&server_data->buf_path);
1005 free(server_data->fifo_fds);