@
klio.transforms.decorators.
handle_klio
Serialize & deserialize incoming PCollections as a KlioMessage.
Behind the scenes, this generates KlioContext, handles de/serialize the incoming PCollection as a Klio Message, as well as manage thread concurrency.
KlioContext
Default Thread Concurrency Management
The @handle_klio decorator will default to limiting the amount of active threads a decorated transform can use. The default maximum number of active threads is the number of CPUs on the worker machine.
@handle_klio
See examples below on how to adjust this behavior.
Learn more about Klio’s thread concurrency management in the User Guide.
If decorating a class method, the KlioContext will be attached to the self argument of the class instance.
self
If decorating a function, KlioContext will be provided as the first argument.
@handle_klio def my_map_func(ctx, item): ctx.logger.info(f"Received {item.element} with {item.payload}") class MyDoFn(beam.DoFn): @handle_klio def process(self, item): self._klio.logger.info( f"Received {item.element} with {item.payload}" ) class MyComposite(beam.PTransform): @handle_klio def expand(self, pcoll): kms_config = self._klio.config.job_config.kms_config return pcoll | MyKMSTransform(**kms_config)
To adjust the maximum threads a decorated transform uses:
from klio import utils as klio_utils # Set the limit to 4 threads @handle_klio(max_thread_count=4): def my_map_func(ctx, item): ... # Set the limit to 2x CPU count import multiprocessing @handle_klio(max_thread_count=lambda: 2 * multiprocessing.cpu_count()): def my_map_func(ctx, item): ... # Turn off any thread limits @handle_klio(max_thread_count=klio_utils.ThreadLimit.NONE): def my_map_func(ctx, item): ... # Explicitly set the limit to Klio's default @handle_klio(max_thread_count=klio_utils.ThreadLimit.DEFAULT): def my_map_func(ctx, item): ... # Share thread limits between multiple transforms global_thread_limiter = klio_utils.ThreadLimiter(max_thread_count=4) @handle_klio(thread_limiter=global_thread_limiter) def first_map_func(ctx, item): ... @handle_klio(thread_limiter=global_thread_limiter) def second_map_func(ctx, item): ...
max_thread_count (int, callable, klio.utils.ThreadLimit) – number of threads to make available to the decorated function, or a callable() that returns an int. Set to klio.utils.ThreadLimit.NONE for no thread limits. Defaults to klio.utils.ThreadLimit.DEFAULT (worker CPU count) if thread_limiter is not provided. Mutually exclusive with thread_limiter argument.
callable()
int
klio.utils.ThreadLimit.NONE
klio.utils.ThreadLimit.DEFAULT
thread_limiter
thread_limiter (klio.utils.ThreadLimiter) – the ThreadLimiter instance that the decorator should use instead of creating its own. Defaults to None. Mutually exclusive with max_thread_count.
ThreadLimiter
None
max_thread_count
timeout
Run the decorated method/function with a timeout in a separate process.
If being used with @retry, order is important depending on the desired effect. If @timeout is applied to a function before @retry, then retries will apply first, meaning the configured timeout will cancel the function even if the retries have not yet been exhausted. In this case, be careful with the delay argument for the @retry decorator: the set timeout is inclusive of a retry’s delay. Conversely, if @retry is applied to a function before @timeout, retries will continue until exhausted even if a function has timed out.
@retry
@timeout
delay
If being used with another Klio decorator like @handle_klio, then the @timeout decorator should be applied to a method/function after another Klio decorator.
@handle_klio @timeout(seconds=5) def my_map_func(ctx, item): ctx.logger.info(f"Received {item.element} with {item.payload}") class MyDoFn(beam.DoFn): @handle_klio @timeout(seconds=5, exception=MyTimeoutException) def process(self, item): self._klio.logger.info( f"Received {item.element} with {item.payload}" ) @timeout( seconds=5, exception=MyTimeoutException, exception_message="I got a timeout!" ) def my_nonklio_map_func(item): print(f"Received {item}!")
seconds (float) – The timeout period in seconds. Must be greater than 0.
exception (Exception) – The Exception that will be raised if a timeout occurs. Default: KlioTimeoutError.
KlioTimeoutError
exception_message (str) – Custom exception message. Default: Function '{function}' timed out after {seconds} seconds.
Function '{function}' timed out after {seconds} seconds.
retry
Retry a decorated method/function on failure.
If being used with @timeout, order is important depending on the desired effect. If @timeout is applied to a function before @retry, then retries will apply first, meaning the configured timeout will cancel the function even if the retries have not yet been exhausted. In this case, be careful with the delay argument for the @retry decorator: the set timeout is inclusive of a retry’s delay. Conversely, if @retry is applied to a function before @timeout, retries will continue until exhausted even if a function has timed out.
If being used with a Klio decorator like @handle_klio, then the @retry decorator should be applied to a method/function after another Klio decorator.
@handle_klio @retry() def my_map_func(ctx, item): ctx.logger.info(f"Received {item.element} with {item.payload}") class MyDoFn(beam.DoFn): @handle_klio @retry(tries=3, exception=MyExceptionToCatch) def process(self, item): self._klio.logger.info( f"Received {item.element} with {item.payload}" ) @retry( tries=3, exception=MyExceptionToCatch, raise_exception=MyExceptionToRaise, exception_message="All retries have been exhausted!" ) def my_nonklio_map_func(item): print(f"Received {item}!")
tries (int) – Maximum number of attempts. Default: -1 (infinite)
delay (int or float) – Delay between attempts in seconds. Default: 0
exception (Exception or tuple(Exception)) – An Exception or tuple of exceptions to catch and retry on. Defaults to Exception.
Exception
raise_exception (Exception) – The Exception to raise once configured retries are exhausted. Defaults to KlioRetriesExhausted.
KlioRetriesExhausted
exception_message (str) – Custom message for raise_exception. Default: Function '{}' has reached the maximum {} retries. Last exception: {}
Function '{}' has reached the maximum {} retries. Last exception: {}
set_klio_context
Set KlioContext to the class instance.
Use @handle_klio instead if KlioMessage de/serialization is also needed.
Use @inject_klio_context if using on a function rather than a class method.
@inject_klio_context
class MyComposite(beam.PTransform): @set_klio_context def expand(self, element): self._klio.logger.info(f"Received {element}")
inject_klio_context
Provide KlioContext as the first argument to a decorated method/func.
If not needing KlioMessage de/serialization, consider @set_klio_context if using on a class method (rather than a function) to set the _klio attribute on self.
@set_klio_context
_klio
@inject_klio_context def my_map_func(ctx, element): ctx.logger.info(f"Received {element}") class MyDoFn(beam.DoFn): @inject_klio_context def process(self, ctx, element): ctx.logger.info(f"Received {element}")
serialize_klio_message
Serialize/deserialize incoming PCollections as a KlioMessage.
This decorator needs access to a KlioContext object via @inject_klio_context or @set_klio_context if not available on the object (i.e. self of a DoFn instance), or use @handle_klio which will handle KlioContext and KlioMessage de/serialization.
profile
Decorator to mark a function/method for profiling. This is used in conjunction with the klio job profile commands to selectively profile parts of your pipeline. This decorator can be added to any function or method, but when using with other Klio decorators such as @handle_klio it must be the last decorator applied.
klio job profile
When running/testing a job normally and not profiling, this decorator has no effect.
@handle_klio @profile def my_map_func(ctx, item): ctx.logger.info(f"Received {item.element} with {item.payload}") class MyDoFn(beam.DoFn): @handle_klio @profile def process(self, item): self._klio.logger.info( f"Received {item.element} with {item.payload}" ) @profile def my_nonklio_map_func(item): print(f"Received {item}!")