ntdll: Remove unneeded casts.
[wine] / dlls / ntdll / threadpool.c
1 /*
2  * Thread pooling
3  *
4  * Copyright (c) 2006 Robert Shearman
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
19  */
20
21 #include "config.h"
22 #include "wine/port.h"
23
24 #include <stdarg.h>
25 #include <limits.h>
26
27 #define NONAMELESSUNION
28 #include "ntstatus.h"
29 #define WIN32_NO_STATUS
30 #include "winternl.h"
31
32 #include "wine/debug.h"
33 #include "wine/list.h"
34
35 #include "ntdll_misc.h"
36
37 WINE_DEFAULT_DEBUG_CHANNEL(threadpool);
38
39 #define WORKER_TIMEOUT 30000 /* 30 seconds */
40
41 static LONG num_workers;
42 static LONG num_work_items;
43 static LONG num_busy_workers;
44
45 static struct list work_item_list = LIST_INIT(work_item_list);
46 static HANDLE work_item_event;
47
48 static RTL_CRITICAL_SECTION threadpool_cs;
49 static RTL_CRITICAL_SECTION_DEBUG critsect_debug =
50 {
51     0, 0, &threadpool_cs,
52     { &critsect_debug.ProcessLocksList, &critsect_debug.ProcessLocksList },
53     0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_cs") }
54 };
55 static RTL_CRITICAL_SECTION threadpool_cs = { &critsect_debug, -1, 0, 0, 0, 0 };
56
57 static HANDLE compl_port = NULL;
58 static RTL_CRITICAL_SECTION threadpool_compl_cs;
59 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug =
60 {
61     0, 0, &threadpool_compl_cs,
62     { &critsect_compl_debug.ProcessLocksList, &critsect_compl_debug.ProcessLocksList },
63     0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_compl_cs") }
64 };
65 static RTL_CRITICAL_SECTION threadpool_compl_cs = { &critsect_compl_debug, -1, 0, 0, 0, 0 };
66
67 struct work_item
68 {
69     struct list entry;
70     PRTL_WORK_ITEM_ROUTINE function;
71     PVOID context;
72 };
73
74 static inline LONG interlocked_inc( PLONG dest )
75 {
76     return interlocked_xchg_add( (int *)dest, 1 ) + 1;
77 }
78
79 static inline LONG interlocked_dec( PLONG dest )
80 {
81     return interlocked_xchg_add( (int *)dest, -1 ) - 1;
82 }
83
84 static void WINAPI worker_thread_proc(void * param)
85 {
86     interlocked_inc(&num_workers);
87
88     /* free the work item memory sooner to reduce memory usage */
89     while (TRUE)
90     {
91         if (num_work_items > 0)
92         {
93             struct list *item;
94             RtlEnterCriticalSection(&threadpool_cs);
95             item = list_head(&work_item_list);
96             if (item)
97             {
98                 struct work_item *work_item_ptr = LIST_ENTRY(item, struct work_item, entry);
99                 struct work_item work_item;
100                 list_remove(&work_item_ptr->entry);
101                 interlocked_dec(&num_work_items);
102
103                 RtlLeaveCriticalSection(&threadpool_cs);
104
105                 work_item = *work_item_ptr;
106                 RtlFreeHeap(GetProcessHeap(), 0, work_item_ptr);
107
108                 TRACE("executing %p(%p)\n", work_item.function, work_item.context);
109
110                 interlocked_inc(&num_busy_workers);
111
112                 /* do the work */
113                 work_item.function(work_item.context);
114
115                 interlocked_dec(&num_busy_workers);
116             }
117             else
118                 RtlLeaveCriticalSection(&threadpool_cs);
119         }
120         else
121         {
122             NTSTATUS status;
123             LARGE_INTEGER timeout;
124             timeout.QuadPart = -(WORKER_TIMEOUT * (ULONGLONG)10000);
125             status = NtWaitForSingleObject(work_item_event, FALSE, &timeout);
126             if (status != STATUS_WAIT_0)
127                 break;
128         }
129     }
130
131     interlocked_dec(&num_workers);
132
133     RtlExitUserThread(0);
134
135     /* never reached */
136 }
137
138 static NTSTATUS add_work_item_to_queue(struct work_item *work_item)
139 {
140     NTSTATUS status;
141
142     RtlEnterCriticalSection(&threadpool_cs);
143     list_add_tail(&work_item_list, &work_item->entry);
144     num_work_items++;
145     RtlLeaveCriticalSection(&threadpool_cs);
146
147     if (!work_item_event)
148     {
149         HANDLE sem;
150         status = NtCreateSemaphore(&sem, SEMAPHORE_ALL_ACCESS, NULL, 1, LONG_MAX);
151         if (interlocked_cmpxchg_ptr( (PVOID *)&work_item_event, sem, 0 ))
152             NtClose(sem);  /* somebody beat us to it */
153     }
154     else
155         status = NtReleaseSemaphore(work_item_event, 1, NULL);
156
157     return status;
158 }
159
160 /***********************************************************************
161  *              RtlQueueWorkItem   (NTDLL.@)
162  *
163  * Queues a work item into a thread in the thread pool.
164  *
165  * PARAMS
166  *  Function [I] Work function to execute.
167  *  Context  [I] Context to pass to the work function when it is executed.
168  *  Flags    [I] Flags. See notes.
169  *
170  * RETURNS
171  *  Success: STATUS_SUCCESS.
172  *  Failure: Any NTSTATUS code.
173  *
174  * NOTES
175  *  Flags can be one or more of the following:
176  *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
177  *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
178  *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
179  *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
180  *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
181  */
182 NTSTATUS WINAPI RtlQueueWorkItem(PRTL_WORK_ITEM_ROUTINE Function, PVOID Context, ULONG Flags)
183 {
184     HANDLE thread;
185     NTSTATUS status;
186     struct work_item *work_item = RtlAllocateHeap(GetProcessHeap(), 0, sizeof(struct work_item));
187
188     if (!work_item)
189         return STATUS_NO_MEMORY;
190
191     work_item->function = Function;
192     work_item->context = Context;
193
194     if (Flags & ~WT_EXECUTELONGFUNCTION)
195         FIXME("Flags 0x%x not supported\n", Flags);
196
197     status = add_work_item_to_queue(work_item);
198
199     /* FIXME: tune this algorithm to not be as aggressive with creating threads
200      * if WT_EXECUTELONGFUNCTION isn't specified */
201     if ((status == STATUS_SUCCESS) &&
202         ((num_workers == 0) || (num_workers == num_busy_workers)))
203     {
204         status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE,
205                                     NULL, 0, 0,
206                                     worker_thread_proc, NULL, &thread, NULL );
207         if (status == STATUS_SUCCESS)
208             NtClose( thread );
209
210         /* NOTE: we don't care if we couldn't create the thread if there is at
211          * least one other available to process the request */
212         if ((num_workers > 0) && (status != STATUS_SUCCESS))
213             status = STATUS_SUCCESS;
214     }
215
216     if (status != STATUS_SUCCESS)
217     {
218         RtlEnterCriticalSection(&threadpool_cs);
219
220         interlocked_dec(&num_work_items);
221         list_remove(&work_item->entry);
222         RtlFreeHeap(GetProcessHeap(), 0, work_item);
223
224         RtlLeaveCriticalSection(&threadpool_cs);
225
226         return status;
227     }
228
229     return STATUS_SUCCESS;
230 }
231
232 /***********************************************************************
233  * iocp_poller - get completion events and run callbacks
234  */
235 static DWORD CALLBACK iocp_poller(LPVOID Arg)
236 {
237     while( TRUE )
238     {
239         PRTL_OVERLAPPED_COMPLETION_ROUTINE callback;
240         LPVOID overlapped;
241         IO_STATUS_BLOCK iosb;
242         NTSTATUS res = NtRemoveIoCompletion( compl_port, (PULONG_PTR)&callback, (PULONG_PTR)&overlapped, &iosb, NULL );
243         if (res)
244         {
245             ERR("NtRemoveIoCompletion failed: 0x%x\n", res);
246         }
247         else
248         {
249             DWORD transferred = 0;
250             DWORD err = 0;
251
252             if (iosb.u.Status == STATUS_SUCCESS)
253                 transferred = iosb.Information;
254             else
255                 err = RtlNtStatusToDosError(iosb.u.Status);
256
257             callback( err, transferred, overlapped );
258         }
259     }
260 }
261
262 /***********************************************************************
263  *              RtlSetIoCompletionCallback  (NTDLL.@)
264  *
265  * Binds a handle to a thread pool's completion port, and possibly
266  * starts a non-I/O thread to monitor this port and call functions back.
267  *
268  * PARAMS
269  *  FileHandle [I] Handle to bind to a completion port.
270  *  Function   [I] Callback function to call on I/O completions.
271  *  Flags      [I] Not used.
272  *
273  * RETURNS
274  *  Success: STATUS_SUCCESS.
275  *  Failure: Any NTSTATUS code.
276  *
277  */
278 NTSTATUS WINAPI RtlSetIoCompletionCallback(HANDLE FileHandle, PRTL_OVERLAPPED_COMPLETION_ROUTINE Function, ULONG Flags)
279 {
280     IO_STATUS_BLOCK iosb;
281     FILE_COMPLETION_INFORMATION info;
282
283     if (Flags) FIXME("Unknown value Flags=0x%x\n", Flags);
284
285     if (!compl_port)
286     {
287         NTSTATUS res = STATUS_SUCCESS;
288
289         RtlEnterCriticalSection(&threadpool_compl_cs);
290         if (!compl_port)
291         {
292             HANDLE cport;
293
294             res = NtCreateIoCompletion( &cport, IO_COMPLETION_ALL_ACCESS, NULL, 0 );
295             if (!res)
296             {
297                 /* FIXME native can start additional threads in case of e.g. hung callback function. */
298                 res = RtlQueueWorkItem( iocp_poller, NULL, WT_EXECUTEDEFAULT );
299                 if (!res)
300                     compl_port = cport;
301                 else
302                     NtClose( cport );
303             }
304         }
305         RtlLeaveCriticalSection(&threadpool_compl_cs);
306         if (res) return res;
307     }
308
309     info.CompletionPort = compl_port;
310     info.CompletionKey = (ULONG_PTR)Function;
311
312     return NtSetInformationFile( FileHandle, &iosb, &info, sizeof(info), FileCompletionInformation );
313 }