Package pylal :: Module low_latency_clustering
[hide private]
[frames] | no frames]

Source Code for Module pylal.low_latency_clustering

  1  # Copyright (C) 2010  Leo Singer 
  2  # 
  3  # This program is free software; you can redistribute it and/or modify it 
  4  # under the terms of the GNU General Public License as published by the 
  5  # Free Software Foundation; either version 2 of the License, or (at your 
  6  # option) any later version. 
  7  # 
  8  # This program is distributed in the hope that it will be useful, but 
  9  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 10  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General 
 11  # Public License for more details. 
 12  # 
 13  # You should have received a copy of the GNU General Public License along 
 14  # with this program; if not, write to the Free Software Foundation, Inc., 
 15  # 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA. 
 16  # 
 17  """ 
 18  Algorithms for low latency clustering of event-like datasets. 
 19  """ 
 20  __author__ = "Leo Singer <leo.singer@ligo.org>" 
 21   
 22   
 23  import array 
24 25 26 -class SlidingMaxHeap(object):
27 """A max heap that retains the order in which samples were added to it. 28 The history capacity grows dynamically by expanding to the smallest power of 29 two required to hold the contents. 30 31 This is essentially an ordinary max heap, storing values and their sample 32 offsets from the originating stream, coupled with a dynamically sized ring 33 buffer storing the heap ranks of each sample. 34 35 This procedure should take about O(1) time per input sample on average.""" 36 37 # ring buffer stores heap ranks 38 # heap stores lists of the form [value, offset, item] 39
40 - def __init__(self, value = None):
41 if value is None: 42 value = lambda x: x 43 self._value = value 44 self._history = array.array('I', (0, 0)) 45 self._offset = 0 46 self._heap = []
47
48 - def _drop_oldest(self):
49 50 # Get heap index of oldest sample. 51 old_index = self._history[(self._offset - len(self._heap)) % len(self._history)] 52 53 # Get the heap index of the lower-right leaf of the heap. 54 new_index = len(self._heap) - 1 55 56 # _swap the lower-right leaf with the oldest sample, and restore the heap invariant. 57 if new_index != old_index: 58 self._swap(old_index, new_index) 59 self._sift(old_index) 60 61 # Remove the oldest sample, which is now at the lower-right leaf. 62 self._heap.pop()
63
64 - def drop_while(self, predicate):
65 while len(self._heap) > 0 and predicate(self._heap[self._history[(self._offset - len(self._heap)) % len(self._history)]][2]): 66 self._drop_oldest()
67
68 - def append(self, x):
69 # If adding this element would overwrite the tail of the ring buffer, then 70 # expand the ring buffer. 71 if len(self._heap) == len(self._history): 72 73 end_index = self._offset % len(self._history) 74 begin_index = end_index - len(self._heap) 75 76 if begin_index >= 0: 77 old_history = self._history[begin_index:end_index] 78 else: 79 old_history = self._history[begin_index:] + self._history[:end_index] 80 81 # Allocate new ring buffer with twice the capacity as before. 82 new_capacity = len(self._history) << 1 83 self._history = array.array('I', (0,) * new_capacity) 84 85 end_index = self._offset % len(self._history) 86 begin_index = end_index - len(self._heap) 87 if begin_index >= 0: 88 self._history[begin_index:end_index] = old_history 89 else: 90 self._history[begin_index:] = old_history[:-begin_index] 91 self._history[:end_index] = old_history[-end_index:] 92 93 # Add the new datum to the ring buffer and the heap. 94 k = len(self._heap) 95 self._history[self._offset % len(self._history)] = k 96 self._heap.append( (self._value(x), self._offset, x) ) 97 self._offset += 1 98 self._sift_up(k)
99 100 @property
101 - def root(self):
102 """Return the maximum element in the history (the 'root' of the binary 103 tree that is the heap).""" 104 return self._heap[0][2]
105 106 @property
107 - def history(self):
108 end_index = self._offset % len(self._history) 109 begin_index = end_index - len(self._heap) 110 111 if begin_index >= 0: 112 old_history = self._history[begin_index:end_index] 113 else: 114 old_history = self._history[begin_index % len(self._history):] + self._history[:end_index] 115 116 return old_history
117 118 @property
119 - def count(self):
120 return len(self._heap)
121
122 - def is_heap(self):
123 return all(self._cmp(i, (i - 1) / 2) == -1 for i in range(1, len(self._heap)))
124
125 - def _swap(self, i, j):
126 offset_i = self._heap[i][1] % len(self._history) 127 offset_j = self._heap[j][1] % len(self._history) 128 self._history[offset_i], self._history[offset_j] = j, i 129 self._heap[i], self._heap[j] = self._heap[j], self._heap[i]
130
131 - def _cmp(self, i, j):
132 return cmp(self._heap[i][0], self._heap[j][0])
133
134 - def _sift_up(self, i):
135 while i > 0: 136 j = (i - 1) / 2 137 if self._cmp(i, j) == -1: 138 break 139 self._swap(i, j) 140 i = j
141
142 - def _sift_down(self, i):
143 n = len(self._heap) 144 while True: 145 j = 2 * i + 1 146 k = j + 1 147 if k < n - 1: 148 if self._cmp(i, j) == -1: 149 if self._cmp(j, k) == -1: 150 self._swap(i, k) 151 i = k 152 else: 153 self._swap(i, j) 154 i = j 155 elif self._cmp(i, k) == -1: 156 self._swap(i, k) 157 i = k 158 else: 159 break 160 elif j < n - 1 and self._cmp(i, j) == -1: 161 self._swap(i, j) 162 i = j 163 else: 164 break
165
166 - def _sift(self, i):
167 if i == 0: 168 self._sift_down(i) 169 else: 170 parent = (i - 1) / 2 171 if self._heap[i] > self._heap[parent]: 172 self._sift_up(i) 173 else: 174 self._sift_down(i)
175
176 177 -class ClusterBuilder(object):
178 """Provides one-dimensional time symmetric clustering. Every input that is 179 greater than all samples that come before or after it within a certain window 180 is reported. 181 182 key is a function that, when applied to the items being clustered, yields a 183 monotonically increasing quantity that is treated as the 'time' of the item. 184 185 value is a function that, when applied to the items being clustered, is the 186 quantity that is maximized -- for example, SNR. 187 188 Note: This implementation is tuned for event-like datasets in which sample 189 times are widely and irregularly spaced. For dense, regularly sampled data, 190 there is a conceptually identical but practically much simpler algorithm. 191 192 The cluster() and flush() routines both return generators. Both may be 193 called multiple times. History is retained between calls.""" 194
195 - def __init__(self, key, value, window):
196 self._key = key 197 self._delta = window 198 self._2delta = 2 * window 199 self._heap = SlidingMaxHeap(lambda x: value(x[1])) 200 self._min_key = None
201
202 - def flush(self, max_key):
203 """Proceed with clustering items in the history under the assumption that 204 there are no further items with a key less than max_key. This method 205 is used internally, but the user may call this as well if information 206 about the absence of triggers is available independently from the 207 triggers. This may be useful in low latency applications.""" 208 if self._min_key is None: 209 self._min_key = max_key 210 return 211 212 _2delta = self._2delta 213 _delta = self._delta 214 min_key = self._min_key 215 216 if max_key < min_key: 217 raise ValueError("Input keys are not increasing monotonically.") 218 219 while max_key - min_key >= _2delta: 220 if self._heap.count > 0: 221 root_key, root = self._heap.root 222 if root_key > max_key - _delta: 223 min_key = root_key - _delta 224 elif root_key < min_key + _delta: 225 min_key = root_key 226 else: 227 yield root 228 min_key = root_key 229 self._heap.drop_while(lambda x: x[0] <= min_key) 230 else: 231 min_key = max_key - _delta 232 233 self._min_key = min_key
234
235 - def cluster(self, iterable):
236 key = self._key 237 238 for item in iterable: 239 max_key = key(item) 240 for clustered_item in self.flush(max_key): 241 yield clustered_item 242 self._heap.append((max_key, item))
243
244 245 -def clustered(iterable, key, value, window):
246 """Utility function for clustering an iterable without having to instantiate 247 a ClusterBuilder object.""" 248 cluster_builder = ClusterBuilder(key, value, window) 249 for clustered_item in cluster_builder.cluster(iterable): 250 yield clustered_item
251