diff options
| author | Gertjan van den Burg <gertjanvandenburg@gmail.com> | 2020-03-12 14:33:57 +0000 |
|---|---|---|
| committer | Gertjan van den Burg <gertjanvandenburg@gmail.com> | 2020-03-12 14:33:57 +0000 |
| commit | 7ef8f6e58990fc069cccc71ed6564e8c639ea4fc (patch) | |
| tree | 9e7662a34b7d0c1f1c5d9faf6d7d6ea8672f6410 /execs/python | |
| download | TCPDBench-7ef8f6e58990fc069cccc71ed6564e8c639ea4fc.tar.gz TCPDBench-7ef8f6e58990fc069cccc71ed6564e8c639ea4fc.zip | |
initial commit
Diffstat (limited to 'execs/python')
| m--------- | execs/python/bocpdms | 0 | ||||
| -rw-r--r-- | execs/python/cpdbench_bocpdms.py | 199 | ||||
| -rw-r--r-- | execs/python/cpdbench_rbocpdms.py | 220 | ||||
| -rw-r--r-- | execs/python/cpdbench_utils.py | 152 | ||||
| m--------- | execs/python/rbocpdms | 0 |
5 files changed, 571 insertions, 0 deletions
diff --git a/execs/python/bocpdms b/execs/python/bocpdms new file mode 160000 +Subproject 26d3cf0bbb688dc99cb9b3ee335b97c1d09b586 diff --git a/execs/python/cpdbench_bocpdms.py b/execs/python/cpdbench_bocpdms.py new file mode 100644 index 00000000..1f95faf8 --- /dev/null +++ b/execs/python/cpdbench_bocpdms.py @@ -0,0 +1,199 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +Wrapper for BOCPDMS in CPDBench. + +Author: G.J.J. van den Burg +Date: 2019-10-02 +License: See the LICENSE file. +Copyright: 2019, The Alan Turing Institute + +""" + +import argparse +import numpy as np +import time + +from bocpdms import CpModel, BVARNIG, Detector +from multiprocessing import Process, Manager + +from cpdbench_utils import ( + load_dataset, + make_param_dict, + exit_with_error, + exit_with_timeout, + exit_success, +) + +# Ensure overflow errors are raised +# np.seterr(over="raise") + +TIMEOUT = 60 * 30 # 30 minutes + + +def parse_args(): + parser = argparse.ArgumentParser(description="Wrapper for BOCPDMS") + parser.add_argument( + "-i", "--input", help="path to the input data file", required=True + ) + parser.add_argument("-o", "--output", help="path to the output file") + parser.add_argument( + "--intensity", + help="parameter for the hazard function", + type=float, + required=True, + ) + parser.add_argument( + "--prior-a", help="initial value of a", type=float, required=True + ) + parser.add_argument( + "--prior-b", help="initial value of b", type=float, required=True + ) + parser.add_argument( + "--threshold", help="threshold to apply", type=int, default=0 + ) + parser.add_argument("--use-timeout", action="store_true") + + return parser.parse_args() + + +def wrapper(args, return_dict, **kwargs): + detector = run_bocpdms(*args, **kwargs) + return_dict["detector"] = detector + + +def wrap_with_timeout(args, kwargs, limit): + manager = Manager() + return_dict = manager.dict() + + p = Process(target=wrapper, args=(args, return_dict), kwargs=kwargs) + p.start() + p.join(limit) + if p.is_alive(): + p.terminate() + return None, "timeout" + if "detector" in return_dict: + return return_dict["detector"], "success" + return None, "fail" + + +def run_bocpdms(mat, params): + """Set up and run BOCPDMS + """ + + AR_models = [] + for lag in range(params["lower_AR"], params["upper_AR"] + 1): + AR_models.append( + BVARNIG( + prior_a=params["prior_a"], + prior_b=params["prior_b"], + S1=params["S1"], + S2=params["S2"], + prior_mean_scale=params["prior_mean_scale"], + prior_var_scale=params["prior_var_scale"], + intercept_grouping=params["intercept_grouping"], + nbh_sequence=[0] * lag, + restriction_sequence=[0] * lag, + hyperparameter_optimization="online", + ) + ) + + cp_model = CpModel(params["intensity"]) + + model_universe = np.array(AR_models) + model_prior = np.array([1 / len(AR_models) for m in AR_models]) + + detector = Detector( + data=mat, + model_universe=model_universe, + model_prior=model_prior, + cp_model=cp_model, + S1=params["S1"], + S2=params["S2"], + T=mat.shape[0], + store_rl=True, + store_mrl=True, + trim_type="keep_K", + threshold=params["threshold"], + save_performance_indicators=True, + generalized_bayes_rld="kullback_leibler", + # alpha_param_learning="individual", # not sure if used + # alpha_param=0.01, # not sure if used + # alpha_param_opt_t=30, # not sure if used + # alpha_rld_learning=True, # not sure if used + loss_der_rld_learning="squared_loss", + loss_param_learning="squared_loss", + ) + detector.run() + + return detector + + +def main(): + args = parse_args() + + data, mat = load_dataset(args.input) + + # setting S1 as dimensionality follows the 30portfolio_ICML18.py script. + defaults = { + "S1": mat.shape[1], + "S2": 1, + "intercept_grouping": None, + "prior_mean_scale": 0, # data is standardized + "prior_var_scale": 1, # data is standardized + } + + # pick the lag lengths based on the paragraph below the proof of Theorem 1, + # using C = 1, as in ``30portfolio_ICML18.py``. + T = mat.shape[0] + Lmin = 1 + Lmax = int(pow(T / np.log(T), 0.25) + 1) + defaults["lower_AR"] = Lmin + defaults["upper_AR"] = Lmax + + parameters = make_param_dict(args, defaults) + + start_time = time.time() + + error = None + status = "fail" # if not overwritten, it must have failed + try: + if args.use_timeout: + detector, status = wrap_with_timeout( + (mat, parameters), {}, TIMEOUT + ) + else: + detector = run_bocpdms(mat, parameters) + status = "success" + except Exception as err: + error = repr(err) + + stop_time = time.time() + runtime = stop_time - start_time + + if status == "timeout": + exit_with_timeout(data, args, parameters, runtime, __file__) + + if not error is None or status == "fail": + exit_with_error(data, args, parameters, error, __file__) + + # According to the Nile unit test, the MAP change points are in + # detector.CPs[-2], with time indices in the first of the two-element + # vectors. + locations = [x[0] for x in detector.CPs[-2]] + + # Based on the fact that time_range in plot_raw_TS of the EvaluationTool + # starts from 1 and the fact that CP_loc that same function is ensured to + # be in time_range, we assert that the change point locations are 1-based. + # We want 0-based, so subtract 1 from each point. + locations = [loc - 1 for loc in locations] + + # convert to Python ints + locations = [int(loc) for loc in locations] + + exit_success(data, args, parameters, locations, runtime, __file__) + + +if __name__ == "__main__": + main() diff --git a/execs/python/cpdbench_rbocpdms.py b/execs/python/cpdbench_rbocpdms.py new file mode 100644 index 00000000..53c5e6c2 --- /dev/null +++ b/execs/python/cpdbench_rbocpdms.py @@ -0,0 +1,220 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +Wrapper for RBOCPDMS in CPDBench. + +Author: G.J.J. van den Burg +Date: 2019-10-03 +License: See the LICENSE file. +Copyright: 2019, The Alan Turing Institute + +""" + +import argparse +import numpy as np +import time + +from rbocpdms import CpModel, BVARNIGDPD, Detector +from multiprocessing import Process, Manager + +from cpdbench_utils import ( + load_dataset, + make_param_dict, + exit_with_error, + exit_with_timeout, + exit_success, +) + +TIMEOUT = 60 * 30 # 30 minutes + + +def parse_args(): + parser = argparse.ArgumentParser(description="Wrapper for BOCPDMS") + parser.add_argument( + "-i", "--input", help="path to the input data file", required=True + ) + parser.add_argument("-o", "--output", help="path to the output file") + parser.add_argument( + "--intensity", help="parameter for the hazard function", type=float + ) + parser.add_argument( + "--prior-a", help="initial value of a", type=float, required=True + ) + parser.add_argument( + "--prior-b", help="initial value of b", type=float, required=True + ) + parser.add_argument( + "--threshold", help="threshold to apply", type=int, default=0 + ) + parser.add_argument( + "--alpha-param", help="alpha parameter", type=float, default=0.5 + ) + parser.add_argument( + "--alpha-rld", help="alpha rld parameter", type=float, default=0.0 + ) + parser.add_argument("--use-timeout", action="store_true") + parser.add_argument( + "--timeout", type=int, help="timeout in minutes", default=30 + ) + return parser.parse_args() + + +def wrapper(args, return_dict, **kwargs): + detector = run_rbocpdms(*args, **kwargs) + return_dict["detector"] = detector + + +def wrap_with_timeout(args, kwargs, limit): + manager = Manager() + return_dict = manager.dict() + + p = Process(target=wrapper, args=(args, return_dict), kwargs=kwargs) + p.start() + p.join(limit) + if p.is_alive(): + p.terminate() + status = "timeout" + return None, status + if "detector" in return_dict: + status = "success" + return return_dict["detector"], status + status = "fail" + return None, status + + +def run_rbocpdms(mat, params): + """Set up and run RBOCPDMS + """ + S1 = params["S1"] + S2 = params["S2"] + + # we use "DPD" from the well log example, as that seems to be the robust + # version. + model_universe = [ + BVARNIGDPD( + prior_a=params["prior_a"], + prior_b=params["prior_b"], + S1=S1, + S2=S2, + alpha_param=params["alpha_param"], + prior_mean_beta=params["prior_mean_beta"], + prior_var_beta=params["prior_var_beta"], + prior_mean_scale=params["prior_mean_scale"], + prior_var_scale=params["prior_var_scale"], + general_nbh_sequence=[[[]]] * S1 * S2, + general_nbh_restriction_sequence=[[0]], + general_nbh_coupling="weak coupling", + hyperparameter_optimization="online", + VB_window_size=params["VB_window_size"], + full_opt_thinning=params["full_opt_thinning"], + SGD_batch_size=params["SGD_batch_size"], + anchor_batch_size_SCSG=params["anchor_batch_size_SCSG"], + anchor_batch_size_SVRG=None, + first_full_opt=params["first_full_opt"], + ) + ] + + model_universe = np.array(model_universe) + model_prior = np.array([1 / len(model_universe)] * len(model_universe)) + + cp_model = CpModel(params["intensity"]) + + detector = Detector( + data=mat, + model_universe=model_universe, + model_prior=model_prior, + cp_model=cp_model, + S1=params["S1"], + S2=params["S2"], + T=mat.shape[0], + store_rl=True, + store_mrl=True, + trim_type="keep_K", + threshold=params["threshold"], + save_performance_indicators=True, + generalized_bayes_rld=params["rld_DPD"], + alpha_param_learning="individual", + alpha_param=params["alpha_param"], + alpha_param_opt_t=100, + alpha_rld=params["alpha_rld"], + alpha_rld_learning=True, + loss_der_rld_learning=params["loss_der_rld_learning"], + ) + detector.run() + + return detector + + +def main(): + args = parse_args() + + data, mat = load_dataset(args.input) + + # setting S1 as dimensionality follows the 30portfolio_ICML18.py script. + # other settings mostly taken from the well log example + defaults = { + "S1": mat.shape[1], + "S2": 1, + "SGD_batch_size": 10, + "VB_window_size": 360, + "anchor_batch_size_SCSG": 25, + "first_full_opt": 10, + "full_opt_thinning": 20, + "intercept_grouping": None, + "loss_der_rld_learning": "absolute_loss", + "prior_mean_beta": None, + "prior_mean_scale": 0, # data has been standardized + "prior_var_beta": None, + "prior_var_scale": 1.0, # data has been standardized + "rld_DPD": "power_divergence", # this ensures doubly robust + } + + parameters = make_param_dict(args, defaults) + + start_time = time.time() + + error = None + try: + if args.use_timeout: + detector, status = wrap_with_timeout( + (mat, parameters), {}, TIMEOUT + ) + elif args.timeout: + detector, status = wrap_with_timeout( + (mat, parameters), {}, args.timeout * 60 + ) + else: + detector = run_rbocpdms(mat, parameters) + status = "success" + except Exception as err: + error = repr(err) + + stop_time = time.time() + runtime = stop_time - start_time + + if status == "timeout": + exit_with_timeout(data, args, parameters, runtime, __file__) + + if not error is None or status == "fail": + exit_with_error(data, args, parameters, error, __file__) + + # According to the Nile unit test, the MAP change points are in + # detector.CPs[-2], with time indices in the first of the two-element + # vectors. + locations = [x[0] for x in detector.CPs[-2]] + + # Based on the fact that time_range in plot_raw_TS of the EvaluationTool + # starts from 1 and the fact that CP_loc that same function is ensured to + # be in time_range, we assert that the change point locations are 1-based. + # We want 0-based, so subtract 1 from each point. + locations = [loc - 1 for loc in locations] + + # convert to Python ints + locations = [int(loc) for loc in locations] + + exit_success(data, args, parameters, locations, runtime, __file__) + + +if __name__ == "__main__": + main() diff --git a/execs/python/cpdbench_utils.py b/execs/python/cpdbench_utils.py new file mode 100644 index 00000000..cb074c69 --- /dev/null +++ b/execs/python/cpdbench_utils.py @@ -0,0 +1,152 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +Utility functions for CPDBench. + +Author: Gertjan van den Burg +Copyright (c) 2020 - The Alan Turing Institute +License: See the LICENSE file. + +""" + +import copy +import hashlib +import json +import numpy as np +import socket +import sys + + +def md5sum(filename): + blocksize = 65536 + hasher = hashlib.md5() + with open(filename, "rb") as fp: + buf = fp.read(blocksize) + while len(buf) > 0: + hasher.update(buf) + buf = fp.read(blocksize) + return hasher.hexdigest() + + +def load_dataset(filename): + with open(filename, "r") as fp: + data = json.load(fp) + + if data["time"]["index"] != list(range(0, data["n_obs"])): + raise NotImplementedError( + "Time series with non-consecutive time axis are not yet supported." + ) + + mat = np.zeros((data["n_obs"], data["n_dim"])) + for j, series in enumerate(data["series"]): + mat[:, j] = series["raw"] + + # We normalize to avoid numerical errors. + mat = (mat - np.nanmean(mat)) / np.sqrt(np.nanvar(mat)) + + return data, mat + + +def prepare_result( + data, + data_filename, + status, + error, + params, + locations, + runtime, + script_filename, +): + out = {} + + # record the command that was used + out["command"] = " ".join(sys.argv) + + # save the script and the hash of the script as very rough versioning + out["script"] = script_filename + out["script_md5"] = md5sum(script_filename) + + # record the hostname + out["hostname"] = socket.gethostname() + + # record the dataset name and hash of the dataset + out["dataset"] = data["name"] + out["dataset_md5"] = md5sum(data_filename) + + # record the status of the detection and any potential error message + out["status"] = status + out["error"] = error + + # save the parameters that were used + out["parameters"] = params + + # save the detection results + out["result"] = {"cplocations": locations, "runtime": runtime} + + return out + + +def dump_output(output, filename=None): + """Save result to output file or write to stdout """ + if filename is None: + print(json.dumps(output, sort_keys=True, indent="\t")) + else: + with open(filename, "w") as fp: + json.dump(output, fp, sort_keys=True, indent="\t") + + +def make_param_dict(args, defaults): + params = copy.deepcopy(vars(args)) + del params["input"] + if "output" in params: + del params["output"] + params.update(defaults) + return params + + +def exit_with_error(data, args, parameters, error, script_filename): + status = "FAIL" + out = prepare_result( + data, + args.input, + status, + error, + parameters, + None, + None, + script_filename, + ) + dump_output(out, args.output) + raise SystemExit + +def exit_with_timeout(data, args, parameters, runtime, script_filename): + status = "TIMEOUT" + out = prepare_result( + data, + args.input, + status, + None, + parameters, + None, + runtime, + script_filename, + ) + dump_output(out, args.output) + raise SystemExit + + +def exit_success(data, args, parameters, locations, runtime, script_filename): + status = "SUCCESS" + error = None + out = prepare_result( + data, + args.input, + status, + error, + parameters, + locations, + runtime, + script_filename, + ) + dump_output(out, args.output) diff --git a/execs/python/rbocpdms b/execs/python/rbocpdms new file mode 160000 +Subproject 1ffd72aa94fa43c2cb588bdab478c401ebfe1ca |
