Decorators

@klio.transforms.decorators.handle_klio(max_thread_count=None, thread_limiter=None)

Serialize & deserialize incoming PCollections as a KlioMessage.

Behind the scenes, this generates KlioContext, handles de/serialize the incoming PCollection as a Klio Message, as well as manage thread concurrency.

Default Thread Concurrency Management

The @handle_klio decorator will default to limiting the amount of active threads a decorated transform can use. The default maximum number of active threads is the number of CPUs on the worker machine.

See examples below on how to adjust this behavior.

Learn more about Klio’s thread concurrency management in the User Guide.

If decorating a class method, the KlioContext will be attached to the self argument of the class instance.

If decorating a function, KlioContext will be provided as the first argument.

@handle_klio
def my_map_func(ctx, item):
    ctx.logger.info(f"Received {item.element} with {item.payload}")

class MyDoFn(beam.DoFn):
    @handle_klio
    def process(self, item):
        self._klio.logger.info(
            f"Received {item.element} with {item.payload}"
        )

class MyComposite(beam.PTransform):
    @handle_klio
    def expand(self, pcoll):
        kms_config = self._klio.config.job_config.kms_config
        return pcoll | MyKMSTransform(**kms_config)

To adjust the maximum threads a decorated transform uses:

from klio import utils as klio_utils

# Set the limit to 4 threads
@handle_klio(max_thread_count=4):
def my_map_func(ctx, item):
    ...

# Set the limit to 2x CPU count
import multiprocessing
@handle_klio(max_thread_count=lambda: 2 * multiprocessing.cpu_count()):
def my_map_func(ctx, item):
    ...

# Turn off any thread limits
@handle_klio(max_thread_count=klio_utils.ThreadLimit.NONE):
def my_map_func(ctx, item):
    ...

# Explicitly set the limit to Klio's default
@handle_klio(max_thread_count=klio_utils.ThreadLimit.DEFAULT):
def my_map_func(ctx, item):
    ...

# Share thread limits between multiple transforms
global_thread_limiter = klio_utils.ThreadLimiter(max_thread_count=4)

@handle_klio(thread_limiter=global_thread_limiter)
def first_map_func(ctx, item):
    ...

@handle_klio(thread_limiter=global_thread_limiter)
def second_map_func(ctx, item):
    ...
Parameters
@klio.transforms.decorators.timeout(seconds, exception=None, exception_message=None)

Run the decorated method/function with a timeout in a separate process.

If being used with @retry, order is important depending on the desired effect. If @timeout is applied to a function before @retry, then retries will apply first, meaning the configured timeout will cancel the function even if the retries have not yet been exhausted. In this case, be careful with the delay argument for the @retry decorator: the set timeout is inclusive of a retry’s delay. Conversely, if @retry is applied to a function before @timeout, retries will continue until exhausted even if a function has timed out.

If being used with another Klio decorator like @handle_klio, then the @timeout decorator should be applied to a method/function after another Klio decorator.

@handle_klio
@timeout(seconds=5)
def my_map_func(ctx, item):
    ctx.logger.info(f"Received {item.element} with {item.payload}")

class MyDoFn(beam.DoFn):
    @handle_klio
    @timeout(seconds=5, exception=MyTimeoutException)
    def process(self, item):
        self._klio.logger.info(
            f"Received {item.element} with {item.payload}"
        )

@timeout(
    seconds=5,
    exception=MyTimeoutException,
    exception_message="I got a timeout!"
)
def my_nonklio_map_func(item):
    print(f"Received {item}!")
Parameters
  • seconds (float) – The timeout period in seconds. Must be greater than 0.

  • exception (Exception) – The Exception that will be raised if a timeout occurs. Default: KlioTimeoutError.

  • exception_message (str) – Custom exception message. Default: Function '{function}' timed out after {seconds} seconds.

@klio.transforms.decorators.retry(tries=- 1, delay=0, exception=None, raise_exception=None, exception_message=None)

Retry a decorated method/function on failure.

If being used with @timeout, order is important depending on the desired effect. If @timeout is applied to a function before @retry, then retries will apply first, meaning the configured timeout will cancel the function even if the retries have not yet been exhausted. In this case, be careful with the delay argument for the @retry decorator: the set timeout is inclusive of a retry’s delay. Conversely, if @retry is applied to a function before @timeout, retries will continue until exhausted even if a function has timed out.

If being used with a Klio decorator like @handle_klio, then the @retry decorator should be applied to a method/function after another Klio decorator.

@handle_klio
@retry()
def my_map_func(ctx, item):
    ctx.logger.info(f"Received {item.element} with {item.payload}")

class MyDoFn(beam.DoFn):
    @handle_klio
    @retry(tries=3, exception=MyExceptionToCatch)
    def process(self, item):
        self._klio.logger.info(
            f"Received {item.element} with {item.payload}"
        )

@retry(
    tries=3,
    exception=MyExceptionToCatch,
    raise_exception=MyExceptionToRaise,
    exception_message="All retries have been exhausted!"
)
def my_nonklio_map_func(item):
    print(f"Received {item}!")
Parameters
  • tries (int) – Maximum number of attempts. Default: -1 (infinite)

  • delay (int or float) – Delay between attempts in seconds. Default: 0

  • exception (Exception or tuple(Exception)) – An Exception or tuple of exceptions to catch and retry on. Defaults to Exception.

  • raise_exception (Exception) – The Exception to raise once configured retries are exhausted. Defaults to KlioRetriesExhausted.

  • exception_message (str) – Custom message for raise_exception. Default: Function '{}' has reached the maximum {} retries. Last exception: {}

@klio.transforms.decorators.set_klio_context

Set KlioContext to the class instance.

Use @handle_klio instead if KlioMessage de/serialization is also needed.

Use @inject_klio_context if using on a function rather than a class method.

class MyComposite(beam.PTransform):
    @set_klio_context
    def expand(self, element):
        self._klio.logger.info(f"Received {element}")
@klio.transforms.decorators.inject_klio_context

Provide KlioContext as the first argument to a decorated method/func.

Use @handle_klio instead if KlioMessage de/serialization is also needed.

If not needing KlioMessage de/serialization, consider @set_klio_context if using on a class method (rather than a function) to set the _klio attribute on self.

@inject_klio_context
def my_map_func(ctx, element):
    ctx.logger.info(f"Received {element}")

class MyDoFn(beam.DoFn):
    @inject_klio_context
    def process(self, ctx, element):
        ctx.logger.info(f"Received {element}")
@klio.transforms.decorators.serialize_klio_message

Serialize/deserialize incoming PCollections as a KlioMessage.

This decorator needs access to a KlioContext object via @inject_klio_context or @set_klio_context if not available on the object (i.e. self of a DoFn instance), or use @handle_klio which will handle KlioContext and KlioMessage de/serialization.

@klio.transforms.decorators.profile

Decorator to mark a function/method for profiling. This is used in conjunction with the klio job profile commands to selectively profile parts of your pipeline. This decorator can be added to any function or method, but when using with other Klio decorators such as @handle_klio it must be the last decorator applied.

When running/testing a job normally and not profiling, this decorator has no effect.

@handle_klio
@profile
def my_map_func(ctx, item):
    ctx.logger.info(f"Received {item.element} with {item.payload}")

class MyDoFn(beam.DoFn):
    @handle_klio
    @profile
    def process(self, item):
        self._klio.logger.info(
            f"Received {item.element} with {item.payload}"
        )

@profile
def my_nonklio_map_func(item):
    print(f"Received {item}!")