urlmon: Added support for javascript URIs.
[wine] / dlls / strmbase / outputqueue.c
1 /*
2  * Generic Implementation of COutputQueue
3  *
4  * Copyright 2011 Aric Stewart, CodeWeavers
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
19  */
20
21 #define COBJMACROS
22
23 #include "dshow.h"
24 #include "wine/debug.h"
25 #include "wine/unicode.h"
26 #include "wine/list.h"
27 #include "wine/strmbase.h"
28 #include "uuids.h"
29 #include "vfwmsgs.h"
30 #include <assert.h>
31
32 WINE_DEFAULT_DEBUG_CHANNEL(strmbase);
33
34 enum {SAMPLE_PACKET, EOS_PACKET};
35
36 typedef struct tagQueuedEvent {
37     int type;
38     struct list entry;
39
40     IMediaSample *pSample;
41 } QueuedEvent;
42
43 static DWORD WINAPI OutputQueue_InitialThreadProc(LPVOID data)
44 {
45     OutputQueue *This = (OutputQueue *)data;
46     return This->pFuncsTable->pfnThreadProc(This);
47 }
48
49 static void OutputQueue_FreeSamples(OutputQueue *pOutputQueue)
50 {
51     struct list *cursor, *cursor2;
52     LIST_FOR_EACH_SAFE(cursor, cursor2, pOutputQueue->SampleList)
53     {
54         QueuedEvent *qev = LIST_ENTRY(cursor, QueuedEvent, entry);
55         list_remove(cursor);
56         HeapFree(GetProcessHeap(),0,qev);
57     }
58 }
59
60 HRESULT WINAPI OutputQueue_Construct(
61     BaseOutputPin *pInputPin,
62     BOOL bAuto,
63     BOOL bQueue,
64     LONG lBatchSize,
65     BOOL bBatchExact,
66     DWORD dwPriority,
67     const OutputQueueFuncTable* pFuncsTable,
68     OutputQueue **ppOutputQueue )
69
70 {
71     HRESULT hr = S_OK;
72     BOOL threaded = FALSE;
73     DWORD tid;
74
75     OutputQueue *This;
76
77     if (!pInputPin || !pFuncsTable || !ppOutputQueue)
78         return E_INVALIDARG;
79
80     *ppOutputQueue = HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,sizeof(OutputQueue));
81     if (!*ppOutputQueue)
82         return E_OUTOFMEMORY;
83
84     This = *ppOutputQueue;
85     This->pFuncsTable = pFuncsTable;
86     This->lBatchSize = lBatchSize;
87     This->bBatchExact = bBatchExact;
88     InitializeCriticalSection(&This->csQueue);
89     This->csQueue.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": OutputQueue.csQueue");
90     This->SampleList = HeapAlloc(GetProcessHeap(),0,sizeof(struct list));
91     if (!This->SampleList)
92     {
93         OutputQueue_Destroy(This);
94         *ppOutputQueue = NULL;
95         return E_OUTOFMEMORY;
96     }
97     list_init(This->SampleList);
98
99     This->pInputPin = pInputPin;
100     IPin_AddRef((IPin*)pInputPin);
101
102     EnterCriticalSection(&This->csQueue);
103     if (bAuto && pInputPin->pMemInputPin)
104         threaded = IMemInputPin_ReceiveCanBlock(pInputPin->pMemInputPin);
105     else
106         threaded = bQueue;
107
108     if (threaded)
109     {
110         This->hThread = CreateThread(NULL, 0, OutputQueue_InitialThreadProc, This, 0, &tid);
111         if (This->hThread)
112         {
113             SetThreadPriority(This->hThread, dwPriority);
114             This->hProcessQueue = CreateEventW(NULL, 0, 0, NULL);
115         }
116     }
117     LeaveCriticalSection(&This->csQueue);
118
119     return hr;
120 }
121
122 HRESULT WINAPI OutputQueue_Destroy(OutputQueue *pOutputQueue)
123 {
124     EnterCriticalSection(&pOutputQueue->csQueue);
125     OutputQueue_FreeSamples(pOutputQueue);
126     pOutputQueue->bTerminate = TRUE;
127     SetEvent(pOutputQueue->hProcessQueue);
128     LeaveCriticalSection(&pOutputQueue->csQueue);
129
130     DeleteCriticalSection(&pOutputQueue->csQueue);
131     CloseHandle(pOutputQueue->hProcessQueue);
132
133     HeapFree(GetProcessHeap(),0,pOutputQueue->SampleList);
134
135     IPin_Release((IPin*)pOutputQueue->pInputPin);
136     HeapFree(GetProcessHeap(),0,pOutputQueue);
137     return S_OK;
138 }
139
140 HRESULT WINAPI OutputQueue_ReceiveMultiple(OutputQueue *pOutputQueue, IMediaSample **ppSamples, LONG nSamples, LONG *nSamplesProcessed)
141 {
142     HRESULT hr = S_OK;
143     int i;
144
145     if (!pOutputQueue->pInputPin->pin.pConnectedTo || !pOutputQueue->pInputPin->pMemInputPin)
146         return VFW_E_NOT_CONNECTED;
147
148     if (!pOutputQueue->hThread)
149     {
150         IMemInputPin_AddRef(pOutputQueue->pInputPin->pMemInputPin);
151         hr = IMemInputPin_ReceiveMultiple(pOutputQueue->pInputPin->pMemInputPin,ppSamples, nSamples, nSamplesProcessed);
152         IMemInputPin_Release(pOutputQueue->pInputPin->pMemInputPin);
153     }
154     else
155     {
156         EnterCriticalSection(&pOutputQueue->csQueue);
157         *nSamplesProcessed = 0;
158
159         for (i = 0; i < nSamples; i++)
160         {
161             QueuedEvent *qev = HeapAlloc(GetProcessHeap(),0,sizeof(QueuedEvent));
162             if (!qev)
163             {
164                 ERR("Out of Memory\n");
165                 hr = E_OUTOFMEMORY;
166                 break;
167             }
168             qev->type = SAMPLE_PACKET;
169             qev->pSample = ppSamples[i];
170             IMediaSample_AddRef(ppSamples[i]);
171             list_add_tail(pOutputQueue->SampleList, &qev->entry);
172             (*nSamplesProcessed)++;
173         }
174
175         if (!pOutputQueue->bBatchExact || list_count(pOutputQueue->SampleList) >= pOutputQueue->lBatchSize)
176             SetEvent(pOutputQueue->hProcessQueue);
177         LeaveCriticalSection(&pOutputQueue->csQueue);
178     }
179     return hr;
180 }
181
182 HRESULT WINAPI OutputQueue_Receive(OutputQueue *pOutputQueue, IMediaSample *pSample)
183 {
184     LONG processed;
185     return OutputQueue_ReceiveMultiple(pOutputQueue,&pSample,1,&processed);
186 }
187
188 VOID WINAPI OutputQueue_SendAnyway(OutputQueue *pOutputQueue)
189 {
190     if (pOutputQueue->hThread)
191     {
192         EnterCriticalSection(&pOutputQueue->csQueue);
193         if (!list_empty(pOutputQueue->SampleList))
194         {
195             pOutputQueue->bSendAnyway = TRUE;
196             SetEvent(pOutputQueue->hProcessQueue);
197         }
198         LeaveCriticalSection(&pOutputQueue->csQueue);
199     }
200 }
201
202 VOID WINAPI OutputQueue_EOS(OutputQueue *pOutputQueue)
203 {
204     EnterCriticalSection(&pOutputQueue->csQueue);
205     if (pOutputQueue->hThread)
206     {
207         QueuedEvent *qev = HeapAlloc(GetProcessHeap(),0,sizeof(QueuedEvent));
208         if (!qev)
209         {
210             ERR("Out of Memory\n");
211             LeaveCriticalSection(&pOutputQueue->csQueue);
212             return;
213         }
214         qev->type = EOS_PACKET;
215         qev->pSample = NULL;
216         list_add_tail(pOutputQueue->SampleList, &qev->entry);
217     }
218     else
219     {
220         IPin* ppin = NULL;
221         IPin_ConnectedTo((IPin*)pOutputQueue->pInputPin, &ppin);
222         if (ppin)
223         {
224             IPin_EndOfStream(ppin);
225             IPin_Release(ppin);
226         }
227     }
228     LeaveCriticalSection(&pOutputQueue->csQueue);
229     /* Covers sending the Event to the worker Thread */
230     OutputQueue_SendAnyway(pOutputQueue);
231 }
232
233 DWORD WINAPI OutputQueueImpl_ThreadProc(OutputQueue *pOutputQueue)
234 {
235     do
236     {
237         EnterCriticalSection(&pOutputQueue->csQueue);
238         if (!list_empty(pOutputQueue->SampleList) &&
239             (!pOutputQueue->bBatchExact ||
240             list_count(pOutputQueue->SampleList) >= pOutputQueue->lBatchSize ||
241             pOutputQueue->bSendAnyway
242             )
243            )
244         {
245             while (!list_empty(pOutputQueue->SampleList))
246             {
247                 IMediaSample **ppSamples;
248                 LONG nSamples;
249                 LONG nSamplesProcessed;
250                 struct list *cursor, *cursor2;
251                 int i = 0;
252
253                 /* First Pass Process Samples */
254                 i = list_count(pOutputQueue->SampleList);
255                 ppSamples = HeapAlloc(GetProcessHeap(),0,sizeof(IMediaSample*) * i);
256                 nSamples = 0;
257                 LIST_FOR_EACH_SAFE(cursor, cursor2, pOutputQueue->SampleList)
258                 {
259                     QueuedEvent *qev = LIST_ENTRY(cursor, QueuedEvent, entry);
260                     if (qev->type == SAMPLE_PACKET)
261                         ppSamples[nSamples++] = qev->pSample;
262                     else
263                         break;
264                     list_remove(cursor);
265                     HeapFree(GetProcessHeap(),0,qev);
266                 }
267
268                 if (pOutputQueue->pInputPin->pin.pConnectedTo && pOutputQueue->pInputPin->pMemInputPin)
269                 {
270                     IMemInputPin_AddRef(pOutputQueue->pInputPin->pMemInputPin);
271                     LeaveCriticalSection(&pOutputQueue->csQueue);
272                     IMemInputPin_ReceiveMultiple(pOutputQueue->pInputPin->pMemInputPin, ppSamples, nSamples, &nSamplesProcessed);
273                     EnterCriticalSection(&pOutputQueue->csQueue);
274                     IMemInputPin_Release(pOutputQueue->pInputPin->pMemInputPin);
275                 }
276                 for (i = 0; i < nSamples; i++)
277                     IUnknown_Release(ppSamples[i]);
278                 HeapFree(GetProcessHeap(),0,ppSamples);
279
280                 /* Process Non-Samples */
281                 LIST_FOR_EACH_SAFE(cursor, cursor2, pOutputQueue->SampleList)
282                 {
283                     QueuedEvent *qev = LIST_ENTRY(cursor, QueuedEvent, entry);
284                     if (qev->type == EOS_PACKET)
285                     {
286                         IPin* ppin = NULL;
287                         IPin_ConnectedTo((IPin*)pOutputQueue->pInputPin, &ppin);
288                         if (ppin)
289                         {
290                             IPin_EndOfStream(ppin);
291                             IPin_Release(ppin);
292                         }
293                     }
294                     else if (qev->type == SAMPLE_PACKET)
295                         break;
296                     else
297                         FIXME("Unhandled Event type %i\n",qev->type);
298                     list_remove(cursor);
299                     HeapFree(GetProcessHeap(),0,qev);
300                 }
301             }
302             pOutputQueue->bSendAnyway = FALSE;
303         }
304         LeaveCriticalSection(&pOutputQueue->csQueue);
305         WaitForSingleObject(pOutputQueue->hProcessQueue, INFINITE);
306     }
307     while (!pOutputQueue->bTerminate);
308     return S_OK;
309 }