2 * Server-side thread management
4 * Copyright (C) 1998 Alexandre Julliard
13 #include <sys/types.h>
22 #include "server/thread.h"
23 #include "server/process.h"
28 struct wait_queue_entry
30 struct wait_queue_entry *next;
31 struct wait_queue_entry *prev;
33 struct thread *thread;
38 int count; /* count of objects */
40 struct timeval timeout;
41 struct wait_queue_entry queues[1];
44 /* asynchronous procedure calls */
48 void *func; /* function to call in client */
49 void *param; /* function param */
51 #define MAX_THREAD_APC 16 /* Max outstanding APCs for a thread */
54 /* thread operations */
56 static void dump_thread( struct object *obj, int verbose );
57 static int thread_signaled( struct object *obj, struct thread *thread );
58 static void destroy_thread( struct object *obj );
60 static const struct object_ops thread_ops =
74 static struct thread initial_thread;
75 static struct thread *first_thread = &initial_thread;
77 /* initialization of a thread structure */
78 static void init_thread( struct thread *thread, int fd )
80 init_object( &thread->obj, &thread_ops, NULL );
81 thread->client_fd = fd;
82 thread->unix_pid = 0; /* not known yet */
86 thread->apc_count = 0;
88 thread->state = STARTING;
89 thread->exit_code = 0x103; /* STILL_ACTIVE */
92 thread->priority = THREAD_PRIORITY_NORMAL;
97 /* create the initial thread and start the main server loop */
98 void create_initial_thread( int fd )
100 current = &initial_thread;
101 init_thread( &initial_thread, fd );
102 initial_thread.process = create_initial_process();
103 add_process_thread( initial_thread.process, &initial_thread );
104 add_client( fd, &initial_thread );
105 grab_object( &initial_thread ); /* so that we never free it */
109 /* create a new thread */
110 struct thread *create_thread( int fd, void *pid, int suspend, int inherit, int *handle )
112 struct thread *thread;
113 struct process *process;
115 if (!(thread = mem_alloc( sizeof(*thread) ))) return NULL;
117 if (!(process = get_process_from_id( pid )))
122 init_thread( thread, fd );
123 thread->process = process;
125 if (suspend) thread->suspend++;
127 thread->next = first_thread;
128 first_thread->prev = thread;
129 first_thread = thread;
130 add_process_thread( process, thread );
132 if ((*handle = alloc_handle( current->process, thread,
133 THREAD_ALL_ACCESS, inherit )) == -1) goto error;
134 if (add_client( fd, thread ) == -1)
136 SET_ERROR( ERROR_TOO_MANY_OPEN_FILES );
142 if (current) close_handle( current->process, *handle );
143 remove_process_thread( process, thread );
144 release_object( thread );
148 /* destroy a thread when its refcount is 0 */
149 static void destroy_thread( struct object *obj )
151 struct thread *thread = (struct thread *)obj;
152 assert( obj->ops == &thread_ops );
154 release_object( thread->process );
155 if (thread->next) thread->next->prev = thread->prev;
156 if (thread->prev) thread->prev->next = thread->next;
157 else first_thread = thread->next;
158 if (thread->apc) free( thread->apc );
159 if (debug_level) memset( thread, 0xaa, sizeof(thread) ); /* catch errors */
163 /* dump a thread on stdout for debugging purposes */
164 static void dump_thread( struct object *obj, int verbose )
166 struct thread *thread = (struct thread *)obj;
167 assert( obj->ops == &thread_ops );
169 fprintf( stderr, "Thread pid=%d fd=%d\n",
170 thread->unix_pid, thread->client_fd );
173 static int thread_signaled( struct object *obj, struct thread *thread )
175 struct thread *mythread = (struct thread *)obj;
176 return (mythread->state == TERMINATED);
179 /* get a thread pointer from a thread id (and increment the refcount) */
180 struct thread *get_thread_from_id( void *id )
182 struct thread *t = first_thread;
183 while (t && (t != id)) t = t->next;
184 if (t) grab_object( t );
188 /* get a thread from a handle (and increment the refcount) */
189 struct thread *get_thread_from_handle( int handle, unsigned int access )
191 return (struct thread *)get_handle_obj( current->process, handle,
192 access, &thread_ops );
195 /* get all information about a thread */
196 void get_thread_info( struct thread *thread,
197 struct get_thread_info_reply *reply )
200 reply->exit_code = thread->exit_code;
201 reply->priority = thread->priority;
205 /* set all information about a thread */
206 void set_thread_info( struct thread *thread,
207 struct set_thread_info_request *req )
209 if (req->mask & SET_THREAD_INFO_PRIORITY)
210 thread->priority = req->priority;
211 if (req->mask & SET_THREAD_INFO_AFFINITY)
213 if (req->affinity != 1) SET_ERROR( ERROR_INVALID_PARAMETER );
214 else thread->affinity = req->affinity;
218 /* suspend a thread */
219 int suspend_thread( struct thread *thread )
221 int old_count = thread->suspend;
222 if (thread->suspend < MAXIMUM_SUSPEND_COUNT)
224 if (!thread->suspend++)
226 if (thread->unix_pid) kill( thread->unix_pid, SIGSTOP );
232 /* resume a thread */
233 int resume_thread( struct thread *thread )
235 int old_count = thread->suspend;
236 if (thread->suspend > 0)
238 if (!--thread->suspend)
240 if (thread->unix_pid) kill( thread->unix_pid, SIGCONT );
246 /* suspend all threads but the current */
247 void suspend_all_threads( void )
249 struct thread *thread;
250 for ( thread = first_thread; thread; thread = thread->next )
251 if ( thread != current )
252 suspend_thread( thread );
255 /* resume all threads but the current */
256 void resume_all_threads( void )
258 struct thread *thread;
259 for ( thread = first_thread; thread; thread = thread->next )
260 if ( thread != current )
261 resume_thread( thread );
264 /* send a reply to a thread */
265 int send_reply( struct thread *thread, int pass_fd, int n,
266 ... /* arg_1, len_1, ..., arg_n, len_n */ )
268 struct iovec vec[16];
274 for (i = 0; i < n; i++)
276 vec[i].iov_base = va_arg( args, void * );
277 vec[i].iov_len = va_arg( args, int );
280 return send_reply_v( thread->client_fd, thread->error, pass_fd, vec, n );
283 /* add a thread to an object wait queue; return 1 if OK, 0 on error */
284 int add_queue( struct object *obj, struct wait_queue_entry *entry )
288 entry->prev = obj->tail;
290 if (obj->tail) obj->tail->next = entry;
291 else obj->head = entry;
296 /* remove a thread from an object wait queue */
297 void remove_queue( struct object *obj, struct wait_queue_entry *entry )
299 if (entry->next) entry->next->prev = entry->prev;
300 else obj->tail = entry->prev;
301 if (entry->prev) entry->prev->next = entry->next;
302 else obj->head = entry->next;
303 release_object( obj );
307 static void end_wait( struct thread *thread )
309 struct thread_wait *wait = thread->wait;
310 struct wait_queue_entry *entry;
314 for (i = 0, entry = wait->queues; i < wait->count; i++, entry++)
315 entry->obj->ops->remove_queue( entry->obj, entry );
316 if (wait->flags & SELECT_TIMEOUT) set_select_timeout( thread->client_fd, NULL );
321 /* build the thread wait structure */
322 static int wait_on( struct thread *thread, int count,
323 int *handles, int flags, int timeout )
325 struct thread_wait *wait;
326 struct wait_queue_entry *entry;
330 if ((count < 0) || (count > MAXIMUM_WAIT_OBJECTS))
332 SET_ERROR( ERROR_INVALID_PARAMETER );
335 if (!(wait = mem_alloc( sizeof(*wait) + (count-1) * sizeof(*entry) ))) return 0;
339 if (flags & SELECT_TIMEOUT)
341 gettimeofday( &wait->timeout, 0 );
344 wait->timeout.tv_usec += (timeout % 1000) * 1000;
345 if (wait->timeout.tv_usec >= 1000000)
347 wait->timeout.tv_usec -= 1000000;
348 wait->timeout.tv_sec++;
350 wait->timeout.tv_sec += timeout / 1000;
354 for (i = 0, entry = wait->queues; i < count; i++, entry++)
356 if (!(obj = get_handle_obj( thread->process, handles[i],
357 SYNCHRONIZE, NULL )))
363 entry->thread = thread;
364 if (!obj->ops->add_queue( obj, entry ))
370 release_object( obj );
375 /* check if the thread waiting condition is satisfied */
376 static int check_wait( struct thread *thread, int *signaled )
379 struct thread_wait *wait = thread->wait;
380 struct wait_queue_entry *entry = wait->queues;
383 if (wait->flags & SELECT_ALL)
385 for (i = 0, entry = wait->queues; i < wait->count; i++, entry++)
386 if (!entry->obj->ops->signaled( entry->obj, thread )) goto other_checks;
387 /* Wait satisfied: tell it to all objects */
389 for (i = 0, entry = wait->queues; i < wait->count; i++, entry++)
390 if (entry->obj->ops->satisfied( entry->obj, thread ))
391 *signaled = STATUS_ABANDONED_WAIT_0;
396 for (i = 0, entry = wait->queues; i < wait->count; i++, entry++)
398 if (!entry->obj->ops->signaled( entry->obj, thread )) continue;
399 /* Wait satisfied: tell it to the object */
401 if (entry->obj->ops->satisfied( entry->obj, thread ))
402 *signaled += STATUS_ABANDONED_WAIT_0;
408 if ((wait->flags & SELECT_ALERTABLE) && thread->apc)
410 *signaled = STATUS_USER_APC;
413 if (wait->flags & SELECT_TIMEOUT)
416 gettimeofday( &now, NULL );
417 if ((now.tv_sec > wait->timeout.tv_sec) ||
418 ((now.tv_sec == wait->timeout.tv_sec) &&
419 (now.tv_usec >= wait->timeout.tv_usec)))
421 *signaled = STATUS_TIMEOUT;
428 /* send the select reply to wake up the client */
429 static void send_select_reply( struct thread *thread, int signaled )
431 struct select_reply reply;
432 reply.signaled = signaled;
433 if ((signaled == STATUS_USER_APC) && thread->apc)
435 struct thread_apc *apc = thread->apc;
436 int len = thread->apc_count * sizeof(*apc);
438 thread->apc_count = 0;
439 send_reply( thread, -1, 2, &reply, sizeof(reply),
443 else send_reply( thread, -1, 1, &reply, sizeof(reply) );
446 /* attempt to wake up a thread */
447 /* return 1 if OK, 0 if the wait condition is still not satisfied */
448 static int wake_thread( struct thread *thread )
452 if (!check_wait( thread, &signaled )) return 0;
454 send_select_reply( thread, signaled );
458 /* sleep on a list of objects */
459 void sleep_on( struct thread *thread, int count, int *handles, int flags, int timeout )
461 assert( !thread->wait );
462 if (!wait_on( thread, count, handles, flags, timeout ))
464 /* return an error */
465 send_select_reply( thread, -1 );
468 if (!wake_thread( thread ))
470 /* we need to wait */
471 if (flags & SELECT_TIMEOUT)
472 set_select_timeout( thread->client_fd, &thread->wait->timeout );
476 /* timeout for the current thread */
477 void thread_timeout(void)
479 assert( current->wait );
481 send_select_reply( current, STATUS_TIMEOUT );
484 /* attempt to wake threads sleeping on the object wait queue */
485 void wake_up( struct object *obj, int max )
487 struct wait_queue_entry *entry = obj->head;
491 struct wait_queue_entry *next = entry->next;
492 if (wake_thread( entry->thread ))
494 if (max && !--max) break;
500 /* queue an async procedure call */
501 int thread_queue_apc( struct thread *thread, void *func, void *param )
503 struct thread_apc *apc;
506 if (!(thread->apc = mem_alloc( MAX_THREAD_APC * sizeof(*apc) )))
508 thread->apc_count = 0;
510 else if (thread->apc_count >= MAX_THREAD_APC) return 0;
511 thread->apc[thread->apc_count].func = func;
512 thread->apc[thread->apc_count].param = param;
514 if (thread->wait) wake_thread( thread );
518 /* kill a thread on the spot */
519 void kill_thread( struct thread *thread, int exit_code )
521 if (thread->state == TERMINATED) return; /* already killed */
522 if (thread->unix_pid) kill( thread->unix_pid, SIGTERM );
523 remove_client( thread->client_fd, exit_code ); /* this will call thread_killed */
526 /* a thread has been killed */
527 void thread_killed( struct thread *thread, int exit_code )
529 thread->state = TERMINATED;
530 thread->exit_code = exit_code;
531 if (thread->wait) end_wait( thread );
532 abandon_mutexes( thread );
533 remove_process_thread( thread->process, thread );
534 wake_up( &thread->obj, 0 );
535 release_object( thread );