aboutsummaryrefslogtreecommitdiff
path: root/execs/python
diff options
context:
space:
mode:
authorGertjan van den Burg <gertjanvandenburg@gmail.com>2020-03-12 14:33:57 +0000
committerGertjan van den Burg <gertjanvandenburg@gmail.com>2020-03-12 14:33:57 +0000
commit7ef8f6e58990fc069cccc71ed6564e8c639ea4fc (patch)
tree9e7662a34b7d0c1f1c5d9faf6d7d6ea8672f6410 /execs/python
downloadTCPDBench-7ef8f6e58990fc069cccc71ed6564e8c639ea4fc.tar.gz
TCPDBench-7ef8f6e58990fc069cccc71ed6564e8c639ea4fc.zip
initial commit
Diffstat (limited to 'execs/python')
m---------execs/python/bocpdms0
-rw-r--r--execs/python/cpdbench_bocpdms.py199
-rw-r--r--execs/python/cpdbench_rbocpdms.py220
-rw-r--r--execs/python/cpdbench_utils.py152
m---------execs/python/rbocpdms0
5 files changed, 571 insertions, 0 deletions
diff --git a/execs/python/bocpdms b/execs/python/bocpdms
new file mode 160000
+Subproject 26d3cf0bbb688dc99cb9b3ee335b97c1d09b586
diff --git a/execs/python/cpdbench_bocpdms.py b/execs/python/cpdbench_bocpdms.py
new file mode 100644
index 00000000..1f95faf8
--- /dev/null
+++ b/execs/python/cpdbench_bocpdms.py
@@ -0,0 +1,199 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+"""
+Wrapper for BOCPDMS in CPDBench.
+
+Author: G.J.J. van den Burg
+Date: 2019-10-02
+License: See the LICENSE file.
+Copyright: 2019, The Alan Turing Institute
+
+"""
+
+import argparse
+import numpy as np
+import time
+
+from bocpdms import CpModel, BVARNIG, Detector
+from multiprocessing import Process, Manager
+
+from cpdbench_utils import (
+ load_dataset,
+ make_param_dict,
+ exit_with_error,
+ exit_with_timeout,
+ exit_success,
+)
+
+# Ensure overflow errors are raised
+# np.seterr(over="raise")
+
+TIMEOUT = 60 * 30 # 30 minutes
+
+
+def parse_args():
+ parser = argparse.ArgumentParser(description="Wrapper for BOCPDMS")
+ parser.add_argument(
+ "-i", "--input", help="path to the input data file", required=True
+ )
+ parser.add_argument("-o", "--output", help="path to the output file")
+ parser.add_argument(
+ "--intensity",
+ help="parameter for the hazard function",
+ type=float,
+ required=True,
+ )
+ parser.add_argument(
+ "--prior-a", help="initial value of a", type=float, required=True
+ )
+ parser.add_argument(
+ "--prior-b", help="initial value of b", type=float, required=True
+ )
+ parser.add_argument(
+ "--threshold", help="threshold to apply", type=int, default=0
+ )
+ parser.add_argument("--use-timeout", action="store_true")
+
+ return parser.parse_args()
+
+
+def wrapper(args, return_dict, **kwargs):
+ detector = run_bocpdms(*args, **kwargs)
+ return_dict["detector"] = detector
+
+
+def wrap_with_timeout(args, kwargs, limit):
+ manager = Manager()
+ return_dict = manager.dict()
+
+ p = Process(target=wrapper, args=(args, return_dict), kwargs=kwargs)
+ p.start()
+ p.join(limit)
+ if p.is_alive():
+ p.terminate()
+ return None, "timeout"
+ if "detector" in return_dict:
+ return return_dict["detector"], "success"
+ return None, "fail"
+
+
+def run_bocpdms(mat, params):
+ """Set up and run BOCPDMS
+ """
+
+ AR_models = []
+ for lag in range(params["lower_AR"], params["upper_AR"] + 1):
+ AR_models.append(
+ BVARNIG(
+ prior_a=params["prior_a"],
+ prior_b=params["prior_b"],
+ S1=params["S1"],
+ S2=params["S2"],
+ prior_mean_scale=params["prior_mean_scale"],
+ prior_var_scale=params["prior_var_scale"],
+ intercept_grouping=params["intercept_grouping"],
+ nbh_sequence=[0] * lag,
+ restriction_sequence=[0] * lag,
+ hyperparameter_optimization="online",
+ )
+ )
+
+ cp_model = CpModel(params["intensity"])
+
+ model_universe = np.array(AR_models)
+ model_prior = np.array([1 / len(AR_models) for m in AR_models])
+
+ detector = Detector(
+ data=mat,
+ model_universe=model_universe,
+ model_prior=model_prior,
+ cp_model=cp_model,
+ S1=params["S1"],
+ S2=params["S2"],
+ T=mat.shape[0],
+ store_rl=True,
+ store_mrl=True,
+ trim_type="keep_K",
+ threshold=params["threshold"],
+ save_performance_indicators=True,
+ generalized_bayes_rld="kullback_leibler",
+ # alpha_param_learning="individual", # not sure if used
+ # alpha_param=0.01, # not sure if used
+ # alpha_param_opt_t=30, # not sure if used
+ # alpha_rld_learning=True, # not sure if used
+ loss_der_rld_learning="squared_loss",
+ loss_param_learning="squared_loss",
+ )
+ detector.run()
+
+ return detector
+
+
+def main():
+ args = parse_args()
+
+ data, mat = load_dataset(args.input)
+
+ # setting S1 as dimensionality follows the 30portfolio_ICML18.py script.
+ defaults = {
+ "S1": mat.shape[1],
+ "S2": 1,
+ "intercept_grouping": None,
+ "prior_mean_scale": 0, # data is standardized
+ "prior_var_scale": 1, # data is standardized
+ }
+
+ # pick the lag lengths based on the paragraph below the proof of Theorem 1,
+ # using C = 1, as in ``30portfolio_ICML18.py``.
+ T = mat.shape[0]
+ Lmin = 1
+ Lmax = int(pow(T / np.log(T), 0.25) + 1)
+ defaults["lower_AR"] = Lmin
+ defaults["upper_AR"] = Lmax
+
+ parameters = make_param_dict(args, defaults)
+
+ start_time = time.time()
+
+ error = None
+ status = "fail" # if not overwritten, it must have failed
+ try:
+ if args.use_timeout:
+ detector, status = wrap_with_timeout(
+ (mat, parameters), {}, TIMEOUT
+ )
+ else:
+ detector = run_bocpdms(mat, parameters)
+ status = "success"
+ except Exception as err:
+ error = repr(err)
+
+ stop_time = time.time()
+ runtime = stop_time - start_time
+
+ if status == "timeout":
+ exit_with_timeout(data, args, parameters, runtime, __file__)
+
+ if not error is None or status == "fail":
+ exit_with_error(data, args, parameters, error, __file__)
+
+ # According to the Nile unit test, the MAP change points are in
+ # detector.CPs[-2], with time indices in the first of the two-element
+ # vectors.
+ locations = [x[0] for x in detector.CPs[-2]]
+
+ # Based on the fact that time_range in plot_raw_TS of the EvaluationTool
+ # starts from 1 and the fact that CP_loc that same function is ensured to
+ # be in time_range, we assert that the change point locations are 1-based.
+ # We want 0-based, so subtract 1 from each point.
+ locations = [loc - 1 for loc in locations]
+
+ # convert to Python ints
+ locations = [int(loc) for loc in locations]
+
+ exit_success(data, args, parameters, locations, runtime, __file__)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/execs/python/cpdbench_rbocpdms.py b/execs/python/cpdbench_rbocpdms.py
new file mode 100644
index 00000000..53c5e6c2
--- /dev/null
+++ b/execs/python/cpdbench_rbocpdms.py
@@ -0,0 +1,220 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+"""
+Wrapper for RBOCPDMS in CPDBench.
+
+Author: G.J.J. van den Burg
+Date: 2019-10-03
+License: See the LICENSE file.
+Copyright: 2019, The Alan Turing Institute
+
+"""
+
+import argparse
+import numpy as np
+import time
+
+from rbocpdms import CpModel, BVARNIGDPD, Detector
+from multiprocessing import Process, Manager
+
+from cpdbench_utils import (
+ load_dataset,
+ make_param_dict,
+ exit_with_error,
+ exit_with_timeout,
+ exit_success,
+)
+
+TIMEOUT = 60 * 30 # 30 minutes
+
+
+def parse_args():
+ parser = argparse.ArgumentParser(description="Wrapper for BOCPDMS")
+ parser.add_argument(
+ "-i", "--input", help="path to the input data file", required=True
+ )
+ parser.add_argument("-o", "--output", help="path to the output file")
+ parser.add_argument(
+ "--intensity", help="parameter for the hazard function", type=float
+ )
+ parser.add_argument(
+ "--prior-a", help="initial value of a", type=float, required=True
+ )
+ parser.add_argument(
+ "--prior-b", help="initial value of b", type=float, required=True
+ )
+ parser.add_argument(
+ "--threshold", help="threshold to apply", type=int, default=0
+ )
+ parser.add_argument(
+ "--alpha-param", help="alpha parameter", type=float, default=0.5
+ )
+ parser.add_argument(
+ "--alpha-rld", help="alpha rld parameter", type=float, default=0.0
+ )
+ parser.add_argument("--use-timeout", action="store_true")
+ parser.add_argument(
+ "--timeout", type=int, help="timeout in minutes", default=30
+ )
+ return parser.parse_args()
+
+
+def wrapper(args, return_dict, **kwargs):
+ detector = run_rbocpdms(*args, **kwargs)
+ return_dict["detector"] = detector
+
+
+def wrap_with_timeout(args, kwargs, limit):
+ manager = Manager()
+ return_dict = manager.dict()
+
+ p = Process(target=wrapper, args=(args, return_dict), kwargs=kwargs)
+ p.start()
+ p.join(limit)
+ if p.is_alive():
+ p.terminate()
+ status = "timeout"
+ return None, status
+ if "detector" in return_dict:
+ status = "success"
+ return return_dict["detector"], status
+ status = "fail"
+ return None, status
+
+
+def run_rbocpdms(mat, params):
+ """Set up and run RBOCPDMS
+ """
+ S1 = params["S1"]
+ S2 = params["S2"]
+
+ # we use "DPD" from the well log example, as that seems to be the robust
+ # version.
+ model_universe = [
+ BVARNIGDPD(
+ prior_a=params["prior_a"],
+ prior_b=params["prior_b"],
+ S1=S1,
+ S2=S2,
+ alpha_param=params["alpha_param"],
+ prior_mean_beta=params["prior_mean_beta"],
+ prior_var_beta=params["prior_var_beta"],
+ prior_mean_scale=params["prior_mean_scale"],
+ prior_var_scale=params["prior_var_scale"],
+ general_nbh_sequence=[[[]]] * S1 * S2,
+ general_nbh_restriction_sequence=[[0]],
+ general_nbh_coupling="weak coupling",
+ hyperparameter_optimization="online",
+ VB_window_size=params["VB_window_size"],
+ full_opt_thinning=params["full_opt_thinning"],
+ SGD_batch_size=params["SGD_batch_size"],
+ anchor_batch_size_SCSG=params["anchor_batch_size_SCSG"],
+ anchor_batch_size_SVRG=None,
+ first_full_opt=params["first_full_opt"],
+ )
+ ]
+
+ model_universe = np.array(model_universe)
+ model_prior = np.array([1 / len(model_universe)] * len(model_universe))
+
+ cp_model = CpModel(params["intensity"])
+
+ detector = Detector(
+ data=mat,
+ model_universe=model_universe,
+ model_prior=model_prior,
+ cp_model=cp_model,
+ S1=params["S1"],
+ S2=params["S2"],
+ T=mat.shape[0],
+ store_rl=True,
+ store_mrl=True,
+ trim_type="keep_K",
+ threshold=params["threshold"],
+ save_performance_indicators=True,
+ generalized_bayes_rld=params["rld_DPD"],
+ alpha_param_learning="individual",
+ alpha_param=params["alpha_param"],
+ alpha_param_opt_t=100,
+ alpha_rld=params["alpha_rld"],
+ alpha_rld_learning=True,
+ loss_der_rld_learning=params["loss_der_rld_learning"],
+ )
+ detector.run()
+
+ return detector
+
+
+def main():
+ args = parse_args()
+
+ data, mat = load_dataset(args.input)
+
+ # setting S1 as dimensionality follows the 30portfolio_ICML18.py script.
+ # other settings mostly taken from the well log example
+ defaults = {
+ "S1": mat.shape[1],
+ "S2": 1,
+ "SGD_batch_size": 10,
+ "VB_window_size": 360,
+ "anchor_batch_size_SCSG": 25,
+ "first_full_opt": 10,
+ "full_opt_thinning": 20,
+ "intercept_grouping": None,
+ "loss_der_rld_learning": "absolute_loss",
+ "prior_mean_beta": None,
+ "prior_mean_scale": 0, # data has been standardized
+ "prior_var_beta": None,
+ "prior_var_scale": 1.0, # data has been standardized
+ "rld_DPD": "power_divergence", # this ensures doubly robust
+ }
+
+ parameters = make_param_dict(args, defaults)
+
+ start_time = time.time()
+
+ error = None
+ try:
+ if args.use_timeout:
+ detector, status = wrap_with_timeout(
+ (mat, parameters), {}, TIMEOUT
+ )
+ elif args.timeout:
+ detector, status = wrap_with_timeout(
+ (mat, parameters), {}, args.timeout * 60
+ )
+ else:
+ detector = run_rbocpdms(mat, parameters)
+ status = "success"
+ except Exception as err:
+ error = repr(err)
+
+ stop_time = time.time()
+ runtime = stop_time - start_time
+
+ if status == "timeout":
+ exit_with_timeout(data, args, parameters, runtime, __file__)
+
+ if not error is None or status == "fail":
+ exit_with_error(data, args, parameters, error, __file__)
+
+ # According to the Nile unit test, the MAP change points are in
+ # detector.CPs[-2], with time indices in the first of the two-element
+ # vectors.
+ locations = [x[0] for x in detector.CPs[-2]]
+
+ # Based on the fact that time_range in plot_raw_TS of the EvaluationTool
+ # starts from 1 and the fact that CP_loc that same function is ensured to
+ # be in time_range, we assert that the change point locations are 1-based.
+ # We want 0-based, so subtract 1 from each point.
+ locations = [loc - 1 for loc in locations]
+
+ # convert to Python ints
+ locations = [int(loc) for loc in locations]
+
+ exit_success(data, args, parameters, locations, runtime, __file__)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/execs/python/cpdbench_utils.py b/execs/python/cpdbench_utils.py
new file mode 100644
index 00000000..cb074c69
--- /dev/null
+++ b/execs/python/cpdbench_utils.py
@@ -0,0 +1,152 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+"""
+Utility functions for CPDBench.
+
+Author: Gertjan van den Burg
+Copyright (c) 2020 - The Alan Turing Institute
+License: See the LICENSE file.
+
+"""
+
+import copy
+import hashlib
+import json
+import numpy as np
+import socket
+import sys
+
+
+def md5sum(filename):
+ blocksize = 65536
+ hasher = hashlib.md5()
+ with open(filename, "rb") as fp:
+ buf = fp.read(blocksize)
+ while len(buf) > 0:
+ hasher.update(buf)
+ buf = fp.read(blocksize)
+ return hasher.hexdigest()
+
+
+def load_dataset(filename):
+ with open(filename, "r") as fp:
+ data = json.load(fp)
+
+ if data["time"]["index"] != list(range(0, data["n_obs"])):
+ raise NotImplementedError(
+ "Time series with non-consecutive time axis are not yet supported."
+ )
+
+ mat = np.zeros((data["n_obs"], data["n_dim"]))
+ for j, series in enumerate(data["series"]):
+ mat[:, j] = series["raw"]
+
+ # We normalize to avoid numerical errors.
+ mat = (mat - np.nanmean(mat)) / np.sqrt(np.nanvar(mat))
+
+ return data, mat
+
+
+def prepare_result(
+ data,
+ data_filename,
+ status,
+ error,
+ params,
+ locations,
+ runtime,
+ script_filename,
+):
+ out = {}
+
+ # record the command that was used
+ out["command"] = " ".join(sys.argv)
+
+ # save the script and the hash of the script as very rough versioning
+ out["script"] = script_filename
+ out["script_md5"] = md5sum(script_filename)
+
+ # record the hostname
+ out["hostname"] = socket.gethostname()
+
+ # record the dataset name and hash of the dataset
+ out["dataset"] = data["name"]
+ out["dataset_md5"] = md5sum(data_filename)
+
+ # record the status of the detection and any potential error message
+ out["status"] = status
+ out["error"] = error
+
+ # save the parameters that were used
+ out["parameters"] = params
+
+ # save the detection results
+ out["result"] = {"cplocations": locations, "runtime": runtime}
+
+ return out
+
+
+def dump_output(output, filename=None):
+ """Save result to output file or write to stdout """
+ if filename is None:
+ print(json.dumps(output, sort_keys=True, indent="\t"))
+ else:
+ with open(filename, "w") as fp:
+ json.dump(output, fp, sort_keys=True, indent="\t")
+
+
+def make_param_dict(args, defaults):
+ params = copy.deepcopy(vars(args))
+ del params["input"]
+ if "output" in params:
+ del params["output"]
+ params.update(defaults)
+ return params
+
+
+def exit_with_error(data, args, parameters, error, script_filename):
+ status = "FAIL"
+ out = prepare_result(
+ data,
+ args.input,
+ status,
+ error,
+ parameters,
+ None,
+ None,
+ script_filename,
+ )
+ dump_output(out, args.output)
+ raise SystemExit
+
+def exit_with_timeout(data, args, parameters, runtime, script_filename):
+ status = "TIMEOUT"
+ out = prepare_result(
+ data,
+ args.input,
+ status,
+ None,
+ parameters,
+ None,
+ runtime,
+ script_filename,
+ )
+ dump_output(out, args.output)
+ raise SystemExit
+
+
+def exit_success(data, args, parameters, locations, runtime, script_filename):
+ status = "SUCCESS"
+ error = None
+ out = prepare_result(
+ data,
+ args.input,
+ status,
+ error,
+ parameters,
+ locations,
+ runtime,
+ script_filename,
+ )
+ dump_output(out, args.output)
diff --git a/execs/python/rbocpdms b/execs/python/rbocpdms
new file mode 160000
+Subproject 1ffd72aa94fa43c2cb588bdab478c401ebfe1ca