KEP 2: Thread Management

Author

Lynn Root

Type

Standards Track

Status

Final

Created

2020-11-10

Abstract

This KEP proposes a default thread management model within a Klio job. Currently, Apache Beam does not have a mechanism to limit the number of threads that a pipeline uses to execute its transforms. The proposed approach provides a way for users to limit the number of threads used for a specific transform or section of code, with defaults set to the number of CPUs of the worker machine.

Motivation and Scope

Currently, when running memory-intensive Beam pipelines on Dataflow (not applicable for Direct Runner with default options - explained below; unknown if applicable to other runners), workers will be OOM (out-of-memory) killed by the underlying runner infrastructure. This is because Beam uses an unbounded pool of threads to process elements as they’re read, spawning more and more threads and using more and more memory (See Detailed Description for a more in-depth description of the background and problem).

The goal of this KEP is to affix thread management onto the Beam paradigm where Klio is able to control threads (see Not in Scope on where it cannot). This will provide support and sane defaults for thread-based concurrency in a Klio job.

To that end, this KEP proposes to expose thread management via:

  1. the existing @handle_klio decorator;

  2. and a context manager.

Both of these approaches can be seen in the following example:

See Usage and Impact for further examples of the proposed interface.
from klio.transforms import decorators

@decorators.handle_klio(max_thread_count=2)
def my_heavy_weight_transform(ctx, item):
    ...

from klio.utils import ThreadLimiter
thread_limiter = ThreadLimiter(max_thread_count=2)
with thread_limiter:
    ...

Note

Originally, a third approach was suggested, @limit_threads. This has been dropped - see Discussion for more information.

Each transform that uses the @handle_klio decorator or the thread limiter context manager will have their own “pseudo-pool” of threads they’re allowed to use. When using the decorators, the number of threads given to a transform equates to the number of elements that the transform can process at any given time. Therefore, if all threads in an allotted pool are in use, then the transform will be blocked 1 from processing a new element until an in-process element is complete or errors out. See Implementation for further understanding.

While limiting the threads for one transform does not directly limit the threads of another transform, if a transform is limited to fewer threads precedes a transform with a higher number of threads – or no limitation – then the latter transform may be indirectly affected and not able to use all of its available threads.

The proposed thread management does not affect jobs using the direct runner unless the option direct_running_mode='multi_threading' is used in the job’s pipeline_options. By default, pipelines run using the direct runner are single-threaded.

Not in Scope

Klio is unable to limit thread concurrency when reading from/writing to Google Cloud Platform-specific services (e.g. Pub/Sub, BigQuery). This is because the implementation of these I/O transforms – specifically, ones that inherit from NativeSource – are closed source.

Also not in scope is the ability to set a job-wide default thread limit (or to turn limits off entirely). This could be possible, but would increase the implementation complexity of this feature as it is difficult to get access to the job’s KlioConfig object when needed.

Usage and Impact

The proposed API is meant to blend into the current API Klio users are familiar with, while also allowing for fine-grained control and flexibility that our advanced users might need. If the users do not do anything, they will still benefit from thread concurrency.

There are two proposed ways to limit active worker threads:

@handle_klio Decorator

Unlike our approach for having separate decorators for different functionalities (@timeout, @retry, etc), this proposal includes hooking into the current @handle_klio decorator. This will allow us to have thread concurrency managed by default.

from klio.transforms import decorators

# No args - uses the default number of threads (CPU count)
@decorators.handle_klio
def my_medium_weight_transform(ctx, item):
    ...

# Adjust the limit on the number of threads for this particular transform.
# Overrides the Klio’s default (CPU count)
@decorators.handle_klio(max_thread_count=1)
def my_heavy_weight_transform(ctx, item):
    ...

# Works the same when applied to DoFn transforms
class MyTransform(beam.DoFn):
    @decorators.handle_klio(max_thread_count=1)
    def process(self, item):
        ...

# Turn off threading limits entirely
@decorators.handle_klio(max_thread_count=None)
def my_light_weight_transform(ctx, item):
    ...

To use one threadpool limit across multiple transforms, create a ThreadLimiter instance to pass into the @handle_klio decorator:

# Provide an instance of ThreadLimiter to the decorators
# using one pool of threads across multiple transforms
from klio.utils import ThreadLimiter
global_thread_limiter = utils.ThreadLimiter(max_thread_count=4)


@decorators.handle_klio(thread_limiter=global_thread_limiter)
def my_tranform(ctx, item):
    ...

