feat(@angular-devkit/benchmark): add capabilities to benchmark watch processes

This commit is contained in:
Alan Agius 2020-01-21 08:34:14 +01:00 committed by Douglas Parker
parent e72f97d188
commit 301bf18a04
15 changed files with 312 additions and 29 deletions

View File

@ -19,7 +19,7 @@ export declare const aggregateMetrics: (m1: Metric | AggregatedMetric, m2: Metri
export declare type BenchmarkReporter = (command: Command, groups: MetricGroup[]) => void; export declare type BenchmarkReporter = (command: Command, groups: MetricGroup[]) => void;
export declare type Capture = (process: MonitoredProcess) => Observable<MetricGroup>; export declare type Capture = (stats: Observable<AggregatedProcessStats>) => Observable<MetricGroup>;
export declare class Command { export declare class Command {
args: string[]; args: string[];
@ -40,7 +40,8 @@ export declare class LocalMonitoredProcess implements MonitoredProcess {
stats$: Observable<AggregatedProcessStats>; stats$: Observable<AggregatedProcessStats>;
stderr$: Observable<Buffer>; stderr$: Observable<Buffer>;
stdout$: Observable<Buffer>; stdout$: Observable<Buffer>;
constructor(command: Command); constructor(command: Command, useProcessTime?: boolean);
resetElapsedTimer(): void;
run(): Observable<number>; run(): Observable<number>;
} }
@ -89,3 +90,11 @@ export interface RunBenchmarkOptions {
reporters: BenchmarkReporter[]; reporters: BenchmarkReporter[];
retries?: number; retries?: number;
} }
export declare function runBenchmarkWatch({ command, captures, reporters, iterations, retries, logger, watchMatcher, watchTimeout, watchCommand, }: RunBenchmarkWatchOptions): Observable<MetricGroup[]>;
export interface RunBenchmarkWatchOptions extends RunBenchmarkOptions {
watchCommand: Command;
watchMatcher: string;
watchTimeout?: number;
}

View File

@ -45,6 +45,9 @@ ts_library(
"src/test/exit-code-one.js", "src/test/exit-code-one.js",
"src/test/fibonacci.js", "src/test/fibonacci.js",
"src/test/test-script.js", "src/test/test-script.js",
"src/test/watch-test-cmd.js",
"src/test/watch-test-file.txt",
"src/test/watch-test-script.js",
], ],
# @external_begin # @external_begin
tsconfig = "//:tsconfig-test.json", tsconfig = "//:tsconfig-test.json",

View File

@ -72,6 +72,21 @@ $ benchmark -- node fibonacci.js 40
[benchmark] Peak Memory usage: 22.34 MB (22.32, 22.34, 22.34, 22.35, 22.35) [benchmark] Peak Memory usage: 22.34 MB (22.32, 22.34, 22.34, 22.35, 22.35)
``` ```
## Example in watch mode
```
benchmark --verbose --watch-timeout=10000 --watch-matcher="Compiled successfully" --watch-script watch-script.js -- ng serve
[benchmark] Benchmarking process over 5 iterations, with up to 5 retries.
[benchmark] ng serve (at D:\sandbox\latest-project)
[benchmark] Process Stats
[benchmark] Elapsed Time: 470.40 ms (820.00, 557.00, 231.00, 509.00, 235.00)
[benchmark] Average Process usage: 2.00 process(es) (2.00, 2.00, 2.00, 2.00, 2.00)
[benchmark] Peak Process usage: 2.00 process(es) (2.00, 2.00, 2.00, 2.00, 2.00)
[benchmark] Average CPU usage: 33.77 % (31.27, 0.00, 101.70, 35.90, 0.00)
[benchmark] Peak CPU usage: 59.72 % (125.10, 0.00, 101.70, 71.80, 0.00)
[benchmark] Average Memory usage: 665.49 MB (619.84, 657.17, 669.47, 685.19, 695.76)
[benchmark] Peak Memory usage: 672.44 MB (639.87, 661.04, 669.47, 689.66, 702.14)
```
## API Usage ## API Usage

View File

