Allow user APCs with NULL 'func' (just wake up the thread).
[wine] / server / thread.c
1 /*
2  * Server-side thread management
3  *
4  * Copyright (C) 1998 Alexandre Julliard
5  */
6
7 #include <assert.h>
8 #include <fcntl.h>
9 #include <signal.h>
10 #include <stdio.h>
11 #include <stdlib.h>
12 #include <string.h>
13 #include <sys/types.h>
14 #include <sys/uio.h>
15 #include <unistd.h>
16 #include <stdarg.h>
17
18
19 #include "winbase.h"
20 #include "winerror.h"
21 #include "server.h"
22 #include "server/thread.h"
23 #include "server/process.h"
24
25
26 /* thread queues */
27
28 struct wait_queue_entry
29 {
30     struct wait_queue_entry *next;
31     struct wait_queue_entry *prev;
32     struct object           *obj;
33     struct thread           *thread;
34 };
35
36 struct thread_wait
37 {
38     int                     count;      /* count of objects */
39     int                     flags;
40     struct timeval          timeout;
41     struct wait_queue_entry queues[1];
42 };
43
44 /* asynchronous procedure calls */
45
46 struct thread_apc
47 {
48     void                   *func;    /* function to call in client */
49     void                   *param;   /* function param */
50 };
51 #define MAX_THREAD_APC  16  /* Max outstanding APCs for a thread */
52
53
54 /* thread operations */
55
56 static void dump_thread( struct object *obj, int verbose );
57 static int thread_signaled( struct object *obj, struct thread *thread );
58 static void destroy_thread( struct object *obj );
59
60 static const struct object_ops thread_ops =
61 {
62     dump_thread,
63     add_queue,
64     remove_queue,
65     thread_signaled,
66     no_satisfied,
67     no_read_fd,
68     no_write_fd,
69     no_flush,
70     no_get_file_info,
71     destroy_thread
72 };
73
74 static struct thread initial_thread;
75 static struct thread *first_thread = &initial_thread;
76
77 /* initialization of a thread structure */
78 static void init_thread( struct thread *thread, int fd )
79 {
80     init_object( &thread->obj, &thread_ops, NULL );
81     thread->client_fd = fd;
82     thread->unix_pid  = 0;  /* not known yet */
83     thread->mutex     = NULL;
84     thread->wait      = NULL;
85     thread->apc       = NULL;
86     thread->apc_count = 0;
87     thread->error     = 0;
88     thread->state     = STARTING;
89     thread->exit_code = 0x103;  /* STILL_ACTIVE */
90     thread->next      = NULL;
91     thread->prev      = NULL;
92     thread->priority  = THREAD_PRIORITY_NORMAL;
93     thread->affinity  = 1;
94     thread->suspend   = 0;
95 }
96
97 /* create the initial thread and start the main server loop */
98 void create_initial_thread( int fd )
99 {
100     current = &initial_thread;
101     init_thread( &initial_thread, fd );
102     initial_thread.process = create_initial_process(); 
103     add_process_thread( initial_thread.process, &initial_thread );
104     add_client( fd, &initial_thread );
105     select_loop();
106 }
107
108 /* create a new thread */
109 struct thread *create_thread( int fd, void *pid, int suspend, int inherit, int *handle )
110 {
111     struct thread *thread;
112     struct process *process;
113
114     if (!(thread = mem_alloc( sizeof(*thread) ))) return NULL;
115
116     if (!(process = get_process_from_id( pid )))
117     {
118         free( thread );
119         return NULL;
120     }
121     init_thread( thread, fd );
122     thread->process = process;
123
124     if (suspend) thread->suspend++;
125
126     thread->next = first_thread;
127     first_thread->prev = thread;
128     first_thread = thread;
129     add_process_thread( process, thread );
130
131     if ((*handle = alloc_handle( current->process, thread,
132                                  THREAD_ALL_ACCESS, inherit )) == -1) goto error;
133     if (add_client( fd, thread ) == -1)
134     {
135         SET_ERROR( ERROR_TOO_MANY_OPEN_FILES );
136         goto error;
137     }
138     return thread;
139
140  error:
141     if (current) close_handle( current->process, *handle );
142     remove_process_thread( process, thread );
143     release_object( thread );
144     return NULL;
145 }
146
147 /* destroy a thread when its refcount is 0 */
148 static void destroy_thread( struct object *obj )
149 {
150     struct thread *thread = (struct thread *)obj;
151     assert( obj->ops == &thread_ops );
152
153     release_object( thread->process );
154     if (thread->next) thread->next->prev = thread->prev;
155     if (thread->prev) thread->prev->next = thread->next;
156     else first_thread = thread->next;
157     if (thread->apc) free( thread->apc );
158     if (debug_level) memset( thread, 0xaa, sizeof(thread) );  /* catch errors */
159     free( thread );
160 }
161
162 /* dump a thread on stdout for debugging purposes */
163 static void dump_thread( struct object *obj, int verbose )
164 {
165     struct thread *thread = (struct thread *)obj;
166     assert( obj->ops == &thread_ops );
167
168     fprintf( stderr, "Thread pid=%d fd=%d\n",
169              thread->unix_pid, thread->client_fd );
170 }
171
172 static int thread_signaled( struct object *obj, struct thread *thread )
173 {
174     struct thread *mythread = (struct thread *)obj;
175     return (mythread->state == TERMINATED);
176 }
177
178 /* get a thread pointer from a thread id (and increment the refcount) */
179 struct thread *get_thread_from_id( void *id )
180 {
181     struct thread *t = first_thread;
182     while (t && (t != id)) t = t->next;
183     if (t) grab_object( t );
184     return t;
185 }
186
187 /* get a thread from a handle (and increment the refcount) */
188 struct thread *get_thread_from_handle( int handle, unsigned int access )
189 {
190     return (struct thread *)get_handle_obj( current->process, handle,
191                                             access, &thread_ops );
192 }
193
194 /* get all information about a thread */
195 void get_thread_info( struct thread *thread,
196                       struct get_thread_info_reply *reply )
197 {
198     reply->tid       = thread;
199     reply->exit_code = thread->exit_code;
200     reply->priority  = thread->priority;
201 }
202
203
204 /* set all information about a thread */
205 void set_thread_info( struct thread *thread,
206                       struct set_thread_info_request *req )
207 {
208     if (req->mask & SET_THREAD_INFO_PRIORITY)
209         thread->priority = req->priority;
210     if (req->mask & SET_THREAD_INFO_AFFINITY)
211     {
212         if (req->affinity != 1) SET_ERROR( ERROR_INVALID_PARAMETER );
213         else thread->affinity = req->affinity;
214     }
215 }
216
217 /* suspend a thread */
218 int suspend_thread( struct thread *thread )
219 {
220     int old_count = thread->suspend;
221     if (thread->suspend < MAXIMUM_SUSPEND_COUNT)
222     {
223         if (!thread->suspend++)
224         {
225             if (thread->unix_pid) kill( thread->unix_pid, SIGSTOP );
226         }
227     }
228     return old_count;
229 }
230
231 /* resume a thread */
232 int resume_thread( struct thread *thread )
233 {
234     int old_count = thread->suspend;
235     if (thread->suspend > 0)
236     {
237         if (!--thread->suspend)
238         {
239             if (thread->unix_pid) kill( thread->unix_pid, SIGCONT );
240         }
241     }
242     return old_count;
243 }
244
245 /* send a reply to a thread */
246 int send_reply( struct thread *thread, int pass_fd, int n,
247                 ... /* arg_1, len_1, ..., arg_n, len_n */ )
248 {
249     struct iovec vec[16];
250     va_list args;
251     int i;
252
253     assert( n < 16 );
254     va_start( args, n );
255     for (i = 0; i < n; i++)
256     {
257         vec[i].iov_base = va_arg( args, void * );
258         vec[i].iov_len  = va_arg( args, int );
259     }
260     va_end( args );
261     return send_reply_v( thread->client_fd, thread->error, pass_fd, vec, n );
262 }
263
264 /* add a thread to an object wait queue; return 1 if OK, 0 on error */
265 int add_queue( struct object *obj, struct wait_queue_entry *entry )
266 {
267     grab_object( obj );
268     entry->obj    = obj;
269     entry->prev   = obj->tail;
270     entry->next   = NULL;
271     if (obj->tail) obj->tail->next = entry;
272     else obj->head = entry;
273     obj->tail = entry;
274     return 1;
275 }
276
277 /* remove a thread from an object wait queue */
278 void remove_queue( struct object *obj, struct wait_queue_entry *entry )
279 {
280     if (entry->next) entry->next->prev = entry->prev;
281     else obj->tail = entry->prev;
282     if (entry->prev) entry->prev->next = entry->next;
283     else obj->head = entry->next;
284     release_object( obj );
285 }
286
287 /* finish waiting */
288 static void end_wait( struct thread *thread )
289 {
290     struct thread_wait *wait = thread->wait;
291     struct wait_queue_entry *entry;
292     int i;
293
294     assert( wait );
295     for (i = 0, entry = wait->queues; i < wait->count; i++, entry++)
296         entry->obj->ops->remove_queue( entry->obj, entry );
297     if (wait->flags & SELECT_TIMEOUT) set_select_timeout( thread->client_fd, NULL );
298     free( wait );
299     thread->wait = NULL;
300 }
301
302 /* build the thread wait structure */
303 static int wait_on( struct thread *thread, int count,
304                     int *handles, int flags, int timeout )
305 {
306     struct thread_wait *wait;
307     struct wait_queue_entry *entry;
308     struct object *obj;
309     int i;
310
311     if ((count < 0) || (count > MAXIMUM_WAIT_OBJECTS))
312     {
313         SET_ERROR( ERROR_INVALID_PARAMETER );
314         return 0;
315     }
316     if (!(wait = mem_alloc( sizeof(*wait) + (count-1) * sizeof(*entry) ))) return 0;
317     thread->wait  = wait;
318     wait->count   = count;
319     wait->flags   = flags;
320     if (flags & SELECT_TIMEOUT)
321     {
322         gettimeofday( &wait->timeout, 0 );
323         if (timeout)
324         {
325             wait->timeout.tv_usec += (timeout % 1000) * 1000;
326             if (wait->timeout.tv_usec >= 1000000)
327             {
328                 wait->timeout.tv_usec -= 1000000;
329                 wait->timeout.tv_sec++;
330             }
331             wait->timeout.tv_sec += timeout / 1000;
332         }
333     }
334
335     for (i = 0, entry = wait->queues; i < count; i++, entry++)
336     {
337         if (!(obj = get_handle_obj( thread->process, handles[i],
338                                     SYNCHRONIZE, NULL )))
339         {
340             wait->count = i - 1;
341             end_wait( thread );
342             return 0;
343         }
344         entry->thread = thread;
345         if (!obj->ops->add_queue( obj, entry ))
346         {
347             wait->count = i - 1;
348             end_wait( thread );
349             return 0;
350         }
351         release_object( obj );
352     }
353     return 1;
354 }
355
356 /* check if the thread waiting condition is satisfied */
357 static int check_wait( struct thread *thread, int *signaled )
358 {
359     int i;
360     struct thread_wait *wait = thread->wait;
361     struct wait_queue_entry *entry = wait->queues;
362
363     assert( wait );
364     if (wait->flags & SELECT_ALL)
365     {
366         for (i = 0, entry = wait->queues; i < wait->count; i++, entry++)
367             if (!entry->obj->ops->signaled( entry->obj, thread )) goto other_checks;
368         /* Wait satisfied: tell it to all objects */
369         *signaled = 0;
370         for (i = 0, entry = wait->queues; i < wait->count; i++, entry++)
371             if (entry->obj->ops->satisfied( entry->obj, thread ))
372                 *signaled = STATUS_ABANDONED_WAIT_0;
373         return 1;
374     }
375     else
376     {
377         for (i = 0, entry = wait->queues; i < wait->count; i++, entry++)
378         {
379             if (!entry->obj->ops->signaled( entry->obj, thread )) continue;
380             /* Wait satisfied: tell it to the object */
381             *signaled = i;
382             if (entry->obj->ops->satisfied( entry->obj, thread ))
383                 *signaled += STATUS_ABANDONED_WAIT_0;
384             return 1;
385         }
386     }
387
388  other_checks:
389     if ((wait->flags & SELECT_ALERTABLE) && thread->apc)
390     {
391         *signaled = STATUS_USER_APC;
392         return 1;
393     }
394     if (wait->flags & SELECT_TIMEOUT)
395     {
396         struct timeval now;
397         gettimeofday( &now, NULL );
398         if ((now.tv_sec > wait->timeout.tv_sec) ||
399             ((now.tv_sec == wait->timeout.tv_sec) &&
400              (now.tv_usec >= wait->timeout.tv_usec)))
401         {
402             *signaled = STATUS_TIMEOUT;
403             return 1;
404         }
405     }
406     return 0;
407 }
408
409 /* send the select reply to wake up the client */
410 static void send_select_reply( struct thread *thread, int signaled )
411 {
412     struct select_reply reply;
413     reply.signaled = signaled;
414     if ((signaled == STATUS_USER_APC) && thread->apc)
415     {
416         struct thread_apc *apc = thread->apc;
417         int len = thread->apc_count * sizeof(*apc);
418         thread->apc = NULL;
419         thread->apc_count = 0;
420         send_reply( thread, -1, 2, &reply, sizeof(reply),
421                     apc, len );
422         free( apc );
423     }
424     else send_reply( thread, -1, 1, &reply, sizeof(reply) );
425 }
426
427 /* attempt to wake up a thread */
428 /* return 1 if OK, 0 if the wait condition is still not satisfied */
429 static int wake_thread( struct thread *thread )
430 {
431     int signaled;
432
433     if (!check_wait( thread, &signaled )) return 0;
434     end_wait( thread );
435     send_select_reply( thread, signaled );
436     return 1;
437 }
438
439 /* sleep on a list of objects */
440 void sleep_on( struct thread *thread, int count, int *handles, int flags, int timeout )
441 {
442     assert( !thread->wait );
443     if (!wait_on( thread, count, handles, flags, timeout ))
444     {
445         /* return an error */
446         send_select_reply( thread, -1 );
447         return;
448     }
449     if (!wake_thread( thread ))
450     {
451         /* we need to wait */
452         if (flags & SELECT_TIMEOUT)
453             set_select_timeout( thread->client_fd, &thread->wait->timeout );
454     }
455 }
456
457 /* timeout for the current thread */
458 void thread_timeout(void)
459 {
460     assert( current->wait );
461     end_wait( current );
462     send_select_reply( current, STATUS_TIMEOUT );
463 }
464
465 /* attempt to wake threads sleeping on the object wait queue */
466 void wake_up( struct object *obj, int max )
467 {
468     struct wait_queue_entry *entry = obj->head;
469
470     while (entry)
471     {
472         struct wait_queue_entry *next = entry->next;
473         if (wake_thread( entry->thread ))
474         {
475             if (max && !--max) break;
476         }
477         entry = next;
478     }
479 }
480
481 /* queue an async procedure call */
482 int thread_queue_apc( struct thread *thread, void *func, void *param )
483 {
484     struct thread_apc *apc;
485     if (!thread->apc)
486     {
487         if (!(thread->apc = mem_alloc( MAX_THREAD_APC * sizeof(*apc) )))
488             return 0;
489         thread->apc_count = 0;
490     }
491     else if (thread->apc_count >= MAX_THREAD_APC) return 0;
492     thread->apc[thread->apc_count].func  = func;
493     thread->apc[thread->apc_count].param = param;
494     thread->apc_count++;
495     wake_thread( thread );
496     return 1;
497 }
498
499 /* kill a thread on the spot */
500 void kill_thread( struct thread *thread, int exit_code )
501 {
502     if (thread->state == TERMINATED) return;  /* already killed */
503     if (thread->unix_pid) kill( thread->unix_pid, SIGTERM );
504     remove_client( thread->client_fd, exit_code ); /* this will call thread_killed */
505 }
506
507 /* a thread has been killed */
508 void thread_killed( struct thread *thread, int exit_code )
509 {
510     thread->state = TERMINATED;
511     thread->exit_code = exit_code;
512     if (thread->wait) end_wait( thread );
513     abandon_mutexes( thread );
514     remove_process_thread( thread->process, thread );
515     wake_up( &thread->obj, 0 );
516     release_object( thread );
517 }