mirror of
https://github.com/github/codeql-action.git
synced 2025-12-27 01:30:10 +08:00
249 lines
5.8 KiB
JavaScript
249 lines
5.8 KiB
JavaScript
import {EventEmitter, on} from 'node:events';
|
|
import process from 'node:process';
|
|
import {workerData, parentPort, threadId} from 'node:worker_threads';
|
|
|
|
import pkg from '../pkg.cjs';
|
|
|
|
// Used to forward messages received over the `parentPort` and any direct ports
|
|
// to test workers. Every subscription adds a listener, so do not enforce any
|
|
// maximums.
|
|
const events = new EventEmitter().setMaxListeners(0);
|
|
const emitMessage = message => {
|
|
// Wait for a turn of the event loop, to allow new subscriptions to be
|
|
// set up in response to the previous message.
|
|
setImmediate(() => events.emit('message', message));
|
|
};
|
|
|
|
// Map of active test workers, used in receiveMessages() to get a reference to
|
|
// the TestWorker instance, and relevant release functions.
|
|
const activeTestWorkers = new Map();
|
|
|
|
const internalMessagePort = Symbol('Internal MessagePort');
|
|
|
|
class TestWorker {
|
|
constructor(id, file, port) {
|
|
this.id = id;
|
|
this.file = file;
|
|
this[internalMessagePort] = port;
|
|
}
|
|
|
|
teardown(fn) {
|
|
let done = false;
|
|
const teardownFn = async () => {
|
|
if (done) {
|
|
return;
|
|
}
|
|
|
|
done = true;
|
|
if (activeTestWorkers.has(this.id)) {
|
|
activeTestWorkers.get(this.id).teardownFns.delete(teardownFn);
|
|
}
|
|
|
|
await fn();
|
|
};
|
|
|
|
activeTestWorkers.get(this.id).teardownFns.add(teardownFn);
|
|
|
|
return teardownFn;
|
|
}
|
|
|
|
publish(data) {
|
|
return publishMessage(this, data);
|
|
}
|
|
|
|
async * subscribe() {
|
|
yield * receiveMessages(this);
|
|
}
|
|
}
|
|
|
|
class ReceivedMessage {
|
|
constructor(testWorker, id, data) {
|
|
this.testWorker = testWorker;
|
|
this.id = id;
|
|
this.data = data;
|
|
}
|
|
|
|
reply(data) {
|
|
return publishMessage(this.testWorker, data, this.id);
|
|
}
|
|
}
|
|
|
|
// Ensure that, no matter how often it's received, we have a stable message
|
|
// object.
|
|
const messageCache = new WeakMap();
|
|
|
|
async function * receiveMessages(fromTestWorker, replyTo) {
|
|
for await (const [message] of on(events, 'message')) {
|
|
if (fromTestWorker !== undefined) {
|
|
if (message.type === 'deregister-test-worker' && message.id === fromTestWorker.id) {
|
|
return;
|
|
}
|
|
|
|
if (message.type === 'message' && message.testWorkerId !== fromTestWorker.id) {
|
|
continue;
|
|
}
|
|
}
|
|
|
|
if (message.type !== 'message') {
|
|
continue;
|
|
}
|
|
|
|
if (replyTo === undefined && message.replyTo !== undefined) {
|
|
continue;
|
|
}
|
|
|
|
if (replyTo !== undefined && message.replyTo !== replyTo) {
|
|
continue;
|
|
}
|
|
|
|
const active = activeTestWorkers.get(message.testWorkerId);
|
|
// It is possible for a message to have been buffering for so long — perhaps
|
|
// due to the caller waiting before iterating to the next message — that the
|
|
// test worker has been deregistered. Ignore such messages.
|
|
//
|
|
// (This is really hard to write a test for, however!)
|
|
if (active === undefined) {
|
|
continue;
|
|
}
|
|
|
|
let received = messageCache.get(message);
|
|
if (received === undefined) {
|
|
received = new ReceivedMessage(active.instance, message.messageId, message.data);
|
|
messageCache.set(message, received);
|
|
}
|
|
|
|
yield received;
|
|
}
|
|
}
|
|
|
|
let messageCounter = 0;
|
|
const messageIdPrefix = `${threadId}/message`;
|
|
const nextMessageId = () => `${messageIdPrefix}/${++messageCounter}`;
|
|
|
|
function publishMessage(testWorker, data, replyTo) {
|
|
const id = nextMessageId();
|
|
testWorker[internalMessagePort].postMessage({
|
|
type: 'message',
|
|
messageId: id,
|
|
data,
|
|
replyTo,
|
|
});
|
|
|
|
return {
|
|
id,
|
|
async * replies() {
|
|
yield * receiveMessages(testWorker, id);
|
|
},
|
|
};
|
|
}
|
|
|
|
function broadcastMessage(data) {
|
|
const id = nextMessageId();
|
|
for (const trackedWorker of activeTestWorkers.values()) {
|
|
trackedWorker.instance[internalMessagePort].postMessage({
|
|
type: 'message',
|
|
messageId: id,
|
|
data,
|
|
});
|
|
}
|
|
|
|
return {
|
|
id,
|
|
async * replies() {
|
|
yield * receiveMessages(undefined, id);
|
|
},
|
|
};
|
|
}
|
|
|
|
async function loadFactory() {
|
|
const {default: factory} = await import(workerData.filename);
|
|
return factory;
|
|
}
|
|
|
|
let signalAvailable = () => {
|
|
parentPort.postMessage({type: 'available'});
|
|
signalAvailable = () => {};
|
|
};
|
|
|
|
let fatal;
|
|
try {
|
|
const factory = await loadFactory(workerData.filename);
|
|
if (typeof factory !== 'function') {
|
|
throw new TypeError(`Missing default factory function export for shared worker plugin at ${workerData.filename}`);
|
|
}
|
|
|
|
factory({
|
|
negotiateProtocol(supported) {
|
|
if (!supported.includes('ava-4')) {
|
|
fatal = new Error(`This version of AVA (${pkg.version}) is not compatible with shared worker plugin at ${workerData.filename}`);
|
|
throw fatal;
|
|
}
|
|
|
|
const produceTestWorker = instance => events.emit('testWorker', instance);
|
|
|
|
parentPort.on('message', async message => {
|
|
if (message.type === 'register-test-worker') {
|
|
const {id, file, port} = message;
|
|
const instance = new TestWorker(id, file, port);
|
|
|
|
activeTestWorkers.set(id, {instance, teardownFns: new Set()});
|
|
|
|
produceTestWorker(instance);
|
|
port.on('message', message => emitMessage({testWorkerId: id, ...message}));
|
|
}
|
|
|
|
if (message.type === 'deregister-test-worker') {
|
|
const {id} = message;
|
|
const {teardownFns} = activeTestWorkers.get(id);
|
|
activeTestWorkers.delete(id);
|
|
|
|
// Run possibly asynchronous release functions serially, in reverse
|
|
// order. Any error will crash the worker.
|
|
for await (const fn of [...teardownFns].reverse()) {
|
|
await fn();
|
|
}
|
|
|
|
parentPort.postMessage({
|
|
type: 'deregistered-test-worker',
|
|
id,
|
|
});
|
|
|
|
emitMessage(message);
|
|
}
|
|
});
|
|
|
|
return {
|
|
initialData: workerData.initialData,
|
|
protocol: 'ava-4',
|
|
|
|
ready() {
|
|
signalAvailable();
|
|
return this;
|
|
},
|
|
|
|
broadcast(data) {
|
|
return broadcastMessage(data);
|
|
},
|
|
|
|
async * subscribe() {
|
|
yield * receiveMessages();
|
|
},
|
|
|
|
async * testWorkers() {
|
|
for await (const [worker] of on(events, 'testWorker')) {
|
|
yield worker;
|
|
}
|
|
},
|
|
};
|
|
},
|
|
});
|
|
} catch (error) {
|
|
fatal = fatal ?? error;
|
|
} finally {
|
|
if (fatal !== undefined) {
|
|
process.nextTick(() => {
|
|
throw fatal;
|
|
});
|
|
}
|
|
}
|