@decorators.handle_klio(thread_limiter=global_thread_limiter)
def my_other_tranform(ctx, item):
    ...

@decorators.limit_threads(thread_limiter=global_thread_limiter)
def my_non_klio_transform(ctx, item):
    ...

Two mutually-exclusive arguments would be supported: max_thread_count – an integer representing the max number of threads to allow for the decorated function/method; and thread_limiter – an instance of klio.utils.ThreadLimiter.

If no argument is provided, then Klio will default to the number of CPUs on the worker (unless the thread_limiter argument is given). Similarly, to explicitly set the max_thread_count to its default (which is worker machine-dependent), users can use the constant klio.utils.ThreadLimiter.DEFAULT_LIMIT.

If the user does not want to limit the thread count, then max_thread_count should be set to None.

ThreadLimiter Context Manager

A context manager provides the user with even more fine-grained control. It allows the users to limit threads within a function/method as opposed to the full function/method. It also provides the user a mechanism to limit threads across transforms.

# Limit threads with a context manager
from klio import utils
thread_limiter = utils.ThreadLimiter(max_thread_count=2)
with thread_limiter:
    ...


# Limit threads across multiple transforms by defining one limiter
from klio.transforms import decorators

global_thread_limiter = utils.ThreadLimiter(max_thread_count=2)

@decorators.handle_klio(max_thread_count=None)
def foo():
    with global_thread_limiter:
        ...

@decorators.handle_klio(max_thread_count=None)
def bar():
    with global_thread_limiter:
        ...

One argument would be supported for the ThreadLimiter class: an integer representing the max number of threads (max_thread_count) to allow for the context.

If no integer is provided then it will default to the number of CPUs on the worker. Similarly, to explicitly set the max_thread_count to its default (which is worker machine-dependent), users can use the constant klio.utils.ThreadLimiter.DEFAULT_LIMIT.

The @handle_klio decorator uses the ThreadLimiter context manager under the hood.

@limit_threads Decorator

No longer implementing

This feature has been dropped – see Discussion.

A separate, new decorator will be added to klio.transforms.decorator in case users do not want to “klio-ify” a particular function or method. This is in line with our offering of separate decorators for different functionalities (@timeout, @retry, @profile).

from klio.transforms import decorators

# Manage threads for a transform that does not use `@handle_klio`.
# No argument in the decorator will default to Klio's default (CPU count)
@decorators.limit_threads
def my_other_non_klio_transform(item):
    ...

# Provide an integer to override Klio's default
@decorators.limit_threads(2)
def my_other_non_klio_transform(item):
    ...

# Explicitly use with keyword argument
@decorators.limit_threads(max_thread_count=1)
def my_other_non_klio_transform(item):
    ...

# Works the same when applied to DoFn transforms
class MyTransform(beam.DoFn):
    @decorators.limit_threads(1)
    def some_func(self, item):
        ...

To use one threadpool limit across multiple transforms, create a ThreadLimiter instance to pass into the @limit_threads decorator:

# Provide an instance of ThreadLimiter to the decorators
# using one pool of threads across multiple transforms
from klio.utils import ThreadLimiter
global_thread_limiter = utils.ThreadLimiter(max_thread_count=2)

@decorators.handle_klio(thread_limiter=global_thread_limiter)
def my_tranform(ctx, item):
    ...

@decorators.limit_threads(thread_limiter=global_thread_limiter)
def my_other_transform(ctx, item):
    ...

Two mutually-exclusive arguments would be supported: max_thread_count – an integer representing the max number of threads to allow for the decorated function/method; and thread_limiter – an instance of klio.utils.ThreadLimiter.

If no integer is provided then it will default to the number of CPUs on the worker (unless the thread_limiter argument is given).

If the user does not want to limit the thread count for a non-klio-ified transform, then the user should remove the decorator. This is not the same behavior as @handle_klio (described above) where max_thread_count should be set to None to not limit threads.

Backward Compatibility

There are no backward incompatible changes proposed in this KEP.

However, users may discover their Klio jobs under-performing while using the default concurrency settings since threading will be limited. This can be addressed with adjusting the number of max_thread_count, or turning off the thread limitation in the @handle_klio decorator by setting max_thread_count to None.

Any user that is using a Beam version older than 2.18 and making use of pipeline_options.experiments[].worker_threads=N or pipeline_options.max_num_worker_threads=N in their klio-job.yaml will need to make sure they set the desired number of threads within the @handle_klio decorator, otherwise these users may experience the problem this KEP was meant to address.

