1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
|
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Utility functions for CPDBench.
Author: Gertjan van den Burg
Copyright (c) 2020 - The Alan Turing Institute
License: See the LICENSE file.
"""
import copy
import hashlib
import json
import numpy as np
import socket
import sys
def md5sum(filename):
blocksize = 65536
hasher = hashlib.md5()
with open(filename, "rb") as fp:
buf = fp.read(blocksize)
while len(buf) > 0:
hasher.update(buf)
buf = fp.read(blocksize)
return hasher.hexdigest()
def load_dataset(filename):
with open(filename, "r") as fp:
data = json.load(fp)
if data["time"]["index"] != list(range(0, data["n_obs"])):
raise NotImplementedError(
"Time series with non-consecutive time axis are not yet supported."
)
mat = np.zeros((data["n_obs"], data["n_dim"]))
for j, series in enumerate(data["series"]):
mat[:, j] = series["raw"]
# We normalize to avoid numerical errors.
mat = (mat - np.nanmean(mat)) / np.sqrt(np.nanvar(mat))
return data, mat
def prepare_result(
data,
data_filename,
status,
error,
params,
locations,
runtime,
script_filename,
):
out = {}
# record the command that was used
out["command"] = " ".join(sys.argv)
# save the script and the hash of the script as very rough versioning
out["script"] = script_filename
out["script_md5"] = md5sum(script_filename)
# record the hostname
out["hostname"] = socket.gethostname()
# record the dataset name and hash of the dataset
out["dataset"] = data["name"]
out["dataset_md5"] = md5sum(data_filename)
# record the status of the detection and any potential error message
out["status"] = status
out["error"] = error
# save the parameters that were used
out["parameters"] = params
# save the detection results
out["result"] = {"cplocations": locations, "runtime": runtime}
return out
def dump_output(output, filename=None):
"""Save result to output file or write to stdout """
if filename is None:
print(json.dumps(output, sort_keys=True, indent="\t"))
else:
with open(filename, "w") as fp:
json.dump(output, fp, sort_keys=True, indent="\t")
def make_param_dict(args, defaults):
params = copy.deepcopy(vars(args))
del params["input"]
if "output" in params:
del params["output"]
params.update(defaults)
return params
def exit_with_error(data, args, parameters, error, script_filename):
status = "FAIL"
out = prepare_result(
data,
args.input,
status,
error,
parameters,
None,
None,
script_filename,
)
dump_output(out, args.output)
raise SystemExit
def exit_with_timeout(data, args, parameters, runtime, script_filename):
status = "TIMEOUT"
out = prepare_result(
data,
args.input,
status,
None,
parameters,
None,
runtime,
script_filename,
)
dump_output(out, args.output)
raise SystemExit
def exit_success(data, args, parameters, locations, runtime, script_filename):
status = "SUCCESS"
error = None
out = prepare_result(
data,
args.input,
status,
error,
parameters,
locations,
runtime,
script_filename,
)
dump_output(out, args.output)
|