server: Make async I/O queues into real objects.
[wine] / server / async.c
1 /*
2  * Server-side async I/O support
3  *
4  * Copyright (C) 2007 Alexandre Julliard
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
19  */
20
21 #include <assert.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <stdarg.h>
25
26 #include "ntstatus.h"
27 #define WIN32_NO_STATUS
28 #include "windef.h"
29 #include "winternl.h"
30
31 #include "object.h"
32 #include "file.h"
33 #include "request.h"
34
35 struct async
36 {
37     struct object        obj;             /* object header */
38     struct thread       *thread;          /* owning thread */
39     struct list          queue_entry;     /* entry in file descriptor queue */
40     struct timeout_user *timeout;
41     struct event        *event;
42     async_data_t         data;            /* data for async I/O call */
43 };
44
45 static void async_dump( struct object *obj, int verbose );
46 static void async_destroy( struct object *obj );
47
48 static const struct object_ops async_ops =
49 {
50     sizeof(struct async),      /* size */
51     async_dump,                /* dump */
52     no_add_queue,              /* add_queue */
53     NULL,                      /* remove_queue */
54     NULL,                      /* signaled */
55     NULL,                      /* satisfied */
56     no_signal,                 /* signal */
57     no_get_fd,                 /* get_fd */
58     no_map_access,             /* map_access */
59     no_lookup_name,            /* lookup_name */
60     no_open_file,              /* open_file */
61     no_close_handle,           /* close_handle */
62     async_destroy              /* destroy */
63 };
64
65
66 struct async_queue
67 {
68     struct object        obj;             /* object header */
69     struct fd           *fd;              /* file descriptor owning this queue */
70     struct list          queue;           /* queue of async objects */
71 };
72
73 static void async_queue_dump( struct object *obj, int verbose );
74 static void async_queue_destroy( struct object *obj );
75
76 static const struct object_ops async_queue_ops =
77 {
78     sizeof(struct async_queue),      /* size */
79     async_queue_dump,                /* dump */
80     no_add_queue,                    /* add_queue */
81     NULL,                            /* remove_queue */
82     NULL,                            /* signaled */
83     NULL,                            /* satisfied */
84     no_signal,                       /* signal */
85     no_get_fd,                       /* get_fd */
86     no_map_access,                   /* map_access */
87     no_lookup_name,                  /* lookup_name */
88     no_open_file,                    /* open_file */
89     no_close_handle,                 /* close_handle */
90     async_queue_destroy              /* destroy */
91 };
92
93
94 static void async_dump( struct object *obj, int verbose )
95 {
96     struct async *async = (struct async *)obj;
97     assert( obj->ops == &async_ops );
98     fprintf( stderr, "Async thread=%p\n", async->thread );
99 }
100
101 static void async_destroy( struct object *obj )
102 {
103     struct async *async = (struct async *)obj;
104     assert( obj->ops == &async_ops );
105
106     if (async->timeout) remove_timeout_user( async->timeout );
107     if (async->event) release_object( async->event );
108     release_object( async->thread );
109 }
110
111 static void async_queue_dump( struct object *obj, int verbose )
112 {
113     struct async_queue *async_queue = (struct async_queue *)obj;
114     assert( obj->ops == &async_queue_ops );
115     fprintf( stderr, "Async queue fd=%p\n", async_queue->fd );
116 }
117
118 static void async_queue_destroy( struct object *obj )
119 {
120     struct async_queue *async_queue = (struct async_queue *)obj;
121     assert( obj->ops == &async_queue_ops );
122
123     async_wake_up( async_queue, STATUS_HANDLES_CLOSED );
124 }
125
126 /* notifies client thread of new status of its async request */
127 /* destroys the server side of it */
128 static void async_terminate( struct async *async, unsigned int status )
129 {
130     apc_call_t data;
131
132     memset( &data, 0, sizeof(data) );
133     data.type            = APC_ASYNC_IO;
134     data.async_io.func   = async->data.callback;
135     data.async_io.user   = async->data.arg;
136     data.async_io.sb     = async->data.iosb;
137     data.async_io.status = status;
138     thread_queue_apc( async->thread, &async->obj, &data );
139
140     if (async->timeout) remove_timeout_user( async->timeout );
141     async->timeout = NULL;
142     list_remove( &async->queue_entry );
143     release_object( async );
144 }
145
146 /* callback for timeout on an async request */
147 static void async_timeout( void *private )
148 {
149     struct async *async = private;
150
151     async->timeout = NULL;
152     async_terminate( async, STATUS_TIMEOUT );
153 }
154
155 /* create a new async queue for a given fd */
156 struct async_queue *create_async_queue( struct fd *fd )
157 {
158     struct async_queue *queue = alloc_object( &async_queue_ops );
159
160     if (queue)
161     {
162         queue->fd = fd;
163         list_init( &queue->queue );
164     }
165     return queue;
166 }
167
168 /* create an async on a given queue of a fd */
169 struct async *create_async( struct thread *thread, const struct timeval *timeout,
170                             struct async_queue *queue, const async_data_t *data )
171 {
172     struct event *event = NULL;
173     struct async *async;
174
175     if (data->event && !(event = get_event_obj( thread->process, data->event, EVENT_MODIFY_STATE )))
176         return NULL;
177
178     if (!(async = alloc_object( &async_ops )))
179     {
180         if (event) release_object( event );
181         return NULL;
182     }
183
184     async->thread = (struct thread *)grab_object( thread );
185     async->event = event;
186     async->data = *data;
187
188     list_add_tail( &queue->queue, &async->queue_entry );
189
190     if (timeout) async->timeout = add_timeout_user( timeout, async_timeout, async );
191     else async->timeout = NULL;
192
193     if (event) reset_event( event );
194     return async;
195 }
196
197 /* store the result of the client-side async callback */
198 void async_set_result( struct object *obj, unsigned int status )
199 {
200     struct async *async = (struct async *)obj;
201
202     if (obj->ops != &async_ops) return;  /* in case the client messed up the APC results */
203
204     if (status == STATUS_PENDING)
205     {
206         /* FIXME: restart the async operation */
207     }
208     else
209     {
210         if (async->data.apc)
211         {
212             apc_call_t data;
213             data.type         = APC_USER;
214             data.user.func    = async->data.apc;
215             data.user.args[0] = (unsigned long)async->data.apc_arg;
216             data.user.args[1] = (unsigned long)async->data.iosb;
217             data.user.args[2] = 0;
218             thread_queue_apc( async->thread, NULL, &data );
219         }
220         if (async->event) set_event( async->event );
221     }
222 }
223
224 /* check if an async operation is waiting to be alerted */
225 int async_waiting( struct async_queue *queue )
226 {
227     return queue && !list_empty( &queue->queue );
228 }
229
230 /* wake up async operations on the queue */
231 void async_wake_up( struct async_queue *queue, unsigned int status )
232 {
233     struct list *ptr, *next;
234
235     if (!queue) return;
236
237     LIST_FOR_EACH_SAFE( ptr, next, &queue->queue )
238     {
239         struct async *async = LIST_ENTRY( ptr, struct async, queue_entry );
240         async_terminate( async, status );
241         if (status == STATUS_ALERTED) break;  /* only wake up the first one */
242     }
243 }