From 7da91858ce7959f102a04bca2b2fcfd418e5a7a9 Mon Sep 17 00:00:00 2001 From: Florian Fischer Date: Wed, 15 May 2019 17:47:22 +0200 Subject: introduce server concept to Benchmark A benchmark object can specify a list of cmds to execute as "servers" in the member attribute server_cmds. Servers are started and terminated through Popen objects. This requires the server cmds to not daemonize so the server can be terminated through the Popen object. For each started server cmd a shutdown function is registered with atexit to terminate all servers even if a exception occurs. Use the new server concept in httpd and mysql benchmarks --- src/benchmark.py | 134 ++++++++++++++++++++++++++++++++++++------------ src/benchmarks/httpd.py | 70 +++++++------------------ src/benchmarks/mysql.py | 61 ++++------------------ 3 files changed, 129 insertions(+), 136 deletions(-) diff --git a/src/benchmark.py b/src/benchmark.py index b436c15..71e4577 100644 --- a/src/benchmark.py +++ b/src/benchmark.py @@ -1,3 +1,4 @@ +import atexit from collections import namedtuple import csv import itertools @@ -8,8 +9,10 @@ import os import pickle import shutil import subprocess +from time import sleep import src.globalvars +import src.util from src.util import * @@ -17,6 +20,7 @@ from src.util import * nan = np.NaN + class Benchmark (object): perf_allowed = None @@ -28,10 +32,29 @@ class Benchmark (object): "measure_cmd": "perf stat -x, -d", "cmd": "true", + "server_cmds": [], "allocators": src.globalvars.allocators, - "server_benchmark": False, } + @staticmethod + def terminate_subprocess(popen, timeout=5): + """Terminate or kill a Popen object""" + + # Skip already terminated subprocess + if popen.poll() is not None: + return + + print_info("Terminating subprocess", popen.args) + popen.terminate() + try: + print_info("Subprocess exited with ", popen.wait(timeout=timeout)) + except: + print_error("Killing subprocess ", server.args) + popen.kill() + popen.wait() + print_debug("Server Out:", popen.stdout) + print_debug("Server Err:", popen.stderr) + @staticmethod def scale_threads_for_cpus(factor, steps=None): cpus = multiprocessing.cpu_count() @@ -82,6 +105,7 @@ class Benchmark (object): print_debug("Creating benchmark", self.name) print_debug("Cmd:", self.cmd) + print_debug("Server Cmds:", self.server_cmds) print_debug("Args:", self.args) print_debug("Requirements:", self.requirements) print_debug("Results dictionary:", self.results) @@ -168,6 +192,50 @@ class Benchmark (object): if is_fixed: yield p + def start_servers(self, env=None, alloc_name="None", alloc={"cmd_prefix": ""}): + """Start Servers + + Servers are not allowed to deamonize because then they can't + be terminated with their Popen object.""" + self.servers = [] + + substitutions = {"alloc": alloc_name, + "perm": alloc_name, + "builddir": src.globalvars.builddir} + + substitutions.update(alloc) + + for server_cmd in self.server_cmds: + print_info("Starting Server for", alloc_name) + + server_cmd = src.util.prefix_cmd_with_abspath(server_cmd) + server_cmd = "{} {} {}".format(self.measure_cmd, + alloc["cmd_prefix"], + server_cmd) + + server_cmd = server_cmd.format(**substitutions) + print_debug(server_cmd) + + server = subprocess.Popen(server_cmd.split(), env=env, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=True) + + #TODO check if server is up + sleep(5) + + ret = server.poll() + if ret is not None: + raise Exception("Starting Server failed with exit code " + str(ret)) + # Register termination of the server + atexit.register(Benchmark.terminate_subprocess, popen=server) + self.servers.append(server) + + def shutdown_servers(self): + print_info("Shutting down servers") + for server in self.servers: + Benchmark.terminate_subprocess(server) + def run(self, runs=5): if runs < 1: return @@ -199,19 +267,23 @@ class Benchmark (object): print_status(run, ". run", sep='') i = 0 - for alloc_name, t in self.allocators.items(): + for alloc_name, alloc in self.allocators.items(): if alloc_name not in self.results: self.results[alloc_name] = {} env = dict(os.environ) env["LD_PRELOAD"] = env.get("LD_PRELOAD", "") env["LD_PRELOAD"] += " " + "build/print_status_on_exit.so" - env["LD_PRELOAD"] += " " + t["LD_PRELOAD"] + env["LD_PRELOAD"] += " " + alloc["LD_PRELOAD"] + + self.start_servers(alloc_name=alloc_name, alloc=alloc, env=env) + # Preallocator hook if hasattr(self, "preallocator_hook"): - self.preallocator_hook((alloc_name, t), run, env, + self.preallocator_hook((alloc_name, alloc), run, env, verbose=src.globalvars.verbosity) + # Run benchmark for alloc for perm in self.iterate_args(): i += 1 print_info0(i, "of", n, "\r", end='') @@ -220,25 +292,17 @@ class Benchmark (object): substitutions = {"run": run} substitutions.update(perm._asdict()) substitutions["perm"] = ("{}-"*(len(perm)-1) + "{}").format(*perm) - substitutions.update(t) + substitutions.update(alloc) actual_cmd = self.cmd.format(**substitutions) actual_env = None - if not self.server_benchmark: - # Find absolute path of executable - binary_end = actual_cmd.find(" ") - binary_end = None if binary_end == -1 else binary_end - cmd_start = len(actual_cmd) if binary_end == None else binary_end - - binary = subprocess.run(["whereis", actual_cmd[0:binary_end]], - stdout=subprocess.PIPE, - universal_newlines=True).stdout.split()[1] - + # Prepend cmd if we are not measuring servers + if self.server_cmds == []: + actual_cmd = src.util.prefix_cmd_with_abspath(actual_cmd) actual_cmd = "{} {} {}{}".format(self.measure_cmd, - t["cmd_prefix"], - binary, - actual_cmd[cmd_start:]) + alloc["cmd_prefix"], + actual_cmd) # substitute again actual_cmd = actual_cmd.format(**substitutions) @@ -264,7 +328,7 @@ class Benchmark (object): # parse and store results else: - if not self.server_benchmark: + if self.server_cmds == []: # Read VmHWM from status file. If our benchmark didn't fork # the first occurance of VmHWM is from our benchmark with open("status", "r") as f: @@ -273,18 +337,21 @@ class Benchmark (object): result["VmHWM"] = l.split()[1] break os.remove("status") - - # Parse perf output if available - if self.measure_cmd == self.defaults["measure_cmd"]: - csvreader = csv.reader(res.stderr.splitlines(), - delimiter=',') - for row in csvreader: - # Split of the user/kernel space info to be better portable - try: - result[row[2].split(":")[0]] = row[0] - except Exception as e: - print_warn("Exception", e, "occured on", row, "for", - alloc_name, "and", perm) + # TODO get VmHWM from servers + else: + pass + + # Parse perf output if available + if self.measure_cmd == self.defaults["measure_cmd"]: + csvreader = csv.reader(res.stderr.splitlines(), + delimiter=',') + for row in csvreader: + # Split of the user/kernel space info to be better portable + try: + result[row[2].split(":")[0]] = row[0] + except Exception as e: + print_warn("Exception", e, "occured on", row, "for", + alloc_name, "and", perm) if hasattr(self, "process_output"): self.process_output(result, res.stdout, res.stderr, @@ -299,9 +366,12 @@ class Benchmark (object): self.results[alloc_name][perm] = [] self.results[alloc_name][perm].append(result) + self.shutdown_servers() + if hasattr(self, "postallocator_hook"): - self.postallocator_hook((alloc_name, t), run, + self.postallocator_hook((alloc_name, alloc), run, verbose=src.globalvars.verbosity) + print() # Reset PATH diff --git a/src/benchmarks/httpd.py b/src/benchmarks/httpd.py index de59d4e..980a6cc 100644 --- a/src/benchmarks/httpd.py +++ b/src/benchmarks/httpd.py @@ -1,4 +1,3 @@ -import atexit import matplotlib.pyplot as plt import numpy as np import re @@ -8,55 +7,26 @@ from subprocess import PIPE import sys from time import sleep -from src.globalvars import builddir from src.benchmark import Benchmark from src.util import * -server_cmd = "{} -c {}/benchmarks/httpd/nginx/nginx.conf".format(shutil.which("nginx"), builddir).split() - class Benchmark_HTTPD(Benchmark): def __init__(self): self.name = "httpd" self.descrition = """TODO""" - self.args = {"nthreads": Benchmark.scale_threads_for_cpus(2)} - self.cmd = "ab -n 10000 -c {nthreads} localhost:8080/index.html" + self.args = {"nthreads": Benchmark.scale_threads_for_cpus(2), + "site": ["index.html", "index.php"]} + self.cmd = "ab -n 100 -c {nthreads} localhost:8080/{site}" self.measure_cmd = "" - self.server_benchmark = True + self.server_cmds = ["nginx -c {builddir}/benchmarks/httpd/etc/nginx/nginx.conf", + "php-fpm -c {builddir}/benchmarks/httpd/etc/php/php.ini -y {builddir}/benchmarks/httpd/etc/php/php-fpm.conf -F"] self.requirements = ["nginx", "ab"] - atexit.register(self.terminate_server) - super().__init__() - def terminate_server(self): - # check if nginx is running - if os.path.isfile(os.path.join(builddir, "benchmarks", self.name, "nginx", "nginx.pid")): - ret = subprocess.run(server_cmd + ["-s", "stop"], stdout=PIPE, - stderr=PIPE, universal_newlines=True) - - if ret.returncode != 0: - print_debug("Stdout:", ret.stdout) - print_debug("Stderr:", ret.stderr) - raise Exception("Stopping {} failed with {}".format(server_cmd[0], ret.returncode)) - - def preallocator_hook(self, allocator, run, env, verbose): - actual_cmd = allocator[1]["cmd_prefix"].split() + server_cmd - print_info("Starting server with:", actual_cmd) - - ret = subprocess.run(actual_cmd, stdout=PIPE, stderr=PIPE, env=env, - universal_newlines=True) - if ret.returncode != 0: - print_debug("Stdout:", ret.stdout) - print_debug("Stderr:", ret.stderr) - raise Exception("Starting {} for {} failed with {}".format(server_cmd[0], allocator[0], ret.returncode)) - - - def postallocator_hook(self, allocator, run, verbose): - self.terminate_server() - def process_output(self, result, stdout, stderr, allocator, perm, verbose): result["time"] = re.search("Time taken for tests:\s*(\d*\.\d*) seconds", stdout).group(1) result["requests"] = re.search("Requests per second:\s*(\d*\.\d*) .*", stdout).group(1) @@ -74,33 +44,29 @@ class Benchmark_HTTPD(Benchmark): self.calc_desc_statistics() # linear plot - self.plot_single_arg("{requests}", + self.plot_fixed_arg("{requests}", xlabel='"threads"', ylabel='"requests/s"', - title='"ab -n 10000 -c threads"') + autoticks=False, + filepostfix="requests", + title='"ab -n 10000 -c " + str(perm.nthreads)') # linear plot ref_alloc = list(allocators)[0] - self.plot_single_arg("{requests}", + self.plot_fixed_arg("{requests}", xlabel='"threads"', ylabel='"requests/s scaled at " + scale', - title='"ab -n 10000 -c threads (normalized)"', - filepostfix="norm", + title='"ab -n 10000 -c " + str(perm.nthreads) + " (normalized)"', + filepostfix="requests.norm", + autoticks=False, scale=ref_alloc) # bar plot - self.barplot_single_arg("{requests}", - xlabel='"threads"', - ylabel='"requests/s"', - filepostfix="b", - title='"ab -n 10000 -c threads"') + # self.barplot_fixed_arg("{requests}", + # xlabel='"threads"', + # ylabel='"requests/s"', + # filepostfix="b", + # title='"ab -n 10000 -c threads"') - # bar plot - self.barplot_single_arg("{requests}", - xlabel='"threads"', - ylabel='"requests/s scaled at " + scale', - title='"ab -n 10000 -c threads (normalized)"', - filepostfix="norm.b.", - scale=ref_alloc) httpd = Benchmark_HTTPD() diff --git a/src/benchmarks/mysql.py b/src/benchmarks/mysql.py index 752a4ea..fb9c2e7 100644 --- a/src/benchmarks/mysql.py +++ b/src/benchmarks/mysql.py @@ -1,4 +1,3 @@ -import atexit import copy import matplotlib.pyplot as plt import multiprocessing @@ -21,14 +20,12 @@ prepare_cmd = ("sysbench oltp_read_only --db-driver=mysql --mysql-user=root " "--mysql-socket=" + cwd + "/mysql_test/socket --tables=5 " "--table-size=1000000 prepare").split() -cmd = ("sysbench oltp_read_only --threads={nthreads} --time=60 --tables=5 " +cmd = ("sysbench oltp_read_only --threads={nthreads} --time=10 --tables=5 " "--db-driver=mysql --mysql-user=root --mysql-socket=" + cwd + "/mysql_test/socket run") -server_cmd = ("{0} -h {2}/mysql_test --socket={2}/mysql_test/socket " - "--max-connections={1} " - "--secure-file-priv=").format(shutil.which("mysqld"), - multiprocessing.cpu_count(), cwd).split() +server_cmd = ("mysqld -h {0}/mysql_test --socket={0}/mysql_test/socket " + "--max-connections={1} --secure-file-priv=").format(cwd, multiprocessing.cpu_count()) class Benchmark_MYSQL(Benchmark): @@ -38,49 +35,18 @@ class Benchmark_MYSQL(Benchmark): # mysqld fails with hoard somehow self.allocators = copy.copy(allocators) - if "hoard" in self.allocators: - del(self.allocators["hoard"]) + if "Hoard" in self.allocators: + del(self.allocators["Hoard"]) self.args = {"nthreads": Benchmark.scale_threads_for_cpus(1)} self.cmd = cmd + self.server_cmds = [server_cmd] self.measure_cmd = "" self.requirements = ["mysqld", "sysbench"] - atexit.register(self.terminate_server) - super().__init__() - def start_and_wait_for_server(self, cmd_prefix="", env=None): - actual_cmd = cmd_prefix.split() + server_cmd - print_info("Starting server with:", actual_cmd) - - self.server = subprocess.Popen(actual_cmd, stdout=PIPE, stderr=PIPE, - env=env, universal_newlines=True) - # TODO make sure server comes up ! - sleep(10) - if self.server.poll() is not None: - print_debug("cmd_prefix:", cmd_prefix, file=sys.stderr) - print_debug("Stderr:", self.server.stderr, file=sys.stderr) - return False - - return True - - def terminate_server(self): - if hasattr(self, "server"): - if self.server.poll() is None: - print_info("Terminating mysql server") - self.server.terminate() - - for i in range(0,10): - sleep(1) - if self.server.poll() is not None: - return - - print_info("Killing still running mysql server") - self.server.kill() - self.server.wait() - def prepare(self): super().prepare() @@ -106,8 +72,7 @@ class Benchmark_MYSQL(Benchmark): print_debug(p.stderr, file=sys.stderr) raise Exception("Creating test DB failed with:", p.returncode) - if not self.start_and_wait_for_server(): - raise Exception("Starting mysql server failed with {}".format(self.server.returncode)) + self.start_servers() # Create sbtest TABLE p = subprocess.run(("mysql -u root -S "+cwd+"/mysql_test/socket").split(" "), @@ -128,21 +93,13 @@ class Benchmark_MYSQL(Benchmark): self.terminate_server() raise Exception("Preparing test tables failed with:", p.returncode) - self.terminate_server() + self.shutdown_servers() def cleanup(self): if os.path.exists("mysql_test"): print_status("Delete mysqld directory") shutil.rmtree("mysql_test", ignore_errors=True) - def preallocator_hook(self, allocator, run, env, verbose): - if not self.start_and_wait_for_server(cmd_prefix=allocator[1]["cmd_prefix"], env=env): - print_debug(allocator[1]["cmd_prefix"], file=sys.stderr) - raise Exception("Starting mysql server for {} failed with".format(allocator[0], self.server.returncode)) - - def postallocator_hook(self, allocator, run, verbose): - self.terminate_server() - def process_output(self, result, stdout, stderr, allocator, perm, verbose): result["transactions"] = re.search("transactions:\s*(\d*)", stdout).group(1) result["queries"] = re.search("queries:\s*(\d*)", stdout).group(1) @@ -151,7 +108,7 @@ class Benchmark_MYSQL(Benchmark): result["avg"] = re.search("avg:\s*(\d*.\d*)", stdout).group(1) result["max"] = re.search("max:\s*(\d*.\d*)", stdout).group(1) - with open("/proc/"+str(self.server.pid)+"/status", "r") as f: + with open("/proc/"+str(self.servers[0].pid)+"/status", "r") as f: for l in f.readlines(): if l.startswith("VmHWM:"): result["rssmax"] = int(l.replace("VmHWM:", "").strip().split()[0]) -- cgit v1.2.3