Propagate apartments through the intermediate threads, make listener
[wine] / dlls / ole32 / rpc.c
1 /*
2  *      (Local) RPC Stuff
3  *
4  *  Copyright 2002  Marcus Meissner
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19  */
20
21 #include "config.h"
22
23 #include <stdlib.h>
24 #include <stdarg.h>
25 #include <stdio.h>
26 #include <string.h>
27 #include <assert.h>
28
29 #define COBJMACROS
30 #define NONAMELESSUNION
31 #define NONAMELESSSTRUCT
32
33 #include "windef.h"
34 #include "winbase.h"
35 #include "winuser.h"
36 #include "objbase.h"
37 #include "ole2.h"
38 #include "rpc.h"
39 #include "winerror.h"
40 #include "winreg.h"
41 #include "wtypes.h"
42 #include "wine/unicode.h"
43
44 #include "compobj_private.h"
45
46 #include "wine/debug.h"
47
48 WINE_DEFAULT_DEBUG_CHANNEL(ole);
49
50 #define REQTYPE_REQUEST         0
51 typedef struct _wine_rpc_request_header {
52     DWORD               reqid;
53     wine_marshal_id     mid;
54     DWORD               iMethod;
55     DWORD               cbBuffer;
56 } wine_rpc_request_header;
57
58 #define REQTYPE_RESPONSE        1
59 typedef struct _wine_rpc_response_header {
60     DWORD               reqid;
61     DWORD               cbBuffer;
62     DWORD               retval;
63 } wine_rpc_response_header;
64
65 /* used when shutting down a pipe, e.g. at the end of a process */
66 #define REQTYPE_DISCONNECT      2
67 typedef struct _wine_rpc_disconnect_header {
68   DWORD reqid;
69   wine_marshal_id mid; /* mid of stub to delete */
70 } wine_rpc_disconnect_header;
71
72
73 #define REQSTATE_START                  0
74 #define REQSTATE_REQ_QUEUED             1
75 #define REQSTATE_REQ_WAITING_FOR_REPLY  2
76 #define REQSTATE_REQ_GOT                3
77 #define REQSTATE_INVOKING               4
78 #define REQSTATE_RESP_QUEUED            5
79 #define REQSTATE_RESP_GOT               6
80 #define REQSTATE_DONE                   6
81
82 typedef struct _wine_rpc_request {
83     int                         state;
84     HANDLE                      hPipe;  /* temp copy of handle */
85     wine_rpc_request_header     reqh;
86     wine_rpc_response_header    resph;
87     LPBYTE                      Buffer;
88 } wine_rpc_request;
89
90 static wine_rpc_request **reqs = NULL;
91 static int nrofreqs = 0;
92
93 /* This pipe is _thread_ based, each thread which talks to a remote
94  * apartment (mid) has its own pipe. The same structure is used both
95  * for outgoing and incoming RPCs.
96  */
97 typedef struct _wine_pipe {
98     wine_marshal_id     mid;    /* target mid */
99     DWORD               tid;    /* thread which owns this pipe */
100     HANDLE              hPipe;
101
102     int                 pending;
103     HANDLE              hThread;
104     CRITICAL_SECTION    crit;
105
106     APARTMENT          *apt;    /* apartment of the marshalling thread for the stub dispatch case */
107 } wine_pipe;
108
109 static wine_pipe *pipes = NULL;
110 static int nrofpipes = 0;
111
112 typedef struct _PipeBuf {
113     IRpcChannelBufferVtbl       *lpVtbl;
114     DWORD                               ref;
115
116     wine_marshal_id                     mid;
117     wine_pipe                           *pipe;
118 } PipeBuf;
119
120 static HRESULT WINAPI
121 read_pipe(HANDLE hf, LPVOID ptr, DWORD size) {
122     DWORD res;
123     if (!ReadFile(hf,ptr,size,&res,NULL)) {
124         FIXME("Failed to read from %p, le is %lx\n",hf,GetLastError());
125         return E_FAIL;
126     }
127     if (res!=size) {
128         FIXME("Read only %ld of %ld bytes from %p.\n",res,size,hf);
129         return E_FAIL;
130     }
131     return S_OK;
132 }
133
134 static void
135 drs(LPCSTR where) {
136 #if 0
137     static int nrofreaders = 0;
138
139     int i, states[10];
140
141     memset(states,0,sizeof(states));
142     for (i=nrofreqs;i--;)
143         states[reqs[i]->state]++;
144     FIXME("%lx/%s/%d: rq %d, w %d, rg %d, rsq %d, rsg %d, d %d\n",
145             GetCurrentProcessId(),
146             where,
147             nrofreaders,
148             states[REQSTATE_REQ_QUEUED],
149             states[REQSTATE_REQ_WAITING_FOR_REPLY],
150             states[REQSTATE_REQ_GOT],
151             states[REQSTATE_RESP_QUEUED],
152             states[REQSTATE_RESP_GOT],
153             states[REQSTATE_DONE]
154     );
155 #endif
156
157     return ;
158 }
159
160 static HRESULT WINAPI
161 write_pipe(HANDLE hf, LPVOID ptr, DWORD size) {
162     DWORD res;
163     if (!WriteFile(hf,ptr,size,&res,NULL)) {
164         FIXME("Failed to write to %p, le is %lx\n",hf,GetLastError());
165         return E_FAIL;
166     }
167     if (res!=size) {
168         FIXME("Wrote only %ld of %ld bytes to %p.\n",res,size,hf);
169         return E_FAIL;
170     }
171     return S_OK;
172 }
173
174 static DWORD WINAPI stub_dispatch_thread(LPVOID);
175
176 static HRESULT
177 PIPE_RegisterPipe(wine_marshal_id *mid, HANDLE hPipe, BOOL startreader) {
178   int   i;
179   char  pipefn[100];
180   wine_pipe *new_pipes;
181
182   for (i=0;i<nrofpipes;i++)
183     if (pipes[i].mid.oxid==mid->oxid)
184       return S_OK;
185   if (pipes)
186     new_pipes=(wine_pipe*)HeapReAlloc(GetProcessHeap(),0,pipes,sizeof(pipes[0])*(nrofpipes+1));
187   else
188     new_pipes=(wine_pipe*)HeapAlloc(GetProcessHeap(),0,sizeof(pipes[0]));
189   if (!new_pipes) return E_OUTOFMEMORY;
190   pipes = new_pipes;
191   sprintf(pipefn,OLESTUBMGR"_%08lx%08lx",(DWORD)(mid->oxid >> 32),(DWORD)mid->oxid);
192   memcpy(&(pipes[nrofpipes].mid),mid,sizeof(*mid));
193   pipes[nrofpipes].hPipe        = hPipe;
194   pipes[nrofpipes].apt          = COM_CurrentApt();
195   assert( pipes[nrofpipes].apt );
196   InitializeCriticalSection(&(pipes[nrofpipes].crit));
197   nrofpipes++;
198   if (startreader) {
199       pipes[nrofpipes-1].hThread = CreateThread(NULL,0,stub_dispatch_thread,(LPVOID)(pipes+(nrofpipes-1)),0,&(pipes[nrofpipes-1].tid));
200   } else {
201       pipes[nrofpipes-1].tid     = GetCurrentThreadId();
202   }
203   return S_OK;
204 }
205
206 static HANDLE
207 PIPE_FindByMID(wine_marshal_id *mid) {
208   int i;
209   for (i=0;i<nrofpipes;i++)
210     if ((pipes[i].mid.oxid==mid->oxid) &&
211         (GetCurrentThreadId()==pipes[i].tid)
212     )
213       return pipes[i].hPipe;
214   return INVALID_HANDLE_VALUE;
215 }
216
217 static wine_pipe*
218 PIPE_GetFromMID(wine_marshal_id *mid) {
219   int i;
220   for (i=0;i<nrofpipes;i++) {
221     if ((pipes[i].mid.oxid==mid->oxid) &&
222         (GetCurrentThreadId()==pipes[i].tid)
223     )
224       return pipes+i;
225   }
226   return NULL;
227 }
228
229 static HRESULT
230 RPC_GetRequest(wine_rpc_request **req) {
231     static int reqid = 0xdeadbeef;
232     int i;
233
234     for (i=0;i<nrofreqs;i++) { /* try to reuse */
235         if (reqs[i]->state == REQSTATE_DONE) {
236             reqs[i]->reqh.reqid = reqid++;
237             reqs[i]->resph.reqid = reqs[i]->reqh.reqid;
238             reqs[i]->hPipe = INVALID_HANDLE_VALUE;
239             *req = reqs[i];
240             reqs[i]->state = REQSTATE_START;
241             return S_OK;
242         }
243     }
244     /* create new */
245     if (reqs)
246         reqs = (wine_rpc_request**)HeapReAlloc(
247                         GetProcessHeap(),
248                         HEAP_ZERO_MEMORY,
249                         reqs,
250                         sizeof(wine_rpc_request*)*(nrofreqs+1)
251                 );
252     else
253         reqs = (wine_rpc_request**)HeapAlloc(
254                         GetProcessHeap(),
255                         HEAP_ZERO_MEMORY,
256                         sizeof(wine_rpc_request*)
257                 );
258     if (!reqs)
259         return E_OUTOFMEMORY;
260     reqs[nrofreqs] = (wine_rpc_request*)HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,sizeof(wine_rpc_request));
261     reqs[nrofreqs]->reqh.reqid = reqid++;
262     reqs[nrofreqs]->resph.reqid = reqs[nrofreqs]->reqh.reqid;
263     reqs[nrofreqs]->hPipe = INVALID_HANDLE_VALUE;
264     *req = reqs[nrofreqs];
265     reqs[nrofreqs]->state = REQSTATE_START;
266     nrofreqs++;
267     return S_OK;
268 }
269
270 static void
271 RPC_FreeRequest(wine_rpc_request *req) {
272     req->state = REQSTATE_DONE; /* Just reuse slot. */
273     return;
274 }
275
276 static HRESULT WINAPI
277 PipeBuf_QueryInterface(
278     LPRPCCHANNELBUFFER iface,REFIID riid,LPVOID *ppv
279 ) {
280     *ppv = NULL;
281     if (IsEqualIID(riid,&IID_IRpcChannelBuffer) || IsEqualIID(riid,&IID_IUnknown)) {
282         *ppv = (LPVOID)iface;
283         IUnknown_AddRef(iface);
284         return S_OK;
285     }
286     return E_NOINTERFACE;
287 }
288
289 static ULONG WINAPI
290 PipeBuf_AddRef(LPRPCCHANNELBUFFER iface) {
291     PipeBuf *This = (PipeBuf *)iface;
292     return InterlockedIncrement(&This->ref);
293 }
294
295 static ULONG WINAPI
296 PipeBuf_Release(LPRPCCHANNELBUFFER iface) {
297     PipeBuf *This = (PipeBuf *)iface;
298     ULONG ref;
299     wine_rpc_disconnect_header header;
300     HANDLE pipe;
301     DWORD reqtype = REQTYPE_DISCONNECT;
302
303     ref = InterlockedDecrement(&This->ref);
304     if (ref)
305         return ref;
306
307     FIXME("Free all stuff\n");
308
309     memcpy(&header.mid, &This->mid, sizeof(wine_marshal_id));
310
311     pipe = PIPE_FindByMID(&This->mid);
312
313     write_pipe(pipe, &reqtype, sizeof(reqtype));
314     write_pipe(pipe, &header, sizeof(wine_rpc_disconnect_header));
315
316     TRACE("written disconnect packet\n");
317
318     HeapFree(GetProcessHeap(),0,This);
319     return 0;
320 }
321
322 static HRESULT WINAPI
323 PipeBuf_GetBuffer(
324     LPRPCCHANNELBUFFER iface,RPCOLEMESSAGE* msg,REFIID riid
325 ) {
326     /*PipeBuf *This = (PipeBuf *)iface;*/
327
328     TRACE("(%p,%s)\n",msg,debugstr_guid(riid));
329     /* probably reuses IID in real. */
330     if (msg->cbBuffer && (msg->Buffer == NULL))
331         msg->Buffer = HeapAlloc(GetProcessHeap(),0,msg->cbBuffer);
332     return S_OK;
333 }
334
335 static HRESULT
336 COM_InvokeAndRpcSend(wine_rpc_request *req) {
337     IRpcStubBuffer      *stub;
338     RPCOLEMESSAGE       msg;
339     HRESULT             hres;
340     DWORD               reqtype;
341
342     hres = MARSHAL_Find_Stub_Buffer(&(req->reqh.mid),&stub);
343     if (hres) {
344         ERR("Stub not found?\n");
345         return hres;
346     }
347     msg.Buffer          = req->Buffer;
348     msg.iMethod         = req->reqh.iMethod;
349     msg.cbBuffer        = req->reqh.cbBuffer;
350     msg.dataRepresentation = NDR_LOCAL_DATA_REPRESENTATION;
351     req->state          = REQSTATE_INVOKING;
352     req->resph.retval   = IRpcStubBuffer_Invoke(stub,&msg,NULL);
353     IUnknown_Release(stub);
354     req->Buffer         = msg.Buffer;
355     req->resph.cbBuffer = msg.cbBuffer;
356     reqtype             = REQTYPE_RESPONSE;
357     hres = write_pipe(req->hPipe,&reqtype,sizeof(reqtype));
358     if (hres) return hres;
359     hres = write_pipe(req->hPipe,&(req->resph),sizeof(req->resph));
360     if (hres) return hres;
361     hres = write_pipe(req->hPipe,req->Buffer,req->resph.cbBuffer);
362     if (hres) return hres;
363     req->state = REQSTATE_DONE;
364     drs("invoke");
365     return S_OK;
366 }
367
368 static HRESULT COM_RpcReceive(wine_pipe *xpipe);
369
370 static HRESULT
371 RPC_QueueRequestAndWait(wine_rpc_request *req) {
372     int                 i;
373     wine_rpc_request    *xreq;
374     HRESULT             hres;
375     DWORD               reqtype;
376     wine_pipe           *xpipe = PIPE_GetFromMID(&(req->reqh.mid));
377
378     if (!xpipe) {
379         FIXME("no pipe found.\n");
380         return E_POINTER;
381     }
382     req->hPipe = xpipe->hPipe;
383     req->state = REQSTATE_REQ_WAITING_FOR_REPLY;
384     reqtype = REQTYPE_REQUEST;
385     hres = write_pipe(req->hPipe,&reqtype,sizeof(reqtype));
386     if (hres) return hres;
387     hres = write_pipe(req->hPipe,&(req->reqh),sizeof(req->reqh));
388     if (hres) return hres;
389     hres = write_pipe(req->hPipe,req->Buffer,req->reqh.cbBuffer);
390     if (hres) return hres;
391
392     /* This loop is about allowing re-entrancy. While waiting for the
393      * response to one RPC we may receive a request starting another. */
394     while (!hres) {
395         hres = COM_RpcReceive(xpipe);
396         if (hres) break;
397
398         for (i=0;i<nrofreqs;i++) {
399             xreq = reqs[i];
400             if ((xreq->state==REQSTATE_REQ_GOT) && (xreq->hPipe==req->hPipe)) {
401                 hres = COM_InvokeAndRpcSend(xreq);
402                 if (hres) break;
403             }
404         }
405         if (req->state == REQSTATE_RESP_GOT)
406             return S_OK;
407     }
408     if (FAILED(hres))
409         WARN("-- 0x%08lx\n", hres);
410     return hres;
411 }
412
413 static HRESULT WINAPI
414 PipeBuf_SendReceive(
415     LPRPCCHANNELBUFFER iface,RPCOLEMESSAGE* msg,ULONG *status
416 ) {
417     PipeBuf *This = (PipeBuf *)iface;
418     wine_rpc_request    *req;
419     HRESULT             hres;
420
421     TRACE("()\n");
422
423     if (This->mid.oxid == COM_CurrentApt()->oxid) {
424         ERR("Need to call directly!\n");
425         return E_FAIL;
426     }
427
428     hres = RPC_GetRequest(&req);
429     if (hres) return hres;
430     req->reqh.iMethod   = msg->iMethod;
431     req->reqh.cbBuffer  = msg->cbBuffer;
432     memcpy(&(req->reqh.mid),&(This->mid),sizeof(This->mid));
433     req->Buffer = msg->Buffer;
434     hres = RPC_QueueRequestAndWait(req);
435     if (hres) {
436         RPC_FreeRequest(req);
437         return hres;
438     }
439     msg->cbBuffer       = req->resph.cbBuffer;
440     msg->Buffer         = req->Buffer;
441     *status             = req->resph.retval;
442     RPC_FreeRequest(req);
443     return S_OK;
444 }
445
446
447 static HRESULT WINAPI
448 PipeBuf_FreeBuffer(LPRPCCHANNELBUFFER iface,RPCOLEMESSAGE* msg) {
449     FIXME("(%p), stub!\n",msg);
450     return E_FAIL;
451 }
452
453 static HRESULT WINAPI
454 PipeBuf_GetDestCtx(
455     LPRPCCHANNELBUFFER iface,DWORD* pdwDestContext,void** ppvDestContext
456 ) {
457     FIXME("(%p,%p), stub!\n",pdwDestContext,ppvDestContext);
458     return E_FAIL;
459 }
460
461 static HRESULT WINAPI
462 PipeBuf_IsConnected(LPRPCCHANNELBUFFER iface) {
463     FIXME("(), stub!\n");
464     return S_OK;
465 }
466
467 static IRpcChannelBufferVtbl pipebufvt = {
468     PipeBuf_QueryInterface,
469     PipeBuf_AddRef,
470     PipeBuf_Release,
471     PipeBuf_GetBuffer,
472     PipeBuf_SendReceive,
473     PipeBuf_FreeBuffer,
474     PipeBuf_GetDestCtx,
475     PipeBuf_IsConnected
476 };
477
478 HRESULT
479 PIPE_GetNewPipeBuf(wine_marshal_id *mid, IRpcChannelBuffer **pipebuf) {
480   wine_marshal_id       ourid;
481   DWORD                 res;
482   HANDLE                hPipe;
483   HRESULT               hres;
484   PipeBuf               *pbuf;
485
486   hPipe = PIPE_FindByMID(mid);
487   if (hPipe == INVALID_HANDLE_VALUE) {
488       char                      pipefn[200];
489       sprintf(pipefn,OLESTUBMGR"_%08lx%08lx",(DWORD)(mid->oxid >> 32),(DWORD)mid->oxid);
490       hPipe = CreateFileA(
491               pipefn,
492               GENERIC_READ|GENERIC_WRITE,
493               0,
494               NULL,
495               OPEN_EXISTING,
496               0,
497               0
498       );
499       if (hPipe == INVALID_HANDLE_VALUE) {
500           FIXME("Could not open named pipe %s, le is %lx\n",pipefn,GetLastError());
501           return E_FAIL;
502       }
503       hres = PIPE_RegisterPipe(mid, hPipe, FALSE);
504       if (hres) return hres;
505       memset(&ourid,0,sizeof(ourid));
506       ourid.oxid = COM_CurrentApt()->oxid;
507       if (!WriteFile(hPipe,&ourid,sizeof(ourid),&res,NULL)||(res!=sizeof(ourid))) {
508           ERR("Failed writing startup mid!\n");
509           return E_FAIL;
510       }
511   }
512   pbuf = (PipeBuf*)HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,sizeof(PipeBuf));
513   pbuf->lpVtbl  = &pipebufvt;
514   pbuf->ref     = 1;
515   memcpy(&(pbuf->mid),mid,sizeof(*mid));
516   *pipebuf = (IRpcChannelBuffer*)pbuf;
517   return S_OK;
518 }
519
520 static HRESULT
521 create_server(REFCLSID rclsid) {
522   static const WCHAR embedding[] = { ' ', '-','E','m','b','e','d','d','i','n','g',0 };
523   HKEY          key;
524   char          buf[200];
525   HRESULT       hres = E_UNEXPECTED;
526   char          xclsid[80];
527   WCHAR        exe[MAX_PATH+1];
528   DWORD        exelen = sizeof(exe);
529   WCHAR         command[MAX_PATH+sizeof(embedding)/sizeof(WCHAR)];
530   STARTUPINFOW  sinfo;
531   PROCESS_INFORMATION   pinfo;
532
533   WINE_StringFromCLSID((LPCLSID)rclsid,xclsid);
534
535   sprintf(buf,"CLSID\\%s\\LocalServer32",xclsid);
536   hres = RegOpenKeyExA(HKEY_CLASSES_ROOT, buf, 0, KEY_READ, &key);
537
538   if (hres != ERROR_SUCCESS) {
539       WARN("CLSID %s not registered as LocalServer32\n", xclsid);
540       return REGDB_E_READREGDB; /* Probably */
541   }
542
543   memset(exe,0,sizeof(exe));
544   hres= RegQueryValueExW(key, NULL, NULL, NULL, (LPBYTE)exe, &exelen);
545   RegCloseKey(key);
546   if (hres) {
547       WARN("No default value for LocalServer32 key\n");
548       return REGDB_E_CLASSNOTREG; /* FIXME: check retval */
549   }
550
551   memset(&sinfo,0,sizeof(sinfo));
552   sinfo.cb = sizeof(sinfo);
553
554   /* EXE servers are started with the -Embedding switch. MSDN also claims /Embedding is used,
555      9x does -Embedding, perhaps an 9x/NT difference?  */
556
557   strcpyW(command, exe);
558   strcatW(command, embedding);
559
560   TRACE("activating local server '%s' for %s\n", debugstr_w(command), xclsid);
561
562   if (!CreateProcessW(exe, command, NULL, NULL, FALSE, 0, NULL, NULL, &sinfo, &pinfo)) {
563       WARN("failed to run local server %s\n", debugstr_w(exe));
564       return E_FAIL;
565   }
566
567   return S_OK;
568 }
569 /* http://msdn.microsoft.com/library/en-us/dnmsj99/html/com0199.asp, Figure 4 */
570 HRESULT create_marshalled_proxy(REFCLSID rclsid, REFIID iid, LPVOID *ppv) {
571   HRESULT       hres;
572   HANDLE        hPipe;
573   char          pipefn[200];
574   DWORD         res,bufferlen;
575   char          marshalbuffer[200];
576   IStream       *pStm;
577   LARGE_INTEGER seekto;
578   ULARGE_INTEGER newpos;
579   int           tries = 0;
580 #define MAXTRIES 10000
581
582   TRACE("rclsid=%s, iid=%s\n", debugstr_guid(rclsid), debugstr_guid(iid));
583
584   strcpy(pipefn,PIPEPREF);
585   WINE_StringFromCLSID(rclsid,pipefn+strlen(PIPEPREF));
586
587   while (tries++<MAXTRIES) {
588       TRACE("waiting for %s\n", pipefn);
589       
590       WaitNamedPipeA( pipefn, NMPWAIT_WAIT_FOREVER );
591       hPipe     = CreateFileA(
592               pipefn,
593               GENERIC_READ|GENERIC_WRITE,
594               0,
595               NULL,
596               OPEN_EXISTING,
597               0,
598               0
599       );
600       if (hPipe == INVALID_HANDLE_VALUE) {
601           if (tries == 1) {
602               if ((hres = create_server(rclsid)))
603                   return hres;
604               Sleep(1000);
605           } else {
606               WARN("Could not open named pipe to broker %s, le is %lx\n",pipefn,GetLastError());
607               Sleep(1000);
608           }
609           continue;
610       }
611       bufferlen = 0;
612       if (!ReadFile(hPipe,marshalbuffer,sizeof(marshalbuffer),&bufferlen,NULL)) {
613           FIXME("Failed to read marshal id from classfactory of %s.\n",debugstr_guid(rclsid));
614           Sleep(1000);
615           continue;
616       }
617       TRACE("read marshal id from pipe\n");
618       CloseHandle(hPipe);
619       break;
620   }
621   if (tries>=MAXTRIES)
622       return E_NOINTERFACE;
623   hres = CreateStreamOnHGlobal(0,TRUE,&pStm);
624   if (hres) return hres;
625   hres = IStream_Write(pStm,marshalbuffer,bufferlen,&res);
626   if (hres) goto out;
627   seekto.u.LowPart = 0;seekto.u.HighPart = 0;
628   hres = IStream_Seek(pStm,seekto,SEEK_SET,&newpos);
629   TRACE("unmarshalling classfactory\n");
630   hres = CoUnmarshalInterface(pStm,&IID_IClassFactory,ppv);
631 out:
632   IStream_Release(pStm);
633   return hres;
634 }
635
636
637 static void WINAPI
638 PIPE_StartRequestThread(HANDLE xhPipe) {
639     wine_marshal_id     remoteid;
640     HRESULT             hres;
641
642     hres = read_pipe(xhPipe,&remoteid,sizeof(remoteid));
643     if (hres) {
644         ERR("Failed to read remote mid!\n");
645         return;
646     }
647     PIPE_RegisterPipe(&remoteid,xhPipe, TRUE);
648 }
649
650 static HRESULT
651 COM_RpcReceive(wine_pipe *xpipe) {
652     DWORD       reqtype;
653     HRESULT     hres = S_OK;
654     HANDLE      xhPipe = xpipe->hPipe;
655
656     /*FIXME("%lx %d reading reqtype\n",GetCurrentProcessId(),xhPipe);*/
657     hres = read_pipe(xhPipe,&reqtype,sizeof(reqtype));
658     if (hres) goto end;
659     EnterCriticalSection(&(xpipe->crit));
660     /*FIXME("%lx got reqtype %ld\n",GetCurrentProcessId(),reqtype);*/
661
662     if (reqtype == REQTYPE_DISCONNECT) { /* only received by servers */
663         wine_rpc_disconnect_header header;
664         IRpcStubBuffer *stub;
665         ULONG ret;
666
667         hres = read_pipe(xhPipe, &header, sizeof(header));
668         if (hres) {
669             ERR("could not read disconnect header\n");
670             goto end;
671         }
672
673         TRACE("read disconnect header\n");
674
675         hres = MARSHAL_Find_Stub_Buffer(&header.mid, &stub);
676         if (hres) {
677             ERR("could not locate stub to disconnect, mid.oid=%s\n",
678                 wine_dbgstr_longlong(header.mid.oid));
679             goto end;
680         }
681
682
683         /* release reference added by MARSHAL_Find_Stub_Buffer call */
684         IRpcStubBuffer_Release(stub);
685         /* release it for real */
686         ret = IRpcStubBuffer_Release(stub);
687         /* FIXME: race */
688         if (ret == 0)
689             MARSHAL_Invalidate_Stub_From_MID(&header.mid);
690         goto end;
691     } else if (reqtype == REQTYPE_REQUEST) {
692         wine_rpc_request        *xreq;
693         RPC_GetRequest(&xreq);
694         xreq->hPipe = xhPipe;
695         hres = read_pipe(xhPipe,&(xreq->reqh),sizeof(xreq->reqh));
696         if (hres) goto end;
697         xreq->resph.reqid = xreq->reqh.reqid;
698         xreq->Buffer = HeapAlloc(GetProcessHeap(),0, xreq->reqh.cbBuffer);
699         hres = read_pipe(xhPipe,xreq->Buffer,xreq->reqh.cbBuffer);
700         if (hres) goto end;
701         xreq->state = REQSTATE_REQ_GOT;
702         goto end;
703     } else if (reqtype == REQTYPE_RESPONSE) {
704         wine_rpc_response_header        resph;
705         int i;
706
707         hres = read_pipe(xhPipe,&resph,sizeof(resph));
708         if (hres) goto end;
709         for (i=nrofreqs;i--;) {
710             wine_rpc_request *xreq = reqs[i];
711             if (xreq->state != REQSTATE_REQ_WAITING_FOR_REPLY)
712                 continue;
713             if (xreq->reqh.reqid == resph.reqid) {
714                 memcpy(&(xreq->resph),&resph,sizeof(resph));
715
716                 if (xreq->Buffer)
717                     xreq->Buffer = HeapReAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,xreq->Buffer,xreq->resph.cbBuffer);
718                 else
719                     xreq->Buffer = HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,xreq->resph.cbBuffer);
720
721                 hres = read_pipe(xhPipe,xreq->Buffer,xreq->resph.cbBuffer);
722                 if (hres) goto end;
723                 xreq->state = REQSTATE_RESP_GOT;
724                 /*PulseEvent(hRpcChanged);*/
725                 goto end;
726             }
727         }
728         ERR("Did not find request for id %lx\n",resph.reqid);
729         hres = S_OK;
730         goto end;
731     }
732     ERR("Unknown reqtype %ld\n",reqtype);
733     hres = E_FAIL;
734 end:
735     LeaveCriticalSection(&(xpipe->crit));
736     return hres;
737 }
738
739 /* This thread listens on the given pipe for requests to a particular stub manager */
740 static DWORD WINAPI stub_dispatch_thread(LPVOID param)
741 {
742     wine_pipe           *xpipe = (wine_pipe*)param;
743     HANDLE              xhPipe = xpipe->hPipe;
744     HRESULT             hres = S_OK;
745
746     TRACE("starting for apartment OXID %08lx%08lx\n", (DWORD)(xpipe->mid.oxid >> 32), (DWORD)(xpipe->mid.oxid));
747
748     /* join marshalling apartment. fixme: this stuff is all very wrong, threading needs to work like native */
749     NtCurrentTeb()->ReservedForOle = xpipe->apt;
750     
751     while (!hres) {
752         int i;
753         
754         hres = COM_RpcReceive(xpipe);
755         if (hres) break;
756
757         for (i=nrofreqs;i--;) {
758             wine_rpc_request *xreq = reqs[i];
759             if ((xreq->state == REQSTATE_REQ_GOT) && (xreq->hPipe == xhPipe)) {
760                 hres = COM_InvokeAndRpcSend(xreq);
761                 if (!hres) break;
762             }
763         }
764     }
765
766     /* fixme: this thread never quits naturally */
767     WARN("exiting with hres %lx\n",hres);
768     CloseHandle(xhPipe);
769     return 0;
770 }
771
772 /* This thread listens on a named pipe for each apartment that exports
773  * objects. It deals with incoming connection requests. Each time a
774  * client connects a separate thread is spawned for that particular
775  * connection.
776  *
777  * This architecture is different in native DCOM.
778  */
779 static DWORD WINAPI apartment_listener_thread(LPVOID param)
780 {
781     char                pipefn[200];
782     HANDLE              listenPipe;
783     APARTMENT          *apt = (APARTMENT *) param;
784
785     /* we must join the marshalling threads apartment */
786     NtCurrentTeb()->ReservedForOle = apt;
787
788     sprintf(pipefn,OLESTUBMGR"_%08lx%08lx", (DWORD)(apt->oxid >> 32), (DWORD)(apt->oxid));
789     TRACE("Apartment listener thread starting on (%s)\n",pipefn);
790
791     while (1) {
792         listenPipe = CreateNamedPipeA(
793             pipefn,
794             PIPE_ACCESS_DUPLEX,
795             PIPE_TYPE_BYTE|PIPE_WAIT,
796             PIPE_UNLIMITED_INSTANCES,
797             4096,
798             4096,
799             NMPWAIT_USE_DEFAULT_WAIT,
800             NULL
801         );
802         if (listenPipe == INVALID_HANDLE_VALUE) {
803             FIXME("pipe creation failed for %s, error %lx\n",pipefn,GetLastError());
804             return 1; /* permanent failure, so quit stubmgr thread */
805         }
806         if (!ConnectNamedPipe(listenPipe,NULL)) {
807             ERR("Failure during ConnectNamedPipe %lx!\n",GetLastError());
808             CloseHandle(listenPipe);
809             continue;
810         }
811         PIPE_StartRequestThread(listenPipe);
812     }
813     return 0;
814 }
815
816 void start_apartment_listener_thread()
817 {
818     APARTMENT *apt = COM_CurrentApt();
819
820     assert( apt );
821     
822     TRACE("apt->listenertid=%ld\n", apt->listenertid);
823
824     /* apt->listenertid is a hack which needs to die at some point, as
825      * it leaks information into the apartment structure. in fact,
826      * this thread isn't quite correct anyway as native RPC doesn't
827      * use a thread per apartment at all, instead the dispatch thread
828      * either enters the apartment to perform the RPC (for MTAs, RTAs)
829      * or does a context switch into it for STAs.
830      */
831     
832     if (!apt->listenertid)
833     {
834         CreateThread(NULL, 0, apartment_listener_thread, apt, 0, &apt->listenertid);
835     }
836 }