1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
# -*- coding: utf-8 -*-
"""
Evaluation metrics
Author: G.J.J. van den Burg
Copyright (c) 2020 - The Alan Turing Institute
License: See the LICENSE file.
"""
def true_positives(T, X, margin=5):
"""Compute true positives without double counting
"""
# make a copy so we don't affect the caller
X = set(list(X))
TP = set()
for tau in T:
close = [(abs(tau - x), x) for x in X if abs(tau - x) <= margin]
close.sort()
if not close:
continue
dist, xstar = close[0]
TP.add(tau)
X.remove(xstar)
return TP
def f_measure(annotations, predictions, margin=5, alpha=0.5, return_PR=False):
"""Compute the F-measure based on human annotations.
annotations : dict from user_id to iterable of CP locations
predictions : iterable of predicted CP locations
alpha : value for the F-measure, alpha=0.5 gives the F1-measure
return_PR : whether to return precision and recall too
Remember that all CP locations are 0-based!
"""
# ensure 0 is in all the sets
Tks = {k + 1: set(annotations[uid]) for k, uid in enumerate(annotations)}
for Tk in Tks.values():
Tk.add(0)
X = set(predictions)
X.add(0)
Tstar = [tau for tau in Tk for Tk in Tks.values()]
Tstar = set(Tstar)
K = len(Tks)
P = len(true_positives(Tstar, X, margin=margin)) / len(X)
TPk = {k: true_positives(Tks[k], X, margin=margin) for k in Tks}
R = 1 / K * sum(len(TPk[k]) / len(Tks[k]) for k in Tks)
F = P * R / (alpha * R + (1 - alpha) * P)
if return_PR:
return F, P, R
return F
def overlap(A, B):
""" Return the overlap (i.e. Jaccard index) of two sets """
return len(A.intersection(B)) / len(A.union(B))
def partition_from_cps(locations, n_obs):
""" Return a list of sets that give a partition of the set [0, T-1], as
defined by the change point locations.
>>> partition_from_cps([], 5)
[{0, 1, 2, 3, 4}]
>>> partition_from_cps([3, 5], 8)
[{0, 1, 2}, {3, 4}, {5, 6, 7}]
>>> partition_from_cps([1,2,7], 8)
[{0}, {1}, {2, 3, 4, 5, 6}, {7}]
>>> partition_from_cps([0, 4], 6)
[{0, 1, 2, 3}, {4, 5}]
"""
T = n_obs
partition = []
current = set()
all_cps = iter(sorted(set(locations)))
cp = next(all_cps, None)
for i in range(T):
if i == cp:
if current:
partition.append(current)
current = set()
cp = next(all_cps, None)
current.add(i)
partition.append(current)
return partition
def cover_single(Sprime, S):
"""Compute the covering of a segmentation S by a segmentation Sprime.
This follows equation (8) in Arbaleaz, 2010.
"""
T = sum(map(len, Sprime))
assert T == sum(map(len, S))
C = 0
for R in S:
C += len(R) * max(overlap(R, Rprime) for Rprime in Sprime)
C /= T
return C
def covering(annotations, predictions, n_obs):
"""Compute the average segmentation covering against the human annotations.
annotations : dict from user_id to iterable of CP locations
predictions : iterable of predicted Cp locations
n_obs : number of observations in the series
"""
Ak = {
k + 1: partition_from_cps(annotations[uid], n_obs)
for k, uid in enumerate(annotations)
}
pX = partition_from_cps(predictions, n_obs)
Cs = [cover_single(pX, Ak[k]) for k in Ak]
return sum(Cs) / len(Cs)
|