Skip to content

Commit 114a4a6

Browse files
authored
Merge pull request #678 from foxglove/achim/fix-messageport-close
Fix message ports not being closed when proxy is relased
2 parents fd4b526 + 359584c commit 114a4a6

File tree

3 files changed

+66
-33
lines changed

3 files changed

+66
-33
lines changed

src/comlink.ts

Lines changed: 42 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,10 @@ type PendingListenersMap = Map<
231231
string,
232232
(value: WireValue | PromiseLike<WireValue>) => void
233233
>;
234+
type EndpointWithPendingListeners = {
235+
endpoint: Endpoint;
236+
pendingListeners: PendingListenersMap;
237+
};
234238

235239
/**
236240
* Internal transfer handler to handle thrown exceptions.
@@ -415,7 +419,7 @@ export function wrap<T>(ep: Endpoint, target?: any): Remote<T> {
415419
}
416420
});
417421

418-
return createProxy<T>(ep, pendingListeners, [], target) as any;
422+
return createProxy<T>({ endpoint: ep, pendingListeners }, [], target) as any;
419423
}
420424

421425
function throwIfProxyReleased(isReleased: boolean) {
@@ -424,11 +428,11 @@ function throwIfProxyReleased(isReleased: boolean) {
424428
}
425429
}
426430

427-
function releaseEndpoint(ep: Endpoint) {
428-
return requestResponseMessage(ep, new Map(), {
431+
function releaseEndpoint(epWithPendingListeners: EndpointWithPendingListeners) {
432+
return requestResponseMessage(epWithPendingListeners, {
429433
type: MessageType.RELEASE,
430434
}).then(() => {
431-
closeEndPoint(ep);
435+
closeEndPoint(epWithPendingListeners.endpoint);
432436
});
433437
}
434438

@@ -441,24 +445,31 @@ interface FinalizationRegistry<T> {
441445
): void;
442446
unregister(unregisterToken: object): void;
443447
}
444-
declare var FinalizationRegistry: FinalizationRegistry<Endpoint>;
448+
declare var FinalizationRegistry: FinalizationRegistry<EndpointWithPendingListeners>;
445449

446-
const proxyCounter = new WeakMap<Endpoint, number>();
450+
const proxyCounter = new WeakMap<EndpointWithPendingListeners, number>();
447451
const proxyFinalizers =
448452
"FinalizationRegistry" in globalThis &&
449-
new FinalizationRegistry((ep: Endpoint) => {
450-
const newCount = (proxyCounter.get(ep) || 0) - 1;
451-
proxyCounter.set(ep, newCount);
452-
if (newCount === 0) {
453-
releaseEndpoint(ep);
453+
new FinalizationRegistry(
454+
(epWithPendingListeners: EndpointWithPendingListeners) => {
455+
const newCount = (proxyCounter.get(epWithPendingListeners) || 0) - 1;
456+
proxyCounter.set(epWithPendingListeners, newCount);
457+
if (newCount === 0) {
458+
releaseEndpoint(epWithPendingListeners).finally(() => {
459+
epWithPendingListeners.pendingListeners.clear();
460+
});
461+
}
454462
}
455-
});
463+
);
456464

457-
function registerProxy(proxy: object, ep: Endpoint) {
458-
const newCount = (proxyCounter.get(ep) || 0) + 1;
459-
proxyCounter.set(ep, newCount);
465+
function registerProxy(
466+
proxy: object,
467+
epWithPendingListeners: EndpointWithPendingListeners
468+
) {
469+
const newCount = (proxyCounter.get(epWithPendingListeners) || 0) + 1;
470+
proxyCounter.set(epWithPendingListeners, newCount);
460471
if (proxyFinalizers) {
461-
proxyFinalizers.register(proxy, ep, proxy);
472+
proxyFinalizers.register(proxy, epWithPendingListeners, proxy);
462473
}
463474
}
464475

@@ -469,8 +480,7 @@ function unregisterProxy(proxy: object) {
469480
}
470481

471482
function createProxy<T>(
472-
ep: Endpoint,
473-
pendingListeners: PendingListenersMap,
483+
epWithPendingListeners: EndpointWithPendingListeners,
474484
path: (string | number | symbol)[] = [],
475485
target: object = function () {}
476486
): Remote<T> {
@@ -481,31 +491,31 @@ function createProxy<T>(
481491
if (prop === releaseProxy) {
482492
return () => {
483493
unregisterProxy(proxy);
484-
releaseEndpoint(ep);
485-
pendingListeners.clear();
494+
releaseEndpoint(epWithPendingListeners).finally(() => {
495+
epWithPendingListeners.pendingListeners.clear();
496+
});
486497
isProxyReleased = true;
487498
};
488499
}
489500
if (prop === "then") {
490501
if (path.length === 0) {
491502
return { then: () => proxy };
492503
}
493-
const r = requestResponseMessage(ep, pendingListeners, {
504+
const r = requestResponseMessage(epWithPendingListeners, {
494505
type: MessageType.GET,
495506
path: path.map((p) => p.toString()),
496507
}).then(fromWireValue);
497508
return r.then.bind(r);
498509
}
499-
return createProxy(ep, pendingListeners, [...path, prop]);
510+
return createProxy(epWithPendingListeners, [...path, prop]);
500511
},
501512
set(_target, prop, rawValue) {
502513
throwIfProxyReleased(isProxyReleased);
503514
// FIXME: ES6 Proxy Handler `set` methods are supposed to return a
504515
// boolean. To show good will, we return true asynchronously ¯\_(ツ)_/¯
505516
const [value, transferables] = toWireValue(rawValue);
506517
return requestResponseMessage(
507-
ep,
508-
pendingListeners,
518+
epWithPendingListeners,
509519
{
510520
type: MessageType.SET,
511521
path: [...path, prop].map((p) => p.toString()),
@@ -518,18 +528,17 @@ function createProxy<T>(
518528
throwIfProxyReleased(isProxyReleased);
519529
const last = path[path.length - 1];
520530
if ((last as any) === createEndpoint) {
521-
return requestResponseMessage(ep, pendingListeners, {
531+
return requestResponseMessage(epWithPendingListeners, {
522532
type: MessageType.ENDPOINT,
523533
}).then(fromWireValue);
524534
}
525535
// We just pretend that `bind()` didn’t happen.
526536
if (last === "bind") {
527-
return createProxy(ep, pendingListeners, path.slice(0, -1));
537+
return createProxy(epWithPendingListeners, path.slice(0, -1));
528538
}
529539
const [argumentList, transferables] = processArguments(rawArgumentList);
530540
return requestResponseMessage(
531-
ep,
532-
pendingListeners,
541+
epWithPendingListeners,
533542
{
534543
type: MessageType.APPLY,
535544
path: path.map((p) => p.toString()),
@@ -542,8 +551,7 @@ function createProxy<T>(
542551
throwIfProxyReleased(isProxyReleased);
543552
const [argumentList, transferables] = processArguments(rawArgumentList);
544553
return requestResponseMessage(
545-
ep,
546-
pendingListeners,
554+
epWithPendingListeners,
547555
{
548556
type: MessageType.CONSTRUCT,
549557
path: path.map((p) => p.toString()),
@@ -553,7 +561,7 @@ function createProxy<T>(
553561
).then(fromWireValue);
554562
},
555563
});
556-
registerProxy(proxy, ep);
564+
registerProxy(proxy, epWithPendingListeners);
557565
return proxy as any;
558566
}
559567

@@ -622,11 +630,12 @@ function fromWireValue(value: WireValue): any {
622630
}
623631

624632
function requestResponseMessage(
625-
ep: Endpoint,
626-
pendingListeners: PendingListenersMap,
633+
epWithPendingListeners: EndpointWithPendingListeners,
627634
msg: Message,
628635
transfers?: Transferable[]
629636
): Promise<WireValue> {
637+
const ep = epWithPendingListeners.endpoint;
638+
const pendingListeners = epWithPendingListeners.pendingListeners;
630639
return new Promise((resolve) => {
631640
const id = generateUUID();
632641
pendingListeners.set(id, resolve);

tests/node/main.mjs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,17 @@ describe("node", () => {
2424
const otherProxy = Comlink.wrap(otherEp);
2525
expect(await otherProxy(20, 1)).to.equal(21);
2626
});
27+
28+
it("releaseProxy closes MessagePort created by createEndpoint", async function () {
29+
const proxy = Comlink.wrap(nodeEndpoint(this.worker));
30+
const otherEp = await proxy[Comlink.createEndpoint]();
31+
const otherProxy = Comlink.wrap(otherEp);
32+
expect(await otherProxy(20, 1)).to.equal(21);
33+
34+
await new Promise((resolve) => {
35+
otherEp.close = resolve; // Resolve the promise when the MessagePort is closed.
36+
otherProxy[Comlink.releaseProxy](); // Release the proxy, which should close the MessagePort.
37+
});
38+
});
2739
});
2840
});

tests/worker.comlink.test.js

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,16 @@ describe("Comlink across workers", function () {
3333
const otherProxy = Comlink.wrap(otherEp);
3434
expect(await otherProxy(20, 1)).to.equal(21);
3535
});
36+
37+
it("releaseProxy closes MessagePort created by createEndpoint", async function () {
38+
const proxy = Comlink.wrap(this.worker);
39+
const otherEp = await proxy[Comlink.createEndpoint]();
40+
const otherProxy = Comlink.wrap(otherEp);
41+
expect(await otherProxy(20, 1)).to.equal(21);
42+
43+
await new Promise((resolve) => {
44+
otherEp.close = resolve; // Resolve the promise when the MessagePort is closed.
45+
otherProxy[Comlink.releaseProxy](); // Release the proxy, which should close the MessagePort.
46+
});
47+
});
3648
});

0 commit comments

Comments
 (0)