Added parser template and made AVISplitter use it.
[wine] / dlls / ole32 / rpc.c
1 /*
2  *      (Local) RPC Stuff
3  *
4  * Copyright 2002  Marcus Meissner
5  * Copyright 2005  Mike Hearn, Rob Shearman for CodeWeavers
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this library; if not, write to the Free Software
19  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
20  */
21
22 #include "config.h"
23
24 #include <stdlib.h>
25 #include <stdarg.h>
26 #include <stdio.h>
27 #include <string.h>
28 #include <assert.h>
29
30 #define COBJMACROS
31 #define NONAMELESSUNION
32 #define NONAMELESSSTRUCT
33
34 #include "windef.h"
35 #include "winbase.h"
36 #include "winuser.h"
37 #include "winsvc.h"
38 #include "objbase.h"
39 #include "ole2.h"
40 #include "rpc.h"
41 #include "winerror.h"
42 #include "winreg.h"
43 #include "wtypes.h"
44 #include "wine/unicode.h"
45
46 #include "compobj_private.h"
47
48 #include "wine/debug.h"
49
50 WINE_DEFAULT_DEBUG_CHANNEL(ole);
51
52 #define PIPEPREF "\\\\.\\pipe\\"
53 #define OLESTUBMGR PIPEPREF"WINE_OLE_StubMgr"
54
55 #define REQTYPE_REQUEST         0
56 #define REQTYPE_RESPONSE        1
57
58 struct request_header
59 {
60     DWORD               reqid;
61     IPID                ipid;
62     DWORD               iMethod;
63     DWORD               cbBuffer;
64 };
65
66 struct response_header
67 {
68     DWORD               reqid;
69     DWORD               cbBuffer;
70     DWORD               retval;
71 };
72
73
74 #define REQSTATE_START                  0
75 #define REQSTATE_REQ_QUEUED             1
76 #define REQSTATE_REQ_WAITING_FOR_REPLY  2
77 #define REQSTATE_REQ_GOT                3
78 #define REQSTATE_INVOKING               4
79 #define REQSTATE_RESP_QUEUED            5
80 #define REQSTATE_RESP_GOT               6
81 #define REQSTATE_DONE                   6
82
83 struct rpc
84 {
85     int                         state;
86     HANDLE                      hPipe;  /* temp copy of handle */
87     struct request_header       reqh;
88     struct response_header      resph;
89     LPBYTE                      Buffer;
90 };
91
92 /* fixme: this should have a lock */
93 static struct rpc **reqs = NULL;
94 static int nrofreqs = 0;
95
96 /* This pipe is _thread_ based, each thread which talks to a remote
97  * apartment (oxid) has its own pipe. The same structure is used both
98  * for outgoing and incoming RPCs.
99  */
100 struct pipe
101 {
102     wine_marshal_id     mid;    /* target mid */
103     DWORD               tid;    /* thread which owns this pipe */
104     HANDLE              hPipe;
105
106     int                 pending;
107     HANDLE              hThread;
108     CRITICAL_SECTION    crit;
109
110     APARTMENT          *apt;    /* apartment of the marshalling thread for the stub dispatch case */
111 };
112
113 typedef struct _PipeBuf {
114     IRpcChannelBufferVtbl *lpVtbl;
115     DWORD                  ref;
116
117     wine_marshal_id        mid;
118     HANDLE                 pipe;
119 } PipeBuf;
120
121
122
123 /* some helper functions */
124
125 static HRESULT WINAPI read_pipe(HANDLE hf, LPVOID ptr, DWORD size)
126 {
127     DWORD res;
128     
129     if (!ReadFile(hf,ptr,size,&res,NULL))
130     {
131         ERR("Failed to read from %p, le is %ld\n",hf,GetLastError());
132         return E_FAIL;
133     }
134     
135     if (res != size)
136     {
137         if (!res)
138         {
139            WARN("%p disconnected\n", hf);
140            return RPC_E_DISCONNECTED;
141         }
142         ERR("Read only %ld of %ld bytes from %p.\n",res,size,hf);
143         return E_FAIL;
144     }
145     return S_OK;
146 }
147
148 static HRESULT WINAPI
149 write_pipe(HANDLE hf, LPVOID ptr, DWORD size) {
150     DWORD res;
151     if (!WriteFile(hf,ptr,size,&res,NULL)) {
152         FIXME("Failed to write to %p, le is %ld\n",hf,GetLastError());
153         return E_FAIL;
154     }
155     if (res!=size) {
156         FIXME("Wrote only %ld of %ld bytes to %p.\n",res,size,hf);
157         return E_FAIL;
158     }
159     return S_OK;
160 }
161
162 static HANDLE dupe_handle(HANDLE h)
163 {
164     HANDLE h2;
165     
166     if (!DuplicateHandle(GetCurrentProcess(), h, GetCurrentProcess(),
167                          &h2, 0, FALSE, DUPLICATE_SAME_ACCESS))
168     {
169         ERR("could not duplicate handle: %ld\n", GetLastError());
170         return INVALID_HANDLE_VALUE;
171     }
172     
173     return h2;
174 }
175
176
177
178
179 static DWORD WINAPI client_dispatch_thread(LPVOID);
180
181 /* FIXME: this all needs to be made thread safe */
182 static HRESULT RPC_GetRequest(struct rpc **req)
183 {
184     static int reqid = 0;
185     int i;
186
187     /* try to reuse */
188     for (i = 0; i < nrofreqs; i++)
189     {
190         if (reqs[i]->state == REQSTATE_DONE)
191         {
192             TRACE("reusing reqs[%d]\n", i);
193             
194             reqs[i]->reqh.reqid  = reqid++;
195             reqs[i]->resph.reqid = reqs[i]->reqh.reqid;
196             reqs[i]->hPipe = INVALID_HANDLE_VALUE;
197             reqs[i]->state = REQSTATE_START;
198             *req = reqs[i];
199             return S_OK;
200         }
201     }
202     
203     TRACE("creating new struct rpc (request)\n");
204     
205     if (reqs)
206         reqs = (struct rpc**)HeapReAlloc(
207                         GetProcessHeap(),
208                         HEAP_ZERO_MEMORY,
209                         reqs,
210                         sizeof(struct rpc*)*(nrofreqs+1)
211                 );
212     else
213         reqs = (struct rpc**)HeapAlloc(
214                         GetProcessHeap(),
215                         HEAP_ZERO_MEMORY,
216                         sizeof(struct rpc*)
217                 );
218     
219     if (!reqs) return E_OUTOFMEMORY;
220     
221     reqs[nrofreqs] = (struct rpc*)HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,sizeof(struct rpc));
222     reqs[nrofreqs]->reqh.reqid = reqid++;
223     reqs[nrofreqs]->resph.reqid = reqs[nrofreqs]->reqh.reqid;
224     reqs[nrofreqs]->hPipe = INVALID_HANDLE_VALUE;
225     reqs[nrofreqs]->state = REQSTATE_START;
226     *req = reqs[nrofreqs];
227     
228     nrofreqs++;
229     
230     return S_OK;
231 }
232
233 static HRESULT WINAPI
234 PipeBuf_QueryInterface(
235     LPRPCCHANNELBUFFER iface,REFIID riid,LPVOID *ppv
236 ) {
237     *ppv = NULL;
238     if (IsEqualIID(riid,&IID_IRpcChannelBuffer) || IsEqualIID(riid,&IID_IUnknown)) {
239         *ppv = (LPVOID)iface;
240         IUnknown_AddRef(iface);
241         return S_OK;
242     }
243     return E_NOINTERFACE;
244 }
245
246 static ULONG WINAPI
247 PipeBuf_AddRef(LPRPCCHANNELBUFFER iface) {
248     PipeBuf *This = (PipeBuf *)iface;
249     return InterlockedIncrement(&This->ref);
250 }
251
252 static ULONG WINAPI
253 PipeBuf_Release(LPRPCCHANNELBUFFER iface) {
254     PipeBuf *This = (PipeBuf *)iface;
255     ULONG ref;
256
257     ref = InterlockedDecrement(&This->ref);
258     if (ref)
259         return ref;
260
261     CloseHandle(This->pipe);
262     HeapFree(GetProcessHeap(),0,This);
263     return 0;
264 }
265
266 static HRESULT WINAPI
267 PipeBuf_GetBuffer(LPRPCCHANNELBUFFER iface,RPCOLEMESSAGE* msg,REFIID riid)
268 {
269     TRACE("(%p,%s)\n",msg,debugstr_guid(riid));
270     /* probably reuses IID in real. */
271     if (msg->cbBuffer && (msg->Buffer == NULL))
272         msg->Buffer = HeapAlloc(GetProcessHeap(),0,msg->cbBuffer);
273     return S_OK;
274 }
275
276 static HRESULT
277 COM_InvokeAndRpcSend(struct rpc *req) {
278     IRpcStubBuffer     *stub;
279     RPCOLEMESSAGE       msg;
280     HRESULT             hres;
281     DWORD               reqtype;
282
283     if (!(stub = ipid_to_stubbuffer(&(req->reqh.ipid))))
284         /* ipid_to_stubbuffer will already have logged the error */
285         return RPC_E_DISCONNECTED;
286
287     IUnknown_AddRef(stub);
288     msg.Buffer          = req->Buffer;
289     msg.iMethod         = req->reqh.iMethod;
290     msg.cbBuffer        = req->reqh.cbBuffer;
291     msg.dataRepresentation = NDR_LOCAL_DATA_REPRESENTATION;
292     req->state          = REQSTATE_INVOKING;
293     req->resph.retval   = IRpcStubBuffer_Invoke(stub,&msg,NULL);
294     IUnknown_Release(stub);
295     req->Buffer         = msg.Buffer;
296     req->resph.cbBuffer = msg.cbBuffer;
297     reqtype             = REQTYPE_RESPONSE;
298     hres = write_pipe(req->hPipe,&reqtype,sizeof(reqtype));
299     if (hres) return hres;
300     hres = write_pipe(req->hPipe,&(req->resph),sizeof(req->resph));
301     if (hres) return hres;
302     hres = write_pipe(req->hPipe,req->Buffer,req->resph.cbBuffer);
303     if (hres) return hres;
304     req->state = REQSTATE_DONE;
305     return S_OK;
306 }
307
308 static HRESULT process_incoming_rpc(HANDLE pipe);
309
310 static HRESULT RPC_QueueRequestAndWait(struct rpc *req, HANDLE pipe)
311 {
312     int                 i;
313     struct rpc         *xreq;
314     HRESULT             hres;
315     DWORD               reqtype;
316
317     req->hPipe = pipe;
318     req->state = REQSTATE_REQ_WAITING_FOR_REPLY;
319     reqtype = REQTYPE_REQUEST;
320     hres = write_pipe(req->hPipe,&reqtype,sizeof(reqtype));
321     if (hres) return hres;
322     hres = write_pipe(req->hPipe,&(req->reqh),sizeof(req->reqh));
323     if (hres) return hres;
324     hres = write_pipe(req->hPipe,req->Buffer,req->reqh.cbBuffer);
325     if (hres) return hres;
326
327     /* This loop is about allowing re-entrancy. While waiting for the
328      * response to one RPC we may receive a request starting another. */
329     while (!hres) {
330         hres = process_incoming_rpc(pipe);
331         if (hres) break;
332
333         for (i=0;i<nrofreqs;i++) {
334             xreq = reqs[i];
335             if ((xreq->state==REQSTATE_REQ_GOT) && (xreq->hPipe==req->hPipe)) {
336                 hres = COM_InvokeAndRpcSend(xreq);
337                 if (hres) break;
338             }
339         }
340         if (req->state == REQSTATE_RESP_GOT)
341             return S_OK;
342     }
343     if (FAILED(hres))
344         WARN("-- 0x%08lx\n", hres);
345     return hres;
346 }
347
348 static HRESULT WINAPI
349 PipeBuf_SendReceive(LPRPCCHANNELBUFFER iface, RPCOLEMESSAGE *msg, ULONG *status)
350 {
351     PipeBuf     *This = (PipeBuf *)iface;
352     struct rpc  *req;
353     HRESULT      hres;
354
355     if (This->mid.oxid == COM_CurrentApt()->oxid) {
356         ERR("Need to call directly!\n");
357         return E_FAIL;
358     }
359
360     hres = RPC_GetRequest(&req);
361     if (hres) return hres;
362     req->reqh.iMethod   = msg->iMethod;
363     req->reqh.cbBuffer  = msg->cbBuffer;
364     req->reqh.ipid = This->mid.ipid;
365     req->Buffer = msg->Buffer;
366     TRACE(" ->    rpc   ->\n");
367     hres = RPC_QueueRequestAndWait(req, This->pipe);
368     TRACE(" <- response <-\n");
369     if (hres)
370     {
371         req->state = REQSTATE_DONE;
372         return hres;
373     }
374     
375     msg->cbBuffer = req->resph.cbBuffer;
376     msg->Buffer   = req->Buffer;
377     *status       = req->resph.retval;
378     req->state    = REQSTATE_DONE;
379     
380     return S_OK;
381 }
382
383
384 static HRESULT WINAPI
385 PipeBuf_FreeBuffer(LPRPCCHANNELBUFFER iface,RPCOLEMESSAGE* msg)
386 {
387     TRACE("(%p)\n",msg);
388     HeapFree(GetProcessHeap(), 0, msg->Buffer);
389     return S_OK;
390 }
391
392 static HRESULT WINAPI
393 PipeBuf_GetDestCtx(LPRPCCHANNELBUFFER iface,DWORD* pdwDestContext,void** ppvDestContext)
394 {
395     FIXME("(%p,%p), stub!\n",pdwDestContext,ppvDestContext);
396     return E_FAIL;
397 }
398
399 static HRESULT WINAPI
400 PipeBuf_IsConnected(LPRPCCHANNELBUFFER iface)
401 {
402     FIXME("(), stub!\n");
403     return S_OK;
404 }
405
406 static IRpcChannelBufferVtbl pipebufvt = {
407     PipeBuf_QueryInterface,
408     PipeBuf_AddRef,
409     PipeBuf_Release,
410     PipeBuf_GetBuffer,
411     PipeBuf_SendReceive,
412     PipeBuf_FreeBuffer,
413     PipeBuf_GetDestCtx,
414     PipeBuf_IsConnected
415 };
416
417 /* returns a pipebuf for proxies */
418 HRESULT PIPE_GetNewPipeBuf(wine_marshal_id *mid, IRpcChannelBuffer **pipebuf)
419 {
420     wine_marshal_id  ourid;
421     HANDLE           handle;
422     PipeBuf         *pbuf;
423     char             pipefn[200];
424
425     /* connect to the apartment listener thread */
426     sprintf(pipefn,OLESTUBMGR"_%08lx%08lx",(DWORD)(mid->oxid >> 32),(DWORD)mid->oxid);
427
428     TRACE("proxy pipe: connecting to apartment listener thread: %s\n", pipefn);
429
430     while (TRUE)
431     {
432         BOOL ret = WaitNamedPipeA(pipefn, NMPWAIT_USE_DEFAULT_WAIT);
433         if (!ret)
434         {
435             ERR("Could not open named pipe %s, error %ld\n", pipefn, GetLastError());
436             return RPC_E_SERVER_DIED;
437         }
438
439         handle = CreateFileA(pipefn, GENERIC_READ | GENERIC_WRITE,
440                              0, NULL, OPEN_EXISTING, 0, 0);
441
442         if (handle == INVALID_HANDLE_VALUE)
443         {
444             if (GetLastError() == ERROR_PIPE_BUSY) continue;
445             
446             ERR("Could not open named pipe %s, error %ld\n", pipefn, GetLastError());
447             return RPC_E_SERVER_DIED;
448         }
449
450         break;
451     }
452     
453     memset(&ourid,0,sizeof(ourid));
454     ourid.oxid = COM_CurrentApt()->oxid;
455     
456     TRACE("constructing new pipebuf for proxy\n");
457     
458     pbuf = (PipeBuf*)HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,sizeof(PipeBuf));
459     pbuf->lpVtbl = &pipebufvt;
460     pbuf->ref    = 1;
461     memcpy(&(pbuf->mid),mid,sizeof(*mid));
462     pbuf->pipe  = dupe_handle(handle);
463     
464     *pipebuf = (IRpcChannelBuffer*)pbuf;
465     
466     return S_OK;
467 }
468
469 static HRESULT
470 create_server(REFCLSID rclsid)
471 {
472     static const WCHAR  embedding[] = { ' ', '-','E','m','b','e','d','d','i','n','g',0 };
473     HKEY                key;
474     char                buf[200];
475     HRESULT             hres = E_UNEXPECTED;
476     char                xclsid[80];
477     WCHAR               exe[MAX_PATH+1];
478     DWORD               exelen = sizeof(exe);
479     WCHAR               command[MAX_PATH+sizeof(embedding)/sizeof(WCHAR)];
480     STARTUPINFOW        sinfo;
481     PROCESS_INFORMATION pinfo;
482
483     WINE_StringFromCLSID((LPCLSID)rclsid,xclsid);
484
485     sprintf(buf,"CLSID\\%s\\LocalServer32",xclsid);
486     hres = RegOpenKeyExA(HKEY_CLASSES_ROOT, buf, 0, KEY_READ, &key);
487
488     if (hres != ERROR_SUCCESS) {
489         WARN("CLSID %s not registered as LocalServer32\n", xclsid);
490         return REGDB_E_READREGDB; /* Probably */
491     }
492
493     memset(exe,0,sizeof(exe));
494     hres= RegQueryValueExW(key, NULL, NULL, NULL, (LPBYTE)exe, &exelen);
495     RegCloseKey(key);
496     if (hres) {
497         WARN("No default value for LocalServer32 key\n");
498         return REGDB_E_CLASSNOTREG; /* FIXME: check retval */
499     }
500
501     memset(&sinfo,0,sizeof(sinfo));
502     sinfo.cb = sizeof(sinfo);
503
504     /* EXE servers are started with the -Embedding switch. MSDN also claims /Embedding is used,
505      * 9x does -Embedding, perhaps an 9x/NT difference?
506      */
507
508     strcpyW(command, exe);
509     strcatW(command, embedding);
510
511     TRACE("activating local server '%s' for %s\n", debugstr_w(command), xclsid);
512
513     if (!CreateProcessW(exe, command, NULL, NULL, FALSE, 0, NULL, NULL, &sinfo, &pinfo)) {
514         WARN("failed to run local server %s\n", debugstr_w(exe));
515         return E_FAIL;
516     }
517
518     return S_OK;
519 }
520
521 /*
522  * start_local_service()  - start a service given its name and parameters
523  */
524 static DWORD
525 start_local_service(LPCWSTR name, DWORD num, LPWSTR *params)
526 {
527     SC_HANDLE handle, hsvc;
528     DWORD     r = ERROR_FUNCTION_FAILED;
529
530     TRACE("Starting service %s %ld params\n", debugstr_w(name), num);
531
532     handle = OpenSCManagerW(NULL, NULL, SC_MANAGER_ALL_ACCESS);
533     if (!handle)
534         return r;
535     hsvc = OpenServiceW(handle, name, SC_MANAGER_ALL_ACCESS);
536     if (hsvc)
537     {
538         if(StartServiceW(hsvc, num, (LPCWSTR*)params))
539             r = ERROR_SUCCESS;
540         else
541             r = GetLastError();
542         if (r == ERROR_SERVICE_ALREADY_RUNNING)
543             r = ERROR_SUCCESS;
544         CloseServiceHandle(hsvc);
545     }
546     CloseServiceHandle(handle);
547
548     TRACE("StartService returned error %ld (%s)\n", r, r?"ok":"failed");
549
550     return r;
551 }
552
553 /*
554  * create_local_service()  - start a COM server in a service
555  *
556  *   To start a Local Service, we read the AppID value under
557  * the class's CLSID key, then open the HKCR\\AppId key specified
558  * there and check for a LocalService value.
559  *
560  * Note:  Local Services are not supported under Windows 9x
561  */
562 static HRESULT
563 create_local_service(REFCLSID rclsid)
564 {
565     HRESULT hres = REGDB_E_READREGDB;
566     WCHAR buf[40], keyname[50];
567     static const WCHAR szClsId[] = { 'C','L','S','I','D','\\',0 };
568     static const WCHAR szAppId[] = { 'A','p','p','I','d',0 };
569     static const WCHAR szAppIdKey[] = { 'A','p','p','I','d','\\',0 };
570     static const WCHAR szLocalService[] = { 'L','o','c','a','l','S','e','r','v','i','c','e',0 };
571     static const WCHAR szServiceParams[] = {'S','e','r','v','i','c','e','P','a','r','a','m','s',0};
572     HKEY hkey;
573     LONG r;
574     DWORD type, sz;
575
576     TRACE("Attempting to start Local service for %s\n", debugstr_guid(rclsid));
577
578     /* read the AppID value under the class's key */
579     strcpyW(keyname,szClsId);
580     StringFromGUID2(rclsid,&keyname[6],39);
581     r = RegOpenKeyExW(HKEY_CLASSES_ROOT, keyname, 0, KEY_READ, &hkey);
582     if (r!=ERROR_SUCCESS)
583         return hres;
584     sz = sizeof buf;
585     r = RegQueryValueExW(hkey, szAppId, NULL, &type, (LPBYTE)buf, &sz);
586     RegCloseKey(hkey);
587     if (r!=ERROR_SUCCESS || type!=REG_SZ)
588         return hres;
589
590     /* read the LocalService and ServiceParameters values from the AppID key */
591     strcpyW(keyname, szAppIdKey);
592     strcatW(keyname, buf);
593     r = RegOpenKeyExW(HKEY_CLASSES_ROOT, keyname, 0, KEY_READ, &hkey);
594     if (r!=ERROR_SUCCESS)
595         return hres;
596     sz = sizeof buf;
597     r = RegQueryValueExW(hkey, szLocalService, NULL, &type, (LPBYTE)buf, &sz);
598     if (r==ERROR_SUCCESS && type==REG_SZ)
599     {
600         DWORD num_args = 0;
601         LPWSTR args[1] = { NULL };
602
603         /*
604          * FIXME: I'm not really sure how to deal with the service parameters.
605          *        I suspect that the string returned from RegQueryValueExW
606          *        should be split into a number of arguments by spaces.
607          *        It would make more sense if ServiceParams contained a
608          *        REG_MULTI_SZ here, but it's a REG_SZ for the services
609          *        that I'm interested in for the moment.
610          */
611         r = RegQueryValueExW(hkey, szServiceParams, NULL, &type, NULL, &sz);
612         if (r == ERROR_SUCCESS && type == REG_SZ && sz)
613         {
614             args[0] = HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,sz);
615             num_args++;
616             RegQueryValueExW(hkey, szServiceParams, NULL, &type, (LPBYTE)args[0], &sz);
617         }
618         r = start_local_service(buf, num_args, args);
619         if (r==ERROR_SUCCESS)
620             hres = S_OK;
621         HeapFree(GetProcessHeap(),0,args[0]);
622     }
623     RegCloseKey(hkey);
624         
625     return hres;
626 }
627
628 /* http://msdn.microsoft.com/library/en-us/dnmsj99/html/com0199.asp, Figure 4 */
629 HRESULT create_marshalled_proxy(REFCLSID rclsid, REFIID iid, LPVOID *ppv)
630 {
631     HRESULT        hres;
632     HANDLE         hPipe;
633     char           pipefn[200];
634     DWORD          res, bufferlen;
635     char           marshalbuffer[200];
636     IStream       *pStm;
637     LARGE_INTEGER  seekto;
638     ULARGE_INTEGER newpos;
639     int            tries = 0;
640     
641     static const int MAXTRIES = 10000;
642
643     TRACE("rclsid=%s, iid=%s\n", debugstr_guid(rclsid), debugstr_guid(iid));
644
645     strcpy(pipefn,PIPEPREF);
646     WINE_StringFromCLSID(rclsid,pipefn+strlen(PIPEPREF));
647
648     while (tries++ < MAXTRIES) {
649         TRACE("waiting for %s\n", pipefn);
650       
651         WaitNamedPipeA( pipefn, NMPWAIT_WAIT_FOREVER );
652         hPipe = CreateFileA(pipefn, GENERIC_READ | GENERIC_WRITE, 0, NULL, OPEN_EXISTING, 0, 0);
653         if (hPipe == INVALID_HANDLE_VALUE) {
654             if (tries == 1) {
655                 if ( (hres = create_server(rclsid)) &&
656                      (hres = create_local_service(rclsid)) )
657                     return hres;
658                 Sleep(1000);
659             } else {
660                 WARN("Connecting to %s, no response yet, retrying: le is %lx\n",pipefn,GetLastError());
661                 Sleep(1000);
662             }
663             continue;
664         }
665         bufferlen = 0;
666         if (!ReadFile(hPipe,marshalbuffer,sizeof(marshalbuffer),&bufferlen,NULL)) {
667             FIXME("Failed to read marshal id from classfactory of %s.\n",debugstr_guid(rclsid));
668             Sleep(1000);
669             continue;
670         }
671         TRACE("read marshal id from pipe\n");
672         CloseHandle(hPipe);
673         break;
674     }
675     
676     if (tries >= MAXTRIES)
677         return E_NOINTERFACE;
678     
679     hres = CreateStreamOnHGlobal(0,TRUE,&pStm);
680     if (hres) return hres;
681     hres = IStream_Write(pStm,marshalbuffer,bufferlen,&res);
682     if (hres) goto out;
683     seekto.u.LowPart = 0;seekto.u.HighPart = 0;
684     hres = IStream_Seek(pStm,seekto,SEEK_SET,&newpos);
685     
686     TRACE("unmarshalling classfactory\n");
687     hres = CoUnmarshalInterface(pStm,&IID_IClassFactory,ppv);
688 out:
689     IStream_Release(pStm);
690     return hres;
691 }
692
693
694 /* this reads an RPC from the given pipe and places it in the global reqs array  */
695 static HRESULT process_incoming_rpc(HANDLE pipe)
696 {
697     DWORD       reqtype;
698     HRESULT     hres = S_OK;
699
700     hres = read_pipe(pipe,&reqtype,sizeof(reqtype));
701     if (hres) return hres;
702
703     /* only received by servers */
704     if (reqtype == REQTYPE_REQUEST)
705     {
706         struct rpc *xreq;
707
708         RPC_GetRequest(&xreq);
709         
710         xreq->hPipe = pipe;
711         hres = read_pipe(pipe,&(xreq->reqh),sizeof(xreq->reqh));
712         if (hres)
713         {
714             xreq->state = REQSTATE_DONE;
715             return hres;
716         }
717         
718         xreq->resph.reqid = xreq->reqh.reqid;
719         xreq->Buffer = HeapAlloc(GetProcessHeap(),0, xreq->reqh.cbBuffer);
720         hres = read_pipe(pipe,xreq->Buffer,xreq->reqh.cbBuffer);
721         if (hres) goto end;
722
723         TRACE("received RPC for IPID %s\n", debugstr_guid(&xreq->reqh.ipid));
724         
725         xreq->state = REQSTATE_REQ_GOT;
726         goto end;
727     }
728     else if (reqtype == REQTYPE_RESPONSE)
729     {
730         struct response_header  resph;
731         int i;
732
733         hres = read_pipe(pipe,&resph,sizeof(resph));
734         if (hres) goto end;
735
736         TRACE("read RPC response\n");
737         
738         for (i = nrofreqs; i--;)
739         {
740             struct rpc *xreq = reqs[i];
741             
742             if (xreq->state != REQSTATE_REQ_WAITING_FOR_REPLY)
743                 continue;
744             
745             if (xreq->reqh.reqid == resph.reqid)
746             {
747                 memcpy(&(xreq->resph),&resph,sizeof(resph));
748
749                 if (xreq->Buffer)
750                     xreq->Buffer = HeapReAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,xreq->Buffer,xreq->resph.cbBuffer);
751                 else
752                     xreq->Buffer = HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,xreq->resph.cbBuffer);
753
754                 hres = read_pipe(pipe,xreq->Buffer,xreq->resph.cbBuffer);
755                 if (hres) goto end;
756
757                 TRACE("received response for reqid 0x%lx\n", xreq->reqh.reqid);
758                 
759                 xreq->state = REQSTATE_RESP_GOT;
760                 goto end;
761             }
762         }
763         
764         ERR("protocol error: did not find request for id %lx\n",resph.reqid);
765         hres = E_FAIL;
766         goto end;
767     }
768     
769     ERR("protocol error: unknown reqtype %ld\n",reqtype);
770     hres = E_FAIL;
771 end:
772     return hres;
773 }
774
775 struct stub_dispatch_params
776 {
777     struct apartment *apt;
778     HANDLE            pipe;
779 };
780
781 /* This thread listens on the given pipe for requests to any stub manager */
782 static DWORD WINAPI client_dispatch_thread(LPVOID param)
783 {
784     HANDLE              pipe = ((struct stub_dispatch_params *)param)->pipe;
785     struct apartment   *apt  = ((struct stub_dispatch_params *)param)->apt;
786     HRESULT             hres = S_OK;
787     HANDLE              shutdown_event = dupe_handle(apt->shutdown_event);
788     
789     HeapFree(GetProcessHeap(), 0, param);
790     
791     /* join marshalling apartment. fixme: this stuff is all very wrong, threading needs to work like native */
792     COM_CurrentInfo()->apt = apt;
793
794     while (TRUE)
795     {
796         int i;
797
798         TRACE("waiting for RPC on OXID: %08lx%08lx\n", (DWORD)(apt->oxid >> 32), (DWORD)(apt->oxid));
799         
800         /* read a new request into the global array, block if no requests have been sent */
801         hres = process_incoming_rpc(pipe);
802         if (hres) break;
803
804         /* do you expect me to talk? */
805         if (WaitForSingleObject(shutdown_event, 0) == WAIT_OBJECT_0)
806         {
807             /* no mr bond, i expect you to die! bwahaha */
808             CloseHandle(shutdown_event);
809             break;
810         }
811         
812         TRACE("received RPC on OXID: %08lx%08lx\n", (DWORD)(apt->oxid >> 32), (DWORD)(apt->oxid));        
813
814         /* now scan the array looking for the RPC just loaded */
815         for (i=nrofreqs;i--;)
816         {
817             struct rpc *req = reqs[i];
818             
819             if ((req->state == REQSTATE_REQ_GOT) && (req->hPipe == pipe))
820             {
821                 hres = COM_InvokeAndRpcSend(req);
822                 if (!hres) break;
823             }
824         }
825     }
826
827     TRACE("exiting with hres %lx\n",hres);
828
829     /* leave marshalling apartment. fixme: this stuff is all very wrong, threading needs to work like native */
830     COM_CurrentInfo()->apt = NULL;
831
832     DisconnectNamedPipe(pipe);
833     CloseHandle(pipe);
834     return 0;
835 }
836
837 struct apartment_listener_params
838 {
839     APARTMENT *apt;
840     HANDLE event;
841 };
842
843 /* This thread listens on a named pipe for each apartment that exports
844  * objects. It deals with incoming connection requests. Each time a
845  * client connects a separate thread is spawned for that particular
846  * connection.
847  *
848  * This architecture is different in native DCOM.
849  */
850 static DWORD WINAPI apartment_listener_thread(LPVOID p)
851 {
852     char pipefn[200];
853     HANDLE listenPipe, thread_handle;
854     OVERLAPPED overlapped;
855     HANDLE wait[2];
856     
857     struct apartment_listener_params * params = (struct apartment_listener_params *)p;
858     struct apartment *apt = params->apt;
859     HANDLE event = params->event;
860     HANDLE apt_shutdown_event = dupe_handle(apt->shutdown_event);
861     OXID   this_oxid = apt->oxid; /* copy here so we can print it when we shut down */
862
863     HeapFree(GetProcessHeap(), 0, params);
864
865     overlapped.hEvent = CreateEventA(NULL, TRUE, FALSE, NULL);
866     
867     /* we must join the marshalling threads apartment. we already have a ref here */
868     COM_CurrentInfo()->apt = apt;
869
870     sprintf(pipefn,OLESTUBMGR"_%08lx%08lx", (DWORD)(apt->oxid >> 32), (DWORD)(apt->oxid));
871     TRACE("Apartment listener thread starting on (%s)\n",pipefn);
872
873     while (TRUE)
874     {
875         struct stub_dispatch_params *params;
876         DWORD res;
877
878         listenPipe = CreateNamedPipeA(
879             pipefn,
880             PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
881             PIPE_TYPE_BYTE|PIPE_WAIT,
882             PIPE_UNLIMITED_INSTANCES,
883             4096,
884             4096,
885             500 /* 0.5 seconds */,
886             NULL
887         );
888
889         /* tell function that started this thread that we have attempted to created the
890          * named pipe. */
891         if (event) {
892             SetEvent(event);
893             event = NULL;
894         }
895
896         if (listenPipe == INVALID_HANDLE_VALUE) {
897             FIXME("pipe creation failed for %s, error %ld\n",pipefn,GetLastError());
898             break; /* permanent failure, so quit stubmgr thread */
899         }
900
901         TRACE("waiting for a client ...\n");
902         
903         /* an already connected pipe is not an error */
904         if (!ConnectNamedPipe(listenPipe, &overlapped))
905         {
906             DWORD le = GetLastError();
907             
908             if ((le != ERROR_IO_PENDING) && (le != ERROR_PIPE_CONNECTED))
909             {
910                 ERR("Failure during ConnectNamedPipe %ld!\n",GetLastError());
911                 CloseHandle(listenPipe);
912                 continue;
913             }
914         }
915
916         /* wait for action */
917         wait[0] = apt_shutdown_event;
918         wait[1] = overlapped.hEvent;
919         res = WaitForMultipleObjectsEx(2, wait, FALSE, INFINITE, FALSE);
920         if (res == WAIT_OBJECT_0) break;
921
922         ResetEvent(overlapped.hEvent);
923         
924         /* start the stub dispatch thread for this connection */
925         TRACE("starting stub dispatch thread for OXID %08lx%08lx\n", (DWORD)(apt->oxid >> 32), (DWORD)(apt->oxid));
926         params = HeapAlloc(GetProcessHeap(), 0, sizeof(struct stub_dispatch_params));
927         if (!params)
928         {
929             ERR("out of memory, dropping this client\n");
930             CloseHandle(listenPipe);
931             continue;
932         }
933         params->apt = apt;
934         params->pipe = listenPipe;
935         thread_handle = CreateThread(NULL, 0, &client_dispatch_thread, params, 0, NULL);
936         CloseHandle(thread_handle);
937     }
938
939     TRACE("shutting down: %s\n", wine_dbgstr_longlong(this_oxid));
940
941     /* we must leave the marshalling threads apartment. we don't have a ref here */
942     COM_CurrentInfo()->apt = NULL;
943
944     DisconnectNamedPipe(listenPipe);
945     CloseHandle(listenPipe);
946     CloseHandle(overlapped.hEvent);
947     CloseHandle(apt_shutdown_event);
948     return 0;
949 }
950
951 void start_apartment_listener_thread()
952 {
953     APARTMENT *apt = COM_CurrentApt();
954     
955     assert( apt );
956     
957     TRACE("apt->listenertid=%ld\n", apt->listenertid);
958
959     /* apt->listenertid is a hack which needs to die at some point, as
960      * it leaks information into the apartment structure. in fact,
961      * this thread isn't quite correct anyway as native RPC doesn't
962      * use a thread per apartment at all, instead the dispatch thread
963      * either enters the apartment to perform the RPC (for MTAs, RTAs)
964      * or does a context switch into it for STAs.
965      */
966     
967     if (!apt->listenertid)
968     {
969         HANDLE thread;
970         HANDLE event = CreateEventW(NULL, TRUE, FALSE, NULL);
971         struct apartment_listener_params * params = HeapAlloc(GetProcessHeap(), 0, sizeof(*params));
972
973         params->apt = apt;
974         params->event = event;
975         thread = CreateThread(NULL, 0, apartment_listener_thread, params, 0, &apt->listenertid);
976         CloseHandle(thread);
977         /* wait for pipe to be created before returning, otherwise we
978          * might try to use it and fail */
979         WaitForSingleObject(event, INFINITE);
980         CloseHandle(event);
981     }
982 }
983
984 struct local_server_params
985 {
986     CLSID clsid;
987     IStream *stream;
988 };
989
990 static DWORD WINAPI local_server_thread(LPVOID param)
991 {
992     struct local_server_params * lsp = (struct local_server_params *)param;
993     HANDLE              hPipe;
994     char                pipefn[200];
995     HRESULT             hres;
996     IStream             *pStm = lsp->stream;
997     STATSTG             ststg;
998     unsigned char       *buffer;
999     int                 buflen;
1000     LARGE_INTEGER       seekto;
1001     ULARGE_INTEGER      newpos;
1002     ULONG               res;
1003
1004     TRACE("Starting threader for %s.\n",debugstr_guid(&lsp->clsid));
1005
1006     strcpy(pipefn,PIPEPREF);
1007     WINE_StringFromCLSID(&lsp->clsid,pipefn+strlen(PIPEPREF));
1008
1009     HeapFree(GetProcessHeap(), 0, lsp);
1010
1011     hPipe = CreateNamedPipeA( pipefn, PIPE_ACCESS_DUPLEX,
1012                               PIPE_TYPE_BYTE|PIPE_WAIT, PIPE_UNLIMITED_INSTANCES,
1013                               4096, 4096, 500 /* 0.5 second timeout */, NULL );
1014     
1015     if (hPipe == INVALID_HANDLE_VALUE)
1016     {
1017         FIXME("pipe creation failed for %s, le is %ld\n",pipefn,GetLastError());
1018         return 1;
1019     }
1020     
1021     while (1) {
1022         if (!ConnectNamedPipe(hPipe,NULL)) {
1023             ERR("Failure during ConnectNamedPipe %ld, ABORT!\n",GetLastError());
1024             break;
1025         }
1026
1027         TRACE("marshalling IClassFactory to client\n");
1028         
1029         hres = IStream_Stat(pStm,&ststg,0);
1030         if (hres) return hres;
1031
1032         buflen = ststg.cbSize.u.LowPart;
1033         buffer = HeapAlloc(GetProcessHeap(),0,buflen);
1034         seekto.u.LowPart = 0;
1035         seekto.u.HighPart = 0;
1036         hres = IStream_Seek(pStm,seekto,SEEK_SET,&newpos);
1037         if (hres) {
1038             FIXME("IStream_Seek failed, %lx\n",hres);
1039             return hres;
1040         }
1041         
1042         hres = IStream_Read(pStm,buffer,buflen,&res);
1043         if (hres) {
1044             FIXME("Stream Read failed, %lx\n",hres);
1045             return hres;
1046         }
1047         
1048         WriteFile(hPipe,buffer,buflen,&res,NULL);
1049         FlushFileBuffers(hPipe);
1050         DisconnectNamedPipe(hPipe);
1051
1052         TRACE("done marshalling IClassFactory\n");
1053     }
1054     CloseHandle(hPipe);
1055     IStream_Release(pStm);
1056     return 0;
1057 }
1058
1059 void RPC_StartLocalServer(REFCLSID clsid, IStream *stream)
1060 {
1061     DWORD tid;
1062     HANDLE thread;
1063     struct local_server_params *lsp = HeapAlloc(GetProcessHeap(), 0, sizeof(*lsp));
1064
1065     lsp->clsid = *clsid;
1066     lsp->stream = stream;
1067
1068     thread = CreateThread(NULL, 0, local_server_thread, lsp, 0, &tid);
1069     CloseHandle(thread);
1070     /* FIXME: failure handling */
1071 }