/** * @license * Copyright Google Inc. All Rights Reserved. * * Use of this source code is governed by an MIT-style license that can be * found in the LICENSE file at https://angular.io/license */ import { SpawnOptions, spawn } from 'child_process'; import * as pidusage from 'pidusage'; import { Observable, Subject, from, timer } from 'rxjs'; import { concatMap, map, onErrorResumeNext, tap } from 'rxjs/operators'; import { Command } from './command'; import { AggregatedProcessStats, MonitoredProcess } from './interfaces'; const pidtree = require('pidtree'); const treeKill = require('tree-kill'); // Cleanup when the parent process exits. const defaultProcessExitCb = () => { }; let processExitCb = defaultProcessExitCb; process.on('exit', () => { processExitCb(); processExitCb = defaultProcessExitCb; }); export class LocalMonitoredProcess implements MonitoredProcess { private stats = new Subject(); private stdout = new Subject(); private stderr = new Subject(); private pollingRate = 100; private elapsedTimer = 0; stats$: Observable = this.stats.asObservable(); stdout$: Observable = this.stdout.asObservable(); stderr$: Observable = this.stderr.asObservable(); constructor( private command: Command, private useProcessTime = true, ) { } run(): Observable { return new Observable(obs => { const { cmd, cwd, args } = this.command; const spawnOptions: SpawnOptions = { cwd, shell: true }; if (!this.useProcessTime) { this.resetElapsedTimer(); } // Spawn the process. const childProcess = spawn(cmd, args, spawnOptions); // Emit output and stats. childProcess.stdout.on('data', (data: Buffer) => this.stdout.next(data)); childProcess.stderr.on('data', (data: Buffer) => this.stderr.next(data)); const statsSubs = timer(0, this.pollingRate).pipe( concatMap(() => from(pidtree(childProcess.pid, { root: true }) as Promise)), concatMap((pids: number[]) => from(pidusage(pids, { maxage: 5 * this.pollingRate }))), map(statsByProcess => { // Ignore the spawned shell in the total process number. const pids = Object.keys(statsByProcess) .filter(pid => pid != childProcess.pid.toString()); const processes = pids.length; // We want most stats from the parent process. const { pid, ppid, ctime, elapsed, timestamp } = statsByProcess[childProcess.pid]; // CPU and memory should be agreggated. let cpu = 0; let memory = 0; for (const pid of pids) { cpu += statsByProcess[pid].cpu; memory += statsByProcess[pid].memory; } const stats: AggregatedProcessStats = { processes, cpu, memory, pid, ppid, ctime, elapsed: this.useProcessTime ? elapsed : (Date.now() - this.elapsedTimer), timestamp, }; return stats; }), tap(stats => this.stats.next(stats)), onErrorResumeNext(), ).subscribe(); // Process event handling. // Killing processes cross platform can be hard, treeKill helps. const killChildProcess = () => { if (childProcess && childProcess.pid) { treeKill(childProcess.pid, 'SIGTERM'); } }; // Convert process exit codes and errors into observable events. const handleChildProcessExit = (code?: number, error?: Error) => { // Stop gathering stats and complete subjects. statsSubs.unsubscribe(); this.stats.complete(); this.stdout.complete(); this.stderr.complete(); // Kill hanging child processes and emit error/exit code. killChildProcess(); if (error) { obs.error(error); } obs.next(code); obs.complete(); }; childProcess.once('exit', handleChildProcessExit); childProcess.once('error', (err) => handleChildProcessExit(1, err)); processExitCb = killChildProcess; // Cleanup on unsubscription. return killChildProcess; }); } resetElapsedTimer() { if (this.useProcessTime) { throw new Error(`Cannot reset elapsed timer when using process time. Set 'useProcessTime' to false.`); } this.elapsedTimer = Date.now(); } }