Helper transforms aid existence checks, ping mode, and force re-generating output. These transforms can be imported from klio.transforms.helpers. Many are used by default in a Klio pipeline.
klio.transforms.helpers
Built-in transforms are used by default in a Klio pipeline, but can be turned off if needed. See below for an flow diagram of how these built-in transforms are used in a pipeline.
For data IO types of gcs, Klio will perform the input existence check for you. Data input and output existence checks are configured by the existence check. These two configuration field default to False and will therefore conduct transforms KlioGcsCheckInputExists and KlioGcsCheckOutputExists automatically. If custom data existence checks are preferred then these fields should be set to True.
gcs
False
KlioGcsCheckInputExists
KlioGcsCheckOutputExists
True
KlioGcsCheckInputExists and KlioGcsCheckOutputExists work by inspecting data configuration for the fields for data locations job_config.data.(in|out)puts[].location and file suffix job_config.data.(in|out)puts[].file_suffix. For example, if we have an element that represents a track ID f00b4r, Klio would inspect the existence of the path: gs://foo-proj-input/ example-streaming-parent-job-output/f00b4r.ogg.
job_config.data.(in|out)puts[].location
job_config.data.(in|out)puts[].file_suffix
f00b4r
gs://foo-proj-input/ example-streaming-parent-job-output/f00b4r.ogg
KlioGcsCheckInputExists is a Composite Transform to check the input data existence in GCS. The transform utilizes Tagged Outputs to label output as either as not_found or found.
not_found
found
class MyFilterInput(beam.PTransform): """Composite transform to filter input data""" def expand(self, pcoll): # Check if input data exists input_data = pcoll | "Input Exists Filter" >> KlioGcsCheckInputExists() # Do something with the data that does not exist _ = input_data.not_found | "Not Found Data Transform" >> MyTransform() # Do something with the data does exist return input_data.found
KlioGcsCheckOutputExists is a Composite Transform to check the output exists in GCS. The transform utilizes Tagged Outputs to label output as either not_found or found.
class MyGcsFilterOutput(beam.PTransform): """Composite transform to filter output data.""" def expand(self, pcoll): # Check if output data exists output_exists = pcoll | "Output Exists Filter" >> KlioGcsCheckOutputExists() # Do something with output data that is found to_filter = output_exists.found | "Transform Found Data" >> MyTransformAlreadyFound() # Do something with the output data that is not found to_process = output_exists.not_found | "Data Not Found" >> MyTransformNotFound()
KlioFilterPing
KlioFilterPing is a Composite Transform to tag outputs if in ping mode or not. The transform utilizes Tagged Outputs to label output as either pass_thru or process.
pass_thru
process
class MyGcsFilterToProcess(beam.PTransform): """Composite transform to filter PCollections for processing""" def expand(self, pcoll): ping_pcoll = pcoll | "Ping Filter" >> KlioFilterPing() # handle any items that should just be sent to output directly _ = ping_pcoll.pass_thru | "Passthru Ping" >> MyPassThruTransform() out_pcoll = ping_pcoll.process | "Process Data" >> MyPrcessTransform()
KlioFilterForce
KlioFilterForce is a Composite Transform to filter if existing output should be force-processed. The transform will look at a job’s configuration for whether or not there is a global (pipeline-wide) forcing of messages with already-existing output. It will first inspect whether a message has an explicit True or False set for force processing. If force mode is not set, then KlioFilterForce will inspect the pipeline configuration. The default is False. The KlioFilterForce transform uses utilizes Tagged Outputs to label output as either pass_thru or``process``.
class KlioGcsFilterOutput(beam.PTransform): """Klio composite transform to filter output data. """ def expand(self, pcoll): # Check if output data exists output_exists = pcoll | "Output Exists Filter" >> KlioGcsCheckOutputExists() # Filter if existing output should be force-processed output_force = output_exists.found | "Force Filter" >> KlioFilterForce() # handle any items that should just be sent to output directly _ = output_force.pass_thru | "Passthru Found Output" >> KlioWriteToEventOutput() # Handle items that should be force processed to_process = (output_exists.not_found, output_force.process)
KlioCheckRecipients
KlioUpdateAuditLog
KlioUpdateAuditLog is a Composite Transform that will update the audit log in the metadata of a KlioMessage with the current job’s KlioJob.
Note
This transform is automatically called unless the event input is configured to be skipped.
KlioTriggerUpstream
KlioTriggerUpstream is a Composite Transform that will trigger an upstream streaming job. This is particularly useful when input data does not exist.
Caution
Klio does not automatically trigger upstream jobs if input data does not exist. It must be used manually within a job’s pipeline definition (in run.py::run).
run.py::run
By default, Klio handles the input data existence check and only provides the run function in run.py a PCollection with KlioMessages of input data that has been found. In order to also have access to input not found, that default input data existence check must be turned off by setting skip_klio_existence_check to True. Then the input existence check must be invoked manually. See example run.py and klio-job.yaml files below.
run
run.py
PCollection
KlioMessages
klio-job.yaml
# Example run.py import apache_beam as beam from klio.transforms import helpers import transforms def run(input_pcol, config): # use the default helper transform to do the default input check # in order to access the output tagged with `not_found` input_data = input_pcol | helpers.KlioGcsCheckInputExists() # Pipe the input data that was not found (using Tagged Outputs) # into `KlioTriggerUpstream` in order to update the KlioMessage # metadata, log it, then publish to upstream's _ = input_data.not_found | helpers.KlioTriggerUpstream( upstream_job_name="my-upstream-job", upstream_topic="projects/my-gcp-project/topics/upstream-topic-input", log_level="DEBUG", ) # pipe the found input pcollection into other transform(s) as needed output_pcol = input_data.found | beam.ParDo(MyTransform()) return output_pcol
# Example klio-job.yaml version: 2 job_name: my-job pipeline_options: project: my-gcp-project # `KlioTriggerUpstream` only supports streaming jobs streaming: True # <-- snip --> job_config: events: inputs: - type: pubsub topic: projects/my-gcp-project/topics/upstream-topic-output subscription: projects/my-gcp-project/subscriptions/my-job-input # <-- snip --> data: inputs: - type: gcs location: gs://my-gcp-project/upstream-output-data file_suffix: .ogg # Be sure to skip Klio's default input existence check in # order to access the input data that was not found. skip_klio_existence_check: True
KlioWriteToEventOutput
KlioWriteToEventOutput is a Composite Transform to write to the configured event output. The transform is currently available for writing to file types and pubsub types.
file
pubsub
class KlioGcsFilterOutput(beam.PTransform): """Klio composite transform to filter output data.""" def expand(self, pcoll): # Check if output data exists output_exists = pcoll | "Output Exists Filter" >> KlioGcsCheckOutputExists() # Filter if existing output should be force-processed output_force = output_exists.found | "Force Filter" >> KlioFilterForce() # Handle items that should be sent directly to output _ = output_force.pass_thru | "Passthru Found Output" >> KlioWriteToEventOutput()
KlioDrop
KlioDrop is a Composite Transform that will simply log and drop a KlioMessage.
KlioMessage
class KlioGcsFilterInput(beam.PTransform): """Klio composite transform to drop input data that is not found """ def expand(self, pcoll): # Check if input data exists input_data = pcoll | "Input Exists Filter" >> KlioGcsCheckInputExists() # Drop the KlioMessage if data does not exist _ = input_data.not_found | "Drop Not Found Data" >> KlioDrop() # Do something with the found input data return input_data.found
KlioDebugMessage
KlioDebugMessage is a Composite Transform that will log a KlioMessage at the given point in a pipeline. It can be used any number of times within a transform.
from klio.transforms import helpers def run(in_pcol, config): return ( in_pcol | "1st debug" >> helpers.KlioDebugMessage() | MyTransform() | "2nd debug" >> helpers.KlioDebugMessage(prefix="[MyTransform Output]") | MyOtherTransform() | "3rd debug" >> helpers.KlioDebugMessage( prefix="[MyOtherTransform Output]", log_level="ERROR" ) )
KlioSetTrace
KlioSetTrace is a Composite Transform that will insert a trace point (via pdb.set_trace()) at a given point in a pipeline.
pdb.set_trace()
from klio.transforms import helpers def run(in_pcol, config): return in_pcol | helpers.KlioSetTrace() | MyTransform()
Klio by default handles these input and output existence checks. However Klio can also be configured to skip these checks if custom control is desired.
To add custom checks, define a new transform that will hold custom existence checking logic.
# transforms.py file import apache_beam as beam class MyCustomInputExistenceDoFn(beam.DoFn): def process(): pass
The built-in Klio existence checks make use of Beam’s Tagged Outputs to output multiple PCollections from a single transform or “tag” values with helpful labels for use in the pipeline.
# transforms.py file import apache_beam as beam from apache_beam import pvalue class CustomDataExistState(enum.Enum): # Note these values can be anything - not limited to (not) found tags FOUND = "found" NOT_FOUND = "not_found" class MyCustomInputExistenceDoFn(beam.DoFn): def process(kmsg): item = kmsg.data.v2.element item_exists = # Do some custom logic here state = CustomDataExistState.FOUND if not item_exists: state = CustomDataExistState.not_found yield pvalue.TaggedOutput(state.value, kmsg.SerializeToString())
The custom existence check transform can then be imported and used as part of a composite transform:
# transforms.py file from transforms import MyCustomInputExistenceDoFn class MyCompositeTransform(beam.PTransform): """Klio composite transform to drop input data that is not found """ def expand(self, pcoll): # Check if input data exists input_data = pcoll | "Custom Input Exists Filter" >> MyCustomInputExistenceDoFn() # Drop the KlioMessage if data does not exist _ = input_data.not_found | "Drop Not Found Data" >> KlioDrop() # Do something with the found input data return input_data.found
The composite transform can then be imported into the rest of the pipeline in the run.py file.
# run.py file from transforms import MyCompositeTransform def run(in_pcol, config): out_pcol = in_pcol | MyCompositeTransform() return out_pcol
The diagram below shows the transforms that are invoked behind the scenes in every Klio pipeline. By setting the values skip_klio_read, skip_klio_write, and/or skip_klio_existence_check in a job’s klio-job.yaml, these transforms can be enabled or disabled.
skip_klio_read
skip_klio_write
skip_klio_existence_check
Please see the thumbnails below for a visual explanation of how each configuration variable impacts each transform.