aboutsummaryrefslogtreecommitdiff
path: root/execs/python/cpdbench_utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'execs/python/cpdbench_utils.py')
-rw-r--r--execs/python/cpdbench_utils.py152
1 files changed, 152 insertions, 0 deletions
diff --git a/execs/python/cpdbench_utils.py b/execs/python/cpdbench_utils.py
new file mode 100644
index 00000000..cb074c69
--- /dev/null
+++ b/execs/python/cpdbench_utils.py
@@ -0,0 +1,152 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+"""
+Utility functions for CPDBench.
+
+Author: Gertjan van den Burg
+Copyright (c) 2020 - The Alan Turing Institute
+License: See the LICENSE file.
+
+"""
+
+import copy
+import hashlib
+import json
+import numpy as np
+import socket
+import sys
+
+
+def md5sum(filename):
+ blocksize = 65536
+ hasher = hashlib.md5()
+ with open(filename, "rb") as fp:
+ buf = fp.read(blocksize)
+ while len(buf) > 0:
+ hasher.update(buf)
+ buf = fp.read(blocksize)
+ return hasher.hexdigest()
+
+
+def load_dataset(filename):
+ with open(filename, "r") as fp:
+ data = json.load(fp)
+
+ if data["time"]["index"] != list(range(0, data["n_obs"])):
+ raise NotImplementedError(
+ "Time series with non-consecutive time axis are not yet supported."
+ )
+
+ mat = np.zeros((data["n_obs"], data["n_dim"]))
+ for j, series in enumerate(data["series"]):
+ mat[:, j] = series["raw"]
+
+ # We normalize to avoid numerical errors.
+ mat = (mat - np.nanmean(mat)) / np.sqrt(np.nanvar(mat))
+
+ return data, mat
+
+
+def prepare_result(
+ data,
+ data_filename,
+ status,
+ error,
+ params,
+ locations,
+ runtime,
+ script_filename,
+):
+ out = {}
+
+ # record the command that was used
+ out["command"] = " ".join(sys.argv)
+
+ # save the script and the hash of the script as very rough versioning
+ out["script"] = script_filename
+ out["script_md5"] = md5sum(script_filename)
+
+ # record the hostname
+ out["hostname"] = socket.gethostname()
+
+ # record the dataset name and hash of the dataset
+ out["dataset"] = data["name"]
+ out["dataset_md5"] = md5sum(data_filename)
+
+ # record the status of the detection and any potential error message
+ out["status"] = status
+ out["error"] = error
+
+ # save the parameters that were used
+ out["parameters"] = params
+
+ # save the detection results
+ out["result"] = {"cplocations": locations, "runtime": runtime}
+
+ return out
+
+
+def dump_output(output, filename=None):
+ """Save result to output file or write to stdout """
+ if filename is None:
+ print(json.dumps(output, sort_keys=True, indent="\t"))
+ else:
+ with open(filename, "w") as fp:
+ json.dump(output, fp, sort_keys=True, indent="\t")
+
+
+def make_param_dict(args, defaults):
+ params = copy.deepcopy(vars(args))
+ del params["input"]
+ if "output" in params:
+ del params["output"]
+ params.update(defaults)
+ return params
+
+
+def exit_with_error(data, args, parameters, error, script_filename):
+ status = "FAIL"
+ out = prepare_result(
+ data,
+ args.input,
+ status,
+ error,
+ parameters,
+ None,
+ None,
+ script_filename,
+ )
+ dump_output(out, args.output)
+ raise SystemExit
+
+def exit_with_timeout(data, args, parameters, runtime, script_filename):
+ status = "TIMEOUT"
+ out = prepare_result(
+ data,
+ args.input,
+ status,
+ None,
+ parameters,
+ None,
+ runtime,
+ script_filename,
+ )
+ dump_output(out, args.output)
+ raise SystemExit
+
+
+def exit_success(data, args, parameters, locations, runtime, script_filename):
+ status = "SUCCESS"
+ error = None
+ out = prepare_result(
+ data,
+ args.input,
+ status,
+ error,
+ parameters,
+ locations,
+ runtime,
+ script_filename,
+ )
+ dump_output(out, args.output)