include: Assorted spelling fixes.
[wine] / dlls / strmbase / outputqueue.c
1 /*
2  * Generic Implementation of COutputQueue
3  *
4  * Copyright 2011 Aric Stewart, CodeWeavers
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
19  */
20
21 #define COBJMACROS
22
23 #include "dshow.h"
24 #include "wine/debug.h"
25 #include "wine/unicode.h"
26 #include "wine/list.h"
27 #include "wine/strmbase.h"
28 #include "uuids.h"
29 #include "vfwmsgs.h"
30 #include <assert.h>
31
32 WINE_DEFAULT_DEBUG_CHANNEL(strmbase);
33
34 enum {SAMPLE_PACKET, EOS_PACKET};
35
36 typedef struct tagQueuedEvent {
37     int type;
38     struct list entry;
39
40     IMediaSample *pSample;
41 } QueuedEvent;
42
43 static DWORD WINAPI OutputQueue_InitialThreadProc(LPVOID data)
44 {
45     OutputQueue *This = (OutputQueue *)data;
46     return This->pFuncsTable->pfnThreadProc(This);
47 }
48
49 static void OutputQueue_FreeSamples(OutputQueue *pOutputQueue)
50 {
51     struct list *cursor, *cursor2;
52     LIST_FOR_EACH_SAFE(cursor, cursor2, pOutputQueue->SampleList)
53     {
54         QueuedEvent *qev = LIST_ENTRY(cursor, QueuedEvent, entry);
55         list_remove(cursor);
56         HeapFree(GetProcessHeap(),0,qev);
57     }
58 }
59
60 HRESULT WINAPI OutputQueue_Construct(
61     BaseOutputPin *pInputPin,
62     BOOL bAuto,
63     BOOL bQueue,
64     LONG lBatchSize,
65     BOOL bBatchExact,
66     DWORD dwPriority,
67     const OutputQueueFuncTable* pFuncsTable,
68     OutputQueue **ppOutputQueue )
69
70 {
71     HRESULT hr = S_OK;
72     BOOL threaded = FALSE;
73     DWORD tid;
74
75     OutputQueue *This;
76
77     if (!pInputPin || !pFuncsTable || !ppOutputQueue)
78         return E_INVALIDARG;
79
80     *ppOutputQueue = HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,sizeof(OutputQueue));
81     if (!*ppOutputQueue)
82         return E_OUTOFMEMORY;
83
84     This = *ppOutputQueue;
85     This->pFuncsTable = pFuncsTable;
86     This->lBatchSize = lBatchSize;
87     This->bBatchExact = bBatchExact;
88     InitializeCriticalSection(&This->csQueue);
89     This->csQueue.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": OutputQueue.csQueue");
90     This->SampleList = HeapAlloc(GetProcessHeap(),0,sizeof(struct list));
91     if (!This->SampleList)
92     {
93         OutputQueue_Destroy(This);
94         *ppOutputQueue = NULL;
95         return E_OUTOFMEMORY;
96     }
97     list_init(This->SampleList);
98
99     This->pInputPin = pInputPin;
100     IPin_AddRef(&pInputPin->pin.IPin_iface);
101
102     EnterCriticalSection(&This->csQueue);
103     if (bAuto && pInputPin->pMemInputPin)
104         threaded = IMemInputPin_ReceiveCanBlock(pInputPin->pMemInputPin);
105     else
106         threaded = bQueue;
107
108     if (threaded)
109     {
110         This->hThread = CreateThread(NULL, 0, OutputQueue_InitialThreadProc, This, 0, &tid);
111         if (This->hThread)
112         {
113             SetThreadPriority(This->hThread, dwPriority);
114             This->hProcessQueue = CreateEventW(NULL, 0, 0, NULL);
115         }
116     }
117     LeaveCriticalSection(&This->csQueue);
118
119     return hr;
120 }
121
122 HRESULT WINAPI OutputQueue_Destroy(OutputQueue *pOutputQueue)
123 {
124     EnterCriticalSection(&pOutputQueue->csQueue);
125     OutputQueue_FreeSamples(pOutputQueue);
126     pOutputQueue->bTerminate = TRUE;
127     SetEvent(pOutputQueue->hProcessQueue);
128     LeaveCriticalSection(&pOutputQueue->csQueue);
129
130     pOutputQueue->csQueue.DebugInfo->Spare[0] = 0;
131     DeleteCriticalSection(&pOutputQueue->csQueue);
132     CloseHandle(pOutputQueue->hProcessQueue);
133
134     HeapFree(GetProcessHeap(),0,pOutputQueue->SampleList);
135
136     IPin_Release(&pOutputQueue->pInputPin->pin.IPin_iface);
137     HeapFree(GetProcessHeap(),0,pOutputQueue);
138     return S_OK;
139 }
140
141 HRESULT WINAPI OutputQueue_ReceiveMultiple(OutputQueue *pOutputQueue, IMediaSample **ppSamples, LONG nSamples, LONG *nSamplesProcessed)
142 {
143     HRESULT hr = S_OK;
144     int i;
145
146     if (!pOutputQueue->pInputPin->pin.pConnectedTo || !pOutputQueue->pInputPin->pMemInputPin)
147         return VFW_E_NOT_CONNECTED;
148
149     if (!pOutputQueue->hThread)
150     {
151         IMemInputPin_AddRef(pOutputQueue->pInputPin->pMemInputPin);
152         hr = IMemInputPin_ReceiveMultiple(pOutputQueue->pInputPin->pMemInputPin,ppSamples, nSamples, nSamplesProcessed);
153         IMemInputPin_Release(pOutputQueue->pInputPin->pMemInputPin);
154     }
155     else
156     {
157         EnterCriticalSection(&pOutputQueue->csQueue);
158         *nSamplesProcessed = 0;
159
160         for (i = 0; i < nSamples; i++)
161         {
162             QueuedEvent *qev = HeapAlloc(GetProcessHeap(),0,sizeof(QueuedEvent));
163             if (!qev)
164             {
165                 ERR("Out of Memory\n");
166                 hr = E_OUTOFMEMORY;
167                 break;
168             }
169             qev->type = SAMPLE_PACKET;
170             qev->pSample = ppSamples[i];
171             IMediaSample_AddRef(ppSamples[i]);
172             list_add_tail(pOutputQueue->SampleList, &qev->entry);
173             (*nSamplesProcessed)++;
174         }
175
176         if (!pOutputQueue->bBatchExact || list_count(pOutputQueue->SampleList) >= pOutputQueue->lBatchSize)
177             SetEvent(pOutputQueue->hProcessQueue);
178         LeaveCriticalSection(&pOutputQueue->csQueue);
179     }
180     return hr;
181 }
182
183 HRESULT WINAPI OutputQueue_Receive(OutputQueue *pOutputQueue, IMediaSample *pSample)
184 {
185     LONG processed;
186     return OutputQueue_ReceiveMultiple(pOutputQueue,&pSample,1,&processed);
187 }
188
189 VOID WINAPI OutputQueue_SendAnyway(OutputQueue *pOutputQueue)
190 {
191     if (pOutputQueue->hThread)
192     {
193         EnterCriticalSection(&pOutputQueue->csQueue);
194         if (!list_empty(pOutputQueue->SampleList))
195         {
196             pOutputQueue->bSendAnyway = TRUE;
197             SetEvent(pOutputQueue->hProcessQueue);
198         }
199         LeaveCriticalSection(&pOutputQueue->csQueue);
200     }
201 }
202
203 VOID WINAPI OutputQueue_EOS(OutputQueue *pOutputQueue)
204 {
205     EnterCriticalSection(&pOutputQueue->csQueue);
206     if (pOutputQueue->hThread)
207     {
208         QueuedEvent *qev = HeapAlloc(GetProcessHeap(),0,sizeof(QueuedEvent));
209         if (!qev)
210         {
211             ERR("Out of Memory\n");
212             LeaveCriticalSection(&pOutputQueue->csQueue);
213             return;
214         }
215         qev->type = EOS_PACKET;
216         qev->pSample = NULL;
217         list_add_tail(pOutputQueue->SampleList, &qev->entry);
218     }
219     else
220     {
221         IPin* ppin = NULL;
222         IPin_ConnectedTo(&pOutputQueue->pInputPin->pin.IPin_iface, &ppin);
223         if (ppin)
224         {
225             IPin_EndOfStream(ppin);
226             IPin_Release(ppin);
227         }
228     }
229     LeaveCriticalSection(&pOutputQueue->csQueue);
230     /* Covers sending the Event to the worker Thread */
231     OutputQueue_SendAnyway(pOutputQueue);
232 }
233
234 DWORD WINAPI OutputQueueImpl_ThreadProc(OutputQueue *pOutputQueue)
235 {
236     do
237     {
238         EnterCriticalSection(&pOutputQueue->csQueue);
239         if (!list_empty(pOutputQueue->SampleList) &&
240             (!pOutputQueue->bBatchExact ||
241             list_count(pOutputQueue->SampleList) >= pOutputQueue->lBatchSize ||
242             pOutputQueue->bSendAnyway
243             )
244            )
245         {
246             while (!list_empty(pOutputQueue->SampleList))
247             {
248                 IMediaSample **ppSamples;
249                 LONG nSamples;
250                 LONG nSamplesProcessed;
251                 struct list *cursor, *cursor2;
252                 int i = 0;
253
254                 /* First Pass Process Samples */
255                 i = list_count(pOutputQueue->SampleList);
256                 ppSamples = HeapAlloc(GetProcessHeap(),0,sizeof(IMediaSample*) * i);
257                 nSamples = 0;
258                 LIST_FOR_EACH_SAFE(cursor, cursor2, pOutputQueue->SampleList)
259                 {
260                     QueuedEvent *qev = LIST_ENTRY(cursor, QueuedEvent, entry);
261                     if (qev->type == SAMPLE_PACKET)
262                         ppSamples[nSamples++] = qev->pSample;
263                     else
264                         break;
265                     list_remove(cursor);
266                     HeapFree(GetProcessHeap(),0,qev);
267                 }
268
269                 if (pOutputQueue->pInputPin->pin.pConnectedTo && pOutputQueue->pInputPin->pMemInputPin)
270                 {
271                     IMemInputPin_AddRef(pOutputQueue->pInputPin->pMemInputPin);
272                     LeaveCriticalSection(&pOutputQueue->csQueue);
273                     IMemInputPin_ReceiveMultiple(pOutputQueue->pInputPin->pMemInputPin, ppSamples, nSamples, &nSamplesProcessed);
274                     EnterCriticalSection(&pOutputQueue->csQueue);
275                     IMemInputPin_Release(pOutputQueue->pInputPin->pMemInputPin);
276                 }
277                 for (i = 0; i < nSamples; i++)
278                     IMediaSample_Release(ppSamples[i]);
279                 HeapFree(GetProcessHeap(),0,ppSamples);
280
281                 /* Process Non-Samples */
282                 LIST_FOR_EACH_SAFE(cursor, cursor2, pOutputQueue->SampleList)
283                 {
284                     QueuedEvent *qev = LIST_ENTRY(cursor, QueuedEvent, entry);
285                     if (qev->type == EOS_PACKET)
286                     {
287                         IPin* ppin = NULL;
288                         IPin_ConnectedTo(&pOutputQueue->pInputPin->pin.IPin_iface, &ppin);
289                         if (ppin)
290                         {
291                             IPin_EndOfStream(ppin);
292                             IPin_Release(ppin);
293                         }
294                     }
295                     else if (qev->type == SAMPLE_PACKET)
296                         break;
297                     else
298                         FIXME("Unhandled Event type %i\n",qev->type);
299                     list_remove(cursor);
300                     HeapFree(GetProcessHeap(),0,qev);
301                 }
302             }
303             pOutputQueue->bSendAnyway = FALSE;
304         }
305         LeaveCriticalSection(&pOutputQueue->csQueue);
306         WaitForSingleObject(pOutputQueue->hProcessQueue, INFINITE);
307     }
308     while (!pOutputQueue->bTerminate);
309     return S_OK;
310 }