klio.utils
General utilities for managing a pipeline.
klio.utils.
ThreadLimiter
A context manager to limit active threads for a block of code.
Threads aren’t limited directly; threads are limited indirectly by maintaining a semaphore (an atomic counter).
Example usage:
# No arg to use the default limit (CPU count of host/worker): thread_limiter = ThreadLimiter() # Then use it as a context manager: with thread_limiter: ... # Or explicitly call acquire and release: thread_limiter.acquire() ... thread_limiter.release() # Explicitly use the default limit: thread_limiter = ThreadLimiter(max_thread_count=ThreadLimit.DEFAULT) # Set your own limit: thread_limiter = ThreadLimiter(max_thread_count=2) # Dynamically set the limit using a function limit_func = lambda: multiprocessing.cpu_count() * 4 thread_limiter = ThreadLimiter(max_thread_count=limit_func) # Turn off thread limiting thread_limiter = ThreadLimiter(max_thread_count=ThreadLimit.NONE)
max_thread_count (int, callable, ThreadLimit) – number of threads to make available to the limiter, or a callable() that returns an int. Values must be greater or equal to 0. Set to ThreadLimit.NONE for no thread limits.
callable()
int
ThreadLimit.NONE
name (str) – Name of particular limiter. Defaults to object ID via id(self).
id(self)
acquire
Acquire a semaphore (a thread).
Acquiring a semaphore will activate an available thread in Beam’s multi-threaded environment.
If no semaphores are available, the method will block until one is released via ThreadLimiter.release().
ThreadLimiter.release()
release
Release a semaphore (a thread).
ValueError – if the semaphore is released too many times (more than the value of ThreadLimiter._semaphore._value).
ThreadLimiter._semaphore._value
ThreadLimit
enum.Enum of constants for ThreadLimiter.
enum.Enum
from klio import utils thread_limiter = utils.ThreadLimiter( max_thread_limit=utils.ThreadLimit.DEFAULT )
NONE
Do not limit the number of threads used.
DEFAULT
Default thread limit (CPU count of worker machine via multiprocessing.cpu_count()).
multiprocessing.cpu_count()