aboutsummaryrefslogtreecommitdiff
path: root/execs/python/cpdbench_utils.py
blob: 65e632c14acb2cbe234f0444ae1dd17d316048c5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
Utility functions for CPDBench.

Author: Gertjan van den Burg
Copyright (c) 2020 - The Alan Turing Institute
License: See the LICENSE file.

"""

import copy
import hashlib
import json
import numpy as np
import socket
import sys


def md5sum(filename):
    """Compute the MD5 checksum of a given file"""
    blocksize = 65536
    hasher = hashlib.md5()
    with open(filename, "rb") as fp:
        buf = fp.read(blocksize)
        while len(buf) > 0:
            hasher.update(buf)
            buf = fp.read(blocksize)
    return hasher.hexdigest()


def load_dataset(filename):
    """ Load a CPDBench dataset """
    with open(filename, "r") as fp:
        data = json.load(fp)

    if data["time"]["index"] != list(range(0, data["n_obs"])):
        raise NotImplementedError(
            "Time series with non-consecutive time axis are not yet supported."
        )

    mat = np.zeros((data["n_obs"], data["n_dim"]))
    for j, series in enumerate(data["series"]):
        mat[:, j] = series["raw"]

    # We normalize to avoid numerical errors.
    mat = (mat - np.nanmean(mat)) / np.sqrt(np.nanvar(mat))

    return data, mat


def prepare_result(
    data,
    data_filename,
    status,
    error,
    params,
    locations,
    runtime,
    script_filename,
):
    """Prepare the experiment output as a dictionary

    Parameters
    ----------
    data : dict
        The CPDBench dataset object

    data_filename : str
        Absolute path to the dataset file

    status : str
        Status of the experiments. Commonly used status codes are: SUCCESS if 
        the experiment was succesful, SKIP is the method was provided improper 
        parameters, FAIL if the method failed for whatever reason, and TIMEOUT 
        if the method ran too long.

    error : str
        If an error occurred, this field can be used to describe what it is. 

    params : dict
        Dictionary of parameters provided to the method. It is good to be as 
        complete as possible, so even default methods should be added to this 
        field. This enhances reproducibility.

    locations : list
        Detected change point locations. Remember that change locations are 
        indices of time points and are 0-based (start counting at zero, thus 
        change locations are integers on the interval [0, T-1], including both 
        endpoints).

    runtime : float
        Runtime of the method. This should be computed as accurately as 
        possible, excluding any method-specific setup code.

    script_filename :
        Path to the script of the method. This is hashed to enable rough 
        versioning.

    """
    out = {}

    # record the command that was used
    out["command"] = " ".join(sys.argv)

    # save the script and the hash of the script as very rough versioning
    out["script"] = script_filename
    out["script_md5"] = md5sum(script_filename)

    # record the hostname
    out["hostname"] = socket.gethostname()

    # record the dataset name and hash of the dataset
    out["dataset"] = data["name"]
    out["dataset_md5"] = md5sum(data_filename)

    # record the status of the detection and any potential error message
    out["status"] = status
    out["error"] = error

    # save the parameters that were used
    out["parameters"] = params

    # save the detection results
    out["result"] = {"cplocations": locations, "runtime": runtime}

    return out


def dump_output(output, filename=None):
    """Save result to output file or write to stdout (json format)"""
    if filename is None:
        print(json.dumps(output, sort_keys=True, indent="\t"))
    else:
        with open(filename, "w") as fp:
            json.dump(output, fp, sort_keys=True, indent="\t")


def make_param_dict(args, defaults):
    """Create the parameter dict combining CLI arguments and defaults"""
    params = copy.deepcopy(vars(args))
    del params["input"]
    if "output" in params:
        del params["output"]
    params.update(defaults)
    return params


def exit_with_error(data, args, parameters, error, script_filename):
    """Exit and save result using the 'FAIL' exit status"""
    status = "FAIL"
    out = prepare_result(
        data,
        args.input,
        status,
        error,
        parameters,
        None,
        None,
        script_filename,
    )
    dump_output(out, args.output)
    raise SystemExit


def exit_with_timeout(data, args, parameters, runtime, script_filename):
    """Exit and save result using the 'TIMEOUT' exit status"""
    status = "TIMEOUT"
    out = prepare_result(
        data,
        args.input,
        status,
        None,
        parameters,
        None,
        runtime,
        script_filename,
    )
    dump_output(out, args.output)
    raise SystemExit


def exit_success(data, args, parameters, locations, runtime, script_filename):
    """Exit and save result using the 'SUCCESS' exit status"""
    status = "SUCCESS"
    error = None
    out = prepare_result(
        data,
        args.input,
        status,
        error,
        parameters,
        locations,
        runtime,
        script_filename,
    )
    dump_output(out, args.output)