Work at SourceForge, help us to make it a better place! We have an immediate need for a Support Technician in our San Francisco or Denver office.

Close

[a37b2b]: plugins / sched.py Maximize Restore History

Download this file

sched.py    284 lines (231 with data), 9.7 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
'''
This module is derived from the sched.py included with Python 2.6 . I have
modified it and am re-licensing it under the GPL v2, as is allowed by the PSF
license:
http://svn.python.org/view/python/trunk/LICENSE?revision=68142&view=markup
This software is licensed under the GPL (GNU General Public License) version 2
as it appears here: http://www.gnu.org/copyleft/gpl.html
It is also included with this archive as `gpl.txt <gpl.txt>`_.
'''
"""A generally useful event scheduler class.
Each instance of this class manages its own queue.
No multi-threading is implied; you are supposed to hack that
yourself, or use a single instance per application.
Each instance is parametrized with two functions, one that is
supposed to return the current time, one that is supposed to
implement a delay. You can implement real-time scheduling by
substituting time and sleep from built-in module time, or you can
implement simulated time by writing your own functions. This can
also be used to integrate scheduling with STDWIN events; the delay
function is allowed to modify the queue. Time can be expressed as
integers or floating point numbers, as long as it is consistent.
Events are specified by tuples (time, priority, action, argument).
As in UNIX, lower priority numbers mean higher priority; in this
way the queue can be maintained as a priority queue. Execution of the
event means calling the action function, passing it the argument
sequence in "argument" (remember that in Python, multiple function
arguments are be packed in a sequence).
The action function may be an instance method so it
has another way to reference private data (besides global variables).
"""
# XXX The timefunc and delayfunc should have been defined as methods
# XXX so you can define new kinds of schedulers using subclassing
# XXX instead of having to define a module or class just to hold
# XXX the global state of your particular time and delay functions.
from __future__ import with_statement
import heapq
__all__ = ["scheduler"]
class Event(object):
attrs = 'time priority action argument'.split()
__slots__ = '_values', '_cancelled'
def __init__(self, time, priority, action, argument):
self._values = (time, priority, action, argument)
self._cancelled = 0
def __cmp__(self, other):
return cmp(self._values, other._values) or cmp(id(self), id(other))
def __iter__(self):
for i in self._values:
yield i
def __getattr__(self, key):
if key == '_cancelled':
return self._cancelled
return self._values[self.attrs.index(key)]
def _get_cancelled(self):
return self._cancelled
def _set_cancelled(self, value):
if not value:
if self._cancelled:
raise Exception("You can't un-cancel an event.")
return
self._cancelled = 1
cancelled = property(_get_cancelled, _set_cancelled)
del _get_cancelled, _set_cancelled
def __repr__(self):
return '%s(%s, %s, %s, %s)'%(self.__class__.__name__, self.time, self.priority, self.action, self.argument)
def LocalSynchronize(method):
def Call(self, *args, **kwargs):
# slight performance improvement over using FakeLock all the time for all
# method calls
if self.lock is FakeLock:
return method(self, *args, **kwargs)
else:
with self.lock:
return method(self, *args, **kwargs)
Call.__name__ = method.__name__
return Call
class FakeLock(object):
def __enter__(self):
return
def __exit__(self, type, value, traceback):
return
def __call__(self):
return self
FakeLock = FakeLock()
class scheduler(object):
def __init__(self, timefunc, delayfunc, locked=False):
"""Initialize a new instance, passing the time and delay
functions.
If the optional locked argument (defaulting to false) is true, this
scheduler will use a threading.RLock() instance to guarantee internal
consistancy.
"""
self._queue = []
self.timefunc = timefunc
self.delayfunc = delayfunc
self.cancelled = 0
if not locked:
self.lock = FakeLock
else:
import threading
self.lock = threading.RLock()
@LocalSynchronize
def enterabs(self, time, priority, action, argument):
"""Enter a new event in the queue at an absolute time.
Returns an ID for the event which can be used to remove it,
if necessary.
"""
event = Event(time, priority, action, argument)
heapq.heappush(self._queue, event)
return event # The ID
@LocalSynchronize
def enter(self, delay, priority, action, argument):
"""A variant that specifies the time as a relative time.
This is actually the more commonly used interface.
"""
time = self.timefunc() + delay
return self.enterabs(time, priority, action, argument)
@LocalSynchronize
def peek(self):
"""Will return the first item in the queue in a properly synchronized
manner.
An empty queue will raise an IndexError.
"""
return self._queue[0]
@LocalSynchronize
def cancel(self, event):
"""Remove an event from the queue.
This must be presented the ID as returned by enter().
"""
self.cancelled += 1
event.cancelled = 1
if self._ok_clear():
self._clear_cancelled()
@LocalSynchronize
def empty(self):
"""Check whether the queue is empty."""
return not self._queue
def _ok_clear(self):
"""Clear cancelled events when the queue is at least 128 events and more
than half of the events are cancelled, or clear regardless of the event
count when cancelled events are at least 7/8 of all events in the queue.
"""
lq = len(self._queue)
return (lq > 128 and self.cancelled > (lq>>1)) or (self.cancelled<<3 > lq*7)
@LocalSynchronize
def _clear_cancelled(self):
"""Clears all cancelled events and re-heapifies the schedule.
Note the following:
>>> a = sched.queue
>>> sched._clear_cancelled()
>>> b = sched.queue
After the above is executed, a and b may not be the same!
"""
self._queue[:] = [i for i in self._queue if not i.cancelled]
heapq.heapify(self._queue)
self.cancelled = 0
def run(self, anow=None):
"""Execute events until the queue is empty.
When there is a positive delay until the first event, the
delay function is called and the event is left in the queue;
otherwise, the event is removed from the queue and executed
(its action function is called, passing it the argument). If
the delay function returns prematurely, it is simply
restarted.
It is legal for both the delay function and the action
function to to modify the queue or to raise an exception;
exceptions are not caught but the scheduler's state remains
well-defined so run() may be called again.
A questionable hack is added to allow other threads to run:
just after an event is executed, a delay of 0 is executed, to
avoid monopolizing the CPU when other threads are also
runnable.
"""
# localize variable access to minimize overhead
# and to improve thread safety
q = self._queue
delayfunc = self.delayfunc
timefunc = self.timefunc
pop = heapq.heappop
push = heapq.heappush
peek = self.peek
lock = self.lock
if self._ok_clear():
self._clear_cancelled()
while q and ((anow is None) or (q[0].time <= anow)):
time, priority, action, argument = checked_event = q[0]
now = timefunc()
if now < time:
delayfunc(time - now)
else:
with lock:
event = pop(q)
# Verify that the event was not removed or altered
# by another thread after we last looked at q[0].
if event is checked_event:
if event.cancelled:
with lock:
self.cancelled -= 1
else:
action(*argument)
delayfunc(0) # Let other threads run
else:
with lock:
push(q, event)
@LocalSynchronize
def getqueue(self, now=None, copy=True, auto_clear=True):
"""An ordered list of upcoming events.
Events are Event objects with fields for:
time, priority, action, arguments
"""
# Use heapq to sort the queue rather than using 'sorted(self._queue)'.
# With heapq, two events scheduled at the same time will show in
# the actual order they would be retrieved.
if copy:
events = self._queue[:]
else:
events = self._queue
if auto_clear and self._ok_clear():
self._clear_cancelled()
out = []
pop = heapq.heappop
while events and ((now is None) or (events[0].time <= now)):
event = pop(events)
if not event.cancelled:
out.append(event)
elif not copy: # the event was cancelled, and this is not a copy
self.cancelled -= 1
return out
@property
@LocalSynchronize
def queue(self):
return self.getqueue()