From 7df0d5b7a4aff34553eac7520b810bb08c7db17a Mon Sep 17 00:00:00 2001 From: Fanis Tharropoulos Date: Mon, 13 Jan 2025 17:09:31 +0200 Subject: [PATCH] feat(process): improve typesense node configuration and startup - Add `NodeConfig` interface for typesense node configuration - Add `initNode` method to validate node configuration - Make `snapshotPath` optional with default value - Add option to disable multi-node mode in `startProcess` - Simplify process error handling and cleanup --- benchmark/src/services/typesense-process.ts | 70 ++++++++++++++++----- 1 file changed, 54 insertions(+), 16 deletions(-) diff --git a/benchmark/src/services/typesense-process.ts b/benchmark/src/services/typesense-process.ts index aebaed62..9ac1fd60 100644 --- a/benchmark/src/services/typesense-process.ts +++ b/benchmark/src/services/typesense-process.ts @@ -21,6 +21,12 @@ import { isStringifiable } from "@/utils/stringifiable"; export const DEFAULT_IP_ADDRESS = "192.168.2.25"; +export interface NodeConfig { + grpc: number; + http: number; + dataDir: string; +} + export class TypesenseProcessController extends EventEmitter { process: ChildProcess; exitCode: number | null = null; @@ -90,6 +96,7 @@ export class TypesenseProcessController extends EventEmitter { export class TypesenseProcessManager { public processes = new Map(); private readonly ipAddress: string; + private readonly snapshotPath: string; constructor( private readonly spinner: Ora, @@ -97,10 +104,12 @@ export class TypesenseProcessManager { private readonly apiKey: string, private readonly workingDirectory: string, private readonly fsService: FilesystemService, - private readonly snapshotPath: string, + snapshotPath?: string, ipAddress?: string, ) { this.ipAddress = ipAddress ?? DEFAULT_IP_ADDRESS; + this.snapshotPath = + snapshotPath ?? path.join(this.workingDirectory, "snapshots"); } get getSnapshotPath() { @@ -238,12 +247,36 @@ export class TypesenseProcessManager { return ResultAsync.fromPromise(models.create(model), toErrorWithMessage); } - startProcess(options: { - grpc: number; - http: number; - dataDir: string; - }): ResultAsync { - const { grpc, http, dataDir } = options; + initNode( + dataDir: string, + port: number, + ): ResultAsync { + return this.fsService.exists(dataDir).andThen((exists) => { + if (!exists) { + return errAsync({ message: `${dataDir} does not exist` }); + } + + const portObj = Object.values(TypesenseProcessManager.nodeToPortMap).find( + (ports) => ports.http === port, + ); + + if (!portObj) { + return errAsync({ message: `${port} is not a valid port` }); + } + + return okAsync({ + dataDir: dataDir, + grpc: portObj.grpc, + http: portObj.http, + }); + }); + } + + startProcess( + node: NodeConfig, + options?: { multiNode?: false }, + ): ResultAsync { + const { grpc, http, dataDir } = node; this.spinner.start(`Starting Typesense process on node ${http}\n`); return this.fsService @@ -265,7 +298,12 @@ export class TypesenseProcessManager { }); } - const args = [ + const multiNodeArgs = [ + `--nodes`, + path.join(this.workingDirectory, "nodes"), + ]; + + const baseArgs = [ `--data-dir=${dataDir}`, `--api-key=${this.apiKey}`, `--api-port`, @@ -276,10 +314,13 @@ export class TypesenseProcessManager { `${this.ipAddress}`, `--peering-port`, `${grpc}`, - `--nodes`, - path.join(this.workingDirectory, "nodes"), ]; + const args = + options?.multiNode !== false ? + multiNodeArgs.concat(baseArgs) + : baseArgs; + const execaOptions: ExecaOptions = { cwd: this.workingDirectory, stdio: "pipe", @@ -298,14 +339,11 @@ export class TypesenseProcessManager { const typesenseProcess = execa(this.binaryPath, args, execaOptions); - typesenseProcess.on("error", (error) => { - logger.error(`[Node ${http}] Process error: ${error.message}`); + typesenseProcess.on("error", () => { + this.processes.delete(http); }); - typesenseProcess.on("exit", (code, signal) => { - logger.info( - `[Node ${http}] Process exited with code=${code} signal=${signal}`, - ); + typesenseProcess.on("exit", () => { this.processes.delete(http); });