ntdll: Simplify the async read/write code now that most of the work is done in the...
[wine] / dlls / ntdll / threadpool.c
1 /*
2  * Thread pooling
3  *
4  * Copyright (c) 2006 Robert Shearman
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
19  */
20
21 #include "config.h"
22 #include "wine/port.h"
23
24 #include <stdarg.h>
25 #include <limits.h>
26
27 #define NONAMELESSUNION
28 #include "ntstatus.h"
29 #define WIN32_NO_STATUS
30 #include "winternl.h"
31
32 #include "wine/debug.h"
33 #include "wine/list.h"
34
35 #include "ntdll_misc.h"
36
37 WINE_DEFAULT_DEBUG_CHANNEL(threadpool);
38
39 #define WORKER_TIMEOUT 30000 /* 30 seconds */
40
41 static LONG num_workers;
42 static LONG num_work_items;
43 static LONG num_busy_workers;
44
45 static struct list work_item_list = LIST_INIT(work_item_list);
46 static HANDLE work_item_event;
47
48 static RTL_CRITICAL_SECTION threadpool_cs;
49 static RTL_CRITICAL_SECTION_DEBUG critsect_debug =
50 {
51     0, 0, &threadpool_cs,
52     { &critsect_debug.ProcessLocksList, &critsect_debug.ProcessLocksList },
53     0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_cs") }
54 };
55 static RTL_CRITICAL_SECTION threadpool_cs = { &critsect_debug, -1, 0, 0, 0, 0 };
56
57 struct work_item
58 {
59     struct list entry;
60     PRTL_WORK_ITEM_ROUTINE function;
61     PVOID context;
62 };
63
64 static inline LONG interlocked_inc( PLONG dest )
65 {
66     return interlocked_xchg_add( (int *)dest, 1 ) + 1;
67 }
68
69 static inline LONG interlocked_dec( PLONG dest )
70 {
71     return interlocked_xchg_add( (int *)dest, -1 ) - 1;
72 }
73
74 static void WINAPI worker_thread_proc(void * param)
75 {
76     interlocked_inc(&num_workers);
77
78     /* free the work item memory sooner to reduce memory usage */
79     while (TRUE)
80     {
81         if (num_work_items > 0)
82         {
83             struct list *item;
84             RtlEnterCriticalSection(&threadpool_cs);
85             item = list_head(&work_item_list);
86             if (item)
87             {
88                 struct work_item *work_item_ptr = LIST_ENTRY(item, struct work_item, entry);
89                 struct work_item work_item;
90                 list_remove(&work_item_ptr->entry);
91                 interlocked_dec(&num_work_items);
92
93                 RtlLeaveCriticalSection(&threadpool_cs);
94
95                 work_item = *work_item_ptr;
96                 RtlFreeHeap(GetProcessHeap(), 0, work_item_ptr);
97
98                 TRACE("executing %p(%p)\n", work_item.function, work_item.context);
99
100                 interlocked_inc(&num_busy_workers);
101
102                 /* do the work */
103                 work_item.function(work_item.context);
104
105                 interlocked_dec(&num_busy_workers);
106             }
107             else
108                 RtlLeaveCriticalSection(&threadpool_cs);
109         }
110         else
111         {
112             NTSTATUS status;
113             LARGE_INTEGER timeout;
114             timeout.QuadPart = -(WORKER_TIMEOUT * (ULONGLONG)10000);
115             status = NtWaitForSingleObject(work_item_event, FALSE, &timeout);
116             if (status != STATUS_WAIT_0)
117                 break;
118         }
119     }
120
121     interlocked_dec(&num_workers);
122
123     RtlExitUserThread(0);
124
125     /* never reached */
126 }
127
128 static NTSTATUS add_work_item_to_queue(struct work_item *work_item)
129 {
130     NTSTATUS status;
131
132     RtlEnterCriticalSection(&threadpool_cs);
133     list_add_tail(&work_item_list, &work_item->entry);
134     num_work_items++;
135     RtlLeaveCriticalSection(&threadpool_cs);
136
137     if (!work_item_event)
138     {
139         HANDLE sem;
140         status = NtCreateSemaphore(&sem, SEMAPHORE_ALL_ACCESS, NULL, 1, LONG_MAX);
141         if (interlocked_cmpxchg_ptr( (PVOID *)&work_item_event, (PVOID)sem, 0 ))
142             NtClose(sem);  /* somebody beat us to it */
143     }
144     else
145         status = NtReleaseSemaphore(work_item_event, 1, NULL);
146
147     return status;
148 }
149
150 /***********************************************************************
151  *              RtlQueueWorkItem   (NTDLL.@)
152  *
153  * Queues a work item into a thread in the thread pool.
154  *
155  * PARAMS
156  *  Function [I] Work function to execute.
157  *  Context  [I] Context to pass to the work function when it is executed.
158  *  Flags    [I] Flags. See notes.
159  *
160  * RETURNS
161  *  Success: STATUS_SUCCESS.
162  *  Failure: Any NTSTATUS code.
163  *
164  * NOTES
165  *  Flags can be one or more of the following:
166  *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
167  *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
168  *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
169  *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
170  *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
171  */
172 NTSTATUS WINAPI RtlQueueWorkItem(PRTL_WORK_ITEM_ROUTINE Function, PVOID Context, ULONG Flags)
173 {
174     HANDLE thread;
175     NTSTATUS status;
176     struct work_item *work_item = RtlAllocateHeap(GetProcessHeap(), 0, sizeof(struct work_item));
177
178     if (!work_item)
179         return STATUS_NO_MEMORY;
180
181     work_item->function = Function;
182     work_item->context = Context;
183
184     if (Flags & ~WT_EXECUTELONGFUNCTION)
185         FIXME("Flags 0x%x not supported\n", Flags);
186
187     status = add_work_item_to_queue(work_item);
188
189     /* FIXME: tune this algorithm to not be as aggressive with creating threads
190      * if WT_EXECUTELONGFUNCTION isn't specified */
191     if ((status == STATUS_SUCCESS) &&
192         ((num_workers == 0) || (num_workers == num_busy_workers)))
193     {
194         status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE,
195                                     NULL, 0, 0,
196                                     worker_thread_proc, NULL, &thread, NULL );
197         if (status == STATUS_SUCCESS)
198             NtClose( thread );
199
200         /* NOTE: we don't care if we couldn't create the thread if there is at
201          * least one other available to process the request */
202         if ((num_workers > 0) && (status != STATUS_SUCCESS))
203             status = STATUS_SUCCESS;
204     }
205
206     if (status != STATUS_SUCCESS)
207     {
208         RtlEnterCriticalSection(&threadpool_cs);
209
210         interlocked_dec(&num_work_items);
211         list_remove(&work_item->entry);
212         RtlFreeHeap(GetProcessHeap(), 0, work_item);
213
214         RtlLeaveCriticalSection(&threadpool_cs);
215
216         return status;
217     }
218
219     return STATUS_SUCCESS;
220 }