Klio does basic input & output existence checks, but if your pipeline requires something more complex, you will have to implement your own, and tell Klio to skip its default existence checks (see job configuration documentation for input and output data existence checks).
import apache_beam as beam from apache_beam import pvalue from apache_beam.io.gcp import gcsio from klio.transforms import decorators class BonjourInputCheck(beam.DoFn): @decorators.set_klio_context def setup(self): self.client = gcsio.Client() @decorators.handle_klio def process(self, data): element = data.element.decode("utf-8") ic = self._klio.config.job_config.data.inputs[0] subdirs = ("subdir1", "subdir2") inputs_exists = [] for subdir in subdirs: path = f"{ic.location}/{subdir}/{element}.{ic.file_suffix}" exists = self.client.exists(path) inputs_exists.append(exists) if all(inputs_exists): yield data else: self._klio.logger.info(f"Skipping {element}: input data not found")
Tip
If you have a method that needs access to self._klio but does not handle a KlioMessage, then use the @set_klio_context to attach Klio context to the self instance.
self._klio
KlioMessage
@set_klio_context
self
If you do so on the setup method, the self._klio context should then be available on all other methods once it’s unpickled onto the Dataflow workers.
setup
You may also set it on the class’s __init__ method, but that makes the context only available on the driver (aka locally or on tingle when launching a job).
__init__
This is very similar to the example input check above with the addition of using Tagged Outputs. Beam allows multiple outputs from a single transform via “tagging”. Here, we tag if output data has been found or not_found. This is then used in run.py (below), allowing branches in logic for the pipeline.
Tagged Outputs
found
not_found
run.py
class BonjourOutputCheck(beam.DoFn): @decorators.set_klio_context def setup(self): self.client = gcsio.Client() @decorators.handle_klio def process(self, data): element = data.element.decode("utf-8") oc = self._klio.config.job_config.data.outputs[0] subdirs = ("subdir1", "subdir2") outputs_exist = [] for subdir in subdirs: path = f"{oc.location}/{subdir}/{element}.{oc.file_suffix}" exists = self.client.exists(path) outputs_exist.append(exists) if all(outputs_exist): yield pvalue.TaggedOutput("not_found", data) else: yield pvalue.TaggedOutput("found", data)
In combination with the KlioFilterForce helper transform, just add a few lines to string everything together.
KlioFilterForce
Notice the use of with_outputs()
with_outputs()
Since we used Tagged Outputs in our BonjourOutputCheck class, we need to invoke our transform using the with_outputs method of to apache_beam.ParDo.
BonjourOutputCheck
with_outputs
apache_beam.ParDo
from klio.transforms import helpers def run(input_pcol, config): output_data = input_pcol | beam.ParDo(BonjourOutputCheck()).with_outputs() output_force = output_data.found | helpers.KlioFilterForce() to_input_check = ( (output_data.not_found, output_force.process) | beam.Flatten() ) to_process = to_input_check | beam.ParDo(BonjourInputCheck()) # continue on with the job-related logic output_pcol = to_process | ... return output_pcol
The BonjourOutputCheck tags output as either found or not_found (via Tagged Outputs). We then handle the found output by passing them through the KlioFilterForce transform.
Then we flatten the not_found tagged output from BonjourOutputCheck and the process output from KlioFilterForce into one PCollection for easier handling.
process
PCollection
Finally, we pipe the flattened PCollection into our BonjourInputCheck transform. This only yields back found data (and just logs when input data is not found). We can then pipe the to_process PCollection to the rest of our job’s logic.
BonjourInputCheck
to_process