klio.utils Subpackage

General utilities for managing a pipeline.

class klio.utils.ThreadLimiter(max_thread_count=ThreadLimit.DEFAULT, name=None)

A context manager to limit active threads for a block of code.

Threads aren’t limited directly; threads are limited indirectly by maintaining a semaphore (an atomic counter).

Example usage:

# No arg to use the default limit (CPU count of host/worker):
thread_limiter = ThreadLimiter()

# Then use it as a context manager:
with thread_limiter:
    ...

# Or explicitly call acquire and release:
thread_limiter.acquire()
...
thread_limiter.release()

# Explicitly use the default limit:
thread_limiter = ThreadLimiter(max_thread_count=ThreadLimit.DEFAULT)

# Set your own limit:
thread_limiter = ThreadLimiter(max_thread_count=2)

# Dynamically set the limit using a function
limit_func = lambda: multiprocessing.cpu_count() * 4
thread_limiter = ThreadLimiter(max_thread_count=limit_func)

# Turn off thread limiting
thread_limiter = ThreadLimiter(max_thread_count=ThreadLimit.NONE)
Parameters
  • max_thread_count (int, callable, ThreadLimit) – number of threads to make available to the limiter, or a callable() that returns an int. Values must be greater or equal to 0. Set to ThreadLimit.NONE for no thread limits.

  • name (str) – Name of particular limiter. Defaults to object ID via id(self).

acquire()

Acquire a semaphore (a thread).

Acquiring a semaphore will activate an available thread in Beam’s multi-threaded environment.

If no semaphores are available, the method will block until one is released via ThreadLimiter.release().

release()

Release a semaphore (a thread).

Raises

ValueError – if the semaphore is released too many times (more than the value of ThreadLimiter._semaphore._value).

class klio.utils.ThreadLimit

enum.Enum of constants for ThreadLimiter.

Example usage:

from klio import utils

thread_limiter = utils.ThreadLimiter(
    max_thread_limit=utils.ThreadLimit.DEFAULT
)
NONE = 0

Do not limit the number of threads used.

DEFAULT = 1

Default thread limit (CPU count of worker machine via multiprocessing.cpu_count()).