From 988835d024ab64a0642f3a4f46bc3e1be4f60d87 Mon Sep 17 00:00:00 2001 From: Hans Larsen Date: Thu, 7 Feb 2019 11:34:10 -0800 Subject: [PATCH] feat(@angular-devkit/core): add a reuse JobStrategy It allows a job to be reused if it's still running. This includes redirecting the inputs to the new job. --- .../core/src/experimental/jobs/strategy.ts | 70 ++++++++++++++++++- .../src/experimental/jobs/strategy_spec.ts | 67 ++++++++++++++++++ 2 files changed, 134 insertions(+), 3 deletions(-) diff --git a/packages/angular_devkit/core/src/experimental/jobs/strategy.ts b/packages/angular_devkit/core/src/experimental/jobs/strategy.ts index 146ba6b6de..d706afdbb1 100644 --- a/packages/angular_devkit/core/src/experimental/jobs/strategy.ts +++ b/packages/angular_devkit/core/src/experimental/jobs/strategy.ts @@ -5,10 +5,16 @@ * Use of this source code is governed by an MIT-style license that can be * found in the LICENSE file at https://angular.io/license */ -import { Observable, concat, of } from 'rxjs'; -import { ignoreElements, share, shareReplay } from 'rxjs/operators'; +import { Observable, Subject, concat, of } from 'rxjs'; +import { finalize, ignoreElements, share, shareReplay, tap } from 'rxjs/operators'; import { JsonValue } from '../../json'; -import { JobDescription, JobHandler, JobHandlerContext, JobOutboundMessage } from './api'; +import { + JobDescription, + JobHandler, + JobHandlerContext, JobInboundMessage, + JobOutboundMessage, + JobOutboundMessageKind, +} from './api'; const stableStringify = require('fast-json-stable-stringify'); @@ -49,6 +55,64 @@ export namespace strategy { } + /** + * Creates a JobStrategy that will always reuse a running job, and restart it if the job ended. + * @param replayMessages Replay ALL messages if a job is reused, otherwise just hook up where it + * is. + */ + export function reuse< + A extends JsonValue = JsonValue, + I extends JsonValue = JsonValue, + O extends JsonValue = JsonValue, + >(replayMessages = false): JobStrategy { + let inboundBus = new Subject>(); + let runContext: JobHandlerContext | null = null; + let run: Observable> | null = null; + let state: JobOutboundMessage | null = null; + + return (handler, options) => { + const newHandler = (argument: A, context: JobHandlerContext) => { + // Forward inputs. + const subscription = context.inboundBus.subscribe(inboundBus); + + if (run) { + return concat( + // Update state. + of(state), + run, + ).pipe( + finalize(() => subscription.unsubscribe()), + ); + } + + run = handler(argument, { ...context, inboundBus: inboundBus.asObservable() }).pipe( + tap( + message => { + if (message.kind == JobOutboundMessageKind.Start + || message.kind == JobOutboundMessageKind.OnReady + || message.kind == JobOutboundMessageKind.End) { + state = message; + } + }, + undefined, + () => { + subscription.unsubscribe(); + inboundBus = new Subject>(); + run = null; + }, + ), + replayMessages ? shareReplay() : share(), + ); + runContext = context; + + return run; + }; + + return Object.assign(newHandler, handler, options || {}); + }; + } + + /** * Creates a JobStrategy that will reuse a running job if the argument matches. * @param replayMessages Replay ALL messages if a job is reused, otherwise just hook up where it diff --git a/packages/angular_devkit/core/src/experimental/jobs/strategy_spec.ts b/packages/angular_devkit/core/src/experimental/jobs/strategy_spec.ts index 3fbef7e689..b0b575bec7 100644 --- a/packages/angular_devkit/core/src/experimental/jobs/strategy_spec.ts +++ b/packages/angular_devkit/core/src/experimental/jobs/strategy_spec.ts @@ -131,6 +131,73 @@ describe('strategy.serialize()', () => { }); }); +describe('strategy.reuse()', () => { + let registry: SimpleJobRegistry; + let scheduler: SimpleScheduler; + + beforeEach(() => { + registry = new SimpleJobRegistry(); + scheduler = new SimpleScheduler(registry); + }); + + it('works', async () => { + let started = 0; + let finished = 0; + + registry.register(strategy.reuse()(createJobHandler((input: number[]) => { + started++; + + return new Promise( + resolve => setTimeout(() => { + finished++; + resolve(input.reduce((a, c) => a + c, 0)); + }, 10), + ); + })), { + argument: { items: { type: 'number' } }, + output: { type: 'number' }, + name: 'add', + }); + + const job1 = await scheduler.schedule('add', [1, 2, 3, 4]); + const job2 = await scheduler.schedule('add', []); + expect(started).toBe(0); + expect(finished).toBe(0); + + job1.output.subscribe(); + expect(started).toBe(1); + expect(finished).toBe(0); + + job2.output.subscribe(); + expect(started).toBe(1); // job2 is reusing job1. + expect(finished).toBe(0); + + let result = await job1.output.toPromise(); + expect(result).toBe(10); + expect(started).toBe(1); + expect(finished).toBe(1); + expect(job1.state).toBe(JobState.Ended); + expect(job2.state).toBe(JobState.Ended); + + const job3 = await scheduler.schedule('add', [1, 2, 3, 4, 5]); + const job4 = await scheduler.schedule('add', []); + job3.output.subscribe(); + expect(started).toBe(2); + expect(finished).toBe(1); + + job4.output.subscribe(); + expect(started).toBe(2); // job4 is reusing job3. + expect(finished).toBe(1); + + result = await job3.output.toPromise(); + expect(result).toBe(15); + expect(started).toBe(2); + expect(finished).toBe(2); + expect(job3.state).toBe(JobState.Ended); + expect(job4.state).toBe(JobState.Ended); + }); +}); + describe('strategy.memoize()', () => { let registry: SimpleJobRegistry; let scheduler: SimpleScheduler;