diff options
Diffstat (limited to 'execs/python/cpdbench_utils.py')
| -rw-r--r-- | execs/python/cpdbench_utils.py | 152 |
1 files changed, 152 insertions, 0 deletions
diff --git a/execs/python/cpdbench_utils.py b/execs/python/cpdbench_utils.py new file mode 100644 index 00000000..cb074c69 --- /dev/null +++ b/execs/python/cpdbench_utils.py @@ -0,0 +1,152 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +Utility functions for CPDBench. + +Author: Gertjan van den Burg +Copyright (c) 2020 - The Alan Turing Institute +License: See the LICENSE file. + +""" + +import copy +import hashlib +import json +import numpy as np +import socket +import sys + + +def md5sum(filename): + blocksize = 65536 + hasher = hashlib.md5() + with open(filename, "rb") as fp: + buf = fp.read(blocksize) + while len(buf) > 0: + hasher.update(buf) + buf = fp.read(blocksize) + return hasher.hexdigest() + + +def load_dataset(filename): + with open(filename, "r") as fp: + data = json.load(fp) + + if data["time"]["index"] != list(range(0, data["n_obs"])): + raise NotImplementedError( + "Time series with non-consecutive time axis are not yet supported." + ) + + mat = np.zeros((data["n_obs"], data["n_dim"])) + for j, series in enumerate(data["series"]): + mat[:, j] = series["raw"] + + # We normalize to avoid numerical errors. + mat = (mat - np.nanmean(mat)) / np.sqrt(np.nanvar(mat)) + + return data, mat + + +def prepare_result( + data, + data_filename, + status, + error, + params, + locations, + runtime, + script_filename, +): + out = {} + + # record the command that was used + out["command"] = " ".join(sys.argv) + + # save the script and the hash of the script as very rough versioning + out["script"] = script_filename + out["script_md5"] = md5sum(script_filename) + + # record the hostname + out["hostname"] = socket.gethostname() + + # record the dataset name and hash of the dataset + out["dataset"] = data["name"] + out["dataset_md5"] = md5sum(data_filename) + + # record the status of the detection and any potential error message + out["status"] = status + out["error"] = error + + # save the parameters that were used + out["parameters"] = params + + # save the detection results + out["result"] = {"cplocations": locations, "runtime": runtime} + + return out + + +def dump_output(output, filename=None): + """Save result to output file or write to stdout """ + if filename is None: + print(json.dumps(output, sort_keys=True, indent="\t")) + else: + with open(filename, "w") as fp: + json.dump(output, fp, sort_keys=True, indent="\t") + + +def make_param_dict(args, defaults): + params = copy.deepcopy(vars(args)) + del params["input"] + if "output" in params: + del params["output"] + params.update(defaults) + return params + + +def exit_with_error(data, args, parameters, error, script_filename): + status = "FAIL" + out = prepare_result( + data, + args.input, + status, + error, + parameters, + None, + None, + script_filename, + ) + dump_output(out, args.output) + raise SystemExit + +def exit_with_timeout(data, args, parameters, runtime, script_filename): + status = "TIMEOUT" + out = prepare_result( + data, + args.input, + status, + None, + parameters, + None, + runtime, + script_filename, + ) + dump_output(out, args.output) + raise SystemExit + + +def exit_success(data, args, parameters, locations, runtime, script_filename): + status = "SUCCESS" + error = None + out = prepare_result( + data, + args.input, + status, + error, + parameters, + locations, + runtime, + script_filename, + ) + dump_output(out, args.output) |
