Helpers

class klio.transforms.helpers.KlioMessageCounter

Helper transform to count elements.

This transform will yield elements given to it, counting each one it sees.

Since it is a DoFn transform, it needs to be invoked via beam.ParDo when used. Some usage examples:

# Example composite transform
class MyCompositeTransform(beam.PTransform):
    def __init__(self, *args, **kwargs):
        self.transform = MyTransform(*args, **kwargs)
        self.ctr = KlioMessageCounter(suffix="processed", "MyTransform")

    def expand(self, pcoll):
        return (
            pcoll
            | "Process items" >> self.transform
            | "Count items processed" >> beam.ParDo(self.ctr)
        )

# Example pipeline segment
def run(input_pcol, config):
    input_data = input_pcol | helpers.KlioGcsCheckInputExists()
    data_not_found = input_data.not_found | helpers.KlioMessageCounter(
        suffix="data-not-found",
        bind_transform="KlioGcsCheckInputExists"
    )
    ...
Parameters
  • suffix (str) – suffix of the counter name. The full counter name will be kmsg-{suffix}.

  • bind_transform (str) – Name of transform to bind the counter to. This is used for Dataflow monitoring UI purposes, and can be set to a prior or following transform in a pipeline, or to itself.

class klio.transforms.helpers.KlioGcsCheckInputExists

Klio transform to check input exists in GCS.

class klio.transforms.helpers.KlioGcsCheckOutputExists

Klio transform to check output exists in GCS.

class klio.transforms.helpers.KlioFilterPing

Klio transform to tag outputs if in ping mode or not.

class klio.transforms.helpers.KlioFilterForce

Klio transform to tag outputs if in force mode or not.

class klio.transforms.helpers.KlioWriteToEventOutput

Klio composite transform to write to the configured event output.

class klio.transforms.helpers.KlioDrop

Klio DoFn to log & drop a KlioMessage.

class klio.transforms.helpers.KlioCheckRecipients

Check if current job should handle a received v2 message.

class klio.transforms.helpers.KlioUpdateAuditLog

Update a KlioMessage’s audit log to include current job.

class klio.transforms.helpers.KlioDebugMessage

Log KlioMessage.

Parameters
  • prefix (str) – logging prefix. Default: "DEBUG".

  • log_level (str or int) – The desired log level for the KlioMessage logs. See available log levels for what’s supported. Default: "INFO".

class klio.transforms.helpers.KlioSetTrace

Insert a Python debugger trace point.

class klio.transforms.helpers.KlioTriggerUpstream

Trigger upstream job from current job with a given KlioMessage.

This transform will update the intended recipients in KlioMessage. metadata in order to trigger a partial bottom-up execution of the overall graph of jobs. It will also generate a log message (optional), then publish the KlioMessage to the upstream’s Pub/Sub topic.

Caution

Klio does not automatically trigger upstream jobs if input data does not exist. It must be used manually within a job’s pipeline definition (in run.py::run).

Note

In order to get access to input data not found, the automatic data existence check that Klio does must be turned off by setting klio-job.yaml::job_config.data.inputs[].skip_klio_existence_check to True. Then the existence check must be invoked manually. See example run.py and klio-job.yaml files below.

Example usage:

import apache_beam as beam
from klio.transforms import helpers
import transforms

def run(input_pcol, config):
    # use the default helper transform to do the default input check
    # in order to access the output tagged with `not_found`
    input_data = input_pcol | helpers.KlioGcsCheckInputExists()

    # Pipe the input data that was not found (using Tagged Outputs)
    # into `KlioTriggerUpstream` in order to update the KlioMessage
    # metadata, log it, then publish to upstream's
    _ = input_data.not_found | helpers.KlioTriggerUpstream(
        upstream_job_name="my-upstream-job",
        upstream_topic="projects/my-gcp-project/topics/upstream-topic",
        log_level="DEBUG",
    )

    # pipe the found input pcollection into transform(s) as needed
    output_pcol = input_data.found | beam.ParDo(MyTransform())
    return output_pcol
# Example klio-job.yaml
version: 2
job_name: my-job
pipeline_options:
  project: my-gcp-project
  # `KlioTriggerUpstream` only supports streaming jobs
  streaming: True
  # <-- snip -->
job_config:
  events:
    inputs:
      - type: pubsub
        topic: projects/my-gcp-project/topics/upstream-topic-output
        subscription: projects/my-gcp-project/subscriptions/my-job-in
    # <-- snip -->
  data:
    inputs:
      - type: gcs
        location: gs://my-gcp-project/upstream-output-data
        file_suffix: .ogg
        # Be sure to skip Klio's default input existence check in
        # order to access the input data that was not found.
        skip_klio_existence_check: True
Parameters
  • upstream_job_name (str) – Name of upstream job.

  • upstream_topic (str) – Pub/Sub topic for the upstream job, in the form of project/<GCP_PROJECT>/topics/<TOPIC_NAME>.

  • log_level (str, int, or None) –

    The desired log level for log message, or None if no logging is desired. See available log levels for what’s supported. Default: "INFO".

Raises

SystemExit – If the current job is not in streaming mode (set in klio-job.yaml::pipeline_options.streaming), if the provided log level is not recognized, or if the provided upstream topic is not in the correct form.