ntdll: The FileMailslotSetInformation and FileCompletionInformation cases of NtSetInf...
[wine] / dlls / ntdll / threadpool.c
1 /*
2  * Thread pooling
3  *
4  * Copyright (c) 2006 Robert Shearman
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
19  */
20
21 #include "config.h"
22 #include "wine/port.h"
23
24 #include <stdarg.h>
25 #include <limits.h>
26
27 #define NONAMELESSUNION
28 #include "ntstatus.h"
29 #define WIN32_NO_STATUS
30 #include "winternl.h"
31
32 #include "wine/debug.h"
33 #include "wine/list.h"
34
35 #include "ntdll_misc.h"
36
37 WINE_DEFAULT_DEBUG_CHANNEL(threadpool);
38
39 #define WORKER_TIMEOUT 30000 /* 30 seconds */
40
41 static LONG num_workers;
42 static LONG num_work_items;
43 static LONG num_busy_workers;
44
45 static struct list work_item_list = LIST_INIT(work_item_list);
46 static HANDLE work_item_event;
47
48 static RTL_CRITICAL_SECTION threadpool_cs;
49 static RTL_CRITICAL_SECTION_DEBUG critsect_debug =
50 {
51     0, 0, &threadpool_cs,
52     { &critsect_debug.ProcessLocksList, &critsect_debug.ProcessLocksList },
53     0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_cs") }
54 };
55 static RTL_CRITICAL_SECTION threadpool_cs = { &critsect_debug, -1, 0, 0, 0, 0 };
56
57 static HANDLE compl_port = NULL;
58 static RTL_CRITICAL_SECTION threadpool_compl_cs;
59 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug =
60 {
61     0, 0, &threadpool_compl_cs,
62     { &critsect_compl_debug.ProcessLocksList, &critsect_compl_debug.ProcessLocksList },
63     0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_compl_cs") }
64 };
65 static RTL_CRITICAL_SECTION threadpool_compl_cs = { &critsect_compl_debug, -1, 0, 0, 0, 0 };
66
67 struct work_item
68 {
69     struct list entry;
70     PRTL_WORK_ITEM_ROUTINE function;
71     PVOID context;
72 };
73
74 static inline LONG interlocked_inc( PLONG dest )
75 {
76     return interlocked_xchg_add( dest, 1 ) + 1;
77 }
78
79 static inline LONG interlocked_dec( PLONG dest )
80 {
81     return interlocked_xchg_add( dest, -1 ) - 1;
82 }
83
84 static void WINAPI worker_thread_proc(void * param)
85 {
86     interlocked_inc(&num_workers);
87
88     /* free the work item memory sooner to reduce memory usage */
89     while (TRUE)
90     {
91         if (num_work_items > 0)
92         {
93             struct list *item;
94             RtlEnterCriticalSection(&threadpool_cs);
95             item = list_head(&work_item_list);
96             if (item)
97             {
98                 struct work_item *work_item_ptr = LIST_ENTRY(item, struct work_item, entry);
99                 struct work_item work_item;
100                 list_remove(&work_item_ptr->entry);
101                 interlocked_dec(&num_work_items);
102
103                 RtlLeaveCriticalSection(&threadpool_cs);
104
105                 work_item = *work_item_ptr;
106                 RtlFreeHeap(GetProcessHeap(), 0, work_item_ptr);
107
108                 TRACE("executing %p(%p)\n", work_item.function, work_item.context);
109
110                 interlocked_inc(&num_busy_workers);
111
112                 /* do the work */
113                 work_item.function(work_item.context);
114
115                 interlocked_dec(&num_busy_workers);
116             }
117             else
118                 RtlLeaveCriticalSection(&threadpool_cs);
119         }
120         else
121         {
122             NTSTATUS status;
123             LARGE_INTEGER timeout;
124             timeout.QuadPart = -(WORKER_TIMEOUT * (ULONGLONG)10000);
125             status = NtWaitForSingleObject(work_item_event, FALSE, &timeout);
126             if (status != STATUS_WAIT_0)
127                 break;
128         }
129     }
130
131     interlocked_dec(&num_workers);
132
133     RtlExitUserThread(0);
134
135     /* never reached */
136 }
137
138 static NTSTATUS add_work_item_to_queue(struct work_item *work_item)
139 {
140     NTSTATUS status;
141
142     RtlEnterCriticalSection(&threadpool_cs);
143     list_add_tail(&work_item_list, &work_item->entry);
144     num_work_items++;
145     RtlLeaveCriticalSection(&threadpool_cs);
146
147     if (!work_item_event)
148     {
149         HANDLE sem;
150         status = NtCreateSemaphore(&sem, SEMAPHORE_ALL_ACCESS, NULL, 1, LONG_MAX);
151         if (interlocked_cmpxchg_ptr( &work_item_event, sem, 0 ))
152             NtClose(sem);  /* somebody beat us to it */
153     }
154     else
155         status = NtReleaseSemaphore(work_item_event, 1, NULL);
156
157     return status;
158 }
159
160 /***********************************************************************
161  *              RtlQueueWorkItem   (NTDLL.@)
162  *
163  * Queues a work item into a thread in the thread pool.
164  *
165  * PARAMS
166  *  Function [I] Work function to execute.
167  *  Context  [I] Context to pass to the work function when it is executed.
168  *  Flags    [I] Flags. See notes.
169  *
170  * RETURNS
171  *  Success: STATUS_SUCCESS.
172  *  Failure: Any NTSTATUS code.
173  *
174  * NOTES
175  *  Flags can be one or more of the following:
176  *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
177  *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
178  *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
179  *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
180  *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
181  */
182 NTSTATUS WINAPI RtlQueueWorkItem(PRTL_WORK_ITEM_ROUTINE Function, PVOID Context, ULONG Flags)
183 {
184     HANDLE thread;
185     NTSTATUS status;
186     struct work_item *work_item = RtlAllocateHeap(GetProcessHeap(), 0, sizeof(struct work_item));
187
188     if (!work_item)
189         return STATUS_NO_MEMORY;
190
191     work_item->function = Function;
192     work_item->context = Context;
193
194     if (Flags & ~WT_EXECUTELONGFUNCTION)
195         FIXME("Flags 0x%x not supported\n", Flags);
196
197     status = add_work_item_to_queue(work_item);
198
199     /* FIXME: tune this algorithm to not be as aggressive with creating threads
200      * if WT_EXECUTELONGFUNCTION isn't specified */
201     if ((status == STATUS_SUCCESS) &&
202         ((num_workers == 0) || (num_workers == num_busy_workers)))
203     {
204         status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE,
205                                     NULL, 0, 0,
206                                     worker_thread_proc, NULL, &thread, NULL );
207         if (status == STATUS_SUCCESS)
208             NtClose( thread );
209
210         /* NOTE: we don't care if we couldn't create the thread if there is at
211          * least one other available to process the request */
212         if ((num_workers > 0) && (status != STATUS_SUCCESS))
213             status = STATUS_SUCCESS;
214     }
215
216     if (status != STATUS_SUCCESS)
217     {
218         RtlEnterCriticalSection(&threadpool_cs);
219
220         interlocked_dec(&num_work_items);
221         list_remove(&work_item->entry);
222         RtlFreeHeap(GetProcessHeap(), 0, work_item);
223
224         RtlLeaveCriticalSection(&threadpool_cs);
225
226         return status;
227     }
228
229     return STATUS_SUCCESS;
230 }
231
232 /***********************************************************************
233  * iocp_poller - get completion events and run callbacks
234  */
235 static DWORD CALLBACK iocp_poller(LPVOID Arg)
236 {
237     while( TRUE )
238     {
239         PRTL_OVERLAPPED_COMPLETION_ROUTINE callback;
240         LPVOID overlapped;
241         IO_STATUS_BLOCK iosb;
242         NTSTATUS res = NtRemoveIoCompletion( compl_port, (PULONG_PTR)&callback, (PULONG_PTR)&overlapped, &iosb, NULL );
243         if (res)
244         {
245             ERR("NtRemoveIoCompletion failed: 0x%x\n", res);
246         }
247         else
248         {
249             DWORD transferred = 0;
250             DWORD err = 0;
251
252             if (iosb.u.Status == STATUS_SUCCESS)
253                 transferred = iosb.Information;
254             else
255                 err = RtlNtStatusToDosError(iosb.u.Status);
256
257             callback( err, transferred, overlapped );
258         }
259     }
260 }
261
262 /***********************************************************************
263  *              RtlSetIoCompletionCallback  (NTDLL.@)
264  *
265  * Binds a handle to a thread pool's completion port, and possibly
266  * starts a non-I/O thread to monitor this port and call functions back.
267  *
268  * PARAMS
269  *  FileHandle [I] Handle to bind to a completion port.
270  *  Function   [I] Callback function to call on I/O completions.
271  *  Flags      [I] Not used.
272  *
273  * RETURNS
274  *  Success: STATUS_SUCCESS.
275  *  Failure: Any NTSTATUS code.
276  *
277  */
278 NTSTATUS WINAPI RtlSetIoCompletionCallback(HANDLE FileHandle, PRTL_OVERLAPPED_COMPLETION_ROUTINE Function, ULONG Flags)
279 {
280     IO_STATUS_BLOCK iosb;
281     FILE_COMPLETION_INFORMATION info;
282
283     if (Flags) FIXME("Unknown value Flags=0x%x\n", Flags);
284
285     if (!compl_port)
286     {
287         NTSTATUS res = STATUS_SUCCESS;
288
289         RtlEnterCriticalSection(&threadpool_compl_cs);
290         if (!compl_port)
291         {
292             HANDLE cport;
293
294             res = NtCreateIoCompletion( &cport, IO_COMPLETION_ALL_ACCESS, NULL, 0 );
295             if (!res)
296             {
297                 /* FIXME native can start additional threads in case of e.g. hung callback function. */
298                 res = RtlQueueWorkItem( iocp_poller, NULL, WT_EXECUTEDEFAULT );
299                 if (!res)
300                     compl_port = cport;
301                 else
302                     NtClose( cport );
303             }
304         }
305         RtlLeaveCriticalSection(&threadpool_compl_cs);
306         if (res) return res;
307     }
308
309     info.CompletionPort = compl_port;
310     info.CompletionKey = (ULONG_PTR)Function;
311
312     return NtSetInformationFile( FileHandle, &iosb, &info, sizeof(info), FileCompletionInformation );
313 }
314
315 static inline PLARGE_INTEGER get_nt_timeout( PLARGE_INTEGER pTime, ULONG timeout )
316 {
317     if (timeout == INFINITE) return NULL;
318     pTime->QuadPart = (ULONGLONG)timeout * -10000;
319     return pTime;
320 }
321
322 struct wait_work_item
323 {
324     HANDLE Object;
325     HANDLE CancelEvent;
326     WAITORTIMERCALLBACK Callback;
327     PVOID Context;
328     ULONG Milliseconds;
329     ULONG Flags;
330     HANDLE CompletionEvent;
331     LONG DeleteCount;
332     BOOLEAN CallbackInProgress;
333 };
334
335 static void delete_wait_work_item(struct wait_work_item *wait_work_item)
336 {
337     NtClose( wait_work_item->CancelEvent );
338     RtlFreeHeap( GetProcessHeap(), 0, wait_work_item );
339 }
340
341 static DWORD CALLBACK wait_thread_proc(LPVOID Arg)
342 {
343     struct wait_work_item *wait_work_item = Arg;
344     NTSTATUS status;
345     BOOLEAN alertable = (wait_work_item->Flags & WT_EXECUTEINIOTHREAD) ? TRUE : FALSE;
346     HANDLE handles[2] = { wait_work_item->Object, wait_work_item->CancelEvent };
347     LARGE_INTEGER timeout;
348     HANDLE completion_event;
349
350     TRACE("\n");
351
352     while (TRUE)
353     {
354         status = NtWaitForMultipleObjects( 2, handles, FALSE, alertable,
355                                            get_nt_timeout( &timeout, wait_work_item->Milliseconds ) );
356         if (status == STATUS_WAIT_0 || status == STATUS_TIMEOUT)
357         {
358             BOOLEAN TimerOrWaitFired;
359
360             if (status == STATUS_WAIT_0)
361             {
362                 TRACE( "object %p signaled, calling callback %p with context %p\n",
363                     wait_work_item->Object, wait_work_item->Callback,
364                     wait_work_item->Context );
365                 TimerOrWaitFired = FALSE;
366             }
367             else
368             {
369                 TRACE( "wait for object %p timed out, calling callback %p with context %p\n",
370                     wait_work_item->Object, wait_work_item->Callback,
371                     wait_work_item->Context );
372                 TimerOrWaitFired = TRUE;
373             }
374             wait_work_item->CallbackInProgress = TRUE;
375             wait_work_item->Callback( wait_work_item->Context, TimerOrWaitFired );
376             wait_work_item->CallbackInProgress = FALSE;
377
378             if (wait_work_item->Flags & WT_EXECUTEONLYONCE)
379                 break;
380         }
381         else
382             break;
383     }
384
385     completion_event = wait_work_item->CompletionEvent;
386     if (completion_event) NtSetEvent( completion_event, NULL );
387
388     if (interlocked_inc( &wait_work_item->DeleteCount ) == 2 )
389         delete_wait_work_item( wait_work_item );
390
391     return 0;
392 }
393
394 /***********************************************************************
395  *              RtlRegisterWait   (NTDLL.@)
396  *
397  * Registers a wait for a handle to become signaled.
398  *
399  * PARAMS
400  *  NewWaitObject [I] Handle to the new wait object. Use RtlDeregisterWait() to free it.
401  *  Object   [I] Object to wait to become signaled.
402  *  Callback [I] Callback function to execute when the wait times out or the handle is signaled.
403  *  Context  [I] Context to pass to the callback function when it is executed.
404  *  Milliseconds [I] Number of milliseconds to wait before timing out.
405  *  Flags    [I] Flags. See notes.
406  *
407  * RETURNS
408  *  Success: STATUS_SUCCESS.
409  *  Failure: Any NTSTATUS code.
410  *
411  * NOTES
412  *  Flags can be one or more of the following:
413  *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
414  *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
415  *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
416  *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
417  *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
418  */
419 NTSTATUS WINAPI RtlRegisterWait(PHANDLE NewWaitObject, HANDLE Object,
420                                 RTL_WAITORTIMERCALLBACKFUNC Callback,
421                                 PVOID Context, ULONG Milliseconds, ULONG Flags)
422 {
423     struct wait_work_item *wait_work_item;
424     NTSTATUS status;
425
426     TRACE( "(%p, %p, %p, %p, %d, 0x%x)\n", NewWaitObject, Object, Callback, Context, Milliseconds, Flags );
427
428     wait_work_item = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*wait_work_item) );
429     if (!wait_work_item)
430         return STATUS_NO_MEMORY;
431
432     wait_work_item->Object = Object;
433     wait_work_item->Callback = Callback;
434     wait_work_item->Context = Context;
435     wait_work_item->Milliseconds = Milliseconds;
436     wait_work_item->Flags = Flags;
437     wait_work_item->CallbackInProgress = FALSE;
438     wait_work_item->DeleteCount = 0;
439     wait_work_item->CompletionEvent = NULL;
440
441     status = NtCreateEvent( &wait_work_item->CancelEvent, EVENT_ALL_ACCESS, NULL, TRUE, FALSE );
442     if (status != STATUS_SUCCESS)
443     {
444         RtlFreeHeap( GetProcessHeap(), 0, wait_work_item );
445         return status;
446     }
447
448     status = RtlQueueWorkItem( wait_thread_proc, wait_work_item, Flags & ~WT_EXECUTEONLYONCE );
449     if (status != STATUS_SUCCESS)
450     {
451         delete_wait_work_item( wait_work_item );
452         return status;
453     }
454
455     *NewWaitObject = wait_work_item;
456     return status;
457 }
458
459 /***********************************************************************
460  *              RtlDeregisterWaitEx   (NTDLL.@)
461  *
462  * Cancels a wait operation and frees the resources associated with calling
463  * RtlRegisterWait().
464  *
465  * PARAMS
466  *  WaitObject [I] Handle to the wait object to free.
467  *
468  * RETURNS
469  *  Success: STATUS_SUCCESS.
470  *  Failure: Any NTSTATUS code.
471  */
472 NTSTATUS WINAPI RtlDeregisterWaitEx(HANDLE WaitHandle, HANDLE CompletionEvent)
473 {
474     struct wait_work_item *wait_work_item = WaitHandle;
475     NTSTATUS status = STATUS_SUCCESS;
476
477     TRACE( "(%p)\n", WaitHandle );
478
479     NtSetEvent( wait_work_item->CancelEvent, NULL );
480     if (wait_work_item->CallbackInProgress)
481     {
482         if (CompletionEvent != NULL)
483         {
484             if (CompletionEvent == INVALID_HANDLE_VALUE)
485             {
486                 status = NtCreateEvent( &CompletionEvent, EVENT_ALL_ACCESS, NULL, TRUE, FALSE );
487                 if (status != STATUS_SUCCESS)
488                     return status;
489                 interlocked_xchg_ptr( &wait_work_item->CompletionEvent, CompletionEvent );
490                 if (wait_work_item->CallbackInProgress)
491                     NtWaitForSingleObject( CompletionEvent, FALSE, NULL );
492                 NtClose( CompletionEvent );
493             }
494             else
495             {
496                 interlocked_xchg_ptr( &wait_work_item->CompletionEvent, CompletionEvent );
497                 if (wait_work_item->CallbackInProgress)
498                     status = STATUS_PENDING;
499             }
500         }
501         else
502             status = STATUS_PENDING;
503     }
504
505     if (interlocked_inc( &wait_work_item->DeleteCount ) == 2 )
506     {
507         status = STATUS_SUCCESS;
508         delete_wait_work_item( wait_work_item );
509     }
510
511     return status;
512 }
513
514 /***********************************************************************
515  *              RtlDeregisterWait   (NTDLL.@)
516  *
517  * Cancels a wait operation and frees the resources associated with calling
518  * RtlRegisterWait().
519  *
520  * PARAMS
521  *  WaitObject [I] Handle to the wait object to free.
522  *
523  * RETURNS
524  *  Success: STATUS_SUCCESS.
525  *  Failure: Any NTSTATUS code.
526  */
527 NTSTATUS WINAPI RtlDeregisterWait(HANDLE WaitHandle)
528 {
529     return RtlDeregisterWaitEx(WaitHandle, NULL);
530 }