"""multi-producer, multi-consumer queue with items optionally in "buckets"; only allows one item from a given bucket to be obtained at a time. A subclass of the standard Python library queue. $Id: bucketqueue.py,v 1.1.1.1 2004/10/10 23:37:05 poster Exp $""" from time import time as _time, sleep as _sleep import sets try: import thread except ImportError: import dummy_thread as thread from Queue import Empty, Full, Queue __all__ = ['BucketQueue'] class BucketQueue(Queue): """A queue that only allows a single item from a given bucket to be obtained at a time. Within a bucket, order is guaranteed. Between unblocked buckets, order is also honored. """ def __init__(self, maxsize=0): # in Queue, fsema presumably stands for (not) full semaphore, and esema # stands for (not) empty semaphore. For us, esema will mean "primed" # or "available" semaphore. Queue.__init__(self, maxsize) self._bucketsema = {} # bucket name to bucket semaphore self._threadbucket = {} # thread id to bucket name def primed(self): """Return True if the queue is primed, False otherwise (not reliable!). """ self.mutex.acquire() try: return self._primed() finally: self.mutex.release() def available(self, id=None): """Return True if there is currently an item available for this particular thread""" if id is None: id = thread.get_ident() self.mutex.acquire() try: return self._primed(self._threadbucket.get(id)) finally: self.mutex.release() def makeBucket(self, name, silent=False): if name is None: raise ValueError("Bucket name cannot be None") if self._bucketsema.has_key(name): if not silent: raise ValueError("Bucket already exists", name) else: self._bucketsema[name] = thread.allocate_lock() def put(self, item, block=True, timeout=None, bucket=None): """Put an item into the given bucket of the queue. If optional args 'block' is true and 'timeout' is None (the default), block if necessary until a free slot is available. If 'timeout' is a positive number, it blocks at most 'timeout' seconds and raises the Full exception if no free slot was available within that time. Otherwise ('block' is false), put an item on the queue if a free slot is immediately available, else raise the Full exception ('timeout' is ignored in that case). """ if bucket is not None and self._bucketsema.get(bucket) is None: raise ValueError("bucket does not exist; create with makeBucket", bucket) if block: if timeout is None: # blocking, w/o timeout, i.e. forever self.fsema.acquire() elif timeout >= 0: # waiting max. 'timeout' seconds. # this code snipped is from threading.py: _Event.wait(): # Balancing act: We can't afford a pure busy loop, so we # have to sleep; but if we sleep the whole timeout time, # we'll be unresponsive. The scheme here sleeps very # little at first, longer as time goes on, but never longer # than 20 times per second (or the timeout time remaining). delay = 0.0005 # 500 us -> initial delay of 1 ms endtime = _time() + timeout while True: if self.fsema.acquire(0): break remaining = endtime - _time() if remaining <= 0: #time is over and no slot was free raise Full delay = min(delay * 2, remaining, .05) _sleep(delay) #reduce CPU usage by using a sleep else: raise ValueError("'timeout' must be a positive number") elif not self.fsema.acquire(0): raise Full self.mutex.acquire() release_fsema = True try: was_primed = self._primed() self._put((item, bucket)) # If we fail before here, the empty state has # not changed, so we can skip the release of esema if not was_primed and self._primed(): self.esema.release() # If we fail before here, the queue can not be full, so # release_full_sema remains True release_fsema = not self._full() finally: # Catching system level exceptions here (RecursionDepth, # OutOfMemory, etc) - so do as little as possible in terms # of Python calls. if release_fsema: self.fsema.release() self.mutex.release() def put_nowait(self, item, bucket=None): """Put an item into the queue without blocking. Only enqueue the item if a free slot is immediately available. Otherwise raise the Full exception. """ return self.put(item, False, bucket=bucket) def releaseBucketByThreadId(self, id): return self._releaseBucket(id=id) def releaseBucket(self, bucket=None): """Release bucket. If bucket is None, release the bucket for this thread, if any.""" return self._releaseBucket(bucket) def _releaseBucket(self, bucket=None, id=None): self.mutex.acquire() try: if bucket is None: if id is None: id = thread.get_ident() bucket = self._threadbucket.pop(id, None) else: for tid, b in self._threadbucket.items(): if b == bucket: del self._threadbucket[tid] break else: bucket = None if bucket is not None: was_primed = self._primed() self._bucketsema[bucket].release() if not was_primed and self._primed(): self.esema.release() finally: self.mutex.release() return bucket def get(self, block=True, timeout=None): """Remove and return an item from the queue. If optional args 'block' is true and 'timeout' is None (the default), block if necessary until an item is available. If 'timeout' is a positive number, it blocks at most 'timeout' seconds and raises the Empty exception if no item was available within that time. Otherwise ('block' is false), return an item if one is immediately available, else raise the Empty exception ('timeout' is ignored in that case). """ self.releaseBucket() if block: if timeout is None: # blocking, w/o timeout, i.e. forever self.esema.acquire() elif timeout >= 0: # waiting max. 'timeout' seconds. # this code snipped is from threading.py: _Event.wait(): # Balancing act: We can't afford a pure busy loop, so we # have to sleep; but if we sleep the whole timeout time, # we'll be unresponsive. The scheme here sleeps very # little at first, longer as time goes on, but never longer # than 20 times per second (or the timeout time remaining). delay = 0.0005 # 500 us -> initial delay of 1 ms endtime = _time() + timeout while 1: if self.esema.acquire(0): break remaining = endtime - _time() if remaining <= 0: #time is over and no element arrived raise Empty # XXX or Unavailable if not primed? delay = min(delay * 2, remaining, .05) _sleep(delay) #reduce CPU usage by using a sleep else: raise ValueError("'timeout' must be a positive number") elif not self.esema.acquire(0): raise Empty # XXX or Unavailable if not primed? self.mutex.acquire() release_esema = True try: was_full = self._full() ix, bucket, bucketlock, item = self._findNext() try: if bucket is not None: bucketlock.acquire() self._threadbucket[thread.get_ident()] = bucket self._removeIx(ix) except: # we need to clean up if bucketlock is not None: bucketlock.release() self._threadbucket.pop(thread.get_ident(), None) raise # If we fail before here, the full state has # not changed, so we can skip the release of fsema if was_full: self.fsema.release() # Failure means empty state also unchanged - release_esema # remains True. release_esema = self._primed() finally: if release_esema: self.esema.release() self.mutex.release() return item # Override if desired (but others are more useful, below). These are # onlyt called with the appropriate locks held. def _findNext(self, current=None): """return ix, bucket lock, and item of next open, where ix is the value that _removeIx needs to remove the item from the queue. Raise LookupError if no next item is available.""" buckets = sets.Set() for ix, bucket, item in self._enumerateItems(): if bucket is None: return ix, bucket, None, item if bucket not in buckets: lock = self._bucketsema[bucket] if current==bucket or not lock.locked(): return ix, bucket, lock, item buckets.add(bucket) raise LookupError("No next value available") def _primed(self, current=None): try: self._findNext(current) except LookupError: return False else: return True # Override these methods to implement other queue organizations # These will only be called with appropriate locks held def _enumerateItems(self): """enumerate ix, bucket name, and item of each item in the queue in preferred order (defaults to FIFO). ix is whatever value that _removeIx needs to remove the item from the queue.""" for ix, (item, bucket) in enumerate(self.queue): yield ix, bucket, item def _removeIx(self, ix): """Given an ix of an item provided by _enumerateItems or _findNext, remove the associated entry from the queue""" del self.queue[ix] # _get, from the original Queue implementation, is not used: see # _findNext, _enumerateItems, and _removeIx for methods that perform # elements of the original task of _get. def _get(self): pass