2 #include "simple-ipc.h"
5 #include "thread-utils.h"
7 #ifndef SUPPORTS_SIMPLE_IPC
9 * This source file should only be compiled when Simple IPC is supported.
10 * See the top-level Makefile.
12 #error SUPPORTS_SIMPLE_IPC not defined
15 static int initialize_pipe_name(const char *path, wchar_t *wpath, size_t alloc)
18 struct strbuf realpath = STRBUF_INIT;
20 if (!strbuf_realpath(&realpath, path, 0))
23 off = swprintf(wpath, alloc, L"\\\\.\\pipe\\");
24 if (xutftowcs(wpath + off, realpath.buf, alloc - off) < 0)
27 /* Handle drive prefix */
28 if (wpath[off] && wpath[off + 1] == L':') {
29 wpath[off + 1] = L'_';
33 for (; wpath[off]; off++)
34 if (wpath[off] == L'/')
37 strbuf_release(&realpath);
41 static enum ipc_active_state get_active_state(wchar_t *pipe_path)
43 if (WaitNamedPipeW(pipe_path, NMPWAIT_USE_DEFAULT_WAIT))
44 return IPC_STATE__LISTENING;
46 if (GetLastError() == ERROR_SEM_TIMEOUT)
47 return IPC_STATE__NOT_LISTENING;
49 if (GetLastError() == ERROR_FILE_NOT_FOUND)
50 return IPC_STATE__PATH_NOT_FOUND;
52 return IPC_STATE__OTHER_ERROR;
55 enum ipc_active_state ipc_get_active_state(const char *path)
57 wchar_t pipe_path[MAX_PATH];
59 if (initialize_pipe_name(path, pipe_path, ARRAY_SIZE(pipe_path)) < 0)
60 return IPC_STATE__INVALID_PATH;
62 return get_active_state(pipe_path);
65 #define WAIT_STEP_MS (50)
67 static enum ipc_active_state connect_to_server(
70 const struct ipc_client_connect_options *options,
73 DWORD t_start_ms, t_waited_ms;
75 HANDLE hPipe = INVALID_HANDLE_VALUE;
76 DWORD mode = PIPE_READMODE_BYTE;
82 hPipe = CreateFileW(wpath, GENERIC_READ | GENERIC_WRITE,
83 0, NULL, OPEN_EXISTING, 0, NULL);
84 if (hPipe != INVALID_HANDLE_VALUE)
90 case ERROR_FILE_NOT_FOUND:
91 if (!options->wait_if_not_found)
92 return IPC_STATE__PATH_NOT_FOUND;
94 return IPC_STATE__PATH_NOT_FOUND;
96 step_ms = (timeout_ms < WAIT_STEP_MS) ?
97 timeout_ms : WAIT_STEP_MS;
98 sleep_millisec(step_ms);
100 timeout_ms -= step_ms;
101 break; /* try again */
103 case ERROR_PIPE_BUSY:
104 if (!options->wait_if_busy)
105 return IPC_STATE__NOT_LISTENING;
107 return IPC_STATE__NOT_LISTENING;
109 t_start_ms = (DWORD)(getnanotime() / 1000000);
111 if (!WaitNamedPipeW(wpath, timeout_ms)) {
112 if (GetLastError() == ERROR_SEM_TIMEOUT)
113 return IPC_STATE__NOT_LISTENING;
115 return IPC_STATE__OTHER_ERROR;
119 * A pipe server instance became available.
120 * Race other client processes to connect to
123 * But first decrement our overall timeout so
124 * that we don't starve if we keep losing the
125 * race. But also guard against special
126 * NPMWAIT_ values (0 and -1).
128 t_waited_ms = (DWORD)(getnanotime() / 1000000) - t_start_ms;
129 if (t_waited_ms < timeout_ms)
130 timeout_ms -= t_waited_ms;
133 break; /* try again */
136 return IPC_STATE__OTHER_ERROR;
140 if (!SetNamedPipeHandleState(hPipe, &mode, NULL, NULL)) {
142 return IPC_STATE__OTHER_ERROR;
145 *pfd = _open_osfhandle((intptr_t)hPipe, O_RDWR|O_BINARY);
148 return IPC_STATE__OTHER_ERROR;
151 /* fd now owns hPipe */
153 return IPC_STATE__LISTENING;
157 * The default connection timeout for Windows clients.
159 * This is not currently part of the ipc_ API (nor the config settings)
160 * because of differences between Windows and other platforms.
162 * This value was chosen at random.
164 #define WINDOWS_CONNECTION_TIMEOUT_MS (30000)
166 enum ipc_active_state ipc_client_try_connect(
168 const struct ipc_client_connect_options *options,
169 struct ipc_client_connection **p_connection)
171 wchar_t wpath[MAX_PATH];
172 enum ipc_active_state state = IPC_STATE__OTHER_ERROR;
175 *p_connection = NULL;
177 trace2_region_enter("ipc-client", "try-connect", NULL);
178 trace2_data_string("ipc-client", NULL, "try-connect/path", path);
180 if (initialize_pipe_name(path, wpath, ARRAY_SIZE(wpath)) < 0)
181 state = IPC_STATE__INVALID_PATH;
183 state = connect_to_server(wpath, WINDOWS_CONNECTION_TIMEOUT_MS,
186 trace2_data_intmax("ipc-client", NULL, "try-connect/state",
188 trace2_region_leave("ipc-client", "try-connect", NULL);
190 if (state == IPC_STATE__LISTENING) {
191 (*p_connection) = xcalloc(1, sizeof(struct ipc_client_connection));
192 (*p_connection)->fd = fd;
198 void ipc_client_close_connection(struct ipc_client_connection *connection)
203 if (connection->fd != -1)
204 close(connection->fd);
209 int ipc_client_send_command_to_connection(
210 struct ipc_client_connection *connection,
211 const char *message, size_t message_len,
212 struct strbuf *answer)
216 strbuf_setlen(answer, 0);
218 trace2_region_enter("ipc-client", "send-command", NULL);
220 if (write_packetized_from_buf_no_flush(message, message_len,
221 connection->fd) < 0 ||
222 packet_flush_gently(connection->fd) < 0) {
223 ret = error(_("could not send IPC command"));
227 FlushFileBuffers((HANDLE)_get_osfhandle(connection->fd));
229 if (read_packetized_to_strbuf(
230 connection->fd, answer,
231 PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR) < 0) {
232 ret = error(_("could not read IPC response"));
237 trace2_region_leave("ipc-client", "send-command", NULL);
241 int ipc_client_send_command(const char *path,
242 const struct ipc_client_connect_options *options,
243 const char *message, size_t message_len,
244 struct strbuf *response)
247 enum ipc_active_state state;
248 struct ipc_client_connection *connection = NULL;
250 state = ipc_client_try_connect(path, options, &connection);
252 if (state != IPC_STATE__LISTENING)
255 ret = ipc_client_send_command_to_connection(connection,
256 message, message_len,
259 ipc_client_close_connection(connection);
265 * Duplicate the given pipe handle and wrap it in a file descriptor so
266 * that we can use pkt-line on it.
268 static int dup_fd_from_pipe(const HANDLE pipe)
270 HANDLE process = GetCurrentProcess();
274 if (!DuplicateHandle(process, pipe, process, &handle, 0, FALSE,
275 DUPLICATE_SAME_ACCESS)) {
276 errno = err_win_to_posix(GetLastError());
280 fd = _open_osfhandle((intptr_t)handle, O_RDWR|O_BINARY);
282 errno = err_win_to_posix(GetLastError());
288 * `handle` is now owned by `fd` and will be automatically closed
289 * when the descriptor is closed.
296 * Magic numbers used to annotate callback instance data.
297 * These are used to help guard against accidentally passing the
298 * wrong instance data across multiple levels of callbacks (which
299 * is easy to do if there are `void*` arguments).
302 MAGIC_SERVER_REPLY_DATA,
303 MAGIC_SERVER_THREAD_DATA,
307 struct ipc_server_reply_data {
310 struct ipc_server_thread_data *server_thread_data;
313 struct ipc_server_thread_data {
315 struct ipc_server_thread_data *next_thread;
316 struct ipc_server_data *server_data;
317 pthread_t pthread_id;
322 * On Windows, the conceptual "ipc-server" is implemented as a pool of
323 * n idential/peer "server-thread" threads. That is, there is no
324 * hierarchy of threads; and therefore no controller thread managing
325 * the pool. Each thread has an independent handle to the named pipe,
326 * receives incoming connections, processes the client, and re-uses
327 * the pipe for the next client connection.
329 * Therefore, the "ipc-server" only needs to maintain a list of the
330 * spawned threads for eventual "join" purposes.
332 * A single "stop-event" is visible to all of the server threads to
333 * tell them to shutdown (when idle).
335 struct ipc_server_data {
337 ipc_server_application_cb *application_cb;
338 void *application_data;
339 struct strbuf buf_path;
340 wchar_t wpath[MAX_PATH];
342 HANDLE hEventStopRequested;
343 struct ipc_server_thread_data *thread_list;
347 enum connect_result {
355 static enum connect_result queue_overlapped_connect(
356 struct ipc_server_thread_data *server_thread_data,
359 if (ConnectNamedPipe(server_thread_data->hPipe, lpo))
362 switch (GetLastError()) {
363 case ERROR_IO_PENDING:
364 return CR_CONNECT_PENDING;
366 case ERROR_PIPE_CONNECTED:
367 SetEvent(lpo->hEvent);
375 error(_("ConnectNamedPipe failed for '%s' (%lu)"),
376 server_thread_data->server_data->buf_path.buf,
378 return CR_CONNECT_ERROR;
382 * Use Windows Overlapped IO to wait for a connection or for our event
385 static enum connect_result wait_for_connection(
386 struct ipc_server_thread_data *server_thread_data,
389 enum connect_result r;
390 HANDLE waitHandles[2];
393 r = queue_overlapped_connect(server_thread_data, lpo);
394 if (r != CR_CONNECT_PENDING)
397 waitHandles[0] = server_thread_data->server_data->hEventStopRequested;
398 waitHandles[1] = lpo->hEvent;
400 dwWaitResult = WaitForMultipleObjects(2, waitHandles, FALSE, INFINITE);
401 switch (dwWaitResult) {
402 case WAIT_OBJECT_0 + 0:
405 case WAIT_OBJECT_0 + 1:
406 ResetEvent(lpo->hEvent);
410 return CR_WAIT_ERROR;
415 * Forward declare our reply callback function so that any compiler
416 * errors are reported when we actually define the function (in addition
417 * to any errors reported when we try to pass this callback function as
418 * a parameter in a function call). The former are easier to understand.
420 static ipc_server_reply_cb do_io_reply_callback;
423 * Relay application's response message to the client process.
424 * (We do not flush at this point because we allow the caller
425 * to chunk data to the client thru us.)
427 static int do_io_reply_callback(struct ipc_server_reply_data *reply_data,
428 const char *response, size_t response_len)
430 if (reply_data->magic != MAGIC_SERVER_REPLY_DATA)
431 BUG("reply_cb called with wrong instance data");
433 return write_packetized_from_buf_no_flush(response, response_len,
438 * Receive the request/command from the client and pass it to the
439 * registered request-callback. The request-callback will compose
440 * a response and call our reply-callback to send it to the client.
442 * Simple-IPC only contains one round trip, so we flush and close
443 * here after the response.
445 static int do_io(struct ipc_server_thread_data *server_thread_data)
447 struct strbuf buf = STRBUF_INIT;
448 struct ipc_server_reply_data reply_data;
451 reply_data.magic = MAGIC_SERVER_REPLY_DATA;
452 reply_data.server_thread_data = server_thread_data;
454 reply_data.fd = dup_fd_from_pipe(server_thread_data->hPipe);
455 if (reply_data.fd < 0)
456 return error(_("could not create fd from pipe for '%s'"),
457 server_thread_data->server_data->buf_path.buf);
459 ret = read_packetized_to_strbuf(
461 PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR);
463 ret = server_thread_data->server_data->application_cb(
464 server_thread_data->server_data->application_data,
465 buf.buf, buf.len, do_io_reply_callback, &reply_data);
467 packet_flush_gently(reply_data.fd);
469 FlushFileBuffers((HANDLE)_get_osfhandle((reply_data.fd)));
473 * The client probably disconnected/shutdown before it
474 * could send a well-formed message. Ignore it.
478 strbuf_release(&buf);
479 close(reply_data.fd);
485 * Handle IPC request and response with this connected client. And reset
486 * the pipe to prepare for the next client.
488 static int use_connection(struct ipc_server_thread_data *server_thread_data)
492 ret = do_io(server_thread_data);
494 FlushFileBuffers(server_thread_data->hPipe);
495 DisconnectNamedPipe(server_thread_data->hPipe);
501 * Thread proc for an IPC server worker thread. It handles a series of
502 * connections from clients. It cleans and reuses the hPipe between each
505 static void *server_thread_proc(void *_server_thread_data)
507 struct ipc_server_thread_data *server_thread_data = _server_thread_data;
508 HANDLE hEventConnected = INVALID_HANDLE_VALUE;
510 enum connect_result cr;
513 assert(server_thread_data->hPipe != INVALID_HANDLE_VALUE);
515 trace2_thread_start("ipc-server");
516 trace2_data_string("ipc-server", NULL, "pipe",
517 server_thread_data->server_data->buf_path.buf);
519 hEventConnected = CreateEventW(NULL, TRUE, FALSE, NULL);
521 memset(&oConnect, 0, sizeof(oConnect));
522 oConnect.hEvent = hEventConnected;
525 cr = wait_for_connection(server_thread_data, &oConnect);
532 ret = use_connection(server_thread_data);
533 if (ret == SIMPLE_IPC_QUIT) {
534 ipc_server_stop_async(
535 server_thread_data->server_data);
540 * Ignore (transient) IO errors with this
541 * client and reset for the next client.
546 case CR_CONNECT_PENDING:
547 /* By construction, this should not happen. */
548 BUG("ipc-server[%s]: unexpeced CR_CONNECT_PENDING",
549 server_thread_data->server_data->buf_path.buf);
551 case CR_CONNECT_ERROR:
554 * Ignore these theoretical errors.
556 DisconnectNamedPipe(server_thread_data->hPipe);
560 BUG("unandled case after wait_for_connection");
565 CloseHandle(server_thread_data->hPipe);
566 CloseHandle(hEventConnected);
568 trace2_thread_exit();
572 static HANDLE create_new_pipe(wchar_t *wpath, int is_first)
575 DWORD dwOpenMode, dwPipeMode;
576 LPSECURITY_ATTRIBUTES lpsa = NULL;
578 dwOpenMode = PIPE_ACCESS_INBOUND | PIPE_ACCESS_OUTBOUND |
579 FILE_FLAG_OVERLAPPED;
581 dwPipeMode = PIPE_TYPE_MESSAGE | PIPE_READMODE_BYTE | PIPE_WAIT |
582 PIPE_REJECT_REMOTE_CLIENTS;
585 dwOpenMode |= FILE_FLAG_FIRST_PIPE_INSTANCE;
588 * On Windows, the first server pipe instance gets to
589 * set the ACL / Security Attributes on the named
590 * pipe; subsequent instances inherit and cannot
593 * TODO Should we allow the application layer to
594 * specify security attributes, such as `LocalService`
595 * or `LocalSystem`, when we create the named pipe?
596 * This question is probably not important when the
597 * daemon is started by a foreground user process and
598 * only needs to talk to the current user, but may be
599 * if the daemon is run via the Control Panel as a
604 hPipe = CreateNamedPipeW(wpath, dwOpenMode, dwPipeMode,
605 PIPE_UNLIMITED_INSTANCES, 1024, 1024, 0, lpsa);
610 int ipc_server_run_async(struct ipc_server_data **returned_server_data,
611 const char *path, const struct ipc_server_opts *opts,
612 ipc_server_application_cb *application_cb,
613 void *application_data)
615 struct ipc_server_data *server_data;
616 wchar_t wpath[MAX_PATH];
617 HANDLE hPipeFirst = INVALID_HANDLE_VALUE;
620 int nr_threads = opts->nr_threads;
622 *returned_server_data = NULL;
624 ret = initialize_pipe_name(path, wpath, ARRAY_SIZE(wpath));
630 hPipeFirst = create_new_pipe(wpath, 1);
631 if (hPipeFirst == INVALID_HANDLE_VALUE) {
636 server_data = xcalloc(1, sizeof(*server_data));
637 server_data->magic = MAGIC_SERVER_DATA;
638 server_data->application_cb = application_cb;
639 server_data->application_data = application_data;
640 server_data->hEventStopRequested = CreateEvent(NULL, TRUE, FALSE, NULL);
641 strbuf_init(&server_data->buf_path, 0);
642 strbuf_addstr(&server_data->buf_path, path);
643 wcscpy(server_data->wpath, wpath);
648 for (k = 0; k < nr_threads; k++) {
649 struct ipc_server_thread_data *std;
651 std = xcalloc(1, sizeof(*std));
652 std->magic = MAGIC_SERVER_THREAD_DATA;
653 std->server_data = server_data;
654 std->hPipe = INVALID_HANDLE_VALUE;
656 std->hPipe = (k == 0)
658 : create_new_pipe(server_data->wpath, 0);
660 if (std->hPipe == INVALID_HANDLE_VALUE) {
662 * If we've reached a pipe instance limit for
663 * this path, just use fewer threads.
669 if (pthread_create(&std->pthread_id, NULL,
670 server_thread_proc, std)) {
672 * Likewise, if we're out of threads, just use
673 * fewer threads than requested.
675 * However, we just give up if we can't even get
676 * one thread. This should not happen.
679 die(_("could not start thread[0] for '%s'"),
682 CloseHandle(std->hPipe);
687 std->next_thread = server_data->thread_list;
688 server_data->thread_list = std;
691 *returned_server_data = server_data;
695 int ipc_server_stop_async(struct ipc_server_data *server_data)
701 * Gently tell all of the ipc_server threads to shutdown.
702 * This will be seen the next time they are idle (and waiting
705 * We DO NOT attempt to force them to drop an active connection.
707 SetEvent(server_data->hEventStopRequested);
711 int ipc_server_await(struct ipc_server_data *server_data)
718 dwWaitResult = WaitForSingleObject(server_data->hEventStopRequested, INFINITE);
719 if (dwWaitResult != WAIT_OBJECT_0)
720 return error(_("wait for hEvent failed for '%s'"),
721 server_data->buf_path.buf);
723 while (server_data->thread_list) {
724 struct ipc_server_thread_data *std = server_data->thread_list;
726 pthread_join(std->pthread_id, NULL);
728 server_data->thread_list = std->next_thread;
732 server_data->is_stopped = 1;
737 void ipc_server_free(struct ipc_server_data *server_data)
742 if (!server_data->is_stopped)
743 BUG("cannot free ipc-server while running for '%s'",
744 server_data->buf_path.buf);
746 strbuf_release(&server_data->buf_path);
748 if (server_data->hEventStopRequested != INVALID_HANDLE_VALUE)
749 CloseHandle(server_data->hEventStopRequested);
751 while (server_data->thread_list) {
752 struct ipc_server_thread_data *std = server_data->thread_list;
754 server_data->thread_list = std->next_thread;