Fixes to FD_CLOSE handling.
[wine] / server / thread.c
1 /*
2  * Server-side thread management
3  *
4  * Copyright (C) 1998 Alexandre Julliard
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19  */
20
21 #include "config.h"
22 #include "wine/port.h"
23
24 #include <assert.h>
25 #include <errno.h>
26 #include <fcntl.h>
27 #include <signal.h>
28 #include <stdarg.h>
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <string.h>
32 #include <sys/types.h>
33 #include <unistd.h>
34 #include <stdarg.h>
35
36 #include "winbase.h"
37
38 #include "handle.h"
39 #include "process.h"
40 #include "thread.h"
41 #include "request.h"
42 #include "user.h"
43
44
45 /* thread queues */
46
47 struct thread_wait
48 {
49     struct thread_wait     *next;       /* next wait structure for this thread */
50     struct thread          *thread;     /* owner thread */
51     int                     count;      /* count of objects */
52     int                     flags;
53     void                   *cookie;     /* magic cookie to return to client */
54     struct timeval          timeout;
55     struct timeout_user    *user;
56     struct wait_queue_entry queues[1];
57 };
58
59 /* asynchronous procedure calls */
60
61 struct thread_apc
62 {
63     struct thread_apc  *next;     /* queue linked list */
64     struct thread_apc  *prev;
65     struct object      *owner;    /* object that queued this apc */
66     void               *func;     /* function to call in client */
67     enum apc_type       type;     /* type of apc function */
68     int                 nb_args;  /* number of arguments */
69     void               *args[1];  /* function arguments */
70 };
71
72
73 /* thread operations */
74
75 static void dump_thread( struct object *obj, int verbose );
76 static int thread_signaled( struct object *obj, struct thread *thread );
77 static void thread_poll_event( struct object *obj, int event );
78 static void destroy_thread( struct object *obj );
79 static struct thread_apc *thread_dequeue_apc( struct thread *thread, int system_only );
80
81 static const struct object_ops thread_ops =
82 {
83     sizeof(struct thread),      /* size */
84     dump_thread,                /* dump */
85     add_queue,                  /* add_queue */
86     remove_queue,               /* remove_queue */
87     thread_signaled,            /* signaled */
88     no_satisfied,               /* satisfied */
89     NULL,                       /* get_poll_events */
90     thread_poll_event,          /* poll_event */
91     no_get_fd,                  /* get_fd */
92     no_flush,                   /* flush */
93     no_get_file_info,           /* get_file_info */
94     NULL,                       /* queue_async */
95     destroy_thread              /* destroy */
96 };
97
98 static struct thread *first_thread;
99 static struct thread *booting_thread;
100
101 /* initialize the structure for a newly allocated thread */
102 inline static void init_thread_structure( struct thread *thread )
103 {
104     int i;
105
106     thread->unix_pid        = 0;  /* not known yet */
107     thread->context         = NULL;
108     thread->teb             = NULL;
109     thread->mutex           = NULL;
110     thread->debug_ctx       = NULL;
111     thread->debug_event     = NULL;
112     thread->queue           = NULL;
113     thread->info            = NULL;
114     thread->wait            = NULL;
115     thread->system_apc.head = NULL;
116     thread->system_apc.tail = NULL;
117     thread->user_apc.head   = NULL;
118     thread->user_apc.tail   = NULL;
119     thread->error           = 0;
120     thread->req_data        = NULL;
121     thread->req_toread      = 0;
122     thread->reply_data      = NULL;
123     thread->reply_towrite   = 0;
124     thread->reply_fd        = -1;
125     thread->wait_fd         = -1;
126     thread->state           = RUNNING;
127     thread->attached        = 0;
128     thread->exit_code       = 0;
129     thread->next            = NULL;
130     thread->prev            = NULL;
131     thread->priority        = THREAD_PRIORITY_NORMAL;
132     thread->affinity        = 1;
133     thread->suspend         = 0;
134
135     for (i = 0; i < MAX_INFLIGHT_FDS; i++)
136         thread->inflight[i].server = thread->inflight[i].client = -1;
137 }
138
139 /* create a new thread */
140 struct thread *create_thread( int fd, struct process *process )
141 {
142     struct thread *thread;
143
144     if (!(thread = alloc_object( &thread_ops, fd ))) return NULL;
145
146     init_thread_structure( thread );
147
148     thread->process = (struct process *)grab_object( process );
149     thread->request_fd = fd;
150     if (!current) current = thread;
151
152     if (!booting_thread)  /* first thread ever */
153     {
154         booting_thread = thread;
155         lock_master_socket(1);
156     }
157
158     if ((thread->next = first_thread) != NULL) thread->next->prev = thread;
159     first_thread = thread;
160
161     set_select_events( &thread->obj, POLLIN );  /* start listening to events */
162     add_process_thread( thread->process, thread );
163     return thread;
164 }
165
166 /* handle a client event */
167 static void thread_poll_event( struct object *obj, int event )
168 {
169     struct thread *thread = (struct thread *)obj;
170     assert( obj->ops == &thread_ops );
171
172     if (event & (POLLERR | POLLHUP)) kill_thread( thread, 0 );
173     else if (event & POLLIN) read_request( thread );
174     else if (event & POLLOUT) write_reply( thread );
175 }
176
177 /* cleanup everything that is no longer needed by a dead thread */
178 /* used by destroy_thread and kill_thread */
179 static void cleanup_thread( struct thread *thread )
180 {
181     int i;
182     struct thread_apc *apc;
183
184     while ((apc = thread_dequeue_apc( thread, 0 ))) free( apc );
185     if (thread->req_data) free( thread->req_data );
186     if (thread->reply_data) free( thread->reply_data );
187     if (thread->request_fd != -1) close( thread->request_fd );
188     if (thread->reply_fd != -1) close( thread->reply_fd );
189     if (thread->wait_fd != -1) close( thread->wait_fd );
190     if (thread->queue)
191     {
192         if (thread->process->queue == thread->queue)
193         {
194             release_object( thread->process->queue );
195             thread->process->queue = NULL;
196         }
197         release_object( thread->queue );
198         thread->queue = NULL;
199     }
200     destroy_thread_windows( thread );
201     for (i = 0; i < MAX_INFLIGHT_FDS; i++)
202     {
203         if (thread->inflight[i].client != -1)
204         {
205             close( thread->inflight[i].server );
206             thread->inflight[i].client = thread->inflight[i].server = -1;
207         }
208     }
209     thread->req_data = NULL;
210     thread->reply_data = NULL;
211     thread->request_fd = -1;
212     thread->reply_fd = -1;
213     thread->wait_fd = -1;
214 }
215
216 /* destroy a thread when its refcount is 0 */
217 static void destroy_thread( struct object *obj )
218 {
219     struct thread_apc *apc;
220     struct thread *thread = (struct thread *)obj;
221     assert( obj->ops == &thread_ops );
222
223     assert( !thread->debug_ctx );  /* cannot still be debugging something */
224     if (thread->next) thread->next->prev = thread->prev;
225     if (thread->prev) thread->prev->next = thread->next;
226     else first_thread = thread->next;
227     while ((apc = thread_dequeue_apc( thread, 0 ))) free( apc );
228     if (thread->info) release_object( thread->info );
229     cleanup_thread( thread );
230     release_object( thread->process );
231 }
232
233 /* dump a thread on stdout for debugging purposes */
234 static void dump_thread( struct object *obj, int verbose )
235 {
236     struct thread *thread = (struct thread *)obj;
237     assert( obj->ops == &thread_ops );
238
239     fprintf( stderr, "Thread pid=%d teb=%p state=%d\n",
240              thread->unix_pid, thread->teb, thread->state );
241 }
242
243 static int thread_signaled( struct object *obj, struct thread *thread )
244 {
245     struct thread *mythread = (struct thread *)obj;
246     return (mythread->state == TERMINATED);
247 }
248
249 /* get a thread pointer from a thread id (and increment the refcount) */
250 struct thread *get_thread_from_id( void *id )
251 {
252     struct thread *t = first_thread;
253     while (t && (t != id)) t = t->next;
254     if (t) grab_object( t );
255     else set_error( STATUS_INVALID_PARAMETER );
256     return t;
257 }
258
259 /* get a thread from a handle (and increment the refcount) */
260 struct thread *get_thread_from_handle( handle_t handle, unsigned int access )
261 {
262     return (struct thread *)get_handle_obj( current->process, handle,
263                                             access, &thread_ops );
264 }
265
266 /* find a thread from a Unix pid */
267 struct thread *get_thread_from_pid( int pid )
268 {
269     struct thread *t = first_thread;
270     while (t && (t->unix_pid != pid)) t = t->next;
271     return t;
272 }
273
274 /* set all information about a thread */
275 static void set_thread_info( struct thread *thread,
276                              const struct set_thread_info_request *req )
277 {
278     if (req->mask & SET_THREAD_INFO_PRIORITY)
279         thread->priority = req->priority;
280     if (req->mask & SET_THREAD_INFO_AFFINITY)
281     {
282         if (req->affinity != 1) set_error( STATUS_INVALID_PARAMETER );
283         else thread->affinity = req->affinity;
284     }
285 }
286
287 /* suspend a thread */
288 int suspend_thread( struct thread *thread, int check_limit )
289 {
290     int old_count = thread->suspend;
291     if (thread->suspend < MAXIMUM_SUSPEND_COUNT || !check_limit)
292     {
293         if (!(thread->process->suspend + thread->suspend++)) stop_thread( thread );
294     }
295     else set_error( STATUS_SUSPEND_COUNT_EXCEEDED );
296     return old_count;
297 }
298
299 /* resume a thread */
300 int resume_thread( struct thread *thread )
301 {
302     int old_count = thread->suspend;
303     if (thread->suspend > 0)
304     {
305         if (!(--thread->suspend + thread->process->suspend)) continue_thread( thread );
306     }
307     return old_count;
308 }
309
310 /* add a thread to an object wait queue; return 1 if OK, 0 on error */
311 int add_queue( struct object *obj, struct wait_queue_entry *entry )
312 {
313     grab_object( obj );
314     entry->obj    = obj;
315     entry->prev   = obj->tail;
316     entry->next   = NULL;
317     if (obj->tail) obj->tail->next = entry;
318     else obj->head = entry;
319     obj->tail = entry;
320     return 1;
321 }
322
323 /* remove a thread from an object wait queue */
324 void remove_queue( struct object *obj, struct wait_queue_entry *entry )
325 {
326     if (entry->next) entry->next->prev = entry->prev;
327     else obj->tail = entry->prev;
328     if (entry->prev) entry->prev->next = entry->next;
329     else obj->head = entry->next;
330     release_object( obj );
331 }
332
333 /* finish waiting */
334 static void end_wait( struct thread *thread )
335 {
336     struct thread_wait *wait = thread->wait;
337     struct wait_queue_entry *entry;
338     int i;
339
340     assert( wait );
341     for (i = 0, entry = wait->queues; i < wait->count; i++, entry++)
342         entry->obj->ops->remove_queue( entry->obj, entry );
343     if (wait->user) remove_timeout_user( wait->user );
344     thread->wait = wait->next;
345     free( wait );
346 }
347
348 /* build the thread wait structure */
349 static int wait_on( int count, struct object *objects[], int flags, int sec, int usec )
350 {
351     struct thread_wait *wait;
352     struct wait_queue_entry *entry;
353     int i;
354
355     if (!(wait = mem_alloc( sizeof(*wait) + (count-1) * sizeof(*entry) ))) return 0;
356     wait->next    = current->wait;
357     wait->thread  = current;
358     wait->count   = count;
359     wait->flags   = flags;
360     wait->user    = NULL;
361     current->wait = wait;
362     if (flags & SELECT_TIMEOUT)
363     {
364         wait->timeout.tv_sec = sec;
365         wait->timeout.tv_usec = usec;
366     }
367
368     for (i = 0, entry = wait->queues; i < count; i++, entry++)
369     {
370         struct object *obj = objects[i];
371         entry->thread = current;
372         if (!obj->ops->add_queue( obj, entry ))
373         {
374             wait->count = i;
375             end_wait( current );
376             return 0;
377         }
378     }
379     return 1;
380 }
381
382 /* check if the thread waiting condition is satisfied */
383 static int check_wait( struct thread *thread )
384 {
385     int i, signaled;
386     struct thread_wait *wait = thread->wait;
387     struct wait_queue_entry *entry = wait->queues;
388
389     assert( wait );
390     if (wait->flags & SELECT_ALL)
391     {
392         int not_ok = 0;
393         /* Note: we must check them all anyway, as some objects may
394          * want to do something when signaled, even if others are not */
395         for (i = 0, entry = wait->queues; i < wait->count; i++, entry++)
396             not_ok |= !entry->obj->ops->signaled( entry->obj, thread );
397         if (not_ok) goto other_checks;
398         /* Wait satisfied: tell it to all objects */
399         signaled = 0;
400         for (i = 0, entry = wait->queues; i < wait->count; i++, entry++)
401             if (entry->obj->ops->satisfied( entry->obj, thread ))
402                 signaled = STATUS_ABANDONED_WAIT_0;
403         return signaled;
404     }
405     else
406     {
407         for (i = 0, entry = wait->queues; i < wait->count; i++, entry++)
408         {
409             if (!entry->obj->ops->signaled( entry->obj, thread )) continue;
410             /* Wait satisfied: tell it to the object */
411             signaled = i;
412             if (entry->obj->ops->satisfied( entry->obj, thread ))
413                 signaled = i + STATUS_ABANDONED_WAIT_0;
414             return signaled;
415         }
416     }
417
418  other_checks:
419     if ((wait->flags & SELECT_INTERRUPTIBLE) && thread->system_apc.head) return STATUS_USER_APC;
420     if ((wait->flags & SELECT_ALERTABLE) && thread->user_apc.head) return STATUS_USER_APC;
421     if (wait->flags & SELECT_TIMEOUT)
422     {
423         struct timeval now;
424         gettimeofday( &now, NULL );
425         if (!time_before( &now, &wait->timeout )) return STATUS_TIMEOUT;
426     }
427     return -1;
428 }
429
430 /* send the wakeup signal to a thread */
431 static int send_thread_wakeup( struct thread *thread, void *cookie, int signaled )
432 {
433     struct wake_up_reply reply;
434     int ret;
435
436     reply.cookie   = cookie;
437     reply.signaled = signaled;
438     if ((ret = write( thread->wait_fd, &reply, sizeof(reply) )) == sizeof(reply)) return 0;
439     if (ret >= 0)
440         fatal_protocol_error( thread, "partial wakeup write %d\n", ret );
441     else if (errno == EPIPE)
442         kill_thread( thread, 0 );  /* normal death */
443     else
444         fatal_protocol_perror( thread, "write" );
445     return -1;
446 }
447
448 /* attempt to wake up a thread */
449 /* return >0 if OK, 0 if the wait condition is still not satisfied */
450 static int wake_thread( struct thread *thread )
451 {
452     int signaled, count;
453     void *cookie;
454
455     for (count = 0; thread->wait; count++)
456     {
457         if ((signaled = check_wait( thread )) == -1) break;
458
459         cookie = thread->wait->cookie;
460         if (debug_level) fprintf( stderr, "%08x: *wakeup* signaled=%d cookie=%p\n",
461                                   (unsigned int)thread, signaled, cookie );
462         end_wait( thread );
463         if (send_thread_wakeup( thread, cookie, signaled ) == -1) /* error */
464             break;
465     }
466     return count;
467 }
468
469 /* thread wait timeout */
470 static void thread_timeout( void *ptr )
471 {
472     struct thread_wait *wait = ptr;
473     struct thread *thread = wait->thread;
474     void *cookie = wait->cookie;
475
476     wait->user = NULL;
477     if (thread->wait != wait) return; /* not the top-level wait, ignore it */
478
479     if (debug_level) fprintf( stderr, "%08x: *wakeup* signaled=%d cookie=%p\n",
480                               (unsigned int)thread, STATUS_TIMEOUT, cookie );
481     end_wait( thread );
482     send_thread_wakeup( thread, cookie, STATUS_TIMEOUT );
483     /* check if other objects have become signaled in the meantime */
484     wake_thread( thread );
485 }
486
487 /* select on a list of handles */
488 static void select_on( int count, void *cookie, const handle_t *handles,
489                        int flags, int sec, int usec )
490 {
491     int ret, i;
492     struct object *objects[MAXIMUM_WAIT_OBJECTS];
493
494     if ((count < 0) || (count > MAXIMUM_WAIT_OBJECTS))
495     {
496         set_error( STATUS_INVALID_PARAMETER );
497         return;
498     }
499     for (i = 0; i < count; i++)
500     {
501         if (!(objects[i] = get_handle_obj( current->process, handles[i], SYNCHRONIZE, NULL )))
502             break;
503     }
504
505     if (i < count) goto done;
506     if (!wait_on( count, objects, flags, sec, usec )) goto done;
507
508     if ((ret = check_wait( current )) != -1)
509     {
510         /* condition is already satisfied */
511         end_wait( current );
512         set_error( ret );
513         goto done;
514     }
515
516     /* now we need to wait */
517     if (flags & SELECT_TIMEOUT)
518     {
519         if (!(current->wait->user = add_timeout_user( &current->wait->timeout,
520                                                       thread_timeout, current->wait )))
521         {
522             end_wait( current );
523             goto done;
524         }
525     }
526     current->wait->cookie = cookie;
527     set_error( STATUS_PENDING );
528
529 done:
530     while (--i >= 0) release_object( objects[i] );
531 }
532
533 /* attempt to wake threads sleeping on the object wait queue */
534 void wake_up( struct object *obj, int max )
535 {
536     struct wait_queue_entry *entry = obj->head;
537
538     while (entry)
539     {
540         struct thread *thread = entry->thread;
541         entry = entry->next;
542         if (wake_thread( thread ))
543         {
544             if (max && !--max) break;
545         }
546     }
547 }
548
549 /* queue an async procedure call */
550 int thread_queue_apc( struct thread *thread, struct object *owner, void *func,
551                       enum apc_type type, int system, int nb_args, ... )
552 {
553     struct thread_apc *apc;
554     struct apc_queue *queue = system ? &thread->system_apc : &thread->user_apc;
555
556     /* cancel a possible previous APC with the same owner */
557     if (owner) thread_cancel_apc( thread, owner, system );
558     if (thread->state == TERMINATED) return 0;
559
560     if (!(apc = mem_alloc( sizeof(*apc) + (nb_args-1)*sizeof(apc->args[0]) ))) return 0;
561     apc->prev    = queue->tail;
562     apc->next    = NULL;
563     apc->owner   = owner;
564     apc->func    = func;
565     apc->type    = type;
566     apc->nb_args = nb_args;
567     if (nb_args)
568     {
569         int i;
570         va_list args;
571         va_start( args, nb_args );
572         for (i = 0; i < nb_args; i++) apc->args[i] = va_arg( args, void * );
573         va_end( args );
574     }
575     queue->tail = apc;
576     if (!apc->prev)  /* first one */
577     {
578         queue->head = apc;
579         wake_thread( thread );
580     }
581     else apc->prev->next = apc;
582
583     return 1;
584 }
585
586 /* cancel the async procedure call owned by a specific object */
587 void thread_cancel_apc( struct thread *thread, struct object *owner, int system )
588 {
589     struct thread_apc *apc;
590     struct apc_queue *queue = system ? &thread->system_apc : &thread->user_apc;
591     for (apc = queue->head; apc; apc = apc->next)
592     {
593         if (apc->owner != owner) continue;
594         if (apc->next) apc->next->prev = apc->prev;
595         else queue->tail = apc->prev;
596         if (apc->prev) apc->prev->next = apc->next;
597         else queue->head = apc->next;
598         free( apc );
599         return;
600     }
601 }
602
603 /* remove the head apc from the queue; the returned pointer must be freed by the caller */
604 static struct thread_apc *thread_dequeue_apc( struct thread *thread, int system_only )
605 {
606     struct thread_apc *apc;
607     struct apc_queue *queue = &thread->system_apc;
608
609     if (!queue->head && !system_only) queue = &thread->user_apc;
610     if ((apc = queue->head))
611     {
612         if (apc->next) apc->next->prev = NULL;
613         else queue->tail = NULL;
614         queue->head = apc->next;
615     }
616     return apc;
617 }
618
619 /* add an fd to the inflight list */
620 /* return list index, or -1 on error */
621 int thread_add_inflight_fd( struct thread *thread, int client, int server )
622 {
623     int i;
624
625     if (server == -1) return -1;
626     if (client == -1)
627     {
628         close( server );
629         return -1;
630     }
631
632     /* first check if we already have an entry for this fd */
633     for (i = 0; i < MAX_INFLIGHT_FDS; i++)
634         if (thread->inflight[i].client == client)
635         {
636             close( thread->inflight[i].server );
637             thread->inflight[i].server = server;
638             return i;
639         }
640
641     /* now find a free spot to store it */
642     for (i = 0; i < MAX_INFLIGHT_FDS; i++)
643         if (thread->inflight[i].client == -1)
644         {
645             thread->inflight[i].client = client;
646             thread->inflight[i].server = server;
647             return i;
648         }
649     return -1;
650 }
651
652 /* get an inflight fd and purge it from the list */
653 /* the fd must be closed when no longer used */
654 int thread_get_inflight_fd( struct thread *thread, int client )
655 {
656     int i, ret;
657
658     if (client == -1) return -1;
659
660     do
661     {
662         for (i = 0; i < MAX_INFLIGHT_FDS; i++)
663         {
664             if (thread->inflight[i].client == client)
665             {
666                 ret = thread->inflight[i].server;
667                 thread->inflight[i].server = thread->inflight[i].client = -1;
668                 return ret;
669             }
670         }
671     } while (!receive_fd( thread->process ));  /* in case it is still in the socket buffer */
672     return -1;
673 }
674
675 /* retrieve an LDT selector entry */
676 static void get_selector_entry( struct thread *thread, int entry,
677                                 unsigned int *base, unsigned int *limit,
678                                 unsigned char *flags )
679 {
680     if (!thread->process->ldt_copy)
681     {
682         set_error( STATUS_ACCESS_DENIED );
683         return;
684     }
685     if (entry >= 8192)
686     {
687         set_error( STATUS_INVALID_PARAMETER );  /* FIXME */
688         return;
689     }
690     if (suspend_for_ptrace( thread ))
691     {
692         unsigned char flags_buf[4];
693         int *addr = (int *)thread->process->ldt_copy + entry;
694         if (read_thread_int( thread, addr, base ) == -1) goto done;
695         if (read_thread_int( thread, addr + 8192, limit ) == -1) goto done;
696         addr = (int *)thread->process->ldt_copy + 2*8192 + (entry >> 2);
697         if (read_thread_int( thread, addr, (int *)flags_buf ) == -1) goto done;
698         *flags = flags_buf[entry & 3];
699     done:
700         resume_thread( thread );
701     }
702 }
703
704 /* kill a thread on the spot */
705 void kill_thread( struct thread *thread, int violent_death )
706 {
707     if (thread->state == TERMINATED) return;  /* already killed */
708     thread->state = TERMINATED;
709     if (current == thread) current = NULL;
710     if (debug_level)
711         fprintf( stderr,"%08x: *killed* exit_code=%d\n",
712                  (unsigned int)thread, thread->exit_code );
713     if (thread->wait)
714     {
715         while (thread->wait) end_wait( thread );
716         send_thread_wakeup( thread, NULL, STATUS_PENDING );
717         /* if it is waiting on the socket, we don't need to send a SIGTERM */
718         violent_death = 0;
719     }
720     kill_console_processes( thread, 0 );
721     debug_exit_thread( thread );
722     abandon_mutexes( thread );
723     remove_process_thread( thread->process, thread );
724     wake_up( &thread->obj, 0 );
725     detach_thread( thread, violent_death ? SIGTERM : 0 );
726     if (thread->request_fd == thread->obj.fd) thread->request_fd = -1;
727     if (thread->reply_fd == thread->obj.fd) thread->reply_fd = -1;
728     remove_select_user( &thread->obj );
729     cleanup_thread( thread );
730     release_object( thread );
731 }
732
733 /* take a snapshot of currently running threads */
734 struct thread_snapshot *thread_snap( int *count )
735 {
736     struct thread_snapshot *snapshot, *ptr;
737     struct thread *thread;
738     int total = 0;
739
740     for (thread = first_thread; thread; thread = thread->next)
741         if (thread->state != TERMINATED) total++;
742     if (!total || !(snapshot = mem_alloc( sizeof(*snapshot) * total ))) return NULL;
743     ptr = snapshot;
744     for (thread = first_thread; thread; thread = thread->next)
745     {
746         if (thread->state == TERMINATED) continue;
747         ptr->thread   = thread;
748         ptr->count    = thread->obj.refcount;
749         ptr->priority = thread->priority;
750         grab_object( thread );
751         ptr++;
752     }
753     *count = total;
754     return snapshot;
755 }
756
757 /* signal that we are finished booting on the client side */
758 DECL_HANDLER(boot_done)
759 {
760     debug_level = max( debug_level, req->debug_level );
761     if (current == booting_thread)
762     {
763         booting_thread = (struct thread *)~0UL;  /* make sure it doesn't match other threads */
764         lock_master_socket(0);  /* allow other clients now */
765     }
766 }
767
768 /* create a new thread */
769 DECL_HANDLER(new_thread)
770 {
771     struct thread *thread;
772     int request_fd = thread_get_inflight_fd( current, req->request_fd );
773
774     if (request_fd == -1 || fcntl( request_fd, F_SETFL, O_NONBLOCK ) == -1)
775     {
776         if (request_fd != -1) close( request_fd );
777         set_error( STATUS_INVALID_HANDLE );
778         return;
779     }
780
781     if ((thread = create_thread( request_fd, current->process )))
782     {
783         if (req->suspend) thread->suspend++;
784         reply->tid = thread;
785         if ((reply->handle = alloc_handle( current->process, thread,
786                                            THREAD_ALL_ACCESS, req->inherit )))
787         {
788             /* thread object will be released when the thread gets killed */
789             return;
790         }
791         kill_thread( thread, 1 );
792         request_fd = -1;
793     }
794 }
795
796 /* initialize a new thread */
797 DECL_HANDLER(init_thread)
798 {
799     int reply_fd = thread_get_inflight_fd( current, req->reply_fd );
800     int wait_fd = thread_get_inflight_fd( current, req->wait_fd );
801
802     if (current->unix_pid)
803     {
804         fatal_protocol_error( current, "init_thread: already running\n" );
805         goto error;
806     }
807     if (reply_fd == -1 || fcntl( reply_fd, F_SETFL, O_NONBLOCK ) == -1)
808     {
809         fatal_protocol_error( current, "bad reply fd\n" );
810         goto error;
811     }
812     if (wait_fd == -1)
813     {
814         fatal_protocol_error( current, "bad wait fd\n" );
815         goto error;
816     }
817
818     current->unix_pid = req->unix_pid;
819     current->teb      = req->teb;
820     current->reply_fd = reply_fd;
821     current->wait_fd  = wait_fd;
822
823     if (current->suspend + current->process->suspend > 0) stop_thread( current );
824     if (current->process->running_threads > 1)
825         generate_debug_event( current, CREATE_THREAD_DEBUG_EVENT, req->entry );
826
827     reply->pid     = get_process_id( current->process );
828     reply->tid     = get_thread_id( current );
829     reply->boot    = (current == booting_thread);
830     reply->version = SERVER_PROTOCOL_VERSION;
831     return;
832
833  error:
834     if (reply_fd != -1) close( reply_fd );
835     if (wait_fd != -1) close( wait_fd );
836 }
837
838 /* terminate a thread */
839 DECL_HANDLER(terminate_thread)
840 {
841     struct thread *thread;
842
843     reply->self = 0;
844     reply->last = 0;
845     if ((thread = get_thread_from_handle( req->handle, THREAD_TERMINATE )))
846     {
847         thread->exit_code = req->exit_code;
848         if (thread != current) kill_thread( thread, 1 );
849         else
850         {
851             reply->self = 1;
852             reply->last = (thread->process->running_threads == 1);
853         }
854         release_object( thread );
855     }
856 }
857
858 /* open a handle to a thread */
859 DECL_HANDLER(open_thread)
860 {
861     struct thread *thread = get_thread_from_id( req->tid );
862
863     reply->handle = 0;
864     if (thread)
865     {
866         reply->handle = alloc_handle( current->process, thread, req->access, req->inherit );
867         release_object( thread );
868     }
869 }
870
871 /* fetch information about a thread */
872 DECL_HANDLER(get_thread_info)
873 {
874     struct thread *thread;
875     handle_t handle = req->handle;
876
877     if (!handle) thread = get_thread_from_id( req->tid_in );
878     else thread = get_thread_from_handle( req->handle, THREAD_QUERY_INFORMATION );
879
880     if (thread)
881     {
882         reply->tid       = get_thread_id( thread );
883         reply->teb       = thread->teb;
884         reply->exit_code = (thread->state == TERMINATED) ? thread->exit_code : STILL_ACTIVE;
885         reply->priority  = thread->priority;
886         release_object( thread );
887     }
888 }
889
890 /* set information about a thread */
891 DECL_HANDLER(set_thread_info)
892 {
893     struct thread *thread;
894
895     if ((thread = get_thread_from_handle( req->handle, THREAD_SET_INFORMATION )))
896     {
897         set_thread_info( thread, req );
898         release_object( thread );
899     }
900 }
901
902 /* suspend a thread */
903 DECL_HANDLER(suspend_thread)
904 {
905     struct thread *thread;
906
907     if ((thread = get_thread_from_handle( req->handle, THREAD_SUSPEND_RESUME )))
908     {
909         if (thread->state == TERMINATED) set_error( STATUS_ACCESS_DENIED );
910         else reply->count = suspend_thread( thread, 1 );
911         release_object( thread );
912     }
913 }
914
915 /* resume a thread */
916 DECL_HANDLER(resume_thread)
917 {
918     struct thread *thread;
919
920     if ((thread = get_thread_from_handle( req->handle, THREAD_SUSPEND_RESUME )))
921     {
922         if (thread->state == TERMINATED) set_error( STATUS_ACCESS_DENIED );
923         else reply->count = resume_thread( thread );
924         release_object( thread );
925     }
926 }
927
928 /* select on a handle list */
929 DECL_HANDLER(select)
930 {
931     int count = get_req_data_size() / sizeof(int);
932     select_on( count, req->cookie, get_req_data(), req->flags, req->sec, req->usec );
933 }
934
935 /* queue an APC for a thread */
936 DECL_HANDLER(queue_apc)
937 {
938     struct thread *thread;
939     if ((thread = get_thread_from_handle( req->handle, THREAD_SET_CONTEXT )))
940     {
941         thread_queue_apc( thread, NULL, req->func, APC_USER, !req->user, 1, req->param );
942         release_object( thread );
943     }
944 }
945
946 /* get next APC to call */
947 DECL_HANDLER(get_apc)
948 {
949     struct thread_apc *apc;
950     size_t size;
951
952     for (;;)
953     {
954         if (!(apc = thread_dequeue_apc( current, !req->alertable )))
955         {
956             /* no more APCs */
957             reply->func = NULL;
958             reply->type = APC_NONE;
959             return;
960         }
961         /* Optimization: ignore APCs that have a NULL func; they are only used
962          * to wake up a thread, but since we got here the thread woke up already.
963          * Exception: for APC_ASYNC_IO, func == NULL is legal.
964          */
965         if (apc->func || apc->type == APC_ASYNC_IO) break;
966         free( apc );
967     }
968     size = apc->nb_args * sizeof(apc->args[0]);
969     if (size > get_reply_max_size()) size = get_reply_max_size();
970     reply->func = apc->func;
971     reply->type = apc->type;
972     set_reply_data( apc->args, size );
973     free( apc );
974 }
975
976 /* fetch a selector entry for a thread */
977 DECL_HANDLER(get_selector_entry)
978 {
979     struct thread *thread;
980     if ((thread = get_thread_from_handle( req->handle, THREAD_QUERY_INFORMATION )))
981     {
982         get_selector_entry( thread, req->entry, &reply->base, &reply->limit, &reply->flags );
983         release_object( thread );
984     }
985 }