Source code for opsoro.stoppable_thread

import threading
import time


[docs]class StoppableThread(threading.Thread): """ Thread class with a stop() method. The thread itself has to check regularly for the stopped() condition. """
[docs] def __init__(self, *args, **kwargs): super(StoppableThread, self).__init__(*args, **kwargs) # Start thread as soon as it is created super(StoppableThread, self).start() self._stop = threading.Event()
# def start(self): # self._stop.clear()
[docs] def stop(self): self._stop.set()
[docs] def stopped(self): return self._stop.isSet()
[docs] def sleep(self, secs): if self.stopped(): return remaining = secs delta = 0.2 # Sleep in intervals of delta while remaining > delta: time.sleep(delta) remaining -= delta # Return immediately if stopped if self.stopped(): return # Sleep remaining time and return time.sleep(remaining)