* Server-side thread management
*
* Copyright (C) 1998 Alexandre Julliard
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include "config.h"
+#include "wine/port.h"
#include <assert.h>
+#include <errno.h>
#include <fcntl.h>
#include <signal.h>
+#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
-#ifdef HAVE_SYS_MMAN_H
-#include <sys/mman.h>
-#endif
#include <sys/types.h>
-#ifdef HAVE_SYS_SOCKET_H
-# include <sys/socket.h>
-#endif
-#include <sys/uio.h>
#include <unistd.h>
-#include <stdarg.h>
-
+#include <time.h>
+#ifdef HAVE_POLL_H
+#include <poll.h>
+#endif
+#include "windef.h"
#include "winbase.h"
+#include "file.h"
#include "handle.h"
#include "process.h"
#include "thread.h"
#include "request.h"
+#include "user.h"
/* thread queues */
-struct wait_queue_entry
-{
- struct wait_queue_entry *next;
- struct wait_queue_entry *prev;
- struct object *obj;
- struct thread *thread;
-};
-
struct thread_wait
{
+ struct thread_wait *next; /* next wait structure for this thread */
+ struct thread *thread; /* owner thread */
int count; /* count of objects */
int flags;
+ void *cookie; /* magic cookie to return to client */
struct timeval timeout;
struct timeout_user *user;
- sleep_reply reply; /* function to build the reply */
struct wait_queue_entry queues[1];
};
struct thread_apc
{
- void *func; /* function to call in client */
- void *param; /* function param */
+ struct list entry; /* queue linked list */
+ struct object *owner; /* object that queued this apc */
+ void *func; /* function to call in client */
+ enum apc_type type; /* type of apc function */
+ int nb_args; /* number of arguments */
+ void *arg1; /* function arguments */
+ void *arg2;
+ void *arg3;
};
-#define MAX_THREAD_APC 16 /* Max outstanding APCs for a thread */
/* thread operations */
static void dump_thread( struct object *obj, int verbose );
static int thread_signaled( struct object *obj, struct thread *thread );
-extern void thread_poll_event( struct object *obj, int event );
+static void thread_poll_event( struct fd *fd, int event );
static void destroy_thread( struct object *obj );
+static struct thread_apc *thread_dequeue_apc( struct thread *thread, int system_only );
static const struct object_ops thread_ops =
{
remove_queue, /* remove_queue */
thread_signaled, /* signaled */
no_satisfied, /* satisfied */
+ no_signal, /* signal */
+ no_get_fd, /* get_fd */
+ destroy_thread /* destroy */
+};
+
+static const struct fd_ops thread_fd_ops =
+{
NULL, /* get_poll_events */
thread_poll_event, /* poll_event */
- no_read_fd, /* get_read_fd */
- no_write_fd, /* get_write_fd */
no_flush, /* flush */
no_get_file_info, /* get_file_info */
- destroy_thread /* destroy */
+ no_queue_async, /* queue_async */
+ no_cancel_async /* cancel_async */
};
-static struct thread *first_thread;
+static struct list thread_list = LIST_INIT(thread_list);
static struct thread *booting_thread;
-/* allocate the buffer for the communication with the client */
-static int alloc_client_buffer( struct thread *thread )
-{
- struct get_thread_buffer_request *req;
- int fd;
-
- if ((fd = create_anonymous_file()) == -1) return -1;
- if (ftruncate( fd, MAX_REQUEST_LENGTH ) == -1) goto error;
- if ((thread->buffer = mmap( 0, MAX_REQUEST_LENGTH, PROT_READ | PROT_WRITE,
- MAP_SHARED, fd, 0 )) == (void*)-1) goto error;
- /* build the first request into the buffer and send it */
- req = thread->buffer;
- req->pid = get_process_id( thread->process );
- req->tid = get_thread_id( thread );
- req->boot = (thread == booting_thread);
- req->version = SERVER_PROTOCOL_VERSION;
- set_reply_fd( thread, fd );
- send_reply( thread );
- return 1;
+/* initialize the structure for a newly allocated thread */
+inline static void init_thread_structure( struct thread *thread )
+{
+ int i;
- error:
- file_set_error();
- if (fd != -1) close( fd );
- return 0;
+ thread->unix_pid = -1; /* not known yet */
+ thread->unix_tid = -1; /* not known yet */
+ thread->context = NULL;
+ thread->teb = NULL;
+ thread->debug_ctx = NULL;
+ thread->debug_event = NULL;
+ thread->queue = NULL;
+ thread->wait = NULL;
+ thread->error = 0;
+ thread->req_data = NULL;
+ thread->req_toread = 0;
+ thread->reply_data = NULL;
+ thread->reply_towrite = 0;
+ thread->request_fd = NULL;
+ thread->reply_fd = NULL;
+ thread->wait_fd = NULL;
+ thread->state = RUNNING;
+ thread->attached = 0;
+ thread->exit_code = 0;
+ thread->priority = THREAD_PRIORITY_NORMAL;
+ thread->affinity = 1;
+ thread->suspend = 0;
+ thread->creation_time = time(NULL);
+ thread->exit_time = 0;
+
+ list_init( &thread->mutex_list );
+ list_init( &thread->system_apc );
+ list_init( &thread->user_apc );
+
+ for (i = 0; i < MAX_INFLIGHT_FDS; i++)
+ thread->inflight[i].server = thread->inflight[i].client = -1;
}
/* create a new thread */
-struct thread *create_thread( int fd, struct process *process, int suspend )
+struct thread *create_thread( int fd, struct process *process )
{
struct thread *thread;
- int flags = fcntl( fd, F_GETFL, 0 );
- fcntl( fd, F_SETFL, flags | O_NONBLOCK );
-
- if (!(thread = alloc_object( &thread_ops, fd ))) return NULL;
-
- thread->unix_pid = 0; /* not known yet */
- thread->context = NULL;
- thread->teb = NULL;
- thread->mutex = NULL;
- thread->debug_ctx = NULL;
- thread->wait = NULL;
- thread->apc = NULL;
- thread->apc_count = 0;
- thread->error = 0;
- thread->pass_fd = -1;
- thread->state = RUNNING;
- thread->attached = 0;
- thread->exit_code = STILL_ACTIVE;
- thread->next = NULL;
- thread->prev = NULL;
- thread->priority = THREAD_PRIORITY_NORMAL;
- thread->affinity = 1;
- thread->suspend = (suspend != 0);
- thread->buffer = (void *)-1;
- thread->last_req = REQ_GET_THREAD_BUFFER;
- thread->process = (struct process *)grab_object( process );
+ if (!(thread = alloc_object( &thread_ops ))) return NULL;
+ init_thread_structure( thread );
+
+ thread->process = (struct process *)grab_object( process );
if (!current) current = thread;
if (!booting_thread) /* first thread ever */
lock_master_socket(1);
}
- if ((thread->next = first_thread) != NULL) thread->next->prev = thread;
- first_thread = thread;
- add_process_thread( process, thread );
+ list_add_head( &thread_list, &thread->entry );
- set_select_events( &thread->obj, POLLIN ); /* start listening to events */
- if (!alloc_client_buffer( thread )) goto error;
- return thread;
+ if (!(thread->id = alloc_ptid( thread )))
+ {
+ release_object( thread );
+ return NULL;
+ }
+ if (!(thread->request_fd = create_anonymous_fd( &thread_fd_ops, fd, &thread->obj )))
+ {
+ release_object( thread );
+ return NULL;
+ }
- error:
- remove_process_thread( process, thread );
- release_object( thread );
- return NULL;
+ thread->token = (struct token *) grab_object( process->token );
+
+ set_fd_events( thread->request_fd, POLLIN ); /* start listening to events */
+ add_process_thread( thread->process, thread );
+ return thread;
}
/* handle a client event */
-void thread_poll_event( struct object *obj, int event )
+static void thread_poll_event( struct fd *fd, int event )
{
- struct thread *thread = (struct thread *)obj;
- assert( obj->ops == &thread_ops );
+ struct thread *thread = get_fd_user( fd );
+ assert( thread->obj.ops == &thread_ops );
- if (event & (POLLERR | POLLHUP)) kill_thread( thread, BROKEN_PIPE );
- else
+ if (event & (POLLERR | POLLHUP)) kill_thread( thread, 0 );
+ else if (event & POLLIN) read_request( thread );
+ else if (event & POLLOUT) write_reply( thread );
+}
+
+/* cleanup everything that is no longer needed by a dead thread */
+/* used by destroy_thread and kill_thread */
+static void cleanup_thread( struct thread *thread )
+{
+ int i;
+ struct thread_apc *apc;
+
+ while ((apc = thread_dequeue_apc( thread, 0 ))) free( apc );
+ if (thread->req_data) free( thread->req_data );
+ if (thread->reply_data) free( thread->reply_data );
+ if (thread->request_fd) release_object( thread->request_fd );
+ if (thread->reply_fd) release_object( thread->reply_fd );
+ if (thread->wait_fd) release_object( thread->wait_fd );
+ free_msg_queue( thread );
+ cleanup_clipboard_thread(thread);
+ destroy_thread_windows( thread );
+ for (i = 0; i < MAX_INFLIGHT_FDS; i++)
{
- if (event & POLLOUT) write_request( thread );
- if (event & POLLIN) read_request( thread );
+ if (thread->inflight[i].client != -1)
+ {
+ close( thread->inflight[i].server );
+ thread->inflight[i].client = thread->inflight[i].server = -1;
+ }
+ }
+ thread->req_data = NULL;
+ thread->reply_data = NULL;
+ thread->request_fd = NULL;
+ thread->reply_fd = NULL;
+ thread->wait_fd = NULL;
+
+ if (thread == booting_thread) /* killing booting thread */
+ {
+ booting_thread = NULL;
+ lock_master_socket(0);
}
}
/* destroy a thread when its refcount is 0 */
static void destroy_thread( struct object *obj )
{
+ struct thread_apc *apc;
struct thread *thread = (struct thread *)obj;
assert( obj->ops == &thread_ops );
assert( !thread->debug_ctx ); /* cannot still be debugging something */
+ list_remove( &thread->entry );
+ while ((apc = thread_dequeue_apc( thread, 0 ))) free( apc );
+ cleanup_thread( thread );
release_object( thread->process );
- if (thread->next) thread->next->prev = thread->prev;
- if (thread->prev) thread->prev->next = thread->next;
- else first_thread = thread->next;
- if (thread->apc) free( thread->apc );
- if (thread->buffer != (void *)-1) munmap( thread->buffer, MAX_REQUEST_LENGTH );
- if (thread->pass_fd != -1) close( thread->pass_fd );
+ if (thread->id) free_ptid( thread->id );
+ if (thread->token) release_object( thread->token );
}
/* dump a thread on stdout for debugging purposes */
struct thread *thread = (struct thread *)obj;
assert( obj->ops == &thread_ops );
- fprintf( stderr, "Thread pid=%d teb=%p state=%d\n",
- thread->unix_pid, thread->teb, thread->state );
+ fprintf( stderr, "Thread id=%04x unix pid=%d unix tid=%d teb=%p state=%d\n",
+ thread->id, thread->unix_pid, thread->unix_tid, thread->teb, thread->state );
}
static int thread_signaled( struct object *obj, struct thread *thread )
}
/* get a thread pointer from a thread id (and increment the refcount) */
-struct thread *get_thread_from_id( void *id )
+struct thread *get_thread_from_id( thread_id_t id )
{
- struct thread *t = first_thread;
- while (t && (t != id)) t = t->next;
- if (t) grab_object( t );
- return t;
+ struct object *obj = get_ptid_entry( id );
+
+ if (obj && obj->ops == &thread_ops) return (struct thread *)grab_object( obj );
+ set_win32_error( ERROR_INVALID_THREAD_ID );
+ return NULL;
}
/* get a thread from a handle (and increment the refcount) */
-struct thread *get_thread_from_handle( int handle, unsigned int access )
+struct thread *get_thread_from_handle( obj_handle_t handle, unsigned int access )
{
return (struct thread *)get_handle_obj( current->process, handle,
access, &thread_ops );
/* find a thread from a Unix pid */
struct thread *get_thread_from_pid( int pid )
{
- struct thread *t = first_thread;
- while (t && (t->unix_pid != pid)) t = t->next;
- return t;
+ struct thread *thread;
+
+ LIST_FOR_EACH_ENTRY( thread, &thread_list, struct thread, entry )
+ {
+ if (thread->unix_tid == pid) return thread;
+ }
+ LIST_FOR_EACH_ENTRY( thread, &thread_list, struct thread, entry )
+ {
+ if (thread->unix_pid == pid) return thread;
+ }
+ return NULL;
}
/* set all information about a thread */
static void set_thread_info( struct thread *thread,
- struct set_thread_info_request *req )
+ const struct set_thread_info_request *req )
{
if (req->mask & SET_THREAD_INFO_PRIORITY)
thread->priority = req->priority;
}
}
+/* stop a thread (at the Unix level) */
+void stop_thread( struct thread *thread )
+{
+ /* can't stop a thread while initialisation is in progress */
+ if (is_process_init_done(thread->process)) send_thread_signal( thread, SIGUSR1 );
+}
+
/* suspend a thread */
-int suspend_thread( struct thread *thread, int check_limit )
+static int suspend_thread( struct thread *thread )
{
int old_count = thread->suspend;
- if (thread->suspend < MAXIMUM_SUSPEND_COUNT || !check_limit)
+ if (thread->suspend < MAXIMUM_SUSPEND_COUNT)
{
if (!(thread->process->suspend + thread->suspend++)) stop_thread( thread );
}
}
/* resume a thread */
-int resume_thread( struct thread *thread )
+static int resume_thread( struct thread *thread )
{
int old_count = thread->suspend;
if (thread->suspend > 0)
{
- if (!(--thread->suspend + thread->process->suspend)) continue_thread( thread );
+ if (!(--thread->suspend + thread->process->suspend)) wake_thread( thread );
}
return old_count;
}
-/* suspend all threads but the current */
-void suspend_all_threads( void )
-{
- struct thread *thread;
- for ( thread = first_thread; thread; thread = thread->next )
- if ( thread != current )
- suspend_thread( thread, 0 );
-}
-
-/* resume all threads but the current */
-void resume_all_threads( void )
-{
- struct thread *thread;
- for ( thread = first_thread; thread; thread = thread->next )
- if ( thread != current )
- resume_thread( thread );
-}
-
/* add a thread to an object wait queue; return 1 if OK, 0 on error */
int add_queue( struct object *obj, struct wait_queue_entry *entry )
{
grab_object( obj );
- entry->obj = obj;
- entry->prev = obj->tail;
- entry->next = NULL;
- if (obj->tail) obj->tail->next = entry;
- else obj->head = entry;
- obj->tail = entry;
+ entry->obj = obj;
+ list_add_tail( &obj->wait_queue, &entry->entry );
return 1;
}
/* remove a thread from an object wait queue */
void remove_queue( struct object *obj, struct wait_queue_entry *entry )
{
- if (entry->next) entry->next->prev = entry->prev;
- else obj->tail = entry->prev;
- if (entry->prev) entry->prev->next = entry->next;
- else obj->head = entry->next;
+ list_remove( &entry->entry );
release_object( obj );
}
for (i = 0, entry = wait->queues; i < wait->count; i++, entry++)
entry->obj->ops->remove_queue( entry->obj, entry );
if (wait->user) remove_timeout_user( wait->user );
+ thread->wait = wait->next;
free( wait );
- thread->wait = NULL;
}
/* build the thread wait structure */
-static int wait_on( int count, struct object *objects[], int flags,
- int timeout, sleep_reply func )
+static int wait_on( int count, struct object *objects[], int flags, const abs_time_t *timeout )
{
struct thread_wait *wait;
struct wait_queue_entry *entry;
int i;
if (!(wait = mem_alloc( sizeof(*wait) + (count-1) * sizeof(*entry) ))) return 0;
- current->wait = wait;
+ wait->next = current->wait;
+ wait->thread = current;
wait->count = count;
wait->flags = flags;
wait->user = NULL;
- wait->reply = func;
+ current->wait = wait;
if (flags & SELECT_TIMEOUT)
{
- gettimeofday( &wait->timeout, 0 );
- add_timeout( &wait->timeout, timeout );
+ wait->timeout.tv_sec = timeout->sec;
+ wait->timeout.tv_usec = timeout->usec;
}
for (i = 0, entry = wait->queues; i < count; i++, entry++)
}
/* check if the thread waiting condition is satisfied */
-static int check_wait( struct thread *thread, struct object **object )
+static int check_wait( struct thread *thread )
{
int i, signaled;
struct thread_wait *wait = thread->wait;
struct wait_queue_entry *entry = wait->queues;
+ /* Suspended threads may not acquire locks */
+ if( thread->process->suspend + thread->suspend > 0 ) return -1;
+
assert( wait );
- *object = NULL;
if (wait->flags & SELECT_ALL)
{
int not_ok = 0;
if (!entry->obj->ops->signaled( entry->obj, thread )) continue;
/* Wait satisfied: tell it to the object */
signaled = i;
- *object = entry->obj;
if (entry->obj->ops->satisfied( entry->obj, thread ))
signaled = i + STATUS_ABANDONED_WAIT_0;
return signaled;
}
other_checks:
- if ((wait->flags & SELECT_ALERTABLE) && thread->apc) return STATUS_USER_APC;
+ if ((wait->flags & SELECT_INTERRUPTIBLE) && !list_empty(&thread->system_apc)) return STATUS_USER_APC;
+ if ((wait->flags & SELECT_ALERTABLE) && !list_empty(&thread->user_apc)) return STATUS_USER_APC;
if (wait->flags & SELECT_TIMEOUT)
{
struct timeval now;
return -1;
}
-/* build a reply to the select request */
-static void build_select_reply( struct thread *thread, struct object *obj, int signaled )
+/* send the wakeup signal to a thread */
+static int send_thread_wakeup( struct thread *thread, void *cookie, int signaled )
{
- struct select_request *req = get_req_ptr( thread );
- req->signaled = signaled;
+ struct wake_up_reply reply;
+ int ret;
+
+ reply.cookie = cookie;
+ reply.signaled = signaled;
+ if ((ret = write( get_unix_fd( thread->wait_fd ), &reply, sizeof(reply) )) == sizeof(reply))
+ return 0;
+ if (ret >= 0)
+ fatal_protocol_error( thread, "partial wakeup write %d\n", ret );
+ else if (errno == EPIPE)
+ kill_thread( thread, 0 ); /* normal death */
+ else
+ fatal_protocol_perror( thread, "write" );
+ return -1;
}
/* attempt to wake up a thread */
-/* return 1 if OK, 0 if the wait condition is still not satisfied */
-static int wake_thread( struct thread *thread )
-{
- int signaled;
- struct object *object;
- if ((signaled = check_wait( thread, &object )) == -1) return 0;
- thread->error = 0;
- thread->wait->reply( thread, object, signaled );
- end_wait( thread );
- return 1;
+/* return >0 if OK, 0 if the wait condition is still not satisfied */
+int wake_thread( struct thread *thread )
+{
+ int signaled, count;
+ void *cookie;
+
+ for (count = 0; thread->wait; count++)
+ {
+ if ((signaled = check_wait( thread )) == -1) break;
+
+ cookie = thread->wait->cookie;
+ if (debug_level) fprintf( stderr, "%04x: *wakeup* signaled=%d cookie=%p\n",
+ thread->id, signaled, cookie );
+ end_wait( thread );
+ if (send_thread_wakeup( thread, cookie, signaled ) == -1) /* error */
+ break;
+ }
+ return count;
}
/* thread wait timeout */
static void thread_timeout( void *ptr )
{
- struct thread *thread = ptr;
- if (debug_level) fprintf( stderr, "%08x: *timeout*\n", (unsigned int)thread );
- assert( thread->wait );
- thread->error = 0;
- thread->wait->user = NULL;
- thread->wait->reply( thread, NULL, STATUS_TIMEOUT );
+ struct thread_wait *wait = ptr;
+ struct thread *thread = wait->thread;
+ void *cookie = wait->cookie;
+
+ wait->user = NULL;
+ if (thread->wait != wait) return; /* not the top-level wait, ignore it */
+ if (thread->suspend + thread->process->suspend > 0) return; /* suspended, ignore it */
+
+ if (debug_level) fprintf( stderr, "%04x: *wakeup* signaled=%d cookie=%p\n",
+ thread->id, STATUS_TIMEOUT, cookie );
end_wait( thread );
- send_reply( thread );
+ if (send_thread_wakeup( thread, cookie, STATUS_TIMEOUT ) == -1) return;
+ /* check if other objects have become signaled in the meantime */
+ wake_thread( thread );
}
-/* sleep on a list of objects */
-int sleep_on( int count, struct object *objects[], int flags, int timeout, sleep_reply func )
+/* try signaling an event flag, a semaphore or a mutex */
+static int signal_object( obj_handle_t handle )
{
- assert( !current->wait );
- if (!wait_on( count, objects, flags, timeout, func )) return 0;
- if (wake_thread( current )) return 1;
- /* now we need to wait */
- if (flags & SELECT_TIMEOUT)
+ struct object *obj;
+ int ret = 0;
+
+ obj = get_handle_obj( current->process, handle, 0, NULL );
+ if (obj)
{
- if (!(current->wait->user = add_timeout_user( ¤t->wait->timeout,
- thread_timeout, current )))
- {
- end_wait( current );
- return 0;
- }
+ ret = obj->ops->signal( obj, get_handle_access( current->process, handle ));
+ release_object( obj );
}
- return 1;
+ return ret;
}
/* select on a list of handles */
-static int select_on( int count, int *handles, int flags, int timeout )
+static void select_on( int count, void *cookie, const obj_handle_t *handles,
+ int flags, const abs_time_t *timeout, obj_handle_t signal_obj )
{
- int ret = 0;
- int i;
+ int ret, i;
struct object *objects[MAXIMUM_WAIT_OBJECTS];
if ((count < 0) || (count > MAXIMUM_WAIT_OBJECTS))
{
set_error( STATUS_INVALID_PARAMETER );
- return 0;
+ return;
}
for (i = 0; i < count; i++)
{
if (!(objects[i] = get_handle_obj( current->process, handles[i], SYNCHRONIZE, NULL )))
break;
}
- if (i == count) ret = sleep_on( count, objects, flags, timeout, build_select_reply );
+
+ if (i < count) goto done;
+ if (!wait_on( count, objects, flags, timeout )) goto done;
+
+ /* signal the object */
+ if (signal_obj)
+ {
+ if (!signal_object( signal_obj ))
+ {
+ end_wait( current );
+ goto done;
+ }
+ /* check if we woke ourselves up */
+ if (!current->wait) goto done;
+ }
+
+ if ((ret = check_wait( current )) != -1)
+ {
+ /* condition is already satisfied */
+ end_wait( current );
+ set_error( ret );
+ goto done;
+ }
+
+ /* now we need to wait */
+ if (flags & SELECT_TIMEOUT)
+ {
+ if (!(current->wait->user = add_timeout_user( ¤t->wait->timeout,
+ thread_timeout, current->wait )))
+ {
+ end_wait( current );
+ goto done;
+ }
+ }
+ current->wait->cookie = cookie;
+ set_error( STATUS_PENDING );
+
+done:
while (--i >= 0) release_object( objects[i] );
- return ret;
}
/* attempt to wake threads sleeping on the object wait queue */
void wake_up( struct object *obj, int max )
{
- struct wait_queue_entry *entry = obj->head;
+ struct list *ptr, *next;
- while (entry)
+ LIST_FOR_EACH_SAFE( ptr, next, &obj->wait_queue )
{
- struct thread *thread = entry->thread;
- entry = entry->next;
- if (wake_thread( thread ))
+ struct wait_queue_entry *entry = LIST_ENTRY( ptr, struct wait_queue_entry, entry );
+ if (wake_thread( entry->thread ))
{
- send_reply( thread );
if (max && !--max) break;
}
}
}
/* queue an async procedure call */
-static int thread_queue_apc( struct thread *thread, void *func, void *param )
+int thread_queue_apc( struct thread *thread, struct object *owner, void *func,
+ enum apc_type type, int system, void *arg1, void *arg2, void *arg3 )
{
struct thread_apc *apc;
- if (!thread->apc)
+ struct list *queue = system ? &thread->system_apc : &thread->user_apc;
+
+ /* cancel a possible previous APC with the same owner */
+ if (owner) thread_cancel_apc( thread, owner, system );
+ if (thread->state == TERMINATED) return 0;
+
+ if (!(apc = mem_alloc( sizeof(*apc) ))) return 0;
+ apc->owner = owner;
+ apc->func = func;
+ apc->type = type;
+ apc->arg1 = arg1;
+ apc->arg2 = arg2;
+ apc->arg3 = arg3;
+ list_add_tail( queue, &apc->entry );
+ if (!list_prev( queue, &apc->entry )) /* first one */
+ wake_thread( thread );
+
+ return 1;
+}
+
+/* cancel the async procedure call owned by a specific object */
+void thread_cancel_apc( struct thread *thread, struct object *owner, int system )
+{
+ struct thread_apc *apc;
+ struct list *queue = system ? &thread->system_apc : &thread->user_apc;
+ LIST_FOR_EACH_ENTRY( apc, queue, struct thread_apc, entry )
{
- if (!(thread->apc = mem_alloc( MAX_THREAD_APC * sizeof(*apc) )))
- return 0;
- thread->apc_count = 0;
+ if (apc->owner != owner) continue;
+ list_remove( &apc->entry );
+ free( apc );
+ return;
}
- else if (thread->apc_count >= MAX_THREAD_APC) return 0;
- thread->apc[thread->apc_count].func = func;
- thread->apc[thread->apc_count].param = param;
- thread->apc_count++;
- if (thread->wait)
+}
+
+/* remove the head apc from the queue; the returned pointer must be freed by the caller */
+static struct thread_apc *thread_dequeue_apc( struct thread *thread, int system_only )
+{
+ struct thread_apc *apc = NULL;
+ struct list *ptr = list_head( &thread->system_apc );
+
+ if (!ptr && !system_only) ptr = list_head( &thread->user_apc );
+ if (ptr)
{
- if (wake_thread( thread )) send_reply( thread );
+ apc = LIST_ENTRY( ptr, struct thread_apc, entry );
+ list_remove( ptr );
}
- return 1;
+ return apc;
+}
+
+/* add an fd to the inflight list */
+/* return list index, or -1 on error */
+int thread_add_inflight_fd( struct thread *thread, int client, int server )
+{
+ int i;
+
+ if (server == -1) return -1;
+ if (client == -1)
+ {
+ close( server );
+ return -1;
+ }
+
+ /* first check if we already have an entry for this fd */
+ for (i = 0; i < MAX_INFLIGHT_FDS; i++)
+ if (thread->inflight[i].client == client)
+ {
+ close( thread->inflight[i].server );
+ thread->inflight[i].server = server;
+ return i;
+ }
+
+ /* now find a free spot to store it */
+ for (i = 0; i < MAX_INFLIGHT_FDS; i++)
+ if (thread->inflight[i].client == -1)
+ {
+ thread->inflight[i].client = client;
+ thread->inflight[i].server = server;
+ return i;
+ }
+ return -1;
+}
+
+/* get an inflight fd and purge it from the list */
+/* the fd must be closed when no longer used */
+int thread_get_inflight_fd( struct thread *thread, int client )
+{
+ int i, ret;
+
+ if (client == -1) return -1;
+
+ do
+ {
+ for (i = 0; i < MAX_INFLIGHT_FDS; i++)
+ {
+ if (thread->inflight[i].client == client)
+ {
+ ret = thread->inflight[i].server;
+ thread->inflight[i].server = thread->inflight[i].client = -1;
+ return ret;
+ }
+ }
+ } while (!receive_fd( thread->process )); /* in case it is still in the socket buffer */
+ return -1;
}
/* retrieve an LDT selector entry */
unsigned int *base, unsigned int *limit,
unsigned char *flags )
{
- if (!thread->process->ldt_copy || !thread->process->ldt_flags)
+ if (!thread->process->ldt_copy)
{
set_error( STATUS_ACCESS_DENIED );
return;
set_error( STATUS_INVALID_PARAMETER ); /* FIXME */
return;
}
- suspend_thread( thread, 0 );
- if (thread->attached)
+ if (suspend_for_ptrace( thread ))
{
unsigned char flags_buf[4];
- int *addr = (int *)thread->process->ldt_copy + 2 * entry;
+ int *addr = (int *)thread->process->ldt_copy + entry;
if (read_thread_int( thread, addr, base ) == -1) goto done;
- if (read_thread_int( thread, addr + 1, limit ) == -1) goto done;
- addr = (int *)thread->process->ldt_flags + (entry >> 2);
+ if (read_thread_int( thread, addr + 8192, limit ) == -1) goto done;
+ addr = (int *)thread->process->ldt_copy + 2*8192 + (entry >> 2);
if (read_thread_int( thread, addr, (int *)flags_buf ) == -1) goto done;
*flags = flags_buf[entry & 3];
+ done:
+ resume_after_ptrace( thread );
}
- else set_error( STATUS_ACCESS_DENIED );
- done:
- resume_thread( thread );
}
/* kill a thread on the spot */
-void kill_thread( struct thread *thread, int exit_code )
+void kill_thread( struct thread *thread, int violent_death )
{
if (thread->state == TERMINATED) return; /* already killed */
thread->state = TERMINATED;
- thread->exit_code = exit_code;
+ thread->exit_time = time(NULL);
if (current == thread) current = NULL;
if (debug_level)
- fprintf( stderr,"%08x: *killed* exit_code=%d\n", (unsigned int)thread, exit_code );
- if (thread->wait) end_wait( thread );
+ fprintf( stderr,"%04x: *killed* exit_code=%d\n",
+ thread->id, thread->exit_code );
+ if (thread->wait)
+ {
+ while (thread->wait) end_wait( thread );
+ send_thread_wakeup( thread, NULL, STATUS_PENDING );
+ /* if it is waiting on the socket, we don't need to send a SIGTERM */
+ violent_death = 0;
+ }
+ kill_console_processes( thread, 0 );
debug_exit_thread( thread );
abandon_mutexes( thread );
remove_process_thread( thread->process, thread );
wake_up( &thread->obj, 0 );
- detach_thread( thread );
- remove_select_user( &thread->obj );
+ detach_thread( thread, violent_death ? SIGTERM : 0 );
+ cleanup_thread( thread );
release_object( thread );
}
+/* take a snapshot of currently running threads */
+struct thread_snapshot *thread_snap( int *count )
+{
+ struct thread_snapshot *snapshot, *ptr;
+ struct thread *thread;
+ int total = 0;
+
+ LIST_FOR_EACH_ENTRY( thread, &thread_list, struct thread, entry )
+ if (thread->state != TERMINATED) total++;
+ if (!total || !(snapshot = mem_alloc( sizeof(*snapshot) * total ))) return NULL;
+ ptr = snapshot;
+ LIST_FOR_EACH_ENTRY( thread, &thread_list, struct thread, entry )
+ {
+ if (thread->state == TERMINATED) continue;
+ ptr->thread = thread;
+ ptr->count = thread->obj.refcount;
+ ptr->priority = thread->priority;
+ grab_object( thread );
+ ptr++;
+ }
+ *count = total;
+ return snapshot;
+}
+
+/* gets the current impersonation token */
+struct token *thread_get_impersonation_token( struct thread *thread )
+{
+ if (thread->token)
+ return thread->token;
+ else
+ return thread->process->token;
+}
+
/* signal that we are finished booting on the client side */
DECL_HANDLER(boot_done)
{
- debug_level = req->debug_level;
- /* Make sure last_req is initialized */
- current->last_req = REQ_BOOT_DONE;
+ debug_level = max( debug_level, req->debug_level );
if (current == booting_thread)
{
booting_thread = (struct thread *)~0UL; /* make sure it doesn't match other threads */
DECL_HANDLER(new_thread)
{
struct thread *thread;
- int sock[2];
+ int request_fd = thread_get_inflight_fd( current, req->request_fd );
- if (socketpair( AF_UNIX, SOCK_STREAM, 0, sock ) != -1)
+ if (request_fd == -1 || fcntl( request_fd, F_SETFL, O_NONBLOCK ) == -1)
{
- if ((thread = create_thread( sock[0], current->process, req->suspend )))
+ if (request_fd != -1) close( request_fd );
+ set_error( STATUS_INVALID_HANDLE );
+ return;
+ }
+
+ if ((thread = create_thread( request_fd, current->process )))
+ {
+ if (req->suspend) thread->suspend++;
+ reply->tid = get_thread_id( thread );
+ if ((reply->handle = alloc_handle( current->process, thread,
+ THREAD_ALL_ACCESS, req->inherit )))
{
- req->tid = thread;
- if ((req->handle = alloc_handle( current->process, thread,
- THREAD_ALL_ACCESS, req->inherit )) != -1)
- {
- set_reply_fd( current, sock[1] );
- /* thread object will be released when the thread gets killed */
- return;
- }
- release_object( thread );
+ /* thread object will be released when the thread gets killed */
+ return;
}
- close( sock[1] );
+ kill_thread( thread, 1 );
}
- else file_set_error();
-}
-
-/* retrieve the thread buffer file descriptor */
-DECL_HANDLER(get_thread_buffer)
-{
- fatal_protocol_error( current, "get_thread_buffer: should never get called directly\n" );
}
/* initialize a new thread */
DECL_HANDLER(init_thread)
{
- if (current->unix_pid)
+ int reply_fd = thread_get_inflight_fd( current, req->reply_fd );
+ int wait_fd = thread_get_inflight_fd( current, req->wait_fd );
+
+ if (current->unix_pid != -1)
{
fatal_protocol_error( current, "init_thread: already running\n" );
- return;
+ goto error;
+ }
+ if (reply_fd == -1 || fcntl( reply_fd, F_SETFL, O_NONBLOCK ) == -1)
+ {
+ fatal_protocol_error( current, "bad reply fd\n" );
+ goto error;
}
+ if (wait_fd == -1)
+ {
+ fatal_protocol_error( current, "bad wait fd\n" );
+ goto error;
+ }
+ if (!(current->reply_fd = create_anonymous_fd( &thread_fd_ops, reply_fd, ¤t->obj )))
+ {
+ reply_fd = -1;
+ fatal_protocol_error( current, "could not allocate reply fd\n" );
+ goto error;
+ }
+ if (!(current->wait_fd = create_anonymous_fd( &thread_fd_ops, wait_fd, ¤t->obj )))
+ return;
+
current->unix_pid = req->unix_pid;
+ current->unix_tid = req->unix_tid;
current->teb = req->teb;
- current->entry = req->entry;
+
if (current->suspend + current->process->suspend > 0) stop_thread( current );
if (current->process->running_threads > 1)
- generate_debug_event( current, CREATE_THREAD_DEBUG_EVENT, current );
+ generate_debug_event( current, CREATE_THREAD_DEBUG_EVENT, req->entry );
+
+ reply->pid = get_process_id( current->process );
+ reply->tid = get_thread_id( current );
+ reply->boot = (current == booting_thread);
+ reply->version = SERVER_PROTOCOL_VERSION;
+ return;
+
+ error:
+ if (reply_fd != -1) close( reply_fd );
+ if (wait_fd != -1) close( wait_fd );
}
/* terminate a thread */
{
struct thread *thread;
+ reply->self = 0;
+ reply->last = 0;
if ((thread = get_thread_from_handle( req->handle, THREAD_TERMINATE )))
{
- kill_thread( thread, req->exit_code );
+ thread->exit_code = req->exit_code;
+ if (thread != current) kill_thread( thread, 1 );
+ else
+ {
+ reply->self = 1;
+ reply->last = (thread->process->running_threads == 1);
+ }
+ release_object( thread );
+ }
+}
+
+/* open a handle to a thread */
+DECL_HANDLER(open_thread)
+{
+ struct thread *thread = get_thread_from_id( req->tid );
+
+ reply->handle = 0;
+ if (thread)
+ {
+ reply->handle = alloc_handle( current->process, thread, req->access, req->inherit );
release_object( thread );
}
}
DECL_HANDLER(get_thread_info)
{
struct thread *thread;
+ obj_handle_t handle = req->handle;
- if ((thread = get_thread_from_handle( req->handle, THREAD_QUERY_INFORMATION )))
+ if (!handle) thread = get_thread_from_id( req->tid_in );
+ else thread = get_thread_from_handle( req->handle, THREAD_QUERY_INFORMATION );
+
+ if (thread)
{
- req->tid = thread;
- req->exit_code = thread->exit_code;
- req->priority = thread->priority;
+ reply->pid = get_process_id( thread->process );
+ reply->tid = get_thread_id( thread );
+ reply->teb = thread->teb;
+ reply->exit_code = (thread->state == TERMINATED) ? thread->exit_code : STILL_ACTIVE;
+ reply->priority = thread->priority;
+ reply->affinity = thread->affinity;
+ reply->creation_time = thread->creation_time;
+ reply->exit_time = thread->exit_time;
+
release_object( thread );
}
}
if ((thread = get_thread_from_handle( req->handle, THREAD_SUSPEND_RESUME )))
{
- req->count = suspend_thread( thread, 1 );
+ if (thread->state == TERMINATED) set_error( STATUS_ACCESS_DENIED );
+ else reply->count = suspend_thread( thread );
release_object( thread );
}
}
if ((thread = get_thread_from_handle( req->handle, THREAD_SUSPEND_RESUME )))
{
- req->count = resume_thread( thread );
+ if (thread->state == TERMINATED) set_error( STATUS_ACCESS_DENIED );
+ else reply->count = resume_thread( thread );
release_object( thread );
}
}
/* select on a handle list */
DECL_HANDLER(select)
{
- if (!select_on( req->count, req->handles, req->flags, req->timeout ))
- req->signaled = -1;
+ int count = get_req_data_size() / sizeof(int);
+ select_on( count, req->cookie, get_req_data(), req->flags, &req->timeout, req->signal );
}
/* queue an APC for a thread */
struct thread *thread;
if ((thread = get_thread_from_handle( req->handle, THREAD_SET_CONTEXT )))
{
- thread_queue_apc( thread, req->func, req->param );
+ thread_queue_apc( thread, NULL, req->func, APC_USER, !req->user,
+ req->arg1, req->arg2, req->arg3 );
release_object( thread );
}
}
-/* get list of APC to call */
-DECL_HANDLER(get_apcs)
+/* get next APC to call */
+DECL_HANDLER(get_apc)
{
- if ((req->count = current->apc_count))
+ struct thread_apc *apc;
+
+ for (;;)
{
- memcpy( req->apcs, current->apc, current->apc_count * sizeof(*current->apc) );
- free( current->apc );
- current->apc = NULL;
- current->apc_count = 0;
+ if (!(apc = thread_dequeue_apc( current, !req->alertable )))
+ {
+ /* no more APCs */
+ reply->func = NULL;
+ reply->type = APC_NONE;
+ return;
+ }
+ /* Optimization: ignore APCs that have a NULL func; they are only used
+ * to wake up a thread, but since we got here the thread woke up already.
+ * Exception: for APC_ASYNC_IO, func == NULL is legal.
+ */
+ if (apc->func || apc->type == APC_ASYNC_IO) break;
+ free( apc );
}
+ reply->func = apc->func;
+ reply->type = apc->type;
+ reply->arg1 = apc->arg1;
+ reply->arg2 = apc->arg2;
+ reply->arg3 = apc->arg3;
+ free( apc );
}
/* fetch a selector entry for a thread */
struct thread *thread;
if ((thread = get_thread_from_handle( req->handle, THREAD_QUERY_INFORMATION )))
{
- get_selector_entry( thread, req->entry, &req->base, &req->limit, &req->flags );
+ get_selector_entry( thread, req->entry, &reply->base, &reply->limit, &reply->flags );
release_object( thread );
}
}