netapi32: Remove unused variable.
[wine] / dlls / quartz / pin.c
1 /*
2  * Generic Implementation of IPin Interface
3  *
4  * Copyright 2003 Robert Shearman
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
19  */
20
21 #include "quartz_private.h"
22 #include "pin.h"
23
24 #include "wine/debug.h"
25 #include "wine/unicode.h"
26 #include "uuids.h"
27 #include "vfwmsgs.h"
28 #include <assert.h>
29
30 WINE_DEFAULT_DEBUG_CHANNEL(quartz);
31
32 static const IPinVtbl PullPin_Vtbl;
33
34 #define ALIGNDOWN(value,boundary) ((value)/(boundary)*(boundary))
35 #define ALIGNUP(value,boundary) (ALIGNDOWN((value)+(boundary)-1, (boundary)))
36
37 typedef HRESULT (*SendPinFunc)( IPin *to, LPVOID arg );
38
39 /** Helper function, there are a lot of places where the error code is inherited
40  * The following rules apply:
41  *
42  * Return the first received error code (E_NOTIMPL is ignored)
43  * If no errors occur: return the first received non-error-code that isn't S_OK
44  */
45 static HRESULT updatehres( HRESULT original, HRESULT new )
46 {
47     if (FAILED( original ) || new == E_NOTIMPL)
48         return original;
49
50     if (FAILED( new ) || original == S_OK)
51         return new;
52
53     return original;
54 }
55
56 /** Sends a message from a pin further to other, similar pins
57  * fnMiddle is called on each pin found further on the stream.
58  * fnEnd (can be NULL) is called when the message can't be sent any further (this is a renderer or source)
59  *
60  * If the pin given is an input pin, the message will be sent downstream to other input pins
61  * If the pin given is an output pin, the message will be sent upstream to other output pins
62  */
63 static HRESULT SendFurther( IPin *from, SendPinFunc fnMiddle, LPVOID arg, SendPinFunc fnEnd )
64 {
65     PIN_INFO pin_info;
66     ULONG amount = 0;
67     HRESULT hr = S_OK;
68     HRESULT hr_return = S_OK;
69     IEnumPins *enumpins = NULL;
70     BOOL foundend = TRUE;
71     PIN_DIRECTION from_dir;
72
73     IPin_QueryDirection( from, &from_dir );
74
75     hr = IPin_QueryInternalConnections( from, NULL, &amount );
76     if (hr != E_NOTIMPL && amount)
77         FIXME("Use QueryInternalConnections!\n");
78      hr = S_OK;
79
80     pin_info.pFilter = NULL;
81     hr = IPin_QueryPinInfo( from, &pin_info );
82     if (FAILED(hr))
83         goto out;
84
85     hr = IBaseFilter_EnumPins( pin_info.pFilter, &enumpins );
86     if (FAILED(hr))
87         goto out;
88
89     hr = IEnumPins_Reset( enumpins );
90     while (hr == S_OK) {
91         IPin *pin = NULL;
92         hr = IEnumPins_Next( enumpins, 1, &pin, NULL );
93         if (hr == VFW_E_ENUM_OUT_OF_SYNC)
94         {
95             hr = IEnumPins_Reset( enumpins );
96             continue;
97         }
98         if (pin)
99         {
100             PIN_DIRECTION dir;
101
102             IPin_QueryDirection( pin, &dir );
103             if (dir != from_dir)
104             {
105                 IPin *connected = NULL;
106
107                 foundend = FALSE;
108                 IPin_ConnectedTo( pin, &connected );
109                 if (connected)
110                 {
111                     HRESULT hr_local;
112
113                     hr_local = fnMiddle( connected, arg );
114                     hr_return = updatehres( hr_return, hr_local );
115                     IPin_Release(connected);
116                 }
117             }
118             IPin_Release( pin );
119         }
120         else
121         {
122             hr = S_OK;
123             break;
124         }
125     }
126
127     if (!foundend)
128         hr = hr_return;
129     else if (fnEnd) {
130         HRESULT hr_local;
131
132         hr_local = fnEnd( from, arg );
133         hr_return = updatehres( hr_return, hr_local );
134     }
135
136 out:
137     if (enumpins)
138         IEnumPins_Release( enumpins );
139     if (pin_info.pFilter)
140         IBaseFilter_Release( pin_info.pFilter );
141     return hr;
142 }
143
144
145 static void Copy_PinInfo(PIN_INFO * pDest, const PIN_INFO * pSrc)
146 {
147     /* Tempting to just do a memcpy, but the name field is
148        128 characters long! We will probably never exceed 10
149        most of the time, so we are better off copying 
150        each field manually */
151     strcpyW(pDest->achName, pSrc->achName);
152     pDest->dir = pSrc->dir;
153     pDest->pFilter = pSrc->pFilter;
154 }
155
156 static HRESULT deliver_endofstream(IPin* pin, LPVOID unused)
157 {
158     return IPin_EndOfStream( pin );
159 }
160
161 static HRESULT deliver_beginflush(IPin* pin, LPVOID unused)
162 {
163     return IPin_BeginFlush( pin );
164 }
165
166 static HRESULT deliver_endflush(IPin* pin, LPVOID unused)
167 {
168     return IPin_EndFlush( pin );
169 }
170
171 typedef struct newsegmentargs
172 {
173     REFERENCE_TIME tStart, tStop;
174     double rate;
175 } newsegmentargs;
176
177 static HRESULT deliver_newsegment(IPin *pin, LPVOID data)
178 {
179     newsegmentargs *args = data;
180     return IPin_NewSegment(pin, args->tStart, args->tStop, args->rate);
181 }
182
183 /*** PullPin implementation ***/
184
185 static HRESULT PullPin_Init(const IPinVtbl *PullPin_Vtbl, const PIN_INFO * pPinInfo, SAMPLEPROC_PULL pSampleProc, LPVOID pUserData,
186                             QUERYACCEPTPROC pQueryAccept, CLEANUPPROC pCleanUp, REQUESTPROC pCustomRequest, STOPPROCESSPROC pDone, LPCRITICAL_SECTION pCritSec, PullPin * pPinImpl)
187 {
188     /* Common attributes */
189     pPinImpl->pin.IPin_iface.lpVtbl = PullPin_Vtbl;
190     pPinImpl->pin.refCount = 1;
191     pPinImpl->pin.pConnectedTo = NULL;
192     pPinImpl->pin.pCritSec = pCritSec;
193     Copy_PinInfo(&pPinImpl->pin.pinInfo, pPinInfo);
194     ZeroMemory(&pPinImpl->pin.mtCurrent, sizeof(AM_MEDIA_TYPE));
195
196     /* Input pin attributes */
197     pPinImpl->pUserData = pUserData;
198     pPinImpl->fnQueryAccept = pQueryAccept;
199     pPinImpl->fnSampleProc = pSampleProc;
200     pPinImpl->fnCleanProc = pCleanUp;
201     pPinImpl->fnDone = pDone;
202     pPinImpl->fnPreConnect = NULL;
203     pPinImpl->pAlloc = NULL;
204     pPinImpl->prefAlloc = NULL;
205     pPinImpl->pReader = NULL;
206     pPinImpl->hThread = NULL;
207     pPinImpl->hEventStateChanged = CreateEventW(NULL, TRUE, TRUE, NULL);
208     pPinImpl->thread_sleepy = CreateEventW(NULL, FALSE, FALSE, NULL);
209
210     pPinImpl->rtStart = 0;
211     pPinImpl->rtCurrent = 0;
212     pPinImpl->rtStop = ((LONGLONG)0x7fffffff << 32) | 0xffffffff;
213     pPinImpl->dRate = 1.0;
214     pPinImpl->state = Req_Die;
215     pPinImpl->fnCustomRequest = pCustomRequest;
216     pPinImpl->stop_playback = 1;
217
218     InitializeCriticalSection(&pPinImpl->thread_lock);
219     pPinImpl->thread_lock.DebugInfo->Spare[0] = (DWORD_PTR)( __FILE__ ": PullPin.thread_lock");
220
221     return S_OK;
222 }
223
224 HRESULT PullPin_Construct(const IPinVtbl *PullPin_Vtbl, const PIN_INFO * pPinInfo, SAMPLEPROC_PULL pSampleProc, LPVOID pUserData, QUERYACCEPTPROC pQueryAccept, CLEANUPPROC pCleanUp, REQUESTPROC pCustomRequest, STOPPROCESSPROC pDone, LPCRITICAL_SECTION pCritSec, IPin ** ppPin)
225 {
226     PullPin * pPinImpl;
227
228     *ppPin = NULL;
229
230     if (pPinInfo->dir != PINDIR_INPUT)
231     {
232         ERR("Pin direction(%x) != PINDIR_INPUT\n", pPinInfo->dir);
233         return E_INVALIDARG;
234     }
235
236     pPinImpl = CoTaskMemAlloc(sizeof(*pPinImpl));
237
238     if (!pPinImpl)
239         return E_OUTOFMEMORY;
240
241     if (SUCCEEDED(PullPin_Init(PullPin_Vtbl, pPinInfo, pSampleProc, pUserData, pQueryAccept, pCleanUp, pCustomRequest, pDone, pCritSec, pPinImpl)))
242     {
243         *ppPin = &pPinImpl->pin.IPin_iface;
244         return S_OK;
245     }
246
247     CoTaskMemFree(pPinImpl);
248     return E_FAIL;
249 }
250
251 static HRESULT PullPin_InitProcessing(PullPin * This);
252
253 HRESULT WINAPI PullPin_ReceiveConnection(IPin * iface, IPin * pReceivePin, const AM_MEDIA_TYPE * pmt)
254 {
255     PIN_DIRECTION pindirReceive;
256     HRESULT hr = S_OK;
257     PullPin *This = impl_PullPin_from_IPin(iface);
258
259     TRACE("(%p/%p)->(%p, %p)\n", This, iface, pReceivePin, pmt);
260     dump_AM_MEDIA_TYPE(pmt);
261
262     EnterCriticalSection(This->pin.pCritSec);
263     if (!This->pin.pConnectedTo)
264     {
265         ALLOCATOR_PROPERTIES props;
266
267         props.cBuffers = 3;
268         props.cbBuffer = 64 * 1024; /* 64k bytes */
269         props.cbAlign = 1;
270         props.cbPrefix = 0;
271
272         if (SUCCEEDED(hr) && (This->fnQueryAccept(This->pUserData, pmt) != S_OK))
273             hr = VFW_E_TYPE_NOT_ACCEPTED; /* FIXME: shouldn't we just map common errors onto 
274                                            * VFW_E_TYPE_NOT_ACCEPTED and pass the value on otherwise? */
275
276         if (SUCCEEDED(hr))
277         {
278             IPin_QueryDirection(pReceivePin, &pindirReceive);
279
280             if (pindirReceive != PINDIR_OUTPUT)
281             {
282                 ERR("Can't connect from non-output pin\n");
283                 hr = VFW_E_INVALID_DIRECTION;
284             }
285         }
286
287         This->pReader = NULL;
288         This->pAlloc = NULL;
289         This->prefAlloc = NULL;
290         if (SUCCEEDED(hr))
291         {
292             hr = IPin_QueryInterface(pReceivePin, &IID_IAsyncReader, (LPVOID *)&This->pReader);
293         }
294
295         if (SUCCEEDED(hr) && This->fnPreConnect)
296         {
297             hr = This->fnPreConnect(iface, pReceivePin, &props);
298         }
299
300         /*
301          * Some custom filters (such as the one used by Fallout 3
302          * and Fallout: New Vegas) expect to be passed a non-NULL
303          * preferred allocator.
304          */
305         if (SUCCEEDED(hr))
306         {
307             hr = StdMemAllocator_create(NULL, (LPVOID *) &This->prefAlloc);
308         }
309
310         if (SUCCEEDED(hr))
311         {
312             hr = IAsyncReader_RequestAllocator(This->pReader, This->prefAlloc, &props, &This->pAlloc);
313         }
314
315         if (SUCCEEDED(hr))
316         {
317             CopyMediaType(&This->pin.mtCurrent, pmt);
318             This->pin.pConnectedTo = pReceivePin;
319             IPin_AddRef(pReceivePin);
320             hr = IMemAllocator_Commit(This->pAlloc);
321         }
322
323         if (SUCCEEDED(hr))
324             hr = PullPin_InitProcessing(This);
325
326         if (FAILED(hr))
327         {
328              if (This->pReader)
329                  IAsyncReader_Release(This->pReader);
330              This->pReader = NULL;
331              if (This->prefAlloc)
332                  IMemAllocator_Release(This->prefAlloc);
333              This->prefAlloc = NULL;
334              if (This->pAlloc)
335                  IMemAllocator_Release(This->pAlloc);
336              This->pAlloc = NULL;
337         }
338     }
339     else
340         hr = VFW_E_ALREADY_CONNECTED;
341     LeaveCriticalSection(This->pin.pCritSec);
342     return hr;
343 }
344
345 HRESULT WINAPI PullPin_QueryInterface(IPin * iface, REFIID riid, LPVOID * ppv)
346 {
347     PullPin *This = impl_PullPin_from_IPin(iface);
348
349     TRACE("(%p/%p)->(%s, %p)\n", This, iface, qzdebugstr_guid(riid), ppv);
350
351     *ppv = NULL;
352
353     if (IsEqualIID(riid, &IID_IUnknown))
354         *ppv = iface;
355     else if (IsEqualIID(riid, &IID_IPin))
356         *ppv = iface;
357     else if (IsEqualIID(riid, &IID_IMediaSeeking) ||
358              IsEqualIID(riid, &IID_IQualityControl))
359     {
360         return IBaseFilter_QueryInterface(This->pin.pinInfo.pFilter, riid, ppv);
361     }
362
363     if (*ppv)
364     {
365         IUnknown_AddRef((IUnknown *)(*ppv));
366         return S_OK;
367     }
368
369     FIXME("No interface for %s!\n", qzdebugstr_guid(riid));
370
371     return E_NOINTERFACE;
372 }
373
374 ULONG WINAPI PullPin_Release(IPin *iface)
375 {
376     PullPin *This = impl_PullPin_from_IPin(iface);
377     ULONG refCount = InterlockedDecrement(&This->pin.refCount);
378
379     TRACE("(%p)->() Release from %d\n", This, refCount + 1);
380
381     if (!refCount)
382     {
383         WaitForSingleObject(This->hEventStateChanged, INFINITE);
384         assert(!This->hThread);
385
386         if(This->prefAlloc)
387             IMemAllocator_Release(This->prefAlloc);
388         if(This->pAlloc)
389             IMemAllocator_Release(This->pAlloc);
390         if(This->pReader)
391             IAsyncReader_Release(This->pReader);
392         CloseHandle(This->thread_sleepy);
393         CloseHandle(This->hEventStateChanged);
394         This->thread_lock.DebugInfo->Spare[0] = 0;
395         DeleteCriticalSection(&This->thread_lock);
396         CoTaskMemFree(This);
397         return 0;
398     }
399     return refCount;
400 }
401
402 static void PullPin_Flush(PullPin *This)
403 {
404     IMediaSample *pSample;
405     TRACE("Flushing!\n");
406
407     if (This->pReader)
408     {
409         /* Do not allow state to change while flushing */
410         EnterCriticalSection(This->pin.pCritSec);
411
412         /* Flush outstanding samples */
413         IAsyncReader_BeginFlush(This->pReader);
414
415         for (;;)
416         {
417             DWORD_PTR dwUser;
418
419             IAsyncReader_WaitForNext(This->pReader, 0, &pSample, &dwUser);
420
421             if (!pSample)
422                 break;
423
424             assert(!IMediaSample_GetActualDataLength(pSample));
425
426             IMediaSample_Release(pSample);
427         }
428
429         IAsyncReader_EndFlush(This->pReader);
430
431         LeaveCriticalSection(This->pin.pCritSec);
432     }
433 }
434
435 static void PullPin_Thread_Process(PullPin *This)
436 {
437     HRESULT hr;
438     IMediaSample * pSample = NULL;
439     ALLOCATOR_PROPERTIES allocProps;
440
441     hr = IMemAllocator_GetProperties(This->pAlloc, &allocProps);
442
443     This->cbAlign = allocProps.cbAlign;
444
445     if (This->rtCurrent < This->rtStart)
446         This->rtCurrent = MEDIATIME_FROM_BYTES(ALIGNDOWN(BYTES_FROM_MEDIATIME(This->rtStart), This->cbAlign));
447
448     TRACE("Start\n");
449
450     if (This->rtCurrent >= This->rtStop)
451     {
452         IPin_EndOfStream(&This->pin.IPin_iface);
453         return;
454     }
455
456     /* There is no sample in our buffer */
457     hr = This->fnCustomRequest(This->pUserData);
458
459     if (FAILED(hr))
460         ERR("Request error: %x\n", hr);
461
462     EnterCriticalSection(This->pin.pCritSec);
463     SetEvent(This->hEventStateChanged);
464     LeaveCriticalSection(This->pin.pCritSec);
465
466     if (SUCCEEDED(hr))
467     do
468     {
469         DWORD_PTR dwUser;
470
471         TRACE("Process sample\n");
472
473         pSample = NULL;
474         hr = IAsyncReader_WaitForNext(This->pReader, 10000, &pSample, &dwUser);
475
476         /* Return an empty sample on error to the implementation in case it does custom parsing, so it knows it's gone */
477         if (SUCCEEDED(hr))
478         {
479             hr = This->fnSampleProc(This->pUserData, pSample, dwUser);
480         }
481         else
482         {
483             if (hr == VFW_E_TIMEOUT)
484             {
485                 if (pSample != NULL)
486                     WARN("Non-NULL sample returned with VFW_E_TIMEOUT.\n");
487                 hr = S_OK;
488             }
489             /* FIXME: Errors are not well handled yet! */
490             else
491                 ERR("Processing error: %x\n", hr);
492         }
493
494         if (pSample)
495         {
496             IMediaSample_Release(pSample);
497             pSample = NULL;
498         }
499     } while (This->rtCurrent < This->rtStop && hr == S_OK && !This->stop_playback);
500
501     /*
502      * Sample was rejected, and we are asked to terminate.  When there is more than one buffer
503      * it is possible for a filter to have several queued samples, making it necessary to
504      * release all of these pending samples.
505      */
506     if (This->stop_playback || FAILED(hr))
507     {
508         DWORD_PTR dwUser;
509
510         do
511         {
512             if (pSample)
513                 IMediaSample_Release(pSample);
514             pSample = NULL;
515             IAsyncReader_WaitForNext(This->pReader, 0, &pSample, &dwUser);
516         } while(pSample);
517     }
518
519     /* Can't reset state to Sleepy here because that might race, instead PauseProcessing will do that for us
520      * Flush remaining samples
521      */
522     if (This->fnDone)
523         This->fnDone(This->pUserData);
524
525     TRACE("End: %08x, %d\n", hr, This->stop_playback);
526 }
527
528 static void PullPin_Thread_Pause(PullPin *This)
529 {
530     PullPin_Flush(This);
531
532     EnterCriticalSection(This->pin.pCritSec);
533     This->state = Req_Sleepy;
534     SetEvent(This->hEventStateChanged);
535     LeaveCriticalSection(This->pin.pCritSec);
536 }
537
538 static void  PullPin_Thread_Stop(PullPin *This)
539 {
540     TRACE("(%p)->()\n", This);
541
542     EnterCriticalSection(This->pin.pCritSec);
543     {
544         CloseHandle(This->hThread);
545         This->hThread = NULL;
546         SetEvent(This->hEventStateChanged);
547     }
548     LeaveCriticalSection(This->pin.pCritSec);
549
550     IBaseFilter_Release(This->pin.pinInfo.pFilter);
551
552     CoUninitialize();
553     ExitThread(0);
554 }
555
556 static DWORD WINAPI PullPin_Thread_Main(LPVOID pv)
557 {
558     PullPin *This = pv;
559     CoInitializeEx(NULL, COINIT_MULTITHREADED);
560
561     PullPin_Flush(This);
562
563     for (;;)
564     {
565         WaitForSingleObject(This->thread_sleepy, INFINITE);
566
567         TRACE("State: %d\n", This->state);
568
569         switch (This->state)
570         {
571         case Req_Die: PullPin_Thread_Stop(This); break;
572         case Req_Run: PullPin_Thread_Process(This); break;
573         case Req_Pause: PullPin_Thread_Pause(This); break;
574         case Req_Sleepy: ERR("Should not be signalled with SLEEPY!\n"); break;
575         default: ERR("Unknown state request: %d\n", This->state); break;
576         }
577     }
578     return 0;
579 }
580
581 static HRESULT PullPin_InitProcessing(PullPin * This)
582 {
583     HRESULT hr = S_OK;
584
585     TRACE("(%p)->()\n", This);
586
587     /* if we are connected */
588     if (This->pAlloc)
589     {
590         DWORD dwThreadId;
591
592         WaitForSingleObject(This->hEventStateChanged, INFINITE);
593         EnterCriticalSection(This->pin.pCritSec);
594
595         assert(!This->hThread);
596         assert(This->state == Req_Die);
597         assert(This->stop_playback);
598         assert(WaitForSingleObject(This->thread_sleepy, 0) == WAIT_TIMEOUT);
599         This->state = Req_Sleepy;
600
601         /* AddRef the filter to make sure it and it's pins will be around
602          * as long as the thread */
603         IBaseFilter_AddRef(This->pin.pinInfo.pFilter);
604
605
606         This->hThread = CreateThread(NULL, 0, PullPin_Thread_Main, This, 0, &dwThreadId);
607         if (!This->hThread)
608         {
609             hr = HRESULT_FROM_WIN32(GetLastError());
610             IBaseFilter_Release(This->pin.pinInfo.pFilter);
611         }
612
613         if (SUCCEEDED(hr))
614         {
615             SetEvent(This->hEventStateChanged);
616             /* If assert fails, that means a command was not processed before the thread previously terminated */
617         }
618         LeaveCriticalSection(This->pin.pCritSec);
619     }
620
621     TRACE(" -- %x\n", hr);
622
623     return hr;
624 }
625
626 HRESULT PullPin_StartProcessing(PullPin * This)
627 {
628     /* if we are connected */
629     TRACE("(%p)->()\n", This);
630     if(This->pAlloc)
631     {
632         assert(This->hThread);
633
634         PullPin_WaitForStateChange(This, INFINITE);
635
636         assert(This->state == Req_Sleepy);
637
638         /* Wake up! */
639         assert(WaitForSingleObject(This->thread_sleepy, 0) == WAIT_TIMEOUT);
640         This->state = Req_Run;
641         This->stop_playback = 0;
642         ResetEvent(This->hEventStateChanged);
643         SetEvent(This->thread_sleepy);
644     }
645
646     return S_OK;
647 }
648
649 HRESULT PullPin_PauseProcessing(PullPin * This)
650 {
651     /* if we are connected */
652     TRACE("(%p)->()\n", This);
653     if(This->pAlloc)
654     {
655         assert(This->hThread);
656
657         PullPin_WaitForStateChange(This, INFINITE);
658
659         EnterCriticalSection(This->pin.pCritSec);
660
661         assert(!This->stop_playback);
662         assert(This->state == Req_Run|| This->state == Req_Sleepy);
663
664         assert(WaitForSingleObject(This->thread_sleepy, 0) == WAIT_TIMEOUT);
665
666         This->state = Req_Pause;
667         This->stop_playback = 1;
668         ResetEvent(This->hEventStateChanged);
669         SetEvent(This->thread_sleepy);
670
671         /* Release any outstanding samples */
672         if (This->pReader)
673         {
674             IMediaSample *pSample;
675             DWORD_PTR dwUser;
676
677             do
678             {
679                 pSample = NULL;
680                 IAsyncReader_WaitForNext(This->pReader, 0, &pSample, &dwUser);
681                 if (pSample)
682                     IMediaSample_Release(pSample);
683             } while(pSample);
684         }
685
686         LeaveCriticalSection(This->pin.pCritSec);
687     }
688
689     return S_OK;
690 }
691
692 static HRESULT PullPin_StopProcessing(PullPin * This)
693 {
694     TRACE("(%p)->()\n", This);
695
696     /* if we are alive */
697     assert(This->hThread);
698
699     PullPin_WaitForStateChange(This, INFINITE);
700
701     assert(This->state == Req_Pause || This->state == Req_Sleepy);
702
703     This->stop_playback = 1;
704     This->state = Req_Die;
705     assert(WaitForSingleObject(This->thread_sleepy, 0) == WAIT_TIMEOUT);
706     ResetEvent(This->hEventStateChanged);
707     SetEvent(This->thread_sleepy);
708     return S_OK;
709 }
710
711 HRESULT PullPin_WaitForStateChange(PullPin * This, DWORD dwMilliseconds)
712 {
713     if (WaitForSingleObject(This->hEventStateChanged, dwMilliseconds) == WAIT_TIMEOUT)
714         return S_FALSE;
715     return S_OK;
716 }
717
718 HRESULT WINAPI PullPin_QueryAccept(IPin * iface, const AM_MEDIA_TYPE * pmt)
719 {
720     PullPin *This = impl_PullPin_from_IPin(iface);
721
722     TRACE("(%p/%p)->(%p)\n", This, iface, pmt);
723
724     return (This->fnQueryAccept(This->pUserData, pmt) == S_OK ? S_OK : S_FALSE);
725 }
726
727 HRESULT WINAPI PullPin_EndOfStream(IPin * iface)
728 {
729     PullPin *This = impl_PullPin_from_IPin(iface);
730     HRESULT hr = S_FALSE;
731
732     TRACE("(%p)->()\n", iface);
733
734     EnterCriticalSection(This->pin.pCritSec);
735     hr = SendFurther( iface, deliver_endofstream, NULL, NULL );
736     SetEvent(This->hEventStateChanged);
737     LeaveCriticalSection(This->pin.pCritSec);
738
739     return hr;
740 }
741
742 HRESULT WINAPI PullPin_BeginFlush(IPin * iface)
743 {
744     PullPin *This = impl_PullPin_from_IPin(iface);
745     TRACE("(%p)->()\n", This);
746
747     EnterCriticalSection(This->pin.pCritSec);
748     {
749         SendFurther( iface, deliver_beginflush, NULL, NULL );
750     }
751     LeaveCriticalSection(This->pin.pCritSec);
752
753     EnterCriticalSection(&This->thread_lock);
754     {
755         if (This->pReader)
756             IAsyncReader_BeginFlush(This->pReader);
757         PullPin_WaitForStateChange(This, INFINITE);
758
759         if (This->hThread && This->state == Req_Run)
760         {
761             PullPin_PauseProcessing(This);
762             PullPin_WaitForStateChange(This, INFINITE);
763         }
764     }
765     LeaveCriticalSection(&This->thread_lock);
766
767     EnterCriticalSection(This->pin.pCritSec);
768     {
769         This->fnCleanProc(This->pUserData);
770     }
771     LeaveCriticalSection(This->pin.pCritSec);
772
773     return S_OK;
774 }
775
776 HRESULT WINAPI PullPin_EndFlush(IPin * iface)
777 {
778     PullPin *This = impl_PullPin_from_IPin(iface);
779
780     TRACE("(%p)->()\n", iface);
781
782     /* Send further first: Else a race condition might terminate processing early */
783     EnterCriticalSection(This->pin.pCritSec);
784     SendFurther( iface, deliver_endflush, NULL, NULL );
785     LeaveCriticalSection(This->pin.pCritSec);
786
787     EnterCriticalSection(&This->thread_lock);
788     {
789         FILTER_STATE state;
790
791         if (This->pReader)
792             IAsyncReader_EndFlush(This->pReader);
793
794         IBaseFilter_GetState(This->pin.pinInfo.pFilter, INFINITE, &state);
795
796         if (state != State_Stopped)
797             PullPin_StartProcessing(This);
798
799         PullPin_WaitForStateChange(This, INFINITE);
800     }
801     LeaveCriticalSection(&This->thread_lock);
802
803     return S_OK;
804 }
805
806 HRESULT WINAPI PullPin_Disconnect(IPin *iface)
807 {
808     HRESULT hr;
809     PullPin *This = impl_PullPin_from_IPin(iface);
810
811     TRACE("()\n");
812
813     EnterCriticalSection(This->pin.pCritSec);
814     {
815         if (FAILED(hr = IMemAllocator_Decommit(This->pAlloc)))
816             ERR("Allocator decommit failed with error %x. Possible memory leak\n", hr);
817
818         if (This->pin.pConnectedTo)
819         {
820             IPin_Release(This->pin.pConnectedTo);
821             This->pin.pConnectedTo = NULL;
822             PullPin_StopProcessing(This);
823
824             FreeMediaType(&This->pin.mtCurrent);
825             ZeroMemory(&This->pin.mtCurrent, sizeof(This->pin.mtCurrent));
826             hr = S_OK;
827         }
828         else
829             hr = S_FALSE;
830     }
831     LeaveCriticalSection(This->pin.pCritSec);
832
833     return hr;
834 }
835
836 HRESULT WINAPI PullPin_NewSegment(IPin * iface, REFERENCE_TIME tStart, REFERENCE_TIME tStop, double dRate)
837 {
838     newsegmentargs args;
839     FIXME("(%p)->(%s, %s, %g) stub\n", iface, wine_dbgstr_longlong(tStart), wine_dbgstr_longlong(tStop), dRate);
840
841     args.tStart = tStart;
842     args.tStop = tStop;
843     args.rate = dRate;
844
845     return SendFurther( iface, deliver_newsegment, &args, NULL );
846 }
847
848 static const IPinVtbl PullPin_Vtbl = 
849 {
850     PullPin_QueryInterface,
851     BasePinImpl_AddRef,
852     PullPin_Release,
853     BaseInputPinImpl_Connect,
854     PullPin_ReceiveConnection,
855     PullPin_Disconnect,
856     BasePinImpl_ConnectedTo,
857     BasePinImpl_ConnectionMediaType,
858     BasePinImpl_QueryPinInfo,
859     BasePinImpl_QueryDirection,
860     BasePinImpl_QueryId,
861     PullPin_QueryAccept,
862     BasePinImpl_EnumMediaTypes,
863     BasePinImpl_QueryInternalConnections,
864     PullPin_EndOfStream,
865     PullPin_BeginFlush,
866     PullPin_EndFlush,
867     PullPin_NewSegment
868 };