-
Notifications
You must be signed in to change notification settings - Fork 45
reuse processes using a process pool #195
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
d37d513
92921f9
756d770
5048a63
85477fe
833d66c
0777db2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,5 +6,5 @@ insert_final_newline = true | |
indent_style = space | ||
indent_size = 2 | ||
|
||
[{*.js,test/index.sh}] | ||
[{*.js}] | ||
indent_size = 4 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,17 +1,17 @@ | ||
import { fork } from 'child_process'; | ||
import * as assert from 'assert'; | ||
import * as CircularJSON from 'circular-json'; | ||
import * as debug from 'debug'; | ||
import * as Mocha from 'mocha'; | ||
import { resolve as pathResolve } from 'path'; | ||
|
||
import ProcessPool from './process-pool'; | ||
import RunnerMain from './runner'; | ||
import TaskManager from './task-manager'; | ||
import { | ||
removeDebugArgs, | ||
subprocessParseReviver, | ||
} from './util'; | ||
|
||
import { DEBUG_SUBPROCESS, SUITE_OWN_OPTIONS } from '../config'; | ||
import { SUITE_OWN_OPTIONS } from '../config'; | ||
import { | ||
IRetriedTest, | ||
ISubprocessOutputMessage, | ||
|
@@ -24,6 +24,7 @@ import { | |
const debugLog = debug('mocha-parallel-tests'); | ||
|
||
export default class MochaWrapper extends Mocha { | ||
private pool = new ProcessPool(); | ||
private isTypescriptRunMode = false; | ||
private maxParallel: number | undefined; | ||
private requires: string[] = []; | ||
|
@@ -50,6 +51,7 @@ export default class MochaWrapper extends Mocha { | |
|
||
setMaxParallel(maxParallel: number) { | ||
this.maxParallel = maxParallel; | ||
this.pool.setMaxParallel(maxParallel); | ||
} | ||
|
||
enableExitMode() { | ||
|
@@ -136,6 +138,7 @@ export default class MochaWrapper extends Mocha { | |
}; | ||
|
||
runner.emitFinishEvents(done); | ||
this.pool.destroyAll(); | ||
}); | ||
|
||
return runner; | ||
|
@@ -162,89 +165,99 @@ export default class MochaWrapper extends Mocha { | |
} | ||
|
||
private spawnTestProcess(file: string): Promise<ISubprocessResult> { | ||
return new Promise((resolve) => { | ||
const nodeFlags: string[] = []; | ||
const extension = this.isTypescriptRunMode ? 'ts' : 'js'; | ||
const runnerPath = pathResolve(__dirname, `../subprocess/runner.${extension}`); | ||
return new Promise<ISubprocessResult>(async (resolve) => { | ||
const resolvedFilePath = pathResolve(file); | ||
|
||
const forkArgs: string[] = ['--test', resolvedFilePath]; | ||
const testOptions: {[key: string]: any} = { test: resolvedFilePath }; | ||
|
||
for (const option of SUITE_OWN_OPTIONS) { | ||
const propValue = this.suite[option](); | ||
// bail is undefined by default, we need to somehow pass its value to the subprocess | ||
forkArgs.push(`--${option}`, propValue === undefined ? false : propValue); | ||
testOptions[option] = propValue === undefined ? false : propValue; | ||
} | ||
|
||
for (const requirePath of this.requires) { | ||
forkArgs.push('--require', requirePath); | ||
testOptions.require = requirePath; | ||
} | ||
|
||
for (const compilerPath of this.compilers) { | ||
forkArgs.push('--compilers', compilerPath); | ||
} | ||
testOptions.compilers = this.compilers || []; | ||
|
||
if (this.options.delay) { | ||
forkArgs.push('--delay'); | ||
testOptions.delay = true; | ||
} | ||
|
||
if (this.options.grep) { | ||
forkArgs.push('--grep', this.options.grep.toString()); | ||
testOptions.grep = this.options.grep.toString(); | ||
} | ||
|
||
if (this.exitImmediately) { | ||
forkArgs.push('--exit'); | ||
testOptions.exit = true; | ||
} | ||
|
||
if (this.options.fullStackTrace) { | ||
forkArgs.push('--full-trace'); | ||
testOptions.fullStackTrace = true; | ||
} | ||
|
||
const test = fork(runnerPath, forkArgs, { | ||
// otherwise `--inspect-brk` and other params will be passed to subprocess | ||
execArgv: process.execArgv.filter(removeDebugArgs), | ||
stdio: ['ipc'], | ||
}); | ||
|
||
if (this.isTypescriptRunMode) { | ||
nodeFlags.push('--require', 'ts-node/register'); | ||
let test; | ||
try { | ||
test = await this.pool.getOrCreate(this.isTypescriptRunMode); | ||
} catch (e) { | ||
throw e; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like you can re-write this piece of code without try..catch There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Because that's essentially what you're doing here. If the exception occurs you just re-throw it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
} | ||
|
||
debugLog('Process spawned. You can run it manually with this command:'); | ||
debugLog(`node ${nodeFlags.join(' ')} ${runnerPath} ${forkArgs.concat([DEBUG_SUBPROCESS.argument]).join(' ')}`); | ||
test.send(JSON.stringify({ type: 'start', testOptions })); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor: it looks like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Switched to a simple handle. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You did but what's the idea of passing a string if your definition says that the |
||
|
||
const events: Array<ISubprocessOutputMessage | ISubprocessRunnerMessage> = []; | ||
let syncedSubprocessData: ISubprocessSyncedData | undefined; | ||
const startedAt = Date.now(); | ||
|
||
test.on('message', function onMessageHandler({ event, data }) { | ||
const clean = () => { | ||
test.removeListener('message', onMessageHandler); | ||
test.stdout.removeListener('data', onStdoutData); | ||
test.stderr.removeListener('data', onStderrData); | ||
test.removeListener('close', onClose); | ||
test.destroy(); | ||
}; | ||
|
||
const onMessageHandler = ({ event, data }: { event: string, data: any }) => { | ||
if (event === 'sync') { | ||
syncedSubprocessData = data; | ||
} else if (event === 'end') { | ||
clean(); | ||
resolve({ | ||
code: data.code || 0, | ||
events, | ||
execTime: Date.now() - startedAt, | ||
file, | ||
syncedSubprocessData, | ||
}); | ||
} else { | ||
assert(event); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks like a premature optimization. Let's either add a test for it or get rid of it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
events.push({ | ||
data, | ||
event, | ||
type: 'runner', | ||
}); | ||
} | ||
}); | ||
}; | ||
|
||
test.stdout.on('data', function onStdoutData(data: Buffer) { | ||
const onStdoutData = (data: Buffer) => { | ||
events.push({ | ||
data, | ||
event: undefined, | ||
type: 'stdout', | ||
}); | ||
}); | ||
}; | ||
|
||
test.stderr.on('data', function onStderrData(data: Buffer) { | ||
const onStderrData = (data: Buffer) => { | ||
events.push({ | ||
data, | ||
event: undefined, | ||
type: 'stderr', | ||
}); | ||
}); | ||
}; | ||
|
||
test.on('close', (code) => { | ||
const onClose = (code: number) => { | ||
debugLog(`Process for ${file} exited with code ${code}`); | ||
|
||
resolve({ | ||
|
@@ -254,7 +267,12 @@ export default class MochaWrapper extends Mocha { | |
file, | ||
syncedSubprocessData, | ||
}); | ||
}); | ||
}; | ||
|
||
test.on('message', onMessageHandler); | ||
test.stdout.on('data', onStdoutData); | ||
test.stderr.on('data', onStderrData); | ||
test.on('close', onClose); | ||
}); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
import { fork } from 'child_process'; | ||
import * as os from 'os'; | ||
import { resolve as pathResolve } from 'path'; | ||
import { removeDebugArgs } from './util'; | ||
|
||
type eventFn = (data: any) => void; | ||
|
||
interface IMochaProcess { | ||
send: (msg: any) => void; | ||
kill: () => void; | ||
destroy: () => void; | ||
on: (event: string, fn: eventFn) => void; | ||
removeListener: (event: string, eventFn) => void; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These two methods look very similar to node EventEmitter, but they're doing different things. Could you please rename them to be more explicit? For instance "on" -> "addProcessListener" and "removeListener" -> "removeProsessListener". Another approach would be that IMochaProcess objects could be extending node EventEmitter but in this case, you'd have to re-implement all EventEmitter methods related to adding/removing listeners. |
||
stdout: NodeJS.ReadableStream; | ||
stderr: NodeJS.ReadableStream; | ||
} | ||
|
||
export default class ProcessPool { | ||
private maxParallel: number; | ||
private waitingList: Array<(proc: IMochaProcess) => void> = []; | ||
private unusedProcesses: IMochaProcess[] = []; | ||
private processes: IMochaProcess[] = []; | ||
|
||
constructor() { | ||
this.maxParallel = os.cpus().length; | ||
} | ||
|
||
setMaxParallel(n: number) { | ||
this.maxParallel = n; | ||
} | ||
|
||
async getOrCreate(isTypescriptRunMode: boolean): Promise<IMochaProcess> { | ||
const extension = isTypescriptRunMode ? 'ts' : 'js'; | ||
const runnerPath = pathResolve(__dirname, `../subprocess/runner.${extension}`); | ||
|
||
const lastUnusedProcess = this.unusedProcesses.pop(); | ||
if (lastUnusedProcess) { | ||
return lastUnusedProcess; | ||
} | ||
|
||
if (this.processes.length >= this.maxParallel) { | ||
const process: IMochaProcess = await new Promise<IMochaProcess>((resolve) => { | ||
this.waitingList.push(resolve); | ||
}); | ||
return process; | ||
} | ||
return this.create(runnerPath, { | ||
// otherwise `--inspect-brk` and other params will be passed to subprocess | ||
execArgv: process.execArgv.filter(removeDebugArgs), | ||
stdio: ['ipc'], | ||
}); | ||
} | ||
|
||
create(runnerPath: string, opt: object): IMochaProcess { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
const process = fork(runnerPath, [], opt); | ||
|
||
const handle = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's not the best way to create a new object + it's not interface-checked. Could you extract this handle object creation into a separate file containing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also please make sure all methods belong to the prototype, not to the object being created ( |
||
destroy: () => { | ||
const nextOnWaitingList = this.waitingList.pop(); | ||
if (nextOnWaitingList) { | ||
nextOnWaitingList(handle); | ||
} else { | ||
this.unusedProcesses.push(handle); | ||
} | ||
}, | ||
kill: () => { | ||
process.kill(); | ||
}, | ||
on: (ev, fn) => { | ||
process.on(ev, fn); | ||
}, | ||
removeListener: (ev, fn) => { | ||
process.removeListener(ev, fn); | ||
}, | ||
send: (msg: any) => { | ||
process.send(msg); | ||
}, | ||
stderr: process.stderr, | ||
stdout: process.stdout, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the idea of passing standard streams to this handle? Is there a place where you re-define them? |
||
}; | ||
|
||
this.processes.push(handle); | ||
|
||
return handle; | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One more thing to consider here is the mocha There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What do we do then? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's a question to you. It looks to me like we can re-use processes only when There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @fabiosantoscode ^ any ideas? If no, let's enable this behaviour for |
||
|
||
destroyAll() { | ||
this.processes.forEach((proc: IMochaProcess) => { | ||
proc.kill(); | ||
}); | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.