Regardless, the implementation should then include documentation for the user facing this issue – perhaps as an FAQ along with the decorator & context manager documentation.

Detailed Description

As of Apache Beam SDK v2.18, the Beam developers made changes 2 that introduced a different concurrency model. Instead of managing a restricted sized threadpool within Beam, Beam now uses an unbounded threadpool, effectively pushing the concurrency management to Python itself (e.g. if a thread is free, reuse it; otherwise, spawn a new thread). If you remember the experiment worker_threads=N, this is no longer supported.

For a streaming job, every time a Pub/Sub message comes in, Beam spawns a new thread to process it (unless there’s a thread already free). This is a problem because messages will just pool up into memory as individual threads and compete for (memory) resources. As well, each thread takes startup time, adding to latency. So in general, the more threads a program has, the slower it is. This is still true even when there is no work to do for every available thread.

This particular problem isn’t necessarily an issue for “regular” textual data. The rate of processing a message should be relatively similar to the rate of pulling off the queue. But it becomes a problem when a transform requires memory, and when the transform is slower than the rate of consuming from input. This will eventually OOM-kill the workers and put the non-finished Pub/Sub items back on the queue. And – if configured with the particular runner – new workers will spin up only to eventually be OOM-killed again.

Since understanding this problem requires deep knowledge of Klio, Apache Beam, and the designated runner, we should help abstract this away for Klio users by providing configurable thread concurrency management by default. To help educate users, the implementation should include logging to make it obvious that concurrency is being managed, as well as prose and API documentation explaining the problem, how it manifests, and what Klio does to help.

Implementation

The proposed changes have been implemented in this draft pull request. This PR defines a new, public context manager in a new klio.utils module. It also updates the @handle_klio decorator with two new keyword arguments (max_thread_count and thread_limiter), and uses the context manager to execute the decorated function.

The context manager uses a bounded semaphore to manage the number of threads used. Since Beam will create threads for new elements it reads in 3, the use of a bounded semaphore will limit the number of available threads in use for the designated code. We use a bounded semaphore instead of a lock since a lock is specific to manage an item, like a lock on a file. A semaphore is also used instead of a threadpool because we don’t need to create new threads, just manage the ones Beam creates already.

Alternatives

We could continue with the current situation and not manage the threads at all.

We could also just provide the context manager and no decorator support, forcing the users to essentially opt-in and therefore understand/know what’s going on under the hood. We could also default to “off” or no thread concurrency management, again forcing the users to essentially opt-in.

We could default the number of threads to 12, the original default count before Beam removed support for limiting worker threading 2. 12 was considered too high for our current users who would often set it to 1 or 2.

Discussion

There are concerns that having decorators for individual purposes (thread management, retries, timeouts, etc) will get very confusing for when the order of the decorators matter. While outside of scope of this KEP, there is interest in moving these extra individual decorators into the @handle_klio decorator. This should be a separate KEP.

Ideally, Klio should prevent users from setting thread limiting in both the @handle_klio decorator and the individual @limit_threads decorator. Since it’s extremely difficult (if impossible) to detect if other decorators are used from a specific decorator, it was suggested to add a static check to klio audit. However, with the concern in the first bullet, the proposal has been updated to not implement the @limit_threads decorator right now.

A suggestion was made to improve the UI when turning off threading versus using the default, since it’s not clear. Setting max_num_thread to None instead of False makes more sense when turning off thread limiting. A way to explicitly set max_num_thread to its default will also be included, perhaps something like ThreadLimiter.DEFAULT_LIMIT.

There are concerns of exposing too many configuration-like values for the user, particularly with the ability to also configure machine type. While this is a valid concern, we are needing to add these “knobs” for users to be able to configure because they were once available in the underlying infrastructure but no longer are. Without the ability to configure exactly how many threads to use for a transform/pipeline, users will no longer be able to use Klio.

References and Footnotes

1

The blocking point in the proposed implementation is within the decorators before the decorated function/method is called, or at the point where the context manager starts. That is to say, the backpressure is within the scope of the transform being limited, not affecting other transforms.

2(1,2)

Patch where thread concurrency management is removed for Apache Beam v2.18.

3

Technically, Python will first try to reuse existing idle threads in the unbounded threadpool created by Beam. But because a Klio job will take longer to process an element than read an element, it’s more likely that a new thread will be created for a given new element as existing threads will be in use.