aboutsummaryrefslogtreecommitdiff
path: root/analysis/scripts/metrics.py
diff options
context:
space:
mode:
Diffstat (limited to 'analysis/scripts/metrics.py')
-rw-r--r--analysis/scripts/metrics.py129
1 files changed, 129 insertions, 0 deletions
diff --git a/analysis/scripts/metrics.py b/analysis/scripts/metrics.py
new file mode 100644
index 00000000..932fbb7c
--- /dev/null
+++ b/analysis/scripts/metrics.py
@@ -0,0 +1,129 @@
+# -*- coding: utf-8 -*-
+
+"""
+Evaluation metrics
+
+Author: G.J.J. van den Burg
+Copyright (c) 2020 - The Alan Turing Institute
+License: See the LICENSE file.
+
+"""
+
+
+def true_positives(T, X, margin=5):
+ """Compute true positives without double counting
+ """
+ # make a copy so we don't affect the caller
+ X = set(list(X))
+ TP = set()
+ for tau in T:
+ close = [(abs(tau - x), x) for x in X if abs(tau - x) <= margin]
+ close.sort()
+ if not close:
+ continue
+ dist, xstar = close[0]
+ TP.add(tau)
+ X.remove(xstar)
+ return TP
+
+
+def f_measure(annotations, predictions, margin=5, alpha=0.5, return_PR=False):
+ """Compute the F-measure based on human annotations.
+
+ annotations : dict from user_id to iterable of CP locations
+ predictions : iterable of predicted CP locations
+ alpha : value for the F-measure, alpha=0.5 gives the F1-measure
+ return_PR : whether to return precision and recall too
+
+ Remember that all CP locations are 0-based!
+
+ """
+ # ensure 0 is in all the sets
+ Tks = {k + 1: set(annotations[uid]) for k, uid in enumerate(annotations)}
+ for Tk in Tks.values():
+ Tk.add(0)
+
+ X = set(predictions)
+ X.add(0)
+
+ Tstar = [tau for tau in Tk for Tk in Tks.values()]
+ Tstar = set(Tstar)
+
+ K = len(Tks)
+
+ P = len(true_positives(Tstar, X, margin=margin)) / len(X)
+
+ TPk = {k: true_positives(Tks[k], X, margin=margin) for k in Tks}
+ R = 1 / K * sum(len(TPk[k]) / len(Tks[k]) for k in Tks)
+
+ F = P * R / (alpha * R + (1 - alpha) * P)
+ if return_PR:
+ return F, P, R
+ return F
+
+
+def overlap(A, B):
+ """ Return the overlap (i.e. Jaccard index) of two sets """
+ return len(A.intersection(B)) / len(A.union(B))
+
+
+def partition_from_cps(locations, n_obs):
+ """ Return a list of sets that give a partition of the set [0, T-1], as
+ defined by the change point locations.
+
+ >>> partition_from_cps([], 5)
+ [{0, 1, 2, 3, 4}]
+ >>> partition_from_cps([3, 5], 8)
+ [{0, 1, 2}, {3, 4}, {5, 6, 7}]
+ >>> partition_from_cps([1,2,7], 8)
+ [{0}, {1}, {2, 3, 4, 5, 6}, {7}]
+ >>> partition_from_cps([0, 4], 6)
+ [{0, 1, 2, 3}, {4, 5}]
+ """
+ T = n_obs
+ partition = []
+ current = set()
+
+ all_cps = iter(sorted(set(locations)))
+ cp = next(all_cps, None)
+ for i in range(T):
+ if i == cp:
+ if current:
+ partition.append(current)
+ current = set()
+ cp = next(all_cps, None)
+ current.add(i)
+ partition.append(current)
+ return partition
+
+
+def cover_single(Sprime, S):
+ """Compute the covering of a segmentation S by a segmentation Sprime.
+
+ This follows equation (8) in Arbaleaz, 2010.
+ """
+ T = sum(map(len, Sprime))
+ assert T == sum(map(len, S))
+ C = 0
+ for R in S:
+ C += len(R) * max(overlap(R, Rprime) for Rprime in Sprime)
+ C /= T
+ return C
+
+
+def covering(annotations, predictions, n_obs):
+ """Compute the average segmentation covering against the human annotations.
+
+ annotations : dict from user_id to iterable of CP locations
+ predictions : iterable of predicted Cp locations
+ n_obs : number of observations in the series
+
+ """
+ Ak = {
+ k + 1: partition_from_cps(annotations[uid], n_obs)
+ for k, uid in enumerate(annotations)
+ }
+ pX = partition_from_cps(predictions, n_obs)
+
+ Cs = [cover_single(pX, Ak[k]) for k in Ak]
+ return sum(Cs) / len(Cs)