# -*- coding: utf-8 -*- """ Evaluation metrics Author: G.J.J. van den Burg Copyright (c) 2020 - The Alan Turing Institute License: See the LICENSE file. """ def true_positives(T, X, margin=5): """Compute true positives without double counting """ # make a copy so we don't affect the caller X = set(list(X)) TP = set() for tau in T: close = [(abs(tau - x), x) for x in X if abs(tau - x) <= margin] close.sort() if not close: continue dist, xstar = close[0] TP.add(tau) X.remove(xstar) return TP def f_measure(annotations, predictions, margin=5, alpha=0.5, return_PR=False): """Compute the F-measure based on human annotations. annotations : dict from user_id to iterable of CP locations predictions : iterable of predicted CP locations alpha : value for the F-measure, alpha=0.5 gives the F1-measure return_PR : whether to return precision and recall too Remember that all CP locations are 0-based! """ # ensure 0 is in all the sets Tks = {k + 1: set(annotations[uid]) for k, uid in enumerate(annotations)} for Tk in Tks.values(): Tk.add(0) X = set(predictions) X.add(0) Tstar = [tau for tau in Tk for Tk in Tks.values()] Tstar = set(Tstar) K = len(Tks) P = len(true_positives(Tstar, X, margin=margin)) / len(X) TPk = {k: true_positives(Tks[k], X, margin=margin) for k in Tks} R = 1 / K * sum(len(TPk[k]) / len(Tks[k]) for k in Tks) F = P * R / (alpha * R + (1 - alpha) * P) if return_PR: return F, P, R return F def overlap(A, B): """ Return the overlap (i.e. Jaccard index) of two sets """ return len(A.intersection(B)) / len(A.union(B)) def partition_from_cps(locations, n_obs): """ Return a list of sets that give a partition of the set [0, T-1], as defined by the change point locations. >>> partition_from_cps([], 5) [{0, 1, 2, 3, 4}] >>> partition_from_cps([3, 5], 8) [{0, 1, 2}, {3, 4}, {5, 6, 7}] >>> partition_from_cps([1,2,7], 8) [{0}, {1}, {2, 3, 4, 5, 6}, {7}] >>> partition_from_cps([0, 4], 6) [{0, 1, 2, 3}, {4, 5}] """ T = n_obs partition = [] current = set() all_cps = iter(sorted(set(locations))) cp = next(all_cps, None) for i in range(T): if i == cp: if current: partition.append(current) current = set() cp = next(all_cps, None) current.add(i) partition.append(current) return partition def cover_single(Sprime, S): """Compute the covering of a segmentation S by a segmentation Sprime. This follows equation (8) in Arbaleaz, 2010. """ T = sum(map(len, Sprime)) assert T == sum(map(len, S)) C = 0 for R in S: C += len(R) * max(overlap(R, Rprime) for Rprime in Sprime) C /= T return C def covering(annotations, predictions, n_obs): """Compute the average segmentation covering against the human annotations. annotations : dict from user_id to iterable of CP locations predictions : iterable of predicted Cp locations n_obs : number of observations in the series """ Ak = { k + 1: partition_from_cps(annotations[uid], n_obs) for k, uid in enumerate(annotations) } pX = partition_from_cps(predictions, n_obs) Cs = [cover_single(pX, Ak[k]) for k in Ak] return sum(Cs) / len(Cs)