diff options
| author | Gertjan van den Burg <gertjanvandenburg@gmail.com> | 2020-03-12 14:33:57 +0000 |
|---|---|---|
| committer | Gertjan van den Burg <gertjanvandenburg@gmail.com> | 2020-03-12 14:33:57 +0000 |
| commit | 7ef8f6e58990fc069cccc71ed6564e8c639ea4fc (patch) | |
| tree | 9e7662a34b7d0c1f1c5d9faf6d7d6ea8672f6410 /analysis/scripts/metrics.py | |
| download | TCPDBench-7ef8f6e58990fc069cccc71ed6564e8c639ea4fc.tar.gz TCPDBench-7ef8f6e58990fc069cccc71ed6564e8c639ea4fc.zip | |
initial commit
Diffstat (limited to 'analysis/scripts/metrics.py')
| -rw-r--r-- | analysis/scripts/metrics.py | 129 |
1 files changed, 129 insertions, 0 deletions
diff --git a/analysis/scripts/metrics.py b/analysis/scripts/metrics.py new file mode 100644 index 00000000..932fbb7c --- /dev/null +++ b/analysis/scripts/metrics.py @@ -0,0 +1,129 @@ +# -*- coding: utf-8 -*- + +""" +Evaluation metrics + +Author: G.J.J. van den Burg +Copyright (c) 2020 - The Alan Turing Institute +License: See the LICENSE file. + +""" + + +def true_positives(T, X, margin=5): + """Compute true positives without double counting + """ + # make a copy so we don't affect the caller + X = set(list(X)) + TP = set() + for tau in T: + close = [(abs(tau - x), x) for x in X if abs(tau - x) <= margin] + close.sort() + if not close: + continue + dist, xstar = close[0] + TP.add(tau) + X.remove(xstar) + return TP + + +def f_measure(annotations, predictions, margin=5, alpha=0.5, return_PR=False): + """Compute the F-measure based on human annotations. + + annotations : dict from user_id to iterable of CP locations + predictions : iterable of predicted CP locations + alpha : value for the F-measure, alpha=0.5 gives the F1-measure + return_PR : whether to return precision and recall too + + Remember that all CP locations are 0-based! + + """ + # ensure 0 is in all the sets + Tks = {k + 1: set(annotations[uid]) for k, uid in enumerate(annotations)} + for Tk in Tks.values(): + Tk.add(0) + + X = set(predictions) + X.add(0) + + Tstar = [tau for tau in Tk for Tk in Tks.values()] + Tstar = set(Tstar) + + K = len(Tks) + + P = len(true_positives(Tstar, X, margin=margin)) / len(X) + + TPk = {k: true_positives(Tks[k], X, margin=margin) for k in Tks} + R = 1 / K * sum(len(TPk[k]) / len(Tks[k]) for k in Tks) + + F = P * R / (alpha * R + (1 - alpha) * P) + if return_PR: + return F, P, R + return F + + +def overlap(A, B): + """ Return the overlap (i.e. Jaccard index) of two sets """ + return len(A.intersection(B)) / len(A.union(B)) + + +def partition_from_cps(locations, n_obs): + """ Return a list of sets that give a partition of the set [0, T-1], as + defined by the change point locations. + + >>> partition_from_cps([], 5) + [{0, 1, 2, 3, 4}] + >>> partition_from_cps([3, 5], 8) + [{0, 1, 2}, {3, 4}, {5, 6, 7}] + >>> partition_from_cps([1,2,7], 8) + [{0}, {1}, {2, 3, 4, 5, 6}, {7}] + >>> partition_from_cps([0, 4], 6) + [{0, 1, 2, 3}, {4, 5}] + """ + T = n_obs + partition = [] + current = set() + + all_cps = iter(sorted(set(locations))) + cp = next(all_cps, None) + for i in range(T): + if i == cp: + if current: + partition.append(current) + current = set() + cp = next(all_cps, None) + current.add(i) + partition.append(current) + return partition + + +def cover_single(Sprime, S): + """Compute the covering of a segmentation S by a segmentation Sprime. + + This follows equation (8) in Arbaleaz, 2010. + """ + T = sum(map(len, Sprime)) + assert T == sum(map(len, S)) + C = 0 + for R in S: + C += len(R) * max(overlap(R, Rprime) for Rprime in Sprime) + C /= T + return C + + +def covering(annotations, predictions, n_obs): + """Compute the average segmentation covering against the human annotations. + + annotations : dict from user_id to iterable of CP locations + predictions : iterable of predicted Cp locations + n_obs : number of observations in the series + + """ + Ak = { + k + 1: partition_from_cps(annotations[uid], n_obs) + for k, uid in enumerate(annotations) + } + pX = partition_from_cps(predictions, n_obs) + + Cs = [cover_single(pX, Ak[k]) for k in Ak] + return sum(Cs) / len(Cs) |
