feat(benchmark): add k6 benchmarking service

- Add `K6Benchmarks` class for managing performance tests
- Implement search and indexing benchmark methods
- Add environment variable configuration for k6 tests
- Add collection schema for benchmark dataset
- Add logging and error handling for benchmark execution
This commit is contained in:
Fanis Tharropoulos 2025-01-13 17:11:12 +02:00
parent 7df0d5b7a4
commit 3cec7a2cd8
No known key found for this signature in database

@ -0,0 +1,214 @@
import path from "path";
import type { TypesenseProcessManager } from "@/services/typesense-process";
import type { ErrorWithMessage } from "@/utils/error";
import type { IDockerComposeResult } from "docker-compose";
import type { Ora } from "ora";
import { run } from "docker-compose";
import { errAsync, okAsync, ResultAsync } from "neverthrow";
import { toErrorWithMessage } from "@/utils/error";
import { logger, LogLevel } from "@/utils/logger";
import { findRoot } from "@/utils/package-info";
interface BaseK6Env {
API_KEY: string;
PORT: number;
COLLECTION_NAME: string;
HOST: string;
COMMIT_HASH: string;
}
export interface IndexK6Env extends BaseK6Env {
BATCH_SIZE: number;
}
export interface SearchK6Env extends BaseK6Env {
DURATION: string;
}
export type K6Env = IndexK6Env & SearchK6Env;
interface LoadTestConfig {
batchSize: number;
duration: string;
apiKey: string;
port: number;
commitHash: string;
typesenseProcessManager: TypesenseProcessManager;
spinner: Ora;
}
export class K6Benchmarks {
private readonly config: LoadTestConfig;
private readonly isInCi: boolean;
private static readonly COLLECTION_NAME = "songs";
public static readonly REQUIRED_SERVICES = ["grafana", "influxdb"];
constructor(config: LoadTestConfig) {
this.config = config;
this.isInCi = Boolean(process.env.CI) || false;
}
public performSearchBenchmark(): ResultAsync<void, ErrorWithMessage> {
return this.getSearchBenchmarkPath()
.andThen((path) => this.executeK6Benchmark({ name: "search", scriptPath: path }))
.map(() => {
this.config.spinner.succeed("Search benchmark complete");
});
}
public performIndexingBenchmark(): ResultAsync<void, ErrorWithMessage> {
return this.getIndexingBenchmarkPath().andThen((path) => {
return this.createBenchmarkCollection()
.andThen(() => this.executeK6Benchmark({ name: "indexing", scriptPath: path }))
.map(() => {
this.config.spinner.succeed("Indexing benchmark complete");
});
});
}
private executeK6Benchmark(options: { scriptPath: string; name: string }): ResultAsync<void, ErrorWithMessage> {
const { scriptPath, name } = options;
const envVarString = this.buildK6EnvironmentVars();
this.config.spinner.start(`Running ${name} benchmark\n`);
const command = [
"run",
envVarString,
"--compatibility-mode=experimental_enhanced",
scriptPath,
logger.getLevel() <= LogLevel.DEBUG ? "--quiet" : "",
].join(" ");
const errors: string[] = [];
const warnings: string[] = [];
return ResultAsync.fromPromise(
run("k6", command, {
commandOptions: ["--no-deps", "--remove-orphans"],
log: logger.getLevel() === LogLevel.DEBUG,
cwd: findRoot(process.cwd()),
callback: (chunk) => this.processLogChunk(chunk, errors, warnings),
}),
toErrorWithMessage,
).andThen((result) => this.handleK6Result(result, errors, warnings));
}
private processLogChunk(chunk: Buffer, errors: string[], warnings: string[]): void {
const log = chunk.toString();
if (log.includes("level=error")) {
errors.push(log.trim());
}
if (log.includes("level=warn")) {
warnings.push(log.trim());
}
}
private handleK6Result(
result: IDockerComposeResult,
errors: string[],
warnings: string[],
): ResultAsync<void, ErrorWithMessage> {
// k6 output format is "checks.........................: 100.00% ✓ 21888 ✗ 0"
// First trim any leading/trailing whitespace
const cleanOutput = result.out.trim();
// Find the checks line specifically
const checksLine = cleanOutput.split("\n").find((line) => line.trim().startsWith("checks"));
if (!checksLine) {
return errAsync({
message: "Could not find checks line in output",
});
}
// Extract the pass rate from the checks line
const checkMatch = /([0-9.]+)%/.exec(checksLine);
const checksPassRate = parseFloat(checkMatch?.[1] ?? "0");
const hasCompleteOutput = result.out.includes("status") && result.out.includes("vus_max");
if (!hasCompleteOutput) {
logger.warn("Incomplete k6 output detected");
return errAsync({
message: "Incomplete k6 output - unable to determine test results",
});
}
this.config.spinner.stop();
logger.info(`Checks pass rate: ${checksPassRate}%`);
if (errors.length > 0) {
logger.error(`Errors: \n\n${errors.join("\n")}`);
}
if (warnings.length > 0) {
logger.warn(`Warnings: \n\n${warnings.join("\n")}`);
}
if (checksPassRate < 100) {
return errAsync({
message: `k6 tests failed - ${checksPassRate}% checks passed`,
});
}
this.config.spinner.succeed("Benchmark complete");
return okAsync(undefined);
}
private buildK6EnvironmentVars(): string {
const envVarMap = {
API_KEY: this.config.apiKey,
DURATION: this.config.duration,
BATCH_SIZE: this.config.batchSize,
COLLECTION_NAME: K6Benchmarks.COLLECTION_NAME,
PORT: this.config.port,
HOST: this.isInCi ? "typesense" : "host.docker.internal",
COMMIT_HASH: this.config.commitHash,
};
return Object.entries(envVarMap)
.map(([key, value]) => `-e ${key}=${value}`)
.join(" ");
}
private createBenchmarkCollection(): ResultAsync<void, ErrorWithMessage> {
this.config.spinner.start("Creating benchmark collection");
const process = this.config.typesenseProcessManager.processes.get(this.config.port);
if (!process) {
return errAsync({
message: `Process not found for port ${this.config.port}`,
});
}
return this.config.typesenseProcessManager
.createCollection(process, {
name: K6Benchmarks.COLLECTION_NAME,
fields: [
{ name: "album_name", type: "string" },
{ name: "country", type: "string", facet: true },
{ name: "genres", type: "string[]", facet: true },
{ name: "primary_artist_name", type: "string", facet: true },
{ name: "release_date", type: "int64" },
{ name: "release_decade", type: "string", facet: true },
{ name: "release_group_types", type: "string[]", facet: true },
{ name: "title", type: "string" },
{ name: "track_id", type: "string" },
{ name: "urls", type: "object[]", optional: true },
],
enable_nested_fields: true,
})
.map(() => {
this.config.spinner.succeed("Benchmark collection created");
});
}
private getSearchBenchmarkPath(): ResultAsync<string, ErrorWithMessage> {
return okAsync(path.join("/app", "src", "benchmarks", "search.ts"));
}
private getIndexingBenchmarkPath(): ResultAsync<string, ErrorWithMessage> {
return okAsync(path.join("/app", "src", "benchmarks", "index.ts"));
}
}