Custom Data Existence Checks

Klio does basic input & output existence checks, but if your pipeline requires something more complex, you will have to implement your own, and tell Klio to skip its default existence checks (see job configuration documentation for input and output data existence checks).

Custom Input Data Check

import apache_beam as beam

from apache_beam import pvalue
from apache_beam.io.gcp import gcsio

from klio.transforms import decorators


class BonjourInputCheck(beam.DoFn):
    @decorators.set_klio_context
    def setup(self):
        self.client = gcsio.Client()

    @decorators.handle_klio
    def process(self, data):
        element = data.element.decode("utf-8")

        ic = self._klio.config.job_config.data.inputs[0]
        subdirs = ("subdir1", "subdir2")

        inputs_exists = []

        for subdir in subdirs:
            path = f"{ic.location}/{subdir}/{element}.{ic.file_suffix}"
            exists = self.client.exists(path)
            inputs_exists.append(exists)

        if all(inputs_exists):
            yield data
        else:
            self._klio.logger.info(f"Skipping {element}: input data not found")


Tip

If you have a method that needs access to self._klio but does not handle a KlioMessage, then use the @set_klio_context to attach Klio context to the self instance.

If you do so on the setup method, the self._klio context should then be available on all other methods once it’s unpickled onto the Dataflow workers.

You may also set it on the class’s __init__ method, but that makes the context only available on the driver (aka locally or on tingle when launching a job).

Custom Output Data Check

This is very similar to the example input check above with the addition of using Tagged Outputs. Beam allows multiple outputs from a single transform via “tagging”. Here, we tag if output data has been found or not_found. This is then used in run.py (below), allowing branches in logic for the pipeline.

class BonjourOutputCheck(beam.DoFn):
    @decorators.set_klio_context
    def setup(self):
        self.client = gcsio.Client()

    @decorators.handle_klio
    def process(self, data):
        element = data.element.decode("utf-8")

        oc = self._klio.config.job_config.data.outputs[0]
        subdirs = ("subdir1", "subdir2")

        outputs_exist = []

        for subdir in subdirs:
            path = f"{oc.location}/{subdir}/{element}.{oc.file_suffix}"
            exists = self.client.exists(path)
            outputs_exist.append(exists)

        if all(outputs_exist):
            yield pvalue.TaggedOutput("not_found", data)
        else:
            yield pvalue.TaggedOutput("found", data)

Using Custom Existence Checks in your Pipeline

In combination with the KlioFilterForce helper transform, just add a few lines to string everything together.

Notice the use of with_outputs()

Since we used Tagged Outputs in our BonjourOutputCheck class, we need to invoke our transform using the with_outputs method of to apache_beam.ParDo.

from klio.transforms import helpers

def run(input_pcol, config):
    output_data = input_pcol | beam.ParDo(BonjourOutputCheck()).with_outputs()

    output_force = output_data.found | helpers.KlioFilterForce()

    to_input_check = (
        (output_data.not_found, output_force.process)
        | beam.Flatten()
    )
    to_process = to_input_check | beam.ParDo(BonjourInputCheck())

    # continue on with the job-related logic
    output_pcol = to_process | ...
    return output_pcol

The BonjourOutputCheck tags output as either found or not_found (via Tagged Outputs). We then handle the found output by passing them through the KlioFilterForce transform.

Then we flatten the not_found tagged output from BonjourOutputCheck and the process output from KlioFilterForce into one PCollection for easier handling.

Finally, we pipe the flattened PCollection into our BonjourInputCheck transform. This only yields back found data (and just logs when input data is not found). We can then pipe the to_process PCollection to the rest of our job’s logic.