2 * Server-side thread management
4 * Copyright (C) 1998 Alexandre Julliard
13 #include <sys/types.h>
30 struct wait_queue_entry
32 struct wait_queue_entry *next;
33 struct wait_queue_entry *prev;
35 struct thread *thread;
40 int count; /* count of objects */
42 struct timeval timeout;
43 struct timeout_user *user;
44 struct wait_queue_entry queues[1];
47 /* asynchronous procedure calls */
51 void *func; /* function to call in client */
52 void *param; /* function param */
54 #define MAX_THREAD_APC 16 /* Max outstanding APCs for a thread */
57 /* thread operations */
59 static void dump_thread( struct object *obj, int verbose );
60 static int thread_signaled( struct object *obj, struct thread *thread );
61 static void destroy_thread( struct object *obj );
63 static const struct object_ops thread_ops =
77 static struct thread *first_thread;
79 /* initialization of a thread structure */
80 static void init_thread( struct thread *thread, int fd )
82 init_object( &thread->obj, &thread_ops, NULL );
83 thread->client = NULL;
84 thread->unix_pid = 0; /* not known yet */
87 thread->debug_ctx = NULL;
88 thread->debug_first = NULL;
89 thread->debug_event = NULL;
92 thread->apc_count = 0;
94 thread->state = STARTING;
95 thread->exit_code = 0x103; /* STILL_ACTIVE */
98 thread->priority = THREAD_PRIORITY_NORMAL;
103 /* create the initial thread and start the main server loop */
104 void create_initial_thread( int fd )
106 first_thread = mem_alloc( sizeof(*first_thread) );
107 assert( first_thread );
109 current = first_thread;
110 init_thread( first_thread, fd );
111 first_thread->process = create_initial_process();
112 add_process_thread( first_thread->process, first_thread );
113 first_thread->client = add_client( fd, first_thread );
117 /* create a new thread */
118 static struct thread *create_thread( int fd, void *pid, int suspend, int inherit, int *handle )
120 struct thread *thread;
121 struct process *process;
123 if (!(thread = mem_alloc( sizeof(*thread) ))) return NULL;
125 if (!(process = get_process_from_id( pid )))
130 init_thread( thread, fd );
131 thread->process = process;
133 if (suspend) thread->suspend++;
135 if ((thread->next = first_thread) != NULL) thread->next->prev = thread;
136 first_thread = thread;
137 add_process_thread( process, thread );
139 if ((*handle = alloc_handle( current->process, thread,
140 THREAD_ALL_ACCESS, inherit )) == -1) goto error;
141 if (!(thread->client = add_client( fd, thread )))
143 SET_ERROR( ERROR_TOO_MANY_OPEN_FILES );
149 close_handle( current->process, *handle );
150 remove_process_thread( process, thread );
151 release_object( thread );
155 /* destroy a thread when its refcount is 0 */
156 static void destroy_thread( struct object *obj )
158 struct thread *thread = (struct thread *)obj;
159 assert( obj->ops == &thread_ops );
161 assert( !thread->debug_ctx ); /* cannot still be debugging something */
162 release_object( thread->process );
163 if (thread->next) thread->next->prev = thread->prev;
164 if (thread->prev) thread->prev->next = thread->next;
165 else first_thread = thread->next;
166 if (thread->apc) free( thread->apc );
167 if (debug_level) memset( thread, 0xaa, sizeof(thread) ); /* catch errors */
171 /* dump a thread on stdout for debugging purposes */
172 static void dump_thread( struct object *obj, int verbose )
174 struct thread *thread = (struct thread *)obj;
175 assert( obj->ops == &thread_ops );
177 fprintf( stderr, "Thread pid=%d teb=%p state=%d\n",
178 thread->unix_pid, thread->teb, thread->state );
181 static int thread_signaled( struct object *obj, struct thread *thread )
183 struct thread *mythread = (struct thread *)obj;
184 return (mythread->state == TERMINATED);
187 /* get a thread pointer from a thread id (and increment the refcount) */
188 struct thread *get_thread_from_id( void *id )
190 struct thread *t = first_thread;
191 while (t && (t != id)) t = t->next;
192 if (t) grab_object( t );
196 /* get a thread from a handle (and increment the refcount) */
197 struct thread *get_thread_from_handle( int handle, unsigned int access )
199 return (struct thread *)get_handle_obj( current->process, handle,
200 access, &thread_ops );
203 /* set all information about a thread */
204 static void set_thread_info( struct thread *thread,
205 struct set_thread_info_request *req )
207 if (req->mask & SET_THREAD_INFO_PRIORITY)
208 thread->priority = req->priority;
209 if (req->mask & SET_THREAD_INFO_AFFINITY)
211 if (req->affinity != 1) SET_ERROR( ERROR_INVALID_PARAMETER );
212 else thread->affinity = req->affinity;
216 /* suspend a thread */
217 static int suspend_thread( struct thread *thread )
219 int old_count = thread->suspend;
220 if (thread->suspend < MAXIMUM_SUSPEND_COUNT)
222 if (!(thread->process->suspend + thread->suspend++))
224 if (thread->unix_pid) kill( thread->unix_pid, SIGSTOP );
230 /* resume a thread */
231 static int resume_thread( struct thread *thread )
233 int old_count = thread->suspend;
234 if (thread->suspend > 0)
236 if (!(--thread->suspend + thread->process->suspend))
238 if (thread->unix_pid) kill( thread->unix_pid, SIGCONT );
244 /* suspend all threads but the current */
245 void suspend_all_threads( void )
247 struct thread *thread;
248 for ( thread = first_thread; thread; thread = thread->next )
249 if ( thread != current )
250 suspend_thread( thread );
253 /* resume all threads but the current */
254 void resume_all_threads( void )
256 struct thread *thread;
257 for ( thread = first_thread; thread; thread = thread->next )
258 if ( thread != current )
259 resume_thread( thread );
262 /* send a reply to a thread */
263 int send_reply( struct thread *thread, int pass_fd, int n,
264 ... /* arg_1, len_1, ..., arg_n, len_n */ )
266 struct iovec vec[16];
272 for (i = 0; i < n; i++)
274 vec[i].iov_base = va_arg( args, void * );
275 vec[i].iov_len = va_arg( args, int );
278 return send_reply_v( thread->client, thread->error, pass_fd, vec, n );
281 /* add a thread to an object wait queue; return 1 if OK, 0 on error */
282 int add_queue( struct object *obj, struct wait_queue_entry *entry )
286 entry->prev = obj->tail;
288 if (obj->tail) obj->tail->next = entry;
289 else obj->head = entry;
294 /* remove a thread from an object wait queue */
295 void remove_queue( struct object *obj, struct wait_queue_entry *entry )
297 if (entry->next) entry->next->prev = entry->prev;
298 else obj->tail = entry->prev;
299 if (entry->prev) entry->prev->next = entry->next;
300 else obj->head = entry->next;
301 release_object( obj );
305 static void end_wait( struct thread *thread )
307 struct thread_wait *wait = thread->wait;
308 struct wait_queue_entry *entry;
312 for (i = 0, entry = wait->queues; i < wait->count; i++, entry++)
313 entry->obj->ops->remove_queue( entry->obj, entry );
314 if (wait->user) remove_timeout_user( wait->user );
319 /* build the thread wait structure */
320 static int wait_on( struct thread *thread, int count,
321 int *handles, int flags, int timeout )
323 struct thread_wait *wait;
324 struct wait_queue_entry *entry;
328 if ((count < 0) || (count > MAXIMUM_WAIT_OBJECTS))
330 SET_ERROR( ERROR_INVALID_PARAMETER );
333 if (!(wait = mem_alloc( sizeof(*wait) + (count-1) * sizeof(*entry) ))) return 0;
338 if (flags & SELECT_TIMEOUT) make_timeout( &wait->timeout, timeout );
340 for (i = 0, entry = wait->queues; i < count; i++, entry++)
342 if (!(obj = get_handle_obj( thread->process, handles[i],
343 SYNCHRONIZE, NULL )))
349 entry->thread = thread;
350 if (!obj->ops->add_queue( obj, entry ))
356 release_object( obj );
361 /* check if the thread waiting condition is satisfied */
362 static int check_wait( struct thread *thread, int *signaled )
365 struct thread_wait *wait = thread->wait;
366 struct wait_queue_entry *entry = wait->queues;
369 if (wait->flags & SELECT_ALL)
372 /* Note: we must check them all anyway, as some objects may
373 * want to do something when signaled, even if others are not */
374 for (i = 0, entry = wait->queues; i < wait->count; i++, entry++)
375 not_ok |= !entry->obj->ops->signaled( entry->obj, thread );
376 if (not_ok) goto other_checks;
377 /* Wait satisfied: tell it to all objects */
379 for (i = 0, entry = wait->queues; i < wait->count; i++, entry++)
380 if (entry->obj->ops->satisfied( entry->obj, thread ))
381 *signaled = STATUS_ABANDONED_WAIT_0;
386 for (i = 0, entry = wait->queues; i < wait->count; i++, entry++)
388 if (!entry->obj->ops->signaled( entry->obj, thread )) continue;
389 /* Wait satisfied: tell it to the object */
391 if (entry->obj->ops->satisfied( entry->obj, thread ))
392 *signaled += STATUS_ABANDONED_WAIT_0;
398 if ((wait->flags & SELECT_ALERTABLE) && thread->apc)
400 *signaled = STATUS_USER_APC;
403 if (wait->flags & SELECT_TIMEOUT)
406 gettimeofday( &now, NULL );
407 if ((now.tv_sec > wait->timeout.tv_sec) ||
408 ((now.tv_sec == wait->timeout.tv_sec) &&
409 (now.tv_usec >= wait->timeout.tv_usec)))
411 *signaled = STATUS_TIMEOUT;
418 /* send the select reply to wake up the client */
419 static void send_select_reply( struct thread *thread, int signaled )
421 struct select_reply reply;
422 reply.signaled = signaled;
423 if ((signaled == STATUS_USER_APC) && thread->apc)
425 struct thread_apc *apc = thread->apc;
426 int len = thread->apc_count * sizeof(*apc);
428 thread->apc_count = 0;
429 send_reply( thread, -1, 2, &reply, sizeof(reply),
433 else send_reply( thread, -1, 1, &reply, sizeof(reply) );
436 /* attempt to wake up a thread */
437 /* return 1 if OK, 0 if the wait condition is still not satisfied */
438 static int wake_thread( struct thread *thread )
442 if (!check_wait( thread, &signaled )) return 0;
444 send_select_reply( thread, signaled );
448 /* sleep on a list of objects */
449 static void sleep_on( struct thread *thread, int count, int *handles, int flags, int timeout )
451 assert( !thread->wait );
452 if (!wait_on( thread, count, handles, flags, timeout ))
454 /* return an error */
455 send_select_reply( thread, -1 );
458 if (wake_thread( thread )) return;
459 /* now we need to wait */
460 if (flags & SELECT_TIMEOUT)
462 if (!(thread->wait->user = add_timeout_user( &thread->wait->timeout,
463 call_timeout_handler, thread )))
465 send_select_reply( thread, -1 );
470 /* timeout for the current thread */
471 void thread_timeout(void)
473 assert( current->wait );
474 current->wait->user = NULL;
476 send_select_reply( current, STATUS_TIMEOUT );
479 /* attempt to wake threads sleeping on the object wait queue */
480 void wake_up( struct object *obj, int max )
482 struct wait_queue_entry *entry = obj->head;
486 struct wait_queue_entry *next = entry->next;
487 if (wake_thread( entry->thread ))
489 if (max && !--max) break;
495 /* queue an async procedure call */
496 static int thread_queue_apc( struct thread *thread, void *func, void *param )
498 struct thread_apc *apc;
501 if (!(thread->apc = mem_alloc( MAX_THREAD_APC * sizeof(*apc) )))
503 thread->apc_count = 0;
505 else if (thread->apc_count >= MAX_THREAD_APC) return 0;
506 thread->apc[thread->apc_count].func = func;
507 thread->apc[thread->apc_count].param = param;
509 if (thread->wait) wake_thread( thread );
513 /* kill a thread on the spot */
514 void kill_thread( struct thread *thread, int exit_code )
516 if (thread->state == TERMINATED) return; /* already killed */
517 if (thread->unix_pid) kill( thread->unix_pid, SIGTERM );
518 remove_client( thread->client, exit_code ); /* this will call thread_killed */
521 /* a thread has been killed */
522 void thread_killed( struct thread *thread, int exit_code )
524 thread->state = TERMINATED;
525 thread->exit_code = exit_code;
526 if (thread->wait) end_wait( thread );
527 debug_exit_thread( thread, exit_code );
528 abandon_mutexes( thread );
529 remove_process_thread( thread->process, thread );
530 wake_up( &thread->obj, 0 );
531 release_object( thread );
534 /* create a new thread */
535 DECL_HANDLER(new_thread)
537 struct new_thread_reply reply;
540 if ((new_fd = dup(fd)) != -1)
542 reply.tid = create_thread( new_fd, req->pid, req->suspend,
543 req->inherit, &reply.handle );
544 if (!reply.tid) close( new_fd );
547 SET_ERROR( ERROR_TOO_MANY_OPEN_FILES );
549 send_reply( current, -1, 1, &reply, sizeof(reply) );
552 /* initialize a new thread */
553 DECL_HANDLER(init_thread)
555 struct init_thread_reply reply;
557 if (current->state != STARTING)
559 fatal_protocol_error( "init_thread: already running\n" );
562 current->state = RUNNING;
563 current->unix_pid = req->unix_pid;
564 current->teb = req->teb;
565 if (current->suspend + current->process->suspend > 0)
566 kill( current->unix_pid, SIGSTOP );
567 reply.pid = current->process;
569 send_reply( current, -1, 1, &reply, sizeof(reply) );
572 /* terminate a thread */
573 DECL_HANDLER(terminate_thread)
575 struct thread *thread;
577 if ((thread = get_thread_from_handle( req->handle, THREAD_TERMINATE )))
579 kill_thread( thread, req->exit_code );
580 release_object( thread );
582 if (current) send_reply( current, -1, 0 );
585 /* fetch information about a thread */
586 DECL_HANDLER(get_thread_info)
588 struct thread *thread;
589 struct get_thread_info_reply reply = { 0, 0, 0 };
591 if ((thread = get_thread_from_handle( req->handle, THREAD_QUERY_INFORMATION )))
594 reply.exit_code = thread->exit_code;
595 reply.priority = thread->priority;
596 release_object( thread );
598 send_reply( current, -1, 1, &reply, sizeof(reply) );
601 /* set information about a thread */
602 DECL_HANDLER(set_thread_info)
604 struct thread *thread;
606 if ((thread = get_thread_from_handle( req->handle, THREAD_SET_INFORMATION )))
608 set_thread_info( thread, req );
609 release_object( thread );
611 send_reply( current, -1, 0 );
614 /* suspend a thread */
615 DECL_HANDLER(suspend_thread)
617 struct thread *thread;
618 struct suspend_thread_reply reply = { -1 };
619 if ((thread = get_thread_from_handle( req->handle, THREAD_SUSPEND_RESUME )))
621 reply.count = suspend_thread( thread );
622 release_object( thread );
624 send_reply( current, -1, 1, &reply, sizeof(reply) );
628 /* resume a thread */
629 DECL_HANDLER(resume_thread)
631 struct thread *thread;
632 struct resume_thread_reply reply = { -1 };
633 if ((thread = get_thread_from_handle( req->handle, THREAD_SUSPEND_RESUME )))
635 reply.count = resume_thread( thread );
636 release_object( thread );
638 send_reply( current, -1, 1, &reply, sizeof(reply) );
642 /* select on a handle list */
645 if (len != req->count * sizeof(int))
646 fatal_protocol_error( "select: bad length %d for %d handles\n",
648 sleep_on( current, req->count, (int *)data, req->flags, req->timeout );
651 /* queue an APC for a thread */
652 DECL_HANDLER(queue_apc)
654 struct thread *thread;
655 if ((thread = get_thread_from_handle( req->handle, THREAD_SET_CONTEXT )))
657 thread_queue_apc( thread, req->func, req->param );
658 release_object( thread );
660 send_reply( current, -1, 0 );