1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 """
18 Algorithms for low latency clustering of event-like datasets.
19 """
20 __author__ = "Leo Singer <leo.singer@ligo.org>"
21
22
23 import array
27 """A max heap that retains the order in which samples were added to it.
28 The history capacity grows dynamically by expanding to the smallest power of
29 two required to hold the contents.
30
31 This is essentially an ordinary max heap, storing values and their sample
32 offsets from the originating stream, coupled with a dynamically sized ring
33 buffer storing the heap ranks of each sample.
34
35 This procedure should take about O(1) time per input sample on average."""
36
37
38
39
41 if value is None:
42 value = lambda x: x
43 self._value = value
44 self._history = array.array('I', (0, 0))
45 self._offset = 0
46 self._heap = []
47
49
50
51 old_index = self._history[(self._offset - len(self._heap)) % len(self._history)]
52
53
54 new_index = len(self._heap) - 1
55
56
57 if new_index != old_index:
58 self._swap(old_index, new_index)
59 self._sift(old_index)
60
61
62 self._heap.pop()
63
65 while len(self._heap) > 0 and predicate(self._heap[self._history[(self._offset - len(self._heap)) % len(self._history)]][2]):
66 self._drop_oldest()
67
69
70
71 if len(self._heap) == len(self._history):
72
73 end_index = self._offset % len(self._history)
74 begin_index = end_index - len(self._heap)
75
76 if begin_index >= 0:
77 old_history = self._history[begin_index:end_index]
78 else:
79 old_history = self._history[begin_index:] + self._history[:end_index]
80
81
82 new_capacity = len(self._history) << 1
83 self._history = array.array('I', (0,) * new_capacity)
84
85 end_index = self._offset % len(self._history)
86 begin_index = end_index - len(self._heap)
87 if begin_index >= 0:
88 self._history[begin_index:end_index] = old_history
89 else:
90 self._history[begin_index:] = old_history[:-begin_index]
91 self._history[:end_index] = old_history[-end_index:]
92
93
94 k = len(self._heap)
95 self._history[self._offset % len(self._history)] = k
96 self._heap.append( (self._value(x), self._offset, x) )
97 self._offset += 1
98 self._sift_up(k)
99
100 @property
102 """Return the maximum element in the history (the 'root' of the binary
103 tree that is the heap)."""
104 return self._heap[0][2]
105
106 @property
108 end_index = self._offset % len(self._history)
109 begin_index = end_index - len(self._heap)
110
111 if begin_index >= 0:
112 old_history = self._history[begin_index:end_index]
113 else:
114 old_history = self._history[begin_index % len(self._history):] + self._history[:end_index]
115
116 return old_history
117
118 @property
120 return len(self._heap)
121
123 return all(self._cmp(i, (i - 1) / 2) == -1 for i in range(1, len(self._heap)))
124
126 offset_i = self._heap[i][1] % len(self._history)
127 offset_j = self._heap[j][1] % len(self._history)
128 self._history[offset_i], self._history[offset_j] = j, i
129 self._heap[i], self._heap[j] = self._heap[j], self._heap[i]
130
131 - def _cmp(self, i, j):
132 return cmp(self._heap[i][0], self._heap[j][0])
133
135 while i > 0:
136 j = (i - 1) / 2
137 if self._cmp(i, j) == -1:
138 break
139 self._swap(i, j)
140 i = j
141
143 n = len(self._heap)
144 while True:
145 j = 2 * i + 1
146 k = j + 1
147 if k < n - 1:
148 if self._cmp(i, j) == -1:
149 if self._cmp(j, k) == -1:
150 self._swap(i, k)
151 i = k
152 else:
153 self._swap(i, j)
154 i = j
155 elif self._cmp(i, k) == -1:
156 self._swap(i, k)
157 i = k
158 else:
159 break
160 elif j < n - 1 and self._cmp(i, j) == -1:
161 self._swap(i, j)
162 i = j
163 else:
164 break
165
167 if i == 0:
168 self._sift_down(i)
169 else:
170 parent = (i - 1) / 2
171 if self._heap[i] > self._heap[parent]:
172 self._sift_up(i)
173 else:
174 self._sift_down(i)
175
178 """Provides one-dimensional time symmetric clustering. Every input that is
179 greater than all samples that come before or after it within a certain window
180 is reported.
181
182 key is a function that, when applied to the items being clustered, yields a
183 monotonically increasing quantity that is treated as the 'time' of the item.
184
185 value is a function that, when applied to the items being clustered, is the
186 quantity that is maximized -- for example, SNR.
187
188 Note: This implementation is tuned for event-like datasets in which sample
189 times are widely and irregularly spaced. For dense, regularly sampled data,
190 there is a conceptually identical but practically much simpler algorithm.
191
192 The cluster() and flush() routines both return generators. Both may be
193 called multiple times. History is retained between calls."""
194
195 - def __init__(self, key, value, window):
196 self._key = key
197 self._delta = window
198 self._2delta = 2 * window
199 self._heap = SlidingMaxHeap(lambda x: value(x[1]))
200 self._min_key = None
201
202 - def flush(self, max_key):
203 """Proceed with clustering items in the history under the assumption that
204 there are no further items with a key less than max_key. This method
205 is used internally, but the user may call this as well if information
206 about the absence of triggers is available independently from the
207 triggers. This may be useful in low latency applications."""
208 if self._min_key is None:
209 self._min_key = max_key
210 return
211
212 _2delta = self._2delta
213 _delta = self._delta
214 min_key = self._min_key
215
216 if max_key < min_key:
217 raise ValueError("Input keys are not increasing monotonically.")
218
219 while max_key - min_key >= _2delta:
220 if self._heap.count > 0:
221 root_key, root = self._heap.root
222 if root_key > max_key - _delta:
223 min_key = root_key - _delta
224 elif root_key < min_key + _delta:
225 min_key = root_key
226 else:
227 yield root
228 min_key = root_key
229 self._heap.drop_while(lambda x: x[0] <= min_key)
230 else:
231 min_key = max_key - _delta
232
233 self._min_key = min_key
234
236 key = self._key
237
238 for item in iterable:
239 max_key = key(item)
240 for clustered_item in self.flush(max_key):
241 yield clustered_item
242 self._heap.append((max_key, item))
243
244
245 -def clustered(iterable, key, value, window):
246 """Utility function for clustering an iterable without having to instantiate
247 a ClusterBuilder object."""
248 cluster_builder = ClusterBuilder(key, value, window)
249 for clustered_item in cluster_builder.cluster(iterable):
250 yield clustered_item
251