#!/usr/bin/env python from argparse import ArgumentParser from TestDirectory import TestDirectory import logging import os import sys import subprocess import json import xml.sax import xml.sax.handler import functools import multiprocessing import re import shutil import io import random _logger = None def init_logging(loglevel, logdir): global _logger _logger = logging.getLogger('TestRunner') _logger.setLevel(logging.DEBUG) formatter = logging.Formatter('%(asctime)s - %(process)d - %(levelname)s - %(message)s') try: os.makedirs(logdir) except: pass fh = logging.FileHandler(os.path.join(logdir, 'run_test.log')) fh.setLevel(logging.DEBUG) fh.setFormatter(formatter) sh = logging.StreamHandler() sh.setLevel(loglevel) sh.setFormatter(formatter) _logger.addHandler(fh) _logger.addHandler(sh) class LogParser: def __init__(self, basedir, name, infile, out, aggregationPolicy): self.basedir = basedir self.name = name self.infile = infile self.backtraces = [] self.result = True self.address_re = re.compile(r'(0x[0-9a-f]+\s+)+') self.aggregationPolicy = aggregationPolicy self.outStream = None if self.aggregationPolicy == 'NONE': self.out = None elif self.aggregationPolicy != 'ALL': self.out = io.StringIO() self.outStream = out else: self.out = out def write(self, txt): if self.aggregationPolicy == 'NONE': pass elif not self.result or self.aggregationPolicy == 'ALL': self.out.write(txt) else: self.outStream.wite(txt) def fail(self): self.result = False if self.aggregationPolicy == 'FAILED': self.out.write(self.outStream.getvalue()) self.outStream = None def writeHeader(self): pass def writeFooter(self): pass def applyAddr2line(self, obj): addresses = self.sanitizeBacktrace(obj) assert addresses is not None fdbbin = os.path.join(basedir, 'bin', 'fdbserver') try: resolved = subprocess.check_output( ('addr2line -e %s -C -f -i' % fdbbin).split() + addresses.split()).splitlines() tmp = dict(**obj) for i, line in enumerate(resolved): tmp['line%04d' % i] = line.decode('utf-8') return tmp except (subprocess.CalledProcessError, UnicodeDecodeError): obj['FailedAddr2LineResolution'] = 'true' return obj def sanitizeBacktrace(self, obj): if sys.platform != "linux" and sys.platform != "linux2": return None raw_backtrace = obj.get('Backtrace', None) if raw_backtrace is None: return None match = self.address_re.search(raw_backtrace) if not match: return None return match.group(0) def processTraces(self): backtraces = 0 linenr = 0 with open(self.infile) as f: line = f.readline() while line != '': obj = self.processLine(line, linenr) line = f.readline() linenr += 1 if obj is None: continue if 'Type' not in obj: continue if obj['Severity'] == '40': self.fail() if self.name is not None: obj['testname'] = self.name if self.sanitizeBacktrace(obj) is not None and backtraces == 0: obj = self.applyAddr2line(obj) self.writeObject(obj) def log_trace_parse_error(self, linenr, e): obj = {} _logger.error("Process trace line file {} Failed".format(self.infile)) _logger.error("Exception {} args: {}".format(type(e), e.args)) _logger.error("Line: '{}'".format(linenr)) obj['Severity'] = "warning" obj['Type'] = "TestInfastructureLogLineGarbled" obj['isLastLine'] = "TestFailure" obj['TraceLine'] = linenr obj['File'] = self.infile return obj def processReturnCodes(self, return_codes): for (command, return_code) in return_codes.items(): return_code_trace = {} if return_code != 0: return_code_trace['Severity'] = '40' return_code_trace['Type'] = 'TestFailure' self.fail() else: return_code_trace['Severity'] = '10' return_code_trace['Type'] = 'ReturnCode' return_code_trace['Command'] = command return_code_trace['ReturnCode'] = return_code return_code_trace['testname'] = self.name self.writeObject(return_code_trace) class JSONParser(LogParser): def __init__(self, basedir, name, infile, out, aggregationPolicy): super().__init__(basedir, name, infile, out, aggregationPolicy) def processLine(self, line, linenr): try: return json.loads(line) except Exception as e: self.log_trace_parse_error(linenr, e) def writeObject(self, obj): self.write(json.dumps(obj)) self.write('\n') class XMLParser(LogParser): class XMLHandler(xml.sax.handler.ContentHandler): def __init__(self): self.result = {} def startElement(self, name, attrs): if name != 'Event': return for (key, value) in attrs.items(): self.result[key] = value class XMLErrorHandler(xml.sax.handler.ErrorHandler): def __init__(self): self.errors = [] self.fatalErrors = [] self.warnings = [] def error(self, exception): self.errors.append(exception) def fatalError(self, exception): self.fatalError.append(exception) def warning(self, exception): self.warnings.append(exception) def __init__(self, basedir, name, infile, out, aggregationPolicy): super().__init__(basedir, name, infile, out, aggregationPolicy) def writeHeader(self): self.write('\n\n') def writeFooter(self): self.write("") def writeObject(self, obj): self.write('\n') def processLine(self, line, linenr): if linenr < 3: # the first two lines don't need to be parsed return None if line.startswith(' 0: return self.log_trace_parse_error(linenr, errorHandler.fatalErrors[0]) return handler.result def get_traces(d, log_format): p = re.compile('^trace\\..*\\.{}$'.format(log_format)) traces = list(map( functools.partial(os.path.join, d), filter( lambda f: p.match(f) is not None, os.listdir(d)))) if os.path.isdir(os.path.join(d, 'testrunner')): traces += list(map( functools.partial(os.path.join, d, 'testrunner'), filter( lambda f: p.match(f) is not None, os.listdir(os.path.join(d, 'testrunner'))))) return traces def process_traces(basedir, testname, path, out, aggregationPolicy, log_format, return_codes, cmake_seed): res = True backtraces = [] parser = None if log_format == 'json': parser = JSONParser(basedir, testname, None, out, aggregationPolicy) else: parser = XMLParser(basedir, testname, None, out, aggregationPolicy) parser.processReturnCodes(return_codes) res = parser.result for trace in get_traces(path, log_format): if log_format == 'json': parser = JSONParser(basedir, testname, trace, out, aggregationPolicy) else: parser = XMLParser(basedir, testname, trace, out, aggregationPolicy) if not res: parser.fail() parser.processTraces() res = res and parser.result parser.writeObject({'CMakeSEED': str(cmake_seed)}) return res def run_simulation_test(basedir, options): fdbserver = os.path.join(basedir, 'bin', 'fdbserver') pargs = [fdbserver, '-r', options.testtype] if options.testtype == 'test': pargs.append('-C') pargs.append(os.path.join(args.builddir, 'fdb.cluster')) else: pargs.append('-S') pargs.append('random') td = TestDirectory(basedir) if options.buggify: pargs.append('-b') pargs.append('on') pargs.append('--trace_format') pargs.append(options.log_format) test_dir = td.get_current_test_dir() if options.seed is not None: pargs.append('-s') seed = int(options.seed, 0) if options.test_number: idx = int(options.test_number) seed = ((seed + idx) % (2**32-2)) + 1 pargs.append("{}".format(seed)) wd = os.path.join(test_dir, 'test_{}'.format(options.name.replace('/', '_'))) os.mkdir(wd) return_codes = {} # {command: return_code} first = True for testfile in options.testfile: tmp = list(pargs) if first and options.old_binary is not None and len(options.testfile) > 1: _logger.info("Run old binary at {}".format(options.old_binary)) tmp[0] = options.old_binary if not first: tmp.append('-R') first = False tmp.append('-f') tmp.append(testfile) command = ' '.join(tmp) _logger.info("COMMAND: {}".format(command)) proc = subprocess.Popen(tmp, stdout=sys.stdout, stderr=sys.stderr, cwd=wd) proc.wait() return_codes[command] = proc.returncode if proc.returncode != 0: break outfile = os.path.join(test_dir, 'traces.{}'.format(options.log_format)) res = True if options.aggregate_traces == 'NONE': res = process_traces(basedir, options.name, wd, None, 'NONE', options.log_format, return_codes, options.seed) else: with open(outfile, 'a') as f: os.lockf(f.fileno(), os.F_LOCK, 0) pos = f.tell() res = process_traces(basedir, options.name, wd, f, options.aggregate_traces, options.log_format, return_codes, options.seed) f.seek(pos) os.lockf(f.fileno(), os.F_ULOCK, 0) if options.keep_logs == 'NONE' or options.keep_logs == 'FAILED' and res: print("Deleting old logs in {}".format(wd)) traces = get_traces(wd, options.log_format) for trace in traces: os.remove(trace) if options.keep_simdirs == 'NONE' or options.keep_simdirs == 'FAILED' and res: print("Delete {}".format(os.path.join(wd, 'simfdb'))) shutil.rmtree(os.path.join(wd, 'simfdb')) if len(os.listdir(wd)) == 0: print("Delete {} - empty".format(wd)) os.rmdir(wd) return res if __name__ == '__main__': testtypes = ['simulation', 'test'] parser = ArgumentParser(description='Run a test preprocess trace') parser.add_argument('-b', '--builddir', help='Path to build directory') parser.add_argument('-s', '--sourcedir', help='Path to source directory') parser.add_argument('-n', '--name', help='Name of the test') parser.add_argument('-t', '--testtype', choices=testtypes, default='simulation', help='The type of test to run, choices are [{}]'.format( ', '.join(testtypes))), parser.add_argument('-B', '--buggify', action='store_true', help='Enable buggify') parser.add_argument('--logdir', default='logs', help='Directory for logs') parser.add_argument('-l', '--loglevel', choices=['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG'], default='INFO') parser.add_argument('-x', '--seed', required=False, default=None, help='The seed to use for this test') parser.add_argument('-N', '--test-number', required=False, default=None, help='A unique number for this test (for seed generation)') parser.add_argument('-F', '--log-format', required=False, default='xml', choices=['xml', 'json'], help='Log format (json or xml)') parser.add_argument('-O', '--old-binary', required=False, default=None, help='Path to the old binary to use for upgrade tests') parser.add_argument('--aggregate-traces', default='NONE', choices=['NONE', 'FAILED', 'ALL']) parser.add_argument('--keep-logs', default='FAILED', choices=['NONE', 'FAILED', 'ALL']) parser.add_argument('--keep-simdirs', default='NONE', choices=['NONE', 'FAILED', 'ALL']) parser.add_argument('testfile', nargs="+", help='The tests to run') args = parser.parse_args() init_logging(args.loglevel, args.logdir) basedir = os.getcwd() if args.builddir is not None: basedir = args.builddir res = run_simulation_test(basedir, args) sys.exit(0 if res else 1)