4 * Copyright 2002 Marcus Meissner
5 * Copyright 2005 Mike Hearn, Rob Shearman for CodeWeavers
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the Free Software
19 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
31 #define NONAMELESSUNION
32 #define NONAMELESSSTRUCT
44 #include "wine/unicode.h"
46 #include "compobj_private.h"
48 #include "wine/debug.h"
50 WINE_DEFAULT_DEBUG_CHANNEL(ole);
52 #define PIPEPREF "\\\\.\\pipe\\"
53 #define OLESTUBMGR PIPEPREF"WINE_OLE_StubMgr"
55 #define REQTYPE_REQUEST 0
56 #define REQTYPE_RESPONSE 1
66 struct response_header
74 #define REQSTATE_START 0
75 #define REQSTATE_REQ_QUEUED 1
76 #define REQSTATE_REQ_WAITING_FOR_REPLY 2
77 #define REQSTATE_REQ_GOT 3
78 #define REQSTATE_INVOKING 4
79 #define REQSTATE_RESP_QUEUED 5
80 #define REQSTATE_RESP_GOT 6
81 #define REQSTATE_DONE 6
86 HANDLE hPipe; /* temp copy of handle */
87 struct request_header reqh;
88 struct response_header resph;
92 /* fixme: this should have a lock */
93 static struct rpc **reqs = NULL;
94 static int nrofreqs = 0;
96 /* This pipe is _thread_ based, each thread which talks to a remote
97 * apartment (oxid) has its own pipe. The same structure is used both
98 * for outgoing and incoming RPCs.
102 wine_marshal_id mid; /* target mid */
103 DWORD tid; /* thread which owns this pipe */
108 CRITICAL_SECTION crit;
110 APARTMENT *apt; /* apartment of the marshalling thread for the stub dispatch case */
113 typedef struct _PipeBuf {
114 IRpcChannelBufferVtbl *lpVtbl;
123 /* some helper functions */
125 static HRESULT WINAPI read_pipe(HANDLE hf, LPVOID ptr, DWORD size)
129 if (!ReadFile(hf,ptr,size,&res,NULL))
131 ERR("Failed to read from %p, le is %ld\n",hf,GetLastError());
139 WARN("%p disconnected\n", hf);
140 return RPC_E_DISCONNECTED;
142 ERR("Read only %ld of %ld bytes from %p.\n",res,size,hf);
148 static HRESULT WINAPI
149 write_pipe(HANDLE hf, LPVOID ptr, DWORD size) {
151 if (!WriteFile(hf,ptr,size,&res,NULL)) {
152 FIXME("Failed to write to %p, le is %ld\n",hf,GetLastError());
156 FIXME("Wrote only %ld of %ld bytes to %p.\n",res,size,hf);
162 static HANDLE dupe_handle(HANDLE h)
166 if (!DuplicateHandle(GetCurrentProcess(), h, GetCurrentProcess(),
167 &h2, 0, FALSE, DUPLICATE_SAME_ACCESS))
169 ERR("could not duplicate handle: %ld\n", GetLastError());
170 return INVALID_HANDLE_VALUE;
179 static DWORD WINAPI client_dispatch_thread(LPVOID);
181 /* FIXME: this all needs to be made thread safe */
182 static HRESULT RPC_GetRequest(struct rpc **req)
184 static int reqid = 0;
188 for (i = 0; i < nrofreqs; i++)
190 if (reqs[i]->state == REQSTATE_DONE)
192 TRACE("reusing reqs[%d]\n", i);
194 reqs[i]->reqh.reqid = reqid++;
195 reqs[i]->resph.reqid = reqs[i]->reqh.reqid;
196 reqs[i]->hPipe = INVALID_HANDLE_VALUE;
197 reqs[i]->state = REQSTATE_START;
203 TRACE("creating new struct rpc (request)\n");
206 reqs = (struct rpc**)HeapReAlloc(
210 sizeof(struct rpc*)*(nrofreqs+1)
213 reqs = (struct rpc**)HeapAlloc(
219 if (!reqs) return E_OUTOFMEMORY;
221 reqs[nrofreqs] = (struct rpc*)HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,sizeof(struct rpc));
222 reqs[nrofreqs]->reqh.reqid = reqid++;
223 reqs[nrofreqs]->resph.reqid = reqs[nrofreqs]->reqh.reqid;
224 reqs[nrofreqs]->hPipe = INVALID_HANDLE_VALUE;
225 reqs[nrofreqs]->state = REQSTATE_START;
226 *req = reqs[nrofreqs];
233 static HRESULT WINAPI
234 PipeBuf_QueryInterface(
235 LPRPCCHANNELBUFFER iface,REFIID riid,LPVOID *ppv
238 if (IsEqualIID(riid,&IID_IRpcChannelBuffer) || IsEqualIID(riid,&IID_IUnknown)) {
239 *ppv = (LPVOID)iface;
240 IUnknown_AddRef(iface);
243 return E_NOINTERFACE;
247 PipeBuf_AddRef(LPRPCCHANNELBUFFER iface) {
248 PipeBuf *This = (PipeBuf *)iface;
249 return InterlockedIncrement(&This->ref);
253 PipeBuf_Release(LPRPCCHANNELBUFFER iface) {
254 PipeBuf *This = (PipeBuf *)iface;
257 ref = InterlockedDecrement(&This->ref);
261 CloseHandle(This->pipe);
262 HeapFree(GetProcessHeap(),0,This);
266 static HRESULT WINAPI
267 PipeBuf_GetBuffer(LPRPCCHANNELBUFFER iface,RPCOLEMESSAGE* msg,REFIID riid)
269 TRACE("(%p,%s)\n",msg,debugstr_guid(riid));
270 /* probably reuses IID in real. */
271 if (msg->cbBuffer && (msg->Buffer == NULL))
272 msg->Buffer = HeapAlloc(GetProcessHeap(),0,msg->cbBuffer);
277 COM_InvokeAndRpcSend(struct rpc *req) {
278 IRpcStubBuffer *stub;
283 if (!(stub = ipid_to_stubbuffer(&(req->reqh.ipid))))
284 /* ipid_to_stubbuffer will already have logged the error */
285 return RPC_E_DISCONNECTED;
287 IUnknown_AddRef(stub);
288 msg.Buffer = req->Buffer;
289 msg.iMethod = req->reqh.iMethod;
290 msg.cbBuffer = req->reqh.cbBuffer;
291 msg.dataRepresentation = NDR_LOCAL_DATA_REPRESENTATION;
292 req->state = REQSTATE_INVOKING;
293 req->resph.retval = IRpcStubBuffer_Invoke(stub,&msg,NULL);
294 IUnknown_Release(stub);
295 req->Buffer = msg.Buffer;
296 req->resph.cbBuffer = msg.cbBuffer;
297 reqtype = REQTYPE_RESPONSE;
298 hres = write_pipe(req->hPipe,&reqtype,sizeof(reqtype));
299 if (hres) return hres;
300 hres = write_pipe(req->hPipe,&(req->resph),sizeof(req->resph));
301 if (hres) return hres;
302 hres = write_pipe(req->hPipe,req->Buffer,req->resph.cbBuffer);
303 if (hres) return hres;
304 req->state = REQSTATE_DONE;
308 static HRESULT process_incoming_rpc(HANDLE pipe);
310 static HRESULT RPC_QueueRequestAndWait(struct rpc *req, HANDLE pipe)
318 req->state = REQSTATE_REQ_WAITING_FOR_REPLY;
319 reqtype = REQTYPE_REQUEST;
320 hres = write_pipe(req->hPipe,&reqtype,sizeof(reqtype));
321 if (hres) return hres;
322 hres = write_pipe(req->hPipe,&(req->reqh),sizeof(req->reqh));
323 if (hres) return hres;
324 hres = write_pipe(req->hPipe,req->Buffer,req->reqh.cbBuffer);
325 if (hres) return hres;
327 /* This loop is about allowing re-entrancy. While waiting for the
328 * response to one RPC we may receive a request starting another. */
330 hres = process_incoming_rpc(pipe);
333 for (i=0;i<nrofreqs;i++) {
335 if ((xreq->state==REQSTATE_REQ_GOT) && (xreq->hPipe==req->hPipe)) {
336 hres = COM_InvokeAndRpcSend(xreq);
340 if (req->state == REQSTATE_RESP_GOT)
344 WARN("-- 0x%08lx\n", hres);
348 static HRESULT WINAPI
349 PipeBuf_SendReceive(LPRPCCHANNELBUFFER iface, RPCOLEMESSAGE *msg, ULONG *status)
351 PipeBuf *This = (PipeBuf *)iface;
355 if (This->mid.oxid == COM_CurrentApt()->oxid) {
356 ERR("Need to call directly!\n");
360 hres = RPC_GetRequest(&req);
361 if (hres) return hres;
362 req->reqh.iMethod = msg->iMethod;
363 req->reqh.cbBuffer = msg->cbBuffer;
364 req->reqh.ipid = This->mid.ipid;
365 req->Buffer = msg->Buffer;
366 TRACE(" -> rpc ->\n");
367 hres = RPC_QueueRequestAndWait(req, This->pipe);
368 TRACE(" <- response <-\n");
371 req->state = REQSTATE_DONE;
375 msg->cbBuffer = req->resph.cbBuffer;
376 msg->Buffer = req->Buffer;
377 *status = req->resph.retval;
378 req->state = REQSTATE_DONE;
384 static HRESULT WINAPI
385 PipeBuf_FreeBuffer(LPRPCCHANNELBUFFER iface,RPCOLEMESSAGE* msg)
388 HeapFree(GetProcessHeap(), 0, msg->Buffer);
392 static HRESULT WINAPI
393 PipeBuf_GetDestCtx(LPRPCCHANNELBUFFER iface,DWORD* pdwDestContext,void** ppvDestContext)
395 FIXME("(%p,%p), stub!\n",pdwDestContext,ppvDestContext);
399 static HRESULT WINAPI
400 PipeBuf_IsConnected(LPRPCCHANNELBUFFER iface)
402 FIXME("(), stub!\n");
406 static IRpcChannelBufferVtbl pipebufvt = {
407 PipeBuf_QueryInterface,
417 /* returns a pipebuf for proxies */
418 HRESULT PIPE_GetNewPipeBuf(wine_marshal_id *mid, IRpcChannelBuffer **pipebuf)
420 wine_marshal_id ourid;
425 /* connect to the apartment listener thread */
426 sprintf(pipefn,OLESTUBMGR"_%08lx%08lx",(DWORD)(mid->oxid >> 32),(DWORD)mid->oxid);
428 TRACE("proxy pipe: connecting to apartment listener thread: %s\n", pipefn);
432 BOOL ret = WaitNamedPipeA(pipefn, NMPWAIT_USE_DEFAULT_WAIT);
435 ERR("Could not open named pipe %s, error %ld\n", pipefn, GetLastError());
436 return RPC_E_SERVER_DIED;
439 handle = CreateFileA(pipefn, GENERIC_READ | GENERIC_WRITE,
440 0, NULL, OPEN_EXISTING, 0, 0);
442 if (handle == INVALID_HANDLE_VALUE)
444 if (GetLastError() == ERROR_PIPE_BUSY) continue;
446 ERR("Could not open named pipe %s, error %ld\n", pipefn, GetLastError());
447 return RPC_E_SERVER_DIED;
453 memset(&ourid,0,sizeof(ourid));
454 ourid.oxid = COM_CurrentApt()->oxid;
456 TRACE("constructing new pipebuf for proxy\n");
458 pbuf = (PipeBuf*)HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,sizeof(PipeBuf));
459 pbuf->lpVtbl = &pipebufvt;
461 memcpy(&(pbuf->mid),mid,sizeof(*mid));
462 pbuf->pipe = dupe_handle(handle);
464 *pipebuf = (IRpcChannelBuffer*)pbuf;
470 create_server(REFCLSID rclsid)
472 static const WCHAR embedding[] = { ' ', '-','E','m','b','e','d','d','i','n','g',0 };
475 HRESULT hres = E_UNEXPECTED;
477 WCHAR exe[MAX_PATH+1];
478 DWORD exelen = sizeof(exe);
479 WCHAR command[MAX_PATH+sizeof(embedding)/sizeof(WCHAR)];
481 PROCESS_INFORMATION pinfo;
483 WINE_StringFromCLSID((LPCLSID)rclsid,xclsid);
485 sprintf(buf,"CLSID\\%s\\LocalServer32",xclsid);
486 hres = RegOpenKeyExA(HKEY_CLASSES_ROOT, buf, 0, KEY_READ, &key);
488 if (hres != ERROR_SUCCESS) {
489 WARN("CLSID %s not registered as LocalServer32\n", xclsid);
490 return REGDB_E_READREGDB; /* Probably */
493 memset(exe,0,sizeof(exe));
494 hres= RegQueryValueExW(key, NULL, NULL, NULL, (LPBYTE)exe, &exelen);
497 WARN("No default value for LocalServer32 key\n");
498 return REGDB_E_CLASSNOTREG; /* FIXME: check retval */
501 memset(&sinfo,0,sizeof(sinfo));
502 sinfo.cb = sizeof(sinfo);
504 /* EXE servers are started with the -Embedding switch. MSDN also claims /Embedding is used,
505 * 9x does -Embedding, perhaps an 9x/NT difference?
508 strcpyW(command, exe);
509 strcatW(command, embedding);
511 TRACE("activating local server '%s' for %s\n", debugstr_w(command), xclsid);
513 if (!CreateProcessW(exe, command, NULL, NULL, FALSE, 0, NULL, NULL, &sinfo, &pinfo)) {
514 WARN("failed to run local server %s\n", debugstr_w(exe));
522 * start_local_service() - start a service given its name and parameters
525 start_local_service(LPCWSTR name, DWORD num, LPWSTR *params)
527 SC_HANDLE handle, hsvc;
528 DWORD r = ERROR_FUNCTION_FAILED;
530 TRACE("Starting service %s %ld params\n", debugstr_w(name), num);
532 handle = OpenSCManagerW(NULL, NULL, SC_MANAGER_ALL_ACCESS);
535 hsvc = OpenServiceW(handle, name, SC_MANAGER_ALL_ACCESS);
538 if(StartServiceW(hsvc, num, (LPCWSTR*)params))
542 if (r == ERROR_SERVICE_ALREADY_RUNNING)
544 CloseServiceHandle(hsvc);
546 CloseServiceHandle(handle);
548 TRACE("StartService returned error %ld (%s)\n", r, r?"ok":"failed");
554 * create_local_service() - start a COM server in a service
556 * To start a Local Service, we read the AppID value under
557 * the class's CLSID key, then open the HKCR\\AppId key specified
558 * there and check for a LocalService value.
560 * Note: Local Services are not supported under Windows 9x
563 create_local_service(REFCLSID rclsid)
565 HRESULT hres = REGDB_E_READREGDB;
566 WCHAR buf[40], keyname[50];
567 static const WCHAR szClsId[] = { 'C','L','S','I','D','\\',0 };
568 static const WCHAR szAppId[] = { 'A','p','p','I','d',0 };
569 static const WCHAR szAppIdKey[] = { 'A','p','p','I','d','\\',0 };
570 static const WCHAR szLocalService[] = { 'L','o','c','a','l','S','e','r','v','i','c','e',0 };
571 static const WCHAR szServiceParams[] = {'S','e','r','v','i','c','e','P','a','r','a','m','s',0};
576 TRACE("Attempting to start Local service for %s\n", debugstr_guid(rclsid));
578 /* read the AppID value under the class's key */
579 strcpyW(keyname,szClsId);
580 StringFromGUID2(rclsid,&keyname[6],39);
581 r = RegOpenKeyExW(HKEY_CLASSES_ROOT, keyname, 0, KEY_READ, &hkey);
582 if (r!=ERROR_SUCCESS)
585 r = RegQueryValueExW(hkey, szAppId, NULL, &type, (LPBYTE)buf, &sz);
587 if (r!=ERROR_SUCCESS || type!=REG_SZ)
590 /* read the LocalService and ServiceParameters values from the AppID key */
591 strcpyW(keyname, szAppIdKey);
592 strcatW(keyname, buf);
593 r = RegOpenKeyExW(HKEY_CLASSES_ROOT, keyname, 0, KEY_READ, &hkey);
594 if (r!=ERROR_SUCCESS)
597 r = RegQueryValueExW(hkey, szLocalService, NULL, &type, (LPBYTE)buf, &sz);
598 if (r==ERROR_SUCCESS && type==REG_SZ)
601 LPWSTR args[1] = { NULL };
604 * FIXME: I'm not really sure how to deal with the service parameters.
605 * I suspect that the string returned from RegQueryValueExW
606 * should be split into a number of arguments by spaces.
607 * It would make more sense if ServiceParams contained a
608 * REG_MULTI_SZ here, but it's a REG_SZ for the services
609 * that I'm interested in for the moment.
611 r = RegQueryValueExW(hkey, szServiceParams, NULL, &type, NULL, &sz);
612 if (r == ERROR_SUCCESS && type == REG_SZ && sz)
614 args[0] = HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,sz);
616 RegQueryValueExW(hkey, szServiceParams, NULL, &type, (LPBYTE)args[0], &sz);
618 r = start_local_service(buf, num_args, args);
619 if (r==ERROR_SUCCESS)
621 HeapFree(GetProcessHeap(),0,args[0]);
628 /* http://msdn.microsoft.com/library/en-us/dnmsj99/html/com0199.asp, Figure 4 */
629 HRESULT create_marshalled_proxy(REFCLSID rclsid, REFIID iid, LPVOID *ppv)
634 DWORD res, bufferlen;
635 char marshalbuffer[200];
637 LARGE_INTEGER seekto;
638 ULARGE_INTEGER newpos;
641 static const int MAXTRIES = 10000;
643 TRACE("rclsid=%s, iid=%s\n", debugstr_guid(rclsid), debugstr_guid(iid));
645 strcpy(pipefn,PIPEPREF);
646 WINE_StringFromCLSID(rclsid,pipefn+strlen(PIPEPREF));
648 while (tries++ < MAXTRIES) {
649 TRACE("waiting for %s\n", pipefn);
651 WaitNamedPipeA( pipefn, NMPWAIT_WAIT_FOREVER );
652 hPipe = CreateFileA(pipefn, GENERIC_READ | GENERIC_WRITE, 0, NULL, OPEN_EXISTING, 0, 0);
653 if (hPipe == INVALID_HANDLE_VALUE) {
655 if ( (hres = create_server(rclsid)) &&
656 (hres = create_local_service(rclsid)) )
660 WARN("Connecting to %s, no response yet, retrying: le is %lx\n",pipefn,GetLastError());
666 if (!ReadFile(hPipe,marshalbuffer,sizeof(marshalbuffer),&bufferlen,NULL)) {
667 FIXME("Failed to read marshal id from classfactory of %s.\n",debugstr_guid(rclsid));
671 TRACE("read marshal id from pipe\n");
676 if (tries >= MAXTRIES)
677 return E_NOINTERFACE;
679 hres = CreateStreamOnHGlobal(0,TRUE,&pStm);
680 if (hres) return hres;
681 hres = IStream_Write(pStm,marshalbuffer,bufferlen,&res);
683 seekto.u.LowPart = 0;seekto.u.HighPart = 0;
684 hres = IStream_Seek(pStm,seekto,SEEK_SET,&newpos);
686 TRACE("unmarshalling classfactory\n");
687 hres = CoUnmarshalInterface(pStm,&IID_IClassFactory,ppv);
689 IStream_Release(pStm);
694 /* this reads an RPC from the given pipe and places it in the global reqs array */
695 static HRESULT process_incoming_rpc(HANDLE pipe)
700 hres = read_pipe(pipe,&reqtype,sizeof(reqtype));
701 if (hres) return hres;
703 /* only received by servers */
704 if (reqtype == REQTYPE_REQUEST)
708 RPC_GetRequest(&xreq);
711 hres = read_pipe(pipe,&(xreq->reqh),sizeof(xreq->reqh));
714 xreq->state = REQSTATE_DONE;
718 xreq->resph.reqid = xreq->reqh.reqid;
719 xreq->Buffer = HeapAlloc(GetProcessHeap(),0, xreq->reqh.cbBuffer);
720 hres = read_pipe(pipe,xreq->Buffer,xreq->reqh.cbBuffer);
723 TRACE("received RPC for IPID %s\n", debugstr_guid(&xreq->reqh.ipid));
725 xreq->state = REQSTATE_REQ_GOT;
728 else if (reqtype == REQTYPE_RESPONSE)
730 struct response_header resph;
733 hres = read_pipe(pipe,&resph,sizeof(resph));
736 TRACE("read RPC response\n");
738 for (i = nrofreqs; i--;)
740 struct rpc *xreq = reqs[i];
742 if (xreq->state != REQSTATE_REQ_WAITING_FOR_REPLY)
745 if (xreq->reqh.reqid == resph.reqid)
747 memcpy(&(xreq->resph),&resph,sizeof(resph));
750 xreq->Buffer = HeapReAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,xreq->Buffer,xreq->resph.cbBuffer);
752 xreq->Buffer = HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,xreq->resph.cbBuffer);
754 hres = read_pipe(pipe,xreq->Buffer,xreq->resph.cbBuffer);
757 TRACE("received response for reqid 0x%lx\n", xreq->reqh.reqid);
759 xreq->state = REQSTATE_RESP_GOT;
764 ERR("protocol error: did not find request for id %lx\n",resph.reqid);
769 ERR("protocol error: unknown reqtype %ld\n",reqtype);
775 struct stub_dispatch_params
777 struct apartment *apt;
781 /* This thread listens on the given pipe for requests to any stub manager */
782 static DWORD WINAPI client_dispatch_thread(LPVOID param)
784 HANDLE pipe = ((struct stub_dispatch_params *)param)->pipe;
785 struct apartment *apt = ((struct stub_dispatch_params *)param)->apt;
787 HANDLE shutdown_event = dupe_handle(apt->shutdown_event);
789 HeapFree(GetProcessHeap(), 0, param);
791 /* join marshalling apartment. fixme: this stuff is all very wrong, threading needs to work like native */
792 COM_CurrentInfo()->apt = apt;
798 TRACE("waiting for RPC on OXID: %08lx%08lx\n", (DWORD)(apt->oxid >> 32), (DWORD)(apt->oxid));
800 /* read a new request into the global array, block if no requests have been sent */
801 hres = process_incoming_rpc(pipe);
804 /* do you expect me to talk? */
805 if (WaitForSingleObject(shutdown_event, 0) == WAIT_OBJECT_0)
807 /* no mr bond, i expect you to die! bwahaha */
808 CloseHandle(shutdown_event);
812 TRACE("received RPC on OXID: %08lx%08lx\n", (DWORD)(apt->oxid >> 32), (DWORD)(apt->oxid));
814 /* now scan the array looking for the RPC just loaded */
815 for (i=nrofreqs;i--;)
817 struct rpc *req = reqs[i];
819 if ((req->state == REQSTATE_REQ_GOT) && (req->hPipe == pipe))
821 hres = COM_InvokeAndRpcSend(req);
827 TRACE("exiting with hres %lx\n",hres);
828 DisconnectNamedPipe(pipe);
833 struct apartment_listener_params
839 /* This thread listens on a named pipe for each apartment that exports
840 * objects. It deals with incoming connection requests. Each time a
841 * client connects a separate thread is spawned for that particular
844 * This architecture is different in native DCOM.
846 static DWORD WINAPI apartment_listener_thread(LPVOID p)
849 HANDLE listenPipe, thread_handle;
850 OVERLAPPED overlapped;
853 struct apartment_listener_params * params = (struct apartment_listener_params *)p;
854 struct apartment *apt = params->apt;
855 HANDLE event = params->event;
856 HANDLE apt_shutdown_event = dupe_handle(apt->shutdown_event);
857 OXID this_oxid = apt->oxid; /* copy here so we can print it when we shut down */
859 HeapFree(GetProcessHeap(), 0, params);
861 overlapped.hEvent = CreateEventA(NULL, TRUE, FALSE, NULL);
863 /* we must join the marshalling threads apartment. we already have a ref here */
864 COM_CurrentInfo()->apt = apt;
866 sprintf(pipefn,OLESTUBMGR"_%08lx%08lx", (DWORD)(apt->oxid >> 32), (DWORD)(apt->oxid));
867 TRACE("Apartment listener thread starting on (%s)\n",pipefn);
871 struct stub_dispatch_params *params;
874 listenPipe = CreateNamedPipeA(
876 PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
877 PIPE_TYPE_BYTE|PIPE_WAIT,
878 PIPE_UNLIMITED_INSTANCES,
881 500 /* 0.5 seconds */,
885 /* tell function that started this thread that we have attempted to created the
892 if (listenPipe == INVALID_HANDLE_VALUE) {
893 FIXME("pipe creation failed for %s, error %ld\n",pipefn,GetLastError());
894 break; /* permanent failure, so quit stubmgr thread */
897 TRACE("waiting for a client ...\n");
899 /* an already connected pipe is not an error */
900 if (!ConnectNamedPipe(listenPipe, &overlapped))
902 DWORD le = GetLastError();
904 if ((le != ERROR_IO_PENDING) && (le != ERROR_PIPE_CONNECTED))
906 ERR("Failure during ConnectNamedPipe %ld!\n",GetLastError());
907 CloseHandle(listenPipe);
912 /* wait for action */
913 wait[0] = apt_shutdown_event;
914 wait[1] = overlapped.hEvent;
915 res = WaitForMultipleObjectsEx(2, wait, FALSE, INFINITE, FALSE);
916 if (res == WAIT_OBJECT_0) break;
918 ResetEvent(overlapped.hEvent);
920 /* start the stub dispatch thread for this connection */
921 TRACE("starting stub dispatch thread for OXID %08lx%08lx\n", (DWORD)(apt->oxid >> 32), (DWORD)(apt->oxid));
922 params = HeapAlloc(GetProcessHeap(), 0, sizeof(struct stub_dispatch_params));
925 ERR("out of memory, dropping this client\n");
926 CloseHandle(listenPipe);
930 params->pipe = listenPipe;
931 thread_handle = CreateThread(NULL, 0, &client_dispatch_thread, params, 0, NULL);
932 CloseHandle(thread_handle);
935 TRACE("shutting down: %s\n", wine_dbgstr_longlong(this_oxid));
937 DisconnectNamedPipe(listenPipe);
938 CloseHandle(listenPipe);
939 CloseHandle(overlapped.hEvent);
940 CloseHandle(apt_shutdown_event);
944 void start_apartment_listener_thread()
946 APARTMENT *apt = COM_CurrentApt();
950 TRACE("apt->listenertid=%ld\n", apt->listenertid);
952 /* apt->listenertid is a hack which needs to die at some point, as
953 * it leaks information into the apartment structure. in fact,
954 * this thread isn't quite correct anyway as native RPC doesn't
955 * use a thread per apartment at all, instead the dispatch thread
956 * either enters the apartment to perform the RPC (for MTAs, RTAs)
957 * or does a context switch into it for STAs.
960 if (!apt->listenertid)
963 HANDLE event = CreateEventW(NULL, TRUE, FALSE, NULL);
964 struct apartment_listener_params * params = HeapAlloc(GetProcessHeap(), 0, sizeof(*params));
967 params->event = event;
968 thread = CreateThread(NULL, 0, apartment_listener_thread, params, 0, &apt->listenertid);
970 /* wait for pipe to be created before returning, otherwise we
971 * might try to use it and fail */
972 WaitForSingleObject(event, INFINITE);
977 struct local_server_params
983 static DWORD WINAPI local_server_thread(LPVOID param)
985 struct local_server_params * lsp = (struct local_server_params *)param;
989 IStream *pStm = lsp->stream;
991 unsigned char *buffer;
993 LARGE_INTEGER seekto;
994 ULARGE_INTEGER newpos;
997 TRACE("Starting threader for %s.\n",debugstr_guid(&lsp->clsid));
999 strcpy(pipefn,PIPEPREF);
1000 WINE_StringFromCLSID(&lsp->clsid,pipefn+strlen(PIPEPREF));
1002 HeapFree(GetProcessHeap(), 0, lsp);
1004 hPipe = CreateNamedPipeA( pipefn, PIPE_ACCESS_DUPLEX,
1005 PIPE_TYPE_BYTE|PIPE_WAIT, PIPE_UNLIMITED_INSTANCES,
1006 4096, 4096, 500 /* 0.5 second timeout */, NULL );
1008 if (hPipe == INVALID_HANDLE_VALUE)
1010 FIXME("pipe creation failed for %s, le is %ld\n",pipefn,GetLastError());
1015 if (!ConnectNamedPipe(hPipe,NULL)) {
1016 ERR("Failure during ConnectNamedPipe %ld, ABORT!\n",GetLastError());
1020 TRACE("marshalling IClassFactory to client\n");
1022 hres = IStream_Stat(pStm,&ststg,0);
1023 if (hres) return hres;
1025 buflen = ststg.cbSize.u.LowPart;
1026 buffer = HeapAlloc(GetProcessHeap(),0,buflen);
1027 seekto.u.LowPart = 0;
1028 seekto.u.HighPart = 0;
1029 hres = IStream_Seek(pStm,seekto,SEEK_SET,&newpos);
1031 FIXME("IStream_Seek failed, %lx\n",hres);
1035 hres = IStream_Read(pStm,buffer,buflen,&res);
1037 FIXME("Stream Read failed, %lx\n",hres);
1041 WriteFile(hPipe,buffer,buflen,&res,NULL);
1042 FlushFileBuffers(hPipe);
1043 DisconnectNamedPipe(hPipe);
1045 TRACE("done marshalling IClassFactory\n");
1048 IStream_Release(pStm);
1052 void RPC_StartLocalServer(REFCLSID clsid, IStream *stream)
1056 struct local_server_params *lsp = HeapAlloc(GetProcessHeap(), 0, sizeof(*lsp));
1058 lsp->clsid = *clsid;
1059 lsp->stream = stream;
1061 thread = CreateThread(NULL, 0, local_server_thread, lsp, 0, &tid);
1062 CloseHandle(thread);
1063 /* FIXME: failure handling */