2 * test-simple-ipc.c: verify that the Inter-Process Communication works.
8 #include "simple-ipc.h"
9 #include "parse-options.h"
10 #include "thread-utils.h"
13 #ifndef SUPPORTS_SIMPLE_IPC
14 int cmd__simple_ipc(int argc, const char **argv)
16 die("simple IPC not available on this platform");
21 * The test daemon defines an "application callback" that supports a
22 * series of commands (see `test_app_cb()`).
24 * Unknown commands are caught here and we send an error message back
25 * to the client process.
27 static int app__unhandled_command(const char *command,
28 ipc_server_reply_cb *reply_cb,
29 struct ipc_server_reply_data *reply_data)
31 struct strbuf buf = STRBUF_INIT;
34 strbuf_addf(&buf, "unhandled command: %s", command);
35 ret = reply_cb(reply_data, buf.buf, buf.len);
42 * Reply with a single very large buffer. This is to ensure that
43 * long response are properly handled -- whether the chunking occurs
44 * in the kernel or in the (probably pkt-line) layer.
46 #define BIG_ROWS (10000)
47 static int app__big_command(ipc_server_reply_cb *reply_cb,
48 struct ipc_server_reply_data *reply_data)
50 struct strbuf buf = STRBUF_INIT;
54 for (row = 0; row < BIG_ROWS; row++)
55 strbuf_addf(&buf, "big: %.75d\n", row);
57 ret = reply_cb(reply_data, buf.buf, buf.len);
64 * Reply with a series of lines. This is to ensure that we can incrementally
65 * compute the response and chunk it to the client.
67 #define CHUNK_ROWS (10000)
68 static int app__chunk_command(ipc_server_reply_cb *reply_cb,
69 struct ipc_server_reply_data *reply_data)
71 struct strbuf buf = STRBUF_INIT;
75 for (row = 0; row < CHUNK_ROWS; row++) {
76 strbuf_setlen(&buf, 0);
77 strbuf_addf(&buf, "big: %.75d\n", row);
78 ret = reply_cb(reply_data, buf.buf, buf.len);
87 * Slowly reply with a series of lines. This is to model an expensive to
88 * compute chunked response (which might happen if this callback is running
89 * in a thread and is fighting for a lock with other threads).
91 #define SLOW_ROWS (1000)
92 #define SLOW_DELAY_MS (10)
93 static int app__slow_command(ipc_server_reply_cb *reply_cb,
94 struct ipc_server_reply_data *reply_data)
96 struct strbuf buf = STRBUF_INIT;
100 for (row = 0; row < SLOW_ROWS; row++) {
101 strbuf_setlen(&buf, 0);
102 strbuf_addf(&buf, "big: %.75d\n", row);
103 ret = reply_cb(reply_data, buf.buf, buf.len);
104 sleep_millisec(SLOW_DELAY_MS);
107 strbuf_release(&buf);
113 * The client sent a command followed by a (possibly very) large buffer.
115 static int app__sendbytes_command(const char *received, size_t received_len,
116 ipc_server_reply_cb *reply_cb,
117 struct ipc_server_reply_data *reply_data)
119 struct strbuf buf_resp = STRBUF_INIT;
127 * The test is setup to send:
128 * "sendbytes" SP <n * char>
130 if (received_len < strlen("sendbytes "))
131 BUG("received_len is short in app__sendbytes_command");
133 if (skip_prefix(received, "sendbytes ", &p))
134 len_ballast = strlen(p);
137 * Verify that the ballast is n copies of a single letter.
138 * And that the multi-threaded IO layer didn't cross the streams.
140 for (k = 1; k < len_ballast; k++)
145 strbuf_addf(&buf_resp, "errs:%d\n", errs);
147 strbuf_addf(&buf_resp, "rcvd:%c%08d\n", p[0], len_ballast);
149 ret = reply_cb(reply_data, buf_resp.buf, buf_resp.len);
151 strbuf_release(&buf_resp);
157 * An arbitrary fixed address to verify that the application instance
158 * data is handled properly.
160 static int my_app_data = 42;
162 static ipc_server_application_cb test_app_cb;
165 * This is the "application callback" that sits on top of the
166 * "ipc-server". It completely defines the set of commands supported
167 * by this application.
169 static int test_app_cb(void *application_data,
170 const char *command, size_t command_len,
171 ipc_server_reply_cb *reply_cb,
172 struct ipc_server_reply_data *reply_data)
175 * Verify that we received the application-data that we passed
176 * when we started the ipc-server. (We have several layers of
177 * callbacks calling callbacks and it's easy to get things mixed
178 * up (especially when some are "void*").)
180 if (application_data != (void*)&my_app_data)
181 BUG("application_cb: application_data pointer wrong");
183 if (command_len == 4 && !strncmp(command, "quit", 4)) {
185 * The client sent a "quit" command. This is an async
186 * request for the server to shutdown.
188 * We DO NOT send the client a response message
189 * (because we have nothing to say and the other
190 * server threads have not yet stopped).
192 * Tell the ipc-server layer to start shutting down.
193 * This includes: stop listening for new connections
194 * on the socket/pipe and telling all worker threads
195 * to finish/drain their outgoing responses to other
198 * This DOES NOT force an immediate sync shutdown.
200 return SIMPLE_IPC_QUIT;
203 if (command_len == 4 && !strncmp(command, "ping", 4)) {
204 const char *answer = "pong";
205 return reply_cb(reply_data, answer, strlen(answer));
208 if (command_len == 3 && !strncmp(command, "big", 3))
209 return app__big_command(reply_cb, reply_data);
211 if (command_len == 5 && !strncmp(command, "chunk", 5))
212 return app__chunk_command(reply_cb, reply_data);
214 if (command_len == 4 && !strncmp(command, "slow", 4))
215 return app__slow_command(reply_cb, reply_data);
217 if (command_len >= 10 && starts_with(command, "sendbytes "))
218 return app__sendbytes_command(command, command_len,
219 reply_cb, reply_data);
221 return app__unhandled_command(command, reply_cb, reply_data);
226 const char *subcommand;
238 static struct cl_args cl_args = {
252 * This process will run as a simple-ipc server and listen for IPC commands
253 * from client processes.
255 static int daemon__run_server(void)
259 struct ipc_server_opts opts = {
260 .nr_threads = cl_args.nr_threads,
264 * Synchronously run the ipc-server. We don't need any application
265 * instance data, so pass an arbitrary pointer (that we'll later
266 * verify made the round trip).
268 ret = ipc_server_run(cl_args.path, &opts, test_app_cb, (void*)&my_app_data);
270 error(_("socket/pipe already in use: '%s'"), cl_args.path);
272 error_errno(_("could not start server on: '%s'"), cl_args.path);
277 #ifndef GIT_WINDOWS_NATIVE
279 * This is adapted from `daemonize()`. Use `fork()` to directly create and
280 * run the daemon in a child process.
282 static int spawn_server(pid_t *pid)
284 struct ipc_server_opts opts = {
285 .nr_threads = cl_args.nr_threads,
293 error_errno(_("setsid failed"));
299 return ipc_server_run(cl_args.path, &opts, test_app_cb,
300 (void*)&my_app_data);
303 return error_errno(_("could not spawn daemon in the background"));
311 * Conceptually like `daemonize()` but different because Windows does not
312 * have `fork(2)`. Spawn a normal Windows child process but without the
313 * limitations of `start_command()` and `finish_command()`.
315 static int spawn_server(pid_t *pid)
317 char test_tool_exe[MAX_PATH];
318 struct strvec args = STRVEC_INIT;
321 GetModuleFileNameA(NULL, test_tool_exe, MAX_PATH);
323 in = open("/dev/null", O_RDONLY);
324 out = open("/dev/null", O_WRONLY);
326 strvec_push(&args, test_tool_exe);
327 strvec_push(&args, "simple-ipc");
328 strvec_push(&args, "run-daemon");
329 strvec_pushf(&args, "--name=%s", cl_args.path);
330 strvec_pushf(&args, "--threads=%d", cl_args.nr_threads);
332 *pid = mingw_spawnvpe(args.v[0], args.v, NULL, NULL, in, out, out);
339 return error(_("could not spawn daemon in the background"));
346 * This is adapted from `wait_or_whine()`. Watch the child process and
347 * let it get started and begin listening for requests on the socket
348 * before reporting our success.
350 static int wait_for_server_startup(pid_t pid_child)
354 enum ipc_active_state s;
355 time_t time_limit, now;
358 time_limit += cl_args.max_wait_sec;
361 pid_seen = waitpid(pid_child, &status, WNOHANG);
364 return error_errno(_("waitpid failed"));
366 else if (pid_seen == 0) {
368 * The child is still running (this should be
369 * the normal case). Try to connect to it on
370 * the socket and see if it is ready for
373 * If there is another daemon already running,
374 * our child will fail to start (possibly
375 * after a timeout on the lock), but we don't
376 * care (who responds) if the socket is live.
378 s = ipc_get_active_state(cl_args.path);
379 if (s == IPC_STATE__LISTENING)
383 if (now > time_limit)
384 return error(_("daemon not online yet"));
389 else if (pid_seen == pid_child) {
391 * The new child daemon process shutdown while
392 * it was starting up, so it is not listening
395 * Try to ping the socket in the odd chance
396 * that another daemon started (or was already
397 * running) while our child was starting.
399 * Again, we don't care who services the socket.
401 s = ipc_get_active_state(cl_args.path);
402 if (s == IPC_STATE__LISTENING)
406 * We don't care about the WEXITSTATUS() nor
407 * any of the WIF*(status) values because
408 * `cmd__simple_ipc()` does the `!!result`
409 * trick on all function return values.
411 * So it is sufficient to just report the
412 * early shutdown as an error.
414 return error(_("daemon failed to start"));
418 return error(_("waitpid is confused"));
423 * This process will start a simple-ipc server in a background process and
424 * wait for it to become ready. This is like `daemonize()` but gives us
425 * more control and better error reporting (and makes it easier to write
428 static int daemon__start_server(void)
434 * Run the actual daemon in a background process.
436 ret = spawn_server(&pid_child);
441 * Let the parent wait for the child process to get started
442 * and begin listening for requests on the socket.
444 ret = wait_for_server_startup(pid_child);
450 * This process will run a quick probe to see if a simple-ipc server
451 * is active on this path.
453 * Returns 0 if the server is alive.
455 static int client__probe_server(void)
457 enum ipc_active_state s;
459 s = ipc_get_active_state(cl_args.path);
461 case IPC_STATE__LISTENING:
464 case IPC_STATE__NOT_LISTENING:
465 return error("no server listening at '%s'", cl_args.path);
467 case IPC_STATE__PATH_NOT_FOUND:
468 return error("path not found '%s'", cl_args.path);
470 case IPC_STATE__INVALID_PATH:
471 return error("invalid pipe/socket name '%s'", cl_args.path);
473 case IPC_STATE__OTHER_ERROR:
475 return error("other error for '%s'", cl_args.path);
480 * Send an IPC command token to an already-running server daemon and
481 * print the response.
483 * This is a simple 1 word command/token that `test_app_cb()` (in the
484 * daemon process) will understand.
486 static int client__send_ipc(void)
488 const char *command = "(no-command)";
489 struct strbuf buf = STRBUF_INIT;
490 struct ipc_client_connect_options options
491 = IPC_CLIENT_CONNECT_OPTIONS_INIT;
493 if (cl_args.token && *cl_args.token)
494 command = cl_args.token;
496 options.wait_if_busy = 1;
497 options.wait_if_not_found = 0;
499 if (!ipc_client_send_command(cl_args.path, &options,
500 command, strlen(command),
503 printf("%s\n", buf.buf);
506 strbuf_release(&buf);
511 return error("failed to send '%s' to '%s'", command, cl_args.path);
515 * Send an IPC command to an already-running server and ask it to
516 * shutdown. "send quit" is an async request and queues a shutdown
517 * event in the server, so we spin and wait here for it to actually
518 * shutdown to make the unit tests a little easier to write.
520 static int client__stop_server(void)
523 time_t time_limit, now;
524 enum ipc_active_state s;
527 time_limit += cl_args.max_wait_sec;
529 cl_args.token = "quit";
531 ret = client__send_ipc();
538 s = ipc_get_active_state(cl_args.path);
540 if (s != IPC_STATE__LISTENING) {
542 * The socket/pipe is gone and/or has stopped
543 * responding. Lets assume that the daemon
544 * process has exited too.
550 if (now > time_limit)
551 return error(_("daemon has not shutdown yet"));
556 * Send an IPC command followed by ballast to confirm that a large
557 * message can be sent and that the kernel or pkt-line layers will
558 * properly chunk it and that the daemon receives the entire message.
560 static int do_sendbytes(int bytecount, char byte, const char *path,
561 const struct ipc_client_connect_options *options)
563 struct strbuf buf_send = STRBUF_INIT;
564 struct strbuf buf_resp = STRBUF_INIT;
566 strbuf_addstr(&buf_send, "sendbytes ");
567 strbuf_addchars(&buf_send, byte, bytecount);
569 if (!ipc_client_send_command(path, options,
570 buf_send.buf, buf_send.len,
572 strbuf_rtrim(&buf_resp);
573 printf("sent:%c%08d %s\n", byte, bytecount, buf_resp.buf);
575 strbuf_release(&buf_send);
576 strbuf_release(&buf_resp);
581 return error("client failed to sendbytes(%d, '%c') to '%s'",
582 bytecount, byte, path);
586 * Send an IPC command with ballast to an already-running server daemon.
588 static int client__sendbytes(void)
590 struct ipc_client_connect_options options
591 = IPC_CLIENT_CONNECT_OPTIONS_INIT;
593 options.wait_if_busy = 1;
594 options.wait_if_not_found = 0;
595 options.uds_disallow_chdir = 0;
597 return do_sendbytes(cl_args.bytecount, cl_args.bytevalue, cl_args.path,
601 struct multiple_thread_data {
602 pthread_t pthread_id;
603 struct multiple_thread_data *next;
612 static void *multiple_thread_proc(void *_multiple_thread_data)
614 struct multiple_thread_data *d = _multiple_thread_data;
616 struct ipc_client_connect_options options
617 = IPC_CLIENT_CONNECT_OPTIONS_INIT;
619 options.wait_if_busy = 1;
620 options.wait_if_not_found = 0;
622 * A multi-threaded client should not be randomly calling chdir().
623 * The test will pass without this restriction because the test is
624 * not otherwise accessing the filesystem, but it makes us honest.
626 options.uds_disallow_chdir = 1;
628 trace2_thread_start("multiple");
630 for (k = 0; k < d->batchsize; k++) {
631 if (do_sendbytes(d->bytecount + k, d->letter, d->path, &options))
637 trace2_thread_exit();
642 * Start a client-side thread pool. Each thread sends a series of
643 * IPC requests. Each request is on a new connection to the server.
645 static int client__multiple(void)
647 struct multiple_thread_data *list = NULL;
649 int sum_join_errors = 0;
650 int sum_thread_errors = 0;
653 for (k = 0; k < cl_args.nr_threads; k++) {
654 struct multiple_thread_data *d = xcalloc(1, sizeof(*d));
656 d->path = cl_args.path;
657 d->bytecount = cl_args.bytecount + cl_args.batchsize*(k/26);
658 d->batchsize = cl_args.batchsize;
661 d->letter = 'A' + (k % 26);
663 if (pthread_create(&d->pthread_id, NULL, multiple_thread_proc, d)) {
664 warning("failed to create thread[%d] skipping remainder", k);
673 struct multiple_thread_data *d = list;
675 if (pthread_join(d->pthread_id, NULL))
678 sum_thread_errors += d->sum_errors;
679 sum_good += d->sum_good;
685 printf("client (good %d) (join %d), (errors %d)\n",
686 sum_good, sum_join_errors, sum_thread_errors);
688 return (sum_join_errors + sum_thread_errors) ? 1 : 0;
691 int cmd__simple_ipc(int argc, const char **argv)
693 const char * const simple_ipc_usage[] = {
694 N_("test-helper simple-ipc is-active [<name>] [<options>]"),
695 N_("test-helper simple-ipc run-daemon [<name>] [<threads>]"),
696 N_("test-helper simple-ipc start-daemon [<name>] [<threads>] [<max-wait>]"),
697 N_("test-helper simple-ipc stop-daemon [<name>] [<max-wait>]"),
698 N_("test-helper simple-ipc send [<name>] [<token>]"),
699 N_("test-helper simple-ipc sendbytes [<name>] [<bytecount>] [<byte>]"),
700 N_("test-helper simple-ipc multiple [<name>] [<threads>] [<bytecount>] [<batchsize>]"),
704 const char *bytevalue = NULL;
706 struct option options[] = {
707 #ifndef GIT_WINDOWS_NATIVE
708 OPT_STRING(0, "name", &cl_args.path, N_("name"), N_("name or pathname of unix domain socket")),
710 OPT_STRING(0, "name", &cl_args.path, N_("name"), N_("named-pipe name")),
712 OPT_INTEGER(0, "threads", &cl_args.nr_threads, N_("number of threads in server thread pool")),
713 OPT_INTEGER(0, "max-wait", &cl_args.max_wait_sec, N_("seconds to wait for daemon to start or stop")),
715 OPT_INTEGER(0, "bytecount", &cl_args.bytecount, N_("number of bytes")),
716 OPT_INTEGER(0, "batchsize", &cl_args.batchsize, N_("number of requests per thread")),
718 OPT_STRING(0, "byte", &bytevalue, N_("byte"), N_("ballast character")),
719 OPT_STRING(0, "token", &cl_args.token, N_("token"), N_("command token to send to the server")),
725 usage_with_options(simple_ipc_usage, options);
727 if (argc == 2 && !strcmp(argv[1], "-h"))
728 usage_with_options(simple_ipc_usage, options);
730 if (argc == 2 && !strcmp(argv[1], "SUPPORTS_SIMPLE_IPC"))
733 cl_args.subcommand = argv[1];
738 argc = parse_options(argc, argv, NULL, options, simple_ipc_usage, 0);
740 if (cl_args.nr_threads < 1)
741 cl_args.nr_threads = 1;
742 if (cl_args.max_wait_sec < 0)
743 cl_args.max_wait_sec = 0;
744 if (cl_args.bytecount < 1)
745 cl_args.bytecount = 1;
746 if (cl_args.batchsize < 1)
747 cl_args.batchsize = 1;
749 if (bytevalue && *bytevalue)
750 cl_args.bytevalue = bytevalue[0];
753 * Use '!!' on all dispatch functions to map from `error()` style
754 * (returns -1) style to `test_must_fail` style (expects 1). This
755 * makes shell error messages less confusing.
758 if (!strcmp(cl_args.subcommand, "is-active"))
759 return !!client__probe_server();
761 if (!strcmp(cl_args.subcommand, "run-daemon"))
762 return !!daemon__run_server();
764 if (!strcmp(cl_args.subcommand, "start-daemon"))
765 return !!daemon__start_server();
768 * Client commands follow. Ensure a server is running before
769 * sending any data. This might be overkill, but then again
770 * this is a test harness.
773 if (!strcmp(cl_args.subcommand, "stop-daemon")) {
774 if (client__probe_server())
776 return !!client__stop_server();
779 if (!strcmp(cl_args.subcommand, "send")) {
780 if (client__probe_server())
782 return !!client__send_ipc();
785 if (!strcmp(cl_args.subcommand, "sendbytes")) {
786 if (client__probe_server())
788 return !!client__sendbytes();
791 if (!strcmp(cl_args.subcommand, "multiple")) {
792 if (client__probe_server())
794 return !!client__multiple();
797 die("Unhandled subcommand: '%s'", cl_args.subcommand);