From 8174a918ea3b7cb216bf7ea98cfdc10661b5c37d Mon Sep 17 00:00:00 2001 From: Florian Fischer Date: Wed, 6 May 2020 16:56:32 +0200 Subject: make the whole project more python idiomatic * rename src directory to allocbench * make global variable names UPPERCASE * format a lot of code using yapf * use lowercase ld_preload and ld_library_path as Allocator members * name expected Errors 'err' and don't raise a new Exception * disable some pylint messages --- src/benchmark.py | 632 ------------------------------------------------------- 1 file changed, 632 deletions(-) delete mode 100644 src/benchmark.py (limited to 'src/benchmark.py') diff --git a/src/benchmark.py b/src/benchmark.py deleted file mode 100644 index 2fb2606..0000000 --- a/src/benchmark.py +++ /dev/null @@ -1,632 +0,0 @@ -# Copyright 2018-2020 Florian Fischer -# -# This file is part of allocbench. -# -# allocbench is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# allocbench is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with allocbench. If not, see . -"""Benchmark class providing general purpose implementation of required methods""" - -import atexit -from collections import namedtuple -import errno -import copy -import csv -import itertools -import multiprocessing -import os -import subprocess -from time import sleep -import traceback - -import numpy as np - -import src.globalvars -import src.util -from src.util import print_status, print_error, print_warn -from src.util import print_info0, print_info, print_debug - - -class Benchmark: - """Default implementation of most methods allocbench expects from a benchmark""" - - # class member to remember if we are allowed to use perf - perf_allowed = None - - cmd = "false" - args = {} - measure_cmd_csv = False - measure_cmd = "perf stat -x, -d" - servers = [] - allocators = copy.deepcopy(src.globalvars.allocators) - - @staticmethod - def terminate_subprocess(proc, timeout=5): - """Terminate or kill a Popen object""" - - # Skip already terminated subprocess - if proc.poll() is not None: - return proc.communicate() - - print_info("Terminating subprocess", proc.args) - proc.terminate() - try: - outs, errs = proc.communicate(timeout=timeout) - print_info("Subprocess exited with ", proc.returncode) - except subprocess.TimeoutExpired: - print_error("Killing subprocess ", proc.args) - proc.kill() - outs, errs = proc.communicate() - - print_debug("Server Out:", outs) - print_debug("Server Err:", errs) - return outs, errs - - @staticmethod - def scale_threads_for_cpus(factor=1, min_threads=1, steps=10): - """Helper to scale thread count to execution units - - Return a list of numbers between start and multiprocessing.cpu_count() * factor - with len <= steps.""" - max_threads = int(multiprocessing.cpu_count() * factor) - - if steps > max_threads - min_threads + 1: - return list(range(min_threads, int(max_threads) + 1)) - - nthreads = [] - divider = 2 - while True: - factor = max_threads // divider - entries = max_threads // factor - if entries > steps - 1: - return sorted(list(set([min_threads] + nthreads + [max_threads]))) - - nthreads = [int((i + 1) * factor) for i in range(int(entries))] - divider *= 2 - - @staticmethod - def is_perf_allowed(): - """raise an exception if perf is not allowed on this system""" - if Benchmark.perf_allowed is None: - print_info("Check if you are allowed to use perf ...") - res = subprocess.run(["perf", "stat", "ls"], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - universal_newlines=True) - - if res.returncode != 0: - print_error(f"Test perf run failed with exit status: {res.returncode}") - print_debug(res.stderr) - Benchmark.perf_allowed = False - else: - Benchmark.perf_allowed = True - - if not Benchmark.perf_allowed: - raise Exception("You don't have the needed permissions to use perf") - - def __str__(self): - return self.name - - def __init__(self, name): - """Initialize a benchmark with default members if they aren't set already""" - self.name = name - - # Set result_dir - if not hasattr(self, "result_dir"): - self.result_dir = os.path.abspath(os.path.join(src.globalvars.resdir or "", - self.name)) - # Set build_dir - if not hasattr(self, "build_dir"): - self.build_dir = os.path.abspath(os.path.join(src.globalvars.builddir, - "benchmarks", self.name)) - - self.Perm = namedtuple("Perm", self.args.keys()) - - default_results = {"args": self.args, - "allocators": self.allocators, - "facts": {"libcs": {}, - "versions": {}}} - default_results.update({alloc: {} for alloc in self.allocators}) - - if not hasattr(self, "results"): - self.results = default_results - # Add default default entrys to self.results if their key is absent - else: - for key, default in default_results.items(): - if key not in self.results: - self.results[key] = default - - if self.servers: - self.results["servers"] = {} - - if not hasattr(self, "requirements"): - self.requirements = [] - - print_debug("Creating benchmark", self.name) - print_debug("Cmd:", self.cmd) - print_debug("Args:", self.args) - print_debug("Servers:", self.servers) - print_debug("Requirements:", self.requirements) - print_debug("Results dictionary:", self.results) - print_debug("Results directory:", self.result_dir) - - def save(self, path=None): - """Save benchmark results to a json file""" - import json - if not path: - path = self.name + ".json" - elif os.path.isdir(path): - path = os.path.join(path, self.name + ".json") - - print_info(f"Saving results to: {path}") - # JSON can't handle namedtuples so convert the dicts of namedtuples - # into lists of dicts. - save_data = {} - save_data.update(self.results) - save_data["stats"] = {} - for allocator in self.results["allocators"]: - # Skip allocators without measurements - if self.results[allocator] == {}: - continue - - measures = [] - stats = [] - for ntuple in self.iterate_args(args=self.results["args"]): - measures.append((ntuple._asdict(), - self.results[allocator][ntuple])) - - if "stats" in self.results: - stats.append((ntuple._asdict(), - self.results["stats"][allocator][ntuple])) - - save_data[allocator] = measures - if "stats" in self.results: - save_data["stats"][allocator] = stats - - with open(path, "w") as f: - json.dump(save_data, f) - - def load(self, path=None): - """Load benchmark results from file""" - if not path: - filename = self.name - else: - if os.path.isdir(path): - filename = os.path.join(path, self.name) - else: - filename = os.path.splitext(path) - - if os.path.exists(filename + ".json"): - import json - filename += ".json" - with open(filename, "r") as f: - self.results = json.load(f) - elif os.path.exists(filename + ".save"): - import pickle - filename += ".save" - with open(filename, "rb") as f: - self.results = pickle.load(f) - else: - raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), filename) - - print_info(f"Loading results from: {filename}") - - # Build new named tuples - for allocator in self.results["allocators"]: - d = {} - for perm, measures in self.results[allocator]: - d[self.Perm(**perm)] = measures - self.results[allocator] = d - - d = {} - if "stats" in self.results: - for perm, value in self.results["stats"][allocator]: - d[self.Perm(**perm)] = value - self.results["stats"][allocator] = d - - # add eventual missing statistics - if "stats" not in self.results: - self.calc_desc_statistics() - - def prepare(self): - """default prepare implementation raising an error if a requirement is not found""" - os.environ["PATH"] += f"{os.pathsep}{src.globalvars.builddir}/benchmarks/{self.name}" - - for r in self.requirements: - exe = src.util.find_cmd(r) - if exe is not None: - self.results["facts"]["libcs"][r] = src.facter.libc_ver(executable=exe) - else: - raise Exception("Requirement: {} not found".format(r)) - - def iterate_args(self, args=None, fixed=None): - """Iterator over each possible combination of args - - Parameters - ---------- - args : dict, optional, default=None - Dictionary of arguments and iterables with their possible values. - If not provided defaults to :rc:`self.args` - - fixed : dict, optional, default=None - Mapping of arguments to one of their values. The yielded result - contains this value. If not provided defaults to :rc:`{}`. - - Returns - ------- - perm : :rc:`self.Perm` - A namedtuple containing one permutation of the benchmark's arguments. - - Examples - -------- - args = {"a1": [1,2], "a2": ["foo", "bar"]} - - self.iterate_args(args=args) yields [(1, "foo"), (2, "foo"), (1, "bar"), (2,"bar")] - self.iterate_args(args, {"a2":"bar"}) yields [(1, "bar"), (2, "bar")] - self.iterate_args(args, {"a1":2, "a2":"foo"}) yields [(2, "foo")]""" - if not args: - args = self.args - if not fixed: - fixed = {} - - for perm in itertools.product(*[args[k] for k in args]): - perm = self.Perm(*perm) - p_dict = perm._asdict() - is_fixed = True - for arg in fixed: - if p_dict[arg] != fixed[arg]: - is_fixed = False - break - - if is_fixed: - yield perm - - def prepare_argv(self, cmd, env={}, alloc={}, substitutions={}, prepend=True): - """Prepare an complete argv list for benchmarking""" - argv = [] - if prepend: - if "cmd_prefix" in alloc: - prefix_argv = alloc["cmd_prefix"].format(**substitutions).split() - argv.extend(prefix_argv) - # add exec wrapper so that a possible prefixed loader can execute shell scripts - argv.append(f"{src.globalvars.builddir}/exec") - - if self.measure_cmd != "": - measure_argv = self.measure_cmd.format(**substitutions) - measure_argv = src.util.prefix_cmd_with_abspath(measure_argv).split() - argv.extend(measure_argv) - - argv.append(f"{src.globalvars.builddir}/exec") - - ld_preload = f"{src.globalvars.builddir}/print_status_on_exit.so" - ld_preload += f" {src.globalvars.builddir}/sig_handlers.so" - - if "LD_PRELOAD" in env or alloc.get("LD_PRELOAD", ""): - ld_preload += f" {alloc.get('LD_PRELOAD', '')}" - ld_preload += " " + env.get('LD_PRELOAD', '') - - argv.extend(["-p", ld_preload]) - - if "LD_LIBRARY_PATH" in env or alloc.get("LD_LIBRARY_PATH", ""): - argv.extend(["-l", f"{alloc.get('LD_LIBRARY_PATH', '')} {env.get('LD_LIBRARY_PATH', '')}"]) - - cmd_argv = cmd.format(**substitutions) - cmd_argv = src.util.prefix_cmd_with_abspath(cmd_argv).split() - - argv.extend(cmd_argv) - - return argv - - def start_servers(self, env={}, alloc_name="None", alloc={"cmd_prefix": ""}): - """Start Servers - - Servers are not allowed to deamonize because then they can't - be terminated using their Popen object.""" - - substitutions = {"alloc": alloc_name, - "perm": alloc_name, - "builddir": src.globalvars.builddir} - - substitutions.update(self.__dict__) - substitutions.update(alloc) - - for server in self.servers: - server_name = server.get("name", "Server") - print_info(f"Starting {server_name} for {alloc_name}") - - argv = self.prepare_argv(server["cmd"], env, alloc, substitutions) - print_debug(argv) - - proc = subprocess.Popen(argv, env=env, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - universal_newlines=True) - - # TODO: check if server is up - sleep(5) - - ret = proc.poll() - if ret is not None: - print_debug("Stdout:", proc.stdout.read()) - print_debug("Stderr:", proc.stderr.read()) - raise Exception(f"Starting {server_name} failed with exit code: {ret}") - server["popen"] = proc - # Register termination of the server - atexit.register(Benchmark.shutdown_server, self=self, server=server) - - self.results["servers"].setdefault(alloc_name, {s["name"]: {"stdout": [], "stderr": [], "returncode": []} for s in self.servers}) - - if not "prepare_cmds" in server: - continue - - print_info(f"Preparing {server_name}") - for prep_cmd in server["prepare_cmds"]: - prep_cmd = prep_cmd.format(**substitutions) - print_debug(prep_cmd) - - proc = subprocess.run(prep_cmd.split(), universal_newlines=True, - stdout=subprocess.PIPE, stderr=subprocess.PIPE) - - print_debug("Stdout:", proc.stdout) - print_debug("Stderr:", proc.stderr) - - - def shutdown_server(self, server): - """Terminate a started server running its shutdown_cmds in advance""" - if server["popen"].poll() == None: - server_name = server.get("name", "Server") - print_info(f"Shutting down {server_name}") - - substitutions = {} - substitutions.update(self.__dict__) - substitutions.update(server) - - if "shutdown_cmds" in server: - for shutdown_cmd in server["shutdown_cmds"]: - shutdown_cmd = shutdown_cmd.format(**substitutions) - print_debug(shutdown_cmd) - - proc = subprocess.run(shutdown_cmd.split(), universal_newlines=True, - stdout=subprocess.PIPE, stderr=subprocess.PIPE) - - print_debug("Stdout:", proc.stdout) - print_debug("Stderr:", proc.stderr) - - # wait for server termination - sleep(5) - - outs, errs = Benchmark.terminate_subprocess(server["popen"]) - else: - outs = server["popen"].stdout.read() - errs = server["popen"].stderr.read() - - server["stdout"] = outs - server["stderr"] = errs - - def shutdown_servers(self): - """Terminate all started servers""" - print_info("Shutting down servers") - for server in self.servers: - self.shutdown_server(server) - - def run(self, runs=3): - """generic run implementation""" - - # check if we are allowed to use perf - if self.measure_cmd.startswith("perf"): - Benchmark.is_perf_allowed() - - # add benchmark dir to PATH - os.environ["PATH"] += f"{os.pathsep}{src.globalvars.builddir}/benchmarks/{self.name}" - - # save one valid result to expand invalid results - valid_result = {} - - self.results["facts"]["runs"] = runs - - n = len(list(self.iterate_args())) * len(self.allocators) - for run in range(1, runs + 1): - print_status(run, ". run", sep='') - - i = 0 - for alloc_name, alloc in self.allocators.items(): - if alloc_name not in self.results: - self.results[alloc_name] = {} - - skip = False - try: - self.start_servers(alloc_name=alloc_name, alloc=alloc, env=os.environ) - except Exception as e: - print_debug(traceback.format_exc()) - print_error(e) - print_error("Skipping", alloc_name) - skip = True - - # Preallocator hook - if hasattr(self, "preallocator_hook"): - self.preallocator_hook((alloc_name, alloc), run, os.environ) - - # Run benchmark for alloc - for perm in self.iterate_args(): - i += 1 - - # create new result entry - if perm not in self.results[alloc_name]: - self.results[alloc_name][perm] = [] - - # starting the server failed -> add empty result and continue - if skip: - self.results[alloc_name][perm].append({}) - continue - - print_info0(i, "of", n, "\r", end='') - - # Available substitutions in cmd - substitutions = {"run": run, "alloc": alloc_name} - substitutions.update(self.__dict__) - substitutions.update(alloc) - if perm: - substitutions.update(perm._asdict()) - substitutions["perm"] = "-".join([str(v) for v in perm]) - else: - substitutions["perm"] = "" - - # we measure the cmd -> prepare it accordingly - if not self.servers: - argv = self.prepare_argv(self.cmd, os.environ, alloc, substitutions) - # we measure the server -> run cmd as it is - else: - argv = self.prepare_argv(self.cmd, substitutions=substitutions, prepend=False) - - cwd = os.getcwd() - if hasattr(self, "run_dir"): - run_dir = self.run_dir.format(**substitutions) - os.chdir(run_dir) - print_debug("\nChange cwd to:", run_dir) - - print_debug("\nCmd:", argv) - res = subprocess.run(argv, stderr=subprocess.PIPE, - stdout=subprocess.PIPE, - universal_newlines=True) - - result = {} - - if res.returncode != 0 or "ERROR: ld.so" in res.stderr or "Segmentation fault" in res.stderr or "Aborted" in res.stderr: - print() - print_debug("Stdout:\n" + res.stdout) - print_debug("Stderr:\n" + res.stderr) - if res.returncode != 0: - print_error(f"{argv} failed with exit code {res.returncode} for {alloc_name}") - elif "ERROR: ld.so" in res.stderr: - print_error(f"Preloading of {alloc['LD_PRELOAD']} failed for {alloc_name}") - else: - print_error(f"{argv} terminated abnormally") - - # parse and store results - else: - if not self.servers: - if os.path.isfile("status"): - # Read VmHWM from status file. If our benchmark - # didn't fork the first occurance of VmHWM is from - # our benchmark - with open("status", "r") as f: - for l in f.readlines(): - if l.startswith("VmHWM:"): - result["VmHWM"] = l.split()[1] - break - - os.remove("status") - - # parse perf output if available - if self.measure_cmd == Benchmark.measure_cmd or self.measure_cmd_csv: - csvreader = csv.reader(res.stderr.splitlines(), - delimiter=',') - for row in csvreader: - # Split of the user/kernel space info to be better portable - try: - result[row[2].split(":")[0]] = row[0] - except Exception as e: - print_warn("Exception", e, "occured on", row, "for", - alloc_name, "and", perm) - else: - result["server_status"] = [] - for server in self.servers: - with open(f"/proc/{server['popen'].pid}/status", "r") as f: - server_status = f.read() - result["server_status"].append(server_status) - - for l in server_status.splitlines(): - if l.startswith("VmHWM:"): - result[f"{server.get('name', 'Server')}_vmhwm"] = l.split()[1] - break - - - if hasattr(self, "process_output"): - self.process_output(result, res.stdout, res.stderr, - alloc_name, perm) - - - # save a valid result so we can expand invalid ones - if valid_result is None: - valid_result = result - - self.results[alloc_name][perm].append(result) - - if os.getcwd() != cwd: - os.chdir(cwd) - - if self.servers != [] and not skip: - self.shutdown_servers() - - for server in self.servers: - server_result = self.results["servers"][alloc_name][server['name']] - server_result["stdout"].append(server["stdout"]) - server_result["stderr"].append(server["stderr"]) - server_result["returncode"].append(server["popen"].returncode) - - if hasattr(self, "postallocator_hook"): - self.postallocator_hook((alloc_name, alloc), run) - - print() - - # reset PATH - os.environ["PATH"] = os.environ["PATH"].replace(f"{os.pathsep}{src.globalvars.builddir}/benchmarks/{self.name}", "") - - # expand invalid results - if valid_result != {}: - for allocator in self.allocators: - for perm in self.iterate_args(): - for i, m in enumerate(self.results[allocator][perm]): - if m == {}: - self.results[allocator][perm][i] = {k: np.NaN for k in valid_result} - - self.calc_desc_statistics() - - def calc_desc_statistics(self): - """Calculate descriptive statistics for each datapoint""" - allocs = self.results["allocators"] - self.results["stats"] = {} - for alloc in allocs: - # Skip allocators without measurements - if self.results[alloc] == {}: - continue - - self.results["stats"][alloc] = {} - - for perm in self.iterate_args(args=self.results["args"]): - stats = {s: {} for s in ["min", "max", "mean", "median", "std", - "std_perc", - "lower_quartile", "upper_quartile", - "lower_whisker", "upper_whisker", - "outliers"]} - for dp in self.results[alloc][perm][0]: - try: - data = [float(m[dp]) for m in self.results[alloc][perm]] - except (TypeError, ValueError) as e: - continue - stats["min"][dp] = np.min(data) - stats["max"][dp] = np.max(data) - stats["mean"][dp] = np.mean(data) - stats["median"][dp] = np.median(data) - stats["std"][dp] = np.std(data, ddof=1) - stats["std_perc"][dp] = stats["std"][dp] / stats["mean"][dp] - stats["lower_quartile"][dp], stats["upper_quartile"][dp] = np.percentile(data, [25, 75]) - trimmed_range = stats["upper_quartile"][dp] - stats["lower_quartile"][dp] - stats["lower_whisker"][dp] = stats["lower_quartile"][dp] - trimmed_range - stats["upper_whisker"][dp] = stats["upper_quartile"][dp] + trimmed_range - outliers = [] - for d in data: - if d > stats["upper_whisker"][dp] or d < stats["lower_whisker"][dp]: - outliers.append(d) - stats["outliers"][dp] = outliers - - self.results["stats"][alloc][perm] = stats -- cgit v1.2.3