2 * Server-side thread management
4 * Copyright (C) 1998 Alexandre Julliard
13 #include <sys/types.h>
22 #include "server/thread.h"
23 #include "server/process.h"
28 struct wait_queue_entry
30 struct wait_queue_entry *next;
31 struct wait_queue_entry *prev;
33 struct thread *thread;
38 int count; /* count of objects */
40 struct timeval timeout;
41 struct wait_queue_entry queues[1];
44 /* asynchronous procedure calls */
48 void *func; /* function to call in client */
49 void *param; /* function param */
51 #define MAX_THREAD_APC 16 /* Max outstanding APCs for a thread */
54 /* thread operations */
56 static void dump_thread( struct object *obj, int verbose );
57 static int thread_signaled( struct object *obj, struct thread *thread );
58 static void destroy_thread( struct object *obj );
60 static const struct object_ops thread_ops =
74 static struct thread initial_thread;
75 static struct thread *first_thread = &initial_thread;
77 /* initialization of a thread structure */
78 static void init_thread( struct thread *thread, int fd )
80 init_object( &thread->obj, &thread_ops, NULL );
81 thread->client_fd = fd;
82 thread->unix_pid = 0; /* not known yet */
86 thread->apc_count = 0;
88 thread->state = STARTING;
89 thread->exit_code = 0x103; /* STILL_ACTIVE */
92 thread->priority = THREAD_PRIORITY_NORMAL;
97 /* create the initial thread and start the main server loop */
98 void create_initial_thread( int fd )
100 current = &initial_thread;
101 init_thread( &initial_thread, fd );
102 initial_thread.process = create_initial_process();
103 add_process_thread( initial_thread.process, &initial_thread );
104 add_client( fd, &initial_thread );
108 /* create a new thread */
109 struct thread *create_thread( int fd, void *pid, int suspend, int inherit, int *handle )
111 struct thread *thread;
112 struct process *process;
114 if (!(thread = mem_alloc( sizeof(*thread) ))) return NULL;
116 if (!(process = get_process_from_id( pid )))
121 init_thread( thread, fd );
122 thread->process = process;
124 if (suspend) thread->suspend++;
126 thread->next = first_thread;
127 first_thread->prev = thread;
128 first_thread = thread;
129 add_process_thread( process, thread );
131 if ((*handle = alloc_handle( current->process, thread,
132 THREAD_ALL_ACCESS, inherit )) == -1) goto error;
133 if (add_client( fd, thread ) == -1)
135 SET_ERROR( ERROR_TOO_MANY_OPEN_FILES );
141 if (current) close_handle( current->process, *handle );
142 remove_process_thread( process, thread );
143 release_object( thread );
147 /* destroy a thread when its refcount is 0 */
148 static void destroy_thread( struct object *obj )
150 struct thread *thread = (struct thread *)obj;
151 assert( obj->ops == &thread_ops );
153 release_object( thread->process );
154 if (thread->next) thread->next->prev = thread->prev;
155 if (thread->prev) thread->prev->next = thread->next;
156 else first_thread = thread->next;
157 if (thread->apc) free( thread->apc );
158 if (debug_level) memset( thread, 0xaa, sizeof(thread) ); /* catch errors */
162 /* dump a thread on stdout for debugging purposes */
163 static void dump_thread( struct object *obj, int verbose )
165 struct thread *thread = (struct thread *)obj;
166 assert( obj->ops == &thread_ops );
168 fprintf( stderr, "Thread pid=%d fd=%d\n",
169 thread->unix_pid, thread->client_fd );
172 static int thread_signaled( struct object *obj, struct thread *thread )
174 struct thread *mythread = (struct thread *)obj;
175 return (mythread->state == TERMINATED);
178 /* get a thread pointer from a thread id (and increment the refcount) */
179 struct thread *get_thread_from_id( void *id )
181 struct thread *t = first_thread;
182 while (t && (t != id)) t = t->next;
183 if (t) grab_object( t );
187 /* get a thread from a handle (and increment the refcount) */
188 struct thread *get_thread_from_handle( int handle, unsigned int access )
190 return (struct thread *)get_handle_obj( current->process, handle,
191 access, &thread_ops );
194 /* get all information about a thread */
195 void get_thread_info( struct thread *thread,
196 struct get_thread_info_reply *reply )
199 reply->exit_code = thread->exit_code;
200 reply->priority = thread->priority;
204 /* set all information about a thread */
205 void set_thread_info( struct thread *thread,
206 struct set_thread_info_request *req )
208 if (req->mask & SET_THREAD_INFO_PRIORITY)
209 thread->priority = req->priority;
210 if (req->mask & SET_THREAD_INFO_AFFINITY)
212 if (req->affinity != 1) SET_ERROR( ERROR_INVALID_PARAMETER );
213 else thread->affinity = req->affinity;
217 /* suspend a thread */
218 int suspend_thread( struct thread *thread )
220 int old_count = thread->suspend;
221 if (thread->suspend < MAXIMUM_SUSPEND_COUNT)
223 if (!thread->suspend++)
225 if (thread->unix_pid) kill( thread->unix_pid, SIGSTOP );
231 /* resume a thread */
232 int resume_thread( struct thread *thread )
234 int old_count = thread->suspend;
235 if (thread->suspend > 0)
237 if (!--thread->suspend)
239 if (thread->unix_pid) kill( thread->unix_pid, SIGCONT );
245 /* send a reply to a thread */
246 int send_reply( struct thread *thread, int pass_fd, int n,
247 ... /* arg_1, len_1, ..., arg_n, len_n */ )
249 struct iovec vec[16];
255 for (i = 0; i < n; i++)
257 vec[i].iov_base = va_arg( args, void * );
258 vec[i].iov_len = va_arg( args, int );
261 return send_reply_v( thread->client_fd, thread->error, pass_fd, vec, n );
264 /* add a thread to an object wait queue; return 1 if OK, 0 on error */
265 int add_queue( struct object *obj, struct wait_queue_entry *entry )
269 entry->prev = obj->tail;
271 if (obj->tail) obj->tail->next = entry;
272 else obj->head = entry;
277 /* remove a thread from an object wait queue */
278 void remove_queue( struct object *obj, struct wait_queue_entry *entry )
280 if (entry->next) entry->next->prev = entry->prev;
281 else obj->tail = entry->prev;
282 if (entry->prev) entry->prev->next = entry->next;
283 else obj->head = entry->next;
284 release_object( obj );
288 static void end_wait( struct thread *thread )
290 struct thread_wait *wait = thread->wait;
291 struct wait_queue_entry *entry;
295 for (i = 0, entry = wait->queues; i < wait->count; i++, entry++)
296 entry->obj->ops->remove_queue( entry->obj, entry );
297 if (wait->flags & SELECT_TIMEOUT) set_select_timeout( thread->client_fd, NULL );
302 /* build the thread wait structure */
303 static int wait_on( struct thread *thread, int count,
304 int *handles, int flags, int timeout )
306 struct thread_wait *wait;
307 struct wait_queue_entry *entry;
311 if ((count < 0) || (count > MAXIMUM_WAIT_OBJECTS))
313 SET_ERROR( ERROR_INVALID_PARAMETER );
316 if (!(wait = mem_alloc( sizeof(*wait) + (count-1) * sizeof(*entry) ))) return 0;
320 if (flags & SELECT_TIMEOUT)
322 gettimeofday( &wait->timeout, 0 );
325 wait->timeout.tv_usec += (timeout % 1000) * 1000;
326 if (wait->timeout.tv_usec >= 1000000)
328 wait->timeout.tv_usec -= 1000000;
329 wait->timeout.tv_sec++;
331 wait->timeout.tv_sec += timeout / 1000;
335 for (i = 0, entry = wait->queues; i < count; i++, entry++)
337 if (!(obj = get_handle_obj( thread->process, handles[i],
338 SYNCHRONIZE, NULL )))
344 entry->thread = thread;
345 if (!obj->ops->add_queue( obj, entry ))
351 release_object( obj );
356 /* check if the thread waiting condition is satisfied */
357 static int check_wait( struct thread *thread, int *signaled )
360 struct thread_wait *wait = thread->wait;
361 struct wait_queue_entry *entry = wait->queues;
364 if (wait->flags & SELECT_ALL)
366 for (i = 0, entry = wait->queues; i < wait->count; i++, entry++)
367 if (!entry->obj->ops->signaled( entry->obj, thread )) goto other_checks;
368 /* Wait satisfied: tell it to all objects */
370 for (i = 0, entry = wait->queues; i < wait->count; i++, entry++)
371 if (entry->obj->ops->satisfied( entry->obj, thread ))
372 *signaled = STATUS_ABANDONED_WAIT_0;
377 for (i = 0, entry = wait->queues; i < wait->count; i++, entry++)
379 if (!entry->obj->ops->signaled( entry->obj, thread )) continue;
380 /* Wait satisfied: tell it to the object */
382 if (entry->obj->ops->satisfied( entry->obj, thread ))
383 *signaled += STATUS_ABANDONED_WAIT_0;
389 if ((wait->flags & SELECT_ALERTABLE) && thread->apc)
391 *signaled = STATUS_USER_APC;
394 if (wait->flags & SELECT_TIMEOUT)
397 gettimeofday( &now, NULL );
398 if ((now.tv_sec > wait->timeout.tv_sec) ||
399 ((now.tv_sec == wait->timeout.tv_sec) &&
400 (now.tv_usec >= wait->timeout.tv_usec)))
402 *signaled = STATUS_TIMEOUT;
409 /* send the select reply to wake up the client */
410 static void send_select_reply( struct thread *thread, int signaled )
412 struct select_reply reply;
413 reply.signaled = signaled;
414 if ((signaled == STATUS_USER_APC) && thread->apc)
416 struct thread_apc *apc = thread->apc;
417 int len = thread->apc_count * sizeof(*apc);
419 thread->apc_count = 0;
420 send_reply( thread, -1, 2, &reply, sizeof(reply),
424 else send_reply( thread, -1, 1, &reply, sizeof(reply) );
427 /* attempt to wake up a thread */
428 /* return 1 if OK, 0 if the wait condition is still not satisfied */
429 static int wake_thread( struct thread *thread )
433 if (!check_wait( thread, &signaled )) return 0;
435 send_select_reply( thread, signaled );
439 /* sleep on a list of objects */
440 void sleep_on( struct thread *thread, int count, int *handles, int flags, int timeout )
442 assert( !thread->wait );
443 if (!wait_on( thread, count, handles, flags, timeout ))
445 /* return an error */
446 send_select_reply( thread, -1 );
449 if (!wake_thread( thread ))
451 /* we need to wait */
452 if (flags & SELECT_TIMEOUT)
453 set_select_timeout( thread->client_fd, &thread->wait->timeout );
457 /* timeout for the current thread */
458 void thread_timeout(void)
460 assert( current->wait );
462 send_select_reply( current, STATUS_TIMEOUT );
465 /* attempt to wake threads sleeping on the object wait queue */
466 void wake_up( struct object *obj, int max )
468 struct wait_queue_entry *entry = obj->head;
472 struct wait_queue_entry *next = entry->next;
473 if (wake_thread( entry->thread ))
475 if (max && !--max) break;
481 /* queue an async procedure call */
482 int thread_queue_apc( struct thread *thread, void *func, void *param )
484 struct thread_apc *apc;
487 if (!(thread->apc = mem_alloc( MAX_THREAD_APC * sizeof(*apc) )))
489 thread->apc_count = 0;
491 else if (thread->apc_count >= MAX_THREAD_APC) return 0;
492 thread->apc[thread->apc_count].func = func;
493 thread->apc[thread->apc_count].param = param;
495 wake_thread( thread );
499 /* kill a thread on the spot */
500 void kill_thread( struct thread *thread, int exit_code )
502 if (thread->state == TERMINATED) return; /* already killed */
503 if (thread->unix_pid) kill( thread->unix_pid, SIGTERM );
504 remove_client( thread->client_fd, exit_code ); /* this will call thread_killed */
507 /* a thread has been killed */
508 void thread_killed( struct thread *thread, int exit_code )
510 thread->state = TERMINATED;
511 thread->exit_code = exit_code;
512 if (thread->wait) end_wait( thread );
513 abandon_mutexes( thread );
514 remove_process_thread( thread->process, thread );
515 wake_up( &thread->obj, 0 );
516 release_object( thread );