@ -13,7 +13,7 @@ import { cumulativeMovingAverage, max } from './utils';
export const defaultStatsCapture: Capture = ( export const defaultStatsCapture: Capture = (
process: MonitoredProcess, stats: Observable<AggregatedProcessStats>,
): Observable<MetricGroup> => { ): Observable<MetricGroup> => {
type Accumulator = { type Accumulator = {
elapsed: number, elapsed: number,
@ -34,7 +34,7 @@ export const defaultStatsCapture: Capture = (
peakMemory: 0, peakMemory: 0,
}; };
return process.stats$.pipe( return stats.pipe(
reduce<AggregatedProcessStats, Accumulator>((acc, val, idx) => ({ reduce<AggregatedProcessStats, Accumulator>((acc, val, idx) => ({
elapsed: val.elapsed, elapsed: val.elapsed,
avgProcesses: cumulativeMovingAverage(acc.avgProcesses, val.processes, idx), avgProcesses: cumulativeMovingAverage(acc.avgProcesses, val.processes, idx),

View File

@ -8,7 +8,7 @@
import { Observable } from 'rxjs'; import { Observable } from 'rxjs';
import { toArray } from 'rxjs/operators'; import { toArray } from 'rxjs/operators';
import { defaultStatsCapture } from './default-stats-capture'; import { defaultStatsCapture } from './default-stats-capture';
import { AggregatedProcessStats, MonitoredProcess } from './interfaces'; import { AggregatedProcessStats } from './interfaces';
describe('defaultStatsCapture', () => { describe('defaultStatsCapture', () => {
@ -29,9 +29,8 @@ describe('defaultStatsCapture', () => {
}); });
obs.complete(); obs.complete();
}); });
const process = { stats$ } as {} as MonitoredProcess;
const res = await defaultStatsCapture(process).pipe(toArray()).toPromise(); const res = await defaultStatsCapture(stats$).pipe(toArray()).toPromise();
expect(res).toEqual([{ expect(res).toEqual([{
name: 'Process Stats', name: 'Process Stats',
metrics: [ metrics: [

View File

@ -12,5 +12,6 @@ export * from './default-reporter';
export * from './default-stats-capture'; export * from './default-stats-capture';
export * from './monitored-process'; export * from './monitored-process';
export * from './run-benchmark'; export * from './run-benchmark';
export * from './run-benchmark-watch';
export * from './utils'; export * from './utils';
export * from './main'; export * from './main';

View File

@ -51,7 +51,8 @@ export interface MetricGroup {
metrics: (Metric | AggregatedMetric)[]; metrics: (Metric | AggregatedMetric)[];
} }
export type Capture = (process: MonitoredProcess) => Observable<MetricGroup>; export type Capture = (stats: Observable<AggregatedProcessStats>) => Observable<MetricGroup>;
// TODO: might need to allow reporters to say they are finished. // TODO: might need to allow reporters to say they are finished.
export type BenchmarkReporter = (command: Command, groups: MetricGroup[]) => void; export type BenchmarkReporter = (command: Command, groups: MetricGroup[]) => void;

View File

@ -16,6 +16,7 @@ import { Command } from '../src/command';
import { defaultReporter } from '../src/default-reporter'; import { defaultReporter } from '../src/default-reporter';
import { defaultStatsCapture } from '../src/default-stats-capture'; import { defaultStatsCapture } from '../src/default-stats-capture';
import { runBenchmark } from '../src/run-benchmark'; import { runBenchmark } from '../src/run-benchmark';
import { runBenchmarkWatch } from './run-benchmark-watch';
export interface MainOptions { export interface MainOptions {
@ -47,6 +48,9 @@ export async function main({
--output-file File to output benchmark log to. --output-file File to output benchmark log to.
--overwrite-output-file If the output file should be overwritten rather than appended to. --overwrite-output-file If the output file should be overwritten rather than appended to.
--prefix Logging prefix. --prefix Logging prefix.
--watch-matcher Text to match in stdout to mark an iteration complete.
--watch-timeout The maximum time in 'ms' to wait for the text specified in the matcher to be matched. Default is 10000.
--watch-script Script to run before each watch iteration.
Example: Example:
benchmark --iterations=3 -- node my-script.js benchmark --iterations=3 -- node my-script.js
@ -63,12 +67,19 @@ export async function main({
'output-file': string | null; 'output-file': string | null;
cwd: string; cwd: string;
prefix: string; prefix: string;
'watch-timeout': number;
'watch-matcher'?: string;
'watch-script'?: string;
'--': string[] | null; '--': string[] | null;
} }
// Parse the command line. // Parse the command line.
const argv = minimist(args, { const argv = minimist(args, {
boolean: ['help', 'verbose', 'overwrite-output-file'], boolean: ['help', 'verbose', 'overwrite-output-file'],
string: [
'watch-matcher',
'watch-script',
],
default: { default: {
'exit-code': 0, 'exit-code': 0,
'iterations': 5, 'iterations': 5,
@ -76,6 +87,7 @@ export async function main({
'output-file': null, 'output-file': null,
'cwd': process.cwd(), 'cwd': process.cwd(),
'prefix': '[benchmark]', 'prefix': '[benchmark]',
'watch-timeout': 10000,
}, },
'--': true, '--': true,
}) as {} as BenchmarkCliArgv; }) as {} as BenchmarkCliArgv;
@ -127,6 +139,29 @@ export async function main({
const commandArgv = argv['--']; const commandArgv = argv['--'];
const {
'watch-timeout': watchTimeout,
'watch-matcher': watchMatcher,
'watch-script': watchScript,
'exit-code': exitCode,
'output-file': outFile,
iterations,
retries,
} = argv;
// Exit early if we can't find the command to benchmark.
if (watchMatcher && !watchScript) {
logger.fatal(`Cannot use --watch-matcher without specifying --watch-script.`);
return 1;
}
if (!watchMatcher && watchScript) {
logger.fatal(`Cannot use --watch-script without specifying --watch-matcher.`);
return 1;
}
// Exit early if we can't find the command to benchmark. // Exit early if we can't find the command to benchmark.
if (!commandArgv || !Array.isArray(argv['--']) || (argv['--'] as Array<string>).length < 1) { if (!commandArgv || !Array.isArray(argv['--']) || (argv['--'] as Array<string>).length < 1) {
logger.fatal(`Missing command, see benchmark --help for help.`); logger.fatal(`Missing command, see benchmark --help for help.`);
@ -135,32 +170,42 @@ export async function main({
} }
// Setup file logging. // Setup file logging.
if (argv['output-file'] !== null) { if (outFile !== null) {
if (argv['overwrite-output-file']) { if (argv['overwrite-output-file']) {
writeFileSync(argv['output-file'] as string, ''); writeFileSync(outFile, '');
} }
logger.pipe(filter(entry => (entry.level != 'debug' || argv['verbose']))) logger.pipe(filter(entry => (entry.level != 'debug' || argv['verbose'])))
.subscribe(entry => appendFileSync(argv['output-file'] as string, `${entry.message}\n`)); .subscribe(entry => appendFileSync(outFile, `${entry.message}\n`));
} }
// Run benchmark on given command, capturing stats and reporting them. // Run benchmark on given command, capturing stats and reporting them.
const exitCode = argv['exit-code'];
const cmd = commandArgv[0]; const cmd = commandArgv[0];
const cmdArgs = commandArgv.slice(1); const cmdArgs = commandArgv.slice(1);
const command = new Command(cmd, cmdArgs, argv['cwd'], exitCode); const command = new Command(cmd, cmdArgs, argv['cwd'], exitCode);
const captures = [defaultStatsCapture]; const captures = [defaultStatsCapture];
const reporters = [defaultReporter(logger)]; const reporters = [defaultReporter(logger)];
const iterations = argv['iterations'];
const retries = argv['retries'];
logger.info(`Benchmarking process over ${iterations} iterations, with up to ${retries} retries.`); logger.info(`Benchmarking process over ${iterations} iterations, with up to ${retries} retries.`);
logger.info(` ${command.toString()}`); logger.info(` ${command.toString()}`);
let res;
try { try {
res = await runBenchmark( let res$;
{ command, captures, reporters, iterations, retries, logger }, if (watchMatcher && watchScript) {
).pipe(toArray()).toPromise(); res$ = runBenchmarkWatch({
command, captures, reporters, iterations, retries, logger,
watchCommand: new Command('node', [watchScript]), watchMatcher, watchTimeout,
});
} else {
res$ = runBenchmark(
{ command, captures, reporters, iterations, retries, logger },
);
}
const res = await res$.pipe(toArray()).toPromise();
if (res.length === 0) {
return 1;
}
} catch (error) { } catch (error) {
if (error.message) { if (error.message) {
logger.fatal(error.message); logger.fatal(error.message);
@ -171,10 +216,6 @@ export async function main({
return 1; return 1;
} }
if (res.length === 0) {
return 1;
}
return 0; return 0;
} }

View File

@ -8,6 +8,7 @@
import { existsSync, readFileSync, unlinkSync, writeFileSync } from 'fs'; import { existsSync, readFileSync, unlinkSync, writeFileSync } from 'fs';
import { basename, dirname, join } from 'path'; import { basename, dirname, join } from 'path';
import { main } from './main'; import { main } from './main';
// tslint:disable-next-line:no-implicit-dependencies // tslint:disable-next-line:no-implicit-dependencies
const temp = require('temp'); const temp = require('temp');
@ -23,9 +24,11 @@ class MockWriteStream {
} }
} }
describe('benchmark binary', () => { fdescribe('benchmark binary', () => {
const benchmarkScript = require.resolve(join(__dirname, './test/fibonacci.js')); const benchmarkScript = require.resolve(join(__dirname, './test/fibonacci.js'));
const exitCodeOneScript = require.resolve(join(__dirname, './test/exit-code-one.js')); const exitCodeOneScript = require.resolve(join(__dirname, './test/exit-code-one.js'));
const benchmarkWatchScript = require.resolve(join(__dirname, './test/watch-test-cmd.js'));
const watchTriggerScript = require.resolve(join(__dirname, './test/watch-test-script.js'));
const outputFileRoot = temp.mkdirSync('benchmark-binary-spec-'); const outputFileRoot = temp.mkdirSync('benchmark-binary-spec-');
const outputFile = join(outputFileRoot, 'output.log'); const outputFile = join(outputFileRoot, 'output.log');
let stdout: MockWriteStream, stderr: MockWriteStream; let stdout: MockWriteStream, stderr: MockWriteStream;
@ -142,4 +145,51 @@ describe('benchmark binary', () => {
stdout.lines.forEach(line => expect(line).toMatch(/^\[abc\]/)); stdout.lines.forEach(line => expect(line).toMatch(/^\[abc\]/));
expect(res).toEqual(0); expect(res).toEqual(0);
}); });
it('uses watch-script and watch-matcher', async () => {
const args = [
'--watch-matcher',
'Complete',
'--watch-script',
watchTriggerScript,
'--',
'node',
benchmarkWatchScript,
];
const res = await main({ args, stdout, stderr });
expect(stdout.lines).toContain('[benchmark] Process Stats\n');
expect(res).toEqual(0);
}, 30000);
it('should not fail with exit code', async () => {
const args = [
'--watch-matcher',
'Complete',
'--watch-script',
watchTriggerScript,
'--',
'node',
exitCodeOneScript,
];
const res = await main({ args, stdout, stderr });
expect(stderr.lines).toContain('[benchmark] Maximum number of retries (5) for command was exceeded.\n');
expect(res).toEqual(1);
});
it('should error when watch-timeout is exceeded', async () => {
const args = [
'--watch-timeout',
'20',
'--watch-matcher',
'Wrong Match',
'--watch-script',
watchTriggerScript,
'--',
'node',
benchmarkWatchScript,
];
const res = await main({ args, stdout, stderr });
expect(stderr.lines).toContain('[benchmark] Timeout has occurred\n');
expect(res).toEqual(1);
});
}); });

View File

@ -31,14 +31,21 @@ export class LocalMonitoredProcess implements MonitoredProcess {
stats$: Observable<AggregatedProcessStats> = this.stats.asObservable(); stats$: Observable<AggregatedProcessStats> = this.stats.asObservable();
stdout$: Observable<Buffer> = this.stdout.asObservable(); stdout$: Observable<Buffer> = this.stdout.asObservable();
stderr$: Observable<Buffer> = this.stderr.asObservable(); stderr$: Observable<Buffer> = this.stderr.asObservable();
private elapsedTimer: number;
constructor(private command: Command) { } constructor(
private command: Command,
private useProcessTime = true,
) { }
run(): Observable<number> { run(): Observable<number> {
return new Observable(obs => { return new Observable(obs => {
const { cmd, cwd, args } = this.command; const { cmd, cwd, args } = this.command;
const spawnOptions: SpawnOptions = { cwd }; const spawnOptions: SpawnOptions = { cwd, shell: true };
if (!this.useProcessTime) {
this.resetElapsedTimer();
}
// Spawn the process. // Spawn the process.
const childProcess = spawn(cmd, args, spawnOptions); const childProcess = spawn(cmd, args, spawnOptions);
@ -63,7 +70,14 @@ export class LocalMonitoredProcess implements MonitoredProcess {
} }
return { return {
processes, cpu, memory, pid, ppid, ctime, elapsed, timestamp, processes,
cpu,
memory,
pid,
ppid,
ctime,
elapsed: this.useProcessTime ? elapsed : (Date.now() - this.elapsedTimer),
timestamp,
} as AggregatedProcessStats; } as AggregatedProcessStats;
}), }),
tap(stats => this.stats.next(stats)), tap(stats => this.stats.next(stats)),
@ -100,7 +114,15 @@ export class LocalMonitoredProcess implements MonitoredProcess {
processExitCb = killChildProcess; processExitCb = killChildProcess;
// Cleanup on unsubscription. // Cleanup on unsubscription.
return () => childProcess.kill(); return killChildProcess;
}); });
} }
resetElapsedTimer() {
if (this.useProcessTime) {
throw new Error(`Cannot reset elapsed timer when using process time. Set 'useProcessTime' to false.`);
}
this.elapsedTimer = Date.now();
}
} }

View File

@ -0,0 +1,134 @@
/**
* @license
* Copyright Google Inc. All Rights Reserved.
*
* Use of this source code is governed by an MIT-style license that can be
* found in the LICENSE file at https://angular.io/license
*/
import { BaseException, logging } from '@angular-devkit/core';
import { spawnSync } from 'child_process';
import { Observable, combineLatest, forkJoin, of, throwError } from 'rxjs';
import {
concatMap,
filter,
first,
reduce,
repeat,
retryWhen,
startWith,
take,
takeUntil,
tap,
throwIfEmpty,
timeout,
} from 'rxjs/operators';
import { Command } from './command';
import { MetricGroup } from './interfaces';
import { LocalMonitoredProcess } from './monitored-process';
import { MaximumRetriesExceeded, RunBenchmarkOptions } from './run-benchmark';
import { aggregateMetricGroups } from './utils';
export interface RunBenchmarkWatchOptions extends RunBenchmarkOptions {
watchMatcher: string;
watchTimeout?: number;
watchCommand: Command;
}
export function runBenchmarkWatch({
command, captures, reporters = [], iterations = 5, retries = 5, logger = new logging.NullLogger(),
watchMatcher, watchTimeout = 10000, watchCommand,
}: RunBenchmarkWatchOptions): Observable<MetricGroup[]> {
let successfulRuns = 0;
let failedRuns = 0;
const debugPrefix = () => `Run #${successfulRuns + 1}:`;
// Run the process and captures, wait for both to finish, and average out the metrics.
const monitoredProcess = new LocalMonitoredProcess(command, false);
const processFailed = new BaseException('Wrong exit code.');
// Gather stats until the stdout contains the matched text.
const stats$ = monitoredProcess.stats$.pipe(
takeUntil(monitoredProcess.stdout$.pipe(
first(stdout => stdout.toString().includes(watchMatcher)),
timeout(watchTimeout),
)),
);
return combineLatest([
monitoredProcess.run().pipe(
// In watch mode typically the run will not emit an exit code when it's running sucessfully.
startWith(undefined),
concatMap(processExitCode => {
if (processExitCode !== undefined && processExitCode != command.expectedExitCode) {
logger.debug(`${debugPrefix()} exited with ${processExitCode} but `
+ `${command.expectedExitCode} was expected`);
return throwError(processFailed);
}
return of(processExitCode);
}),
),
monitoredProcess.stdout$.pipe(
filter(stdout => stdout.includes(watchMatcher)),
timeout(watchTimeout),
take(1),
),
])
.pipe(
concatMap(() => {
const { cmd, cwd, args } = watchCommand;
failedRuns = 0;
return of(null)
.pipe(
tap(() => {
const { status, error } = spawnSync(cmd, args, { cwd });
monitoredProcess.resetElapsedTimer();
if (status != command.expectedExitCode) {
logger.debug(`${debugPrefix()} exited with ${status}\n${error?.message}`);
throw processFailed;
}
// Reset fail counter for this iteration.
failedRuns = 0;
}),
tap(() => logger.debug(`${debugPrefix()} starting`)),
concatMap(() => forkJoin(captures.map(capture => capture(stats$)))),
throwIfEmpty(() => new Error('Nothing was captured')),
tap(() => logger.debug(`${debugPrefix()} finished successfully`)),
tap(() => successfulRuns++),
repeat(iterations),
retryWhen(errors => errors
.pipe(concatMap(val => {
// Check if we're still within the retry threshold.
failedRuns++;
return failedRuns < retries ? of(val) : throwError(val);
})),
),
);
}),
retryWhen(errors => errors
.pipe(concatMap(val => {
// Check if we're still within the retry threshold.
failedRuns++;
if (failedRuns < retries) {
return of(val);
}
return throwError(
val === processFailed ?
new MaximumRetriesExceeded(retries) :
val,
);
})),
),
take(iterations),
reduce((acc, val) => acc.map((_, idx) => aggregateMetricGroups(acc[idx], val[idx]))),
tap(groups => reporters.forEach(reporter => reporter(command, groups))),
);
}

View File

@ -43,13 +43,13 @@ export function runBenchmark({
// Run the process and captures, wait for both to finish, and average out the metrics. // Run the process and captures, wait for both to finish, and average out the metrics.
return new Observable(obs => { return new Observable(obs => {
const monitoredProcess = new LocalMonitoredProcess(command); const monitoredProcess = new LocalMonitoredProcess(command);
const metric$ = captures.map(capture => capture(monitoredProcess)); const metric$ = captures.map(capture => capture(monitoredProcess.stats$));
obs.next([monitoredProcess, ...metric$]); obs.next([monitoredProcess, ...metric$]);
}).pipe( }).pipe(
tap(() => logger.debug(`${debugPrefix()} starting`)), tap(() => logger.debug(`${debugPrefix()} starting`)),
concatMap(([monitoredProcess, ...metric$]) => forkJoin(monitoredProcess.run(), ...metric$)), concatMap(([monitoredProcess, ...metric$]) => forkJoin(monitoredProcess.run(), ...metric$)),
throwIfEmpty(() => new Error('Nothing was captured')), throwIfEmpty(() => new Error('Nothing was captured')),
concatMap((results) => { concatMap(results => {
const [processExitCode, ...metrics] = results; const [processExitCode, ...metrics] = results;
if ((processExitCode as number) != command.expectedExitCode) { if ((processExitCode as number) != command.expectedExitCode) {

View File

@ -0,0 +1,4 @@
const { watchFile } = require('fs');
console.log('Complete');
watchFile(require.resolve('./watch-test-file.txt'), () => console.log('Complete'));

View File

@ -0,0 +1,4 @@
const { closeSync, openSync } = require('fs');
// touch file
closeSync(openSync(require.resolve('./watch-test-file.txt'), 'w'));