Job Configuration

A Klio job’s configuration is defined in klio-job.yaml.

Tip

Klio defaults to reading a YAML file named klio-job.yaml for configuration.

Point the klio command to a different configuration filename via the -c/--config-file flag, e.g.

$ klio job run --config-file klio-job-staging.yaml

File Structure

A klio-job.yaml file have the following top-level keys:

version INT
Version of Klio job.

Options: 1, 2.
Default: 2.
Optional

Warning

Version 1 has been deprecated and removed from Klio as of klio-cli version 1.0.0, klio version 0.2.0, and klio-exec version 0.2.0.

job_name STR

The job’s name. This should be unique within a particular GCP project - different projects can have a job with the same name.

Required

pipeline_options DICT

Configuration under pipeline_options map directly to Beam and Dataflow options. See Beam Pipeline Options for some commonly-used options, but any option that Beam & Dataflow support are also supported here. More information can be found in the official Beam docs.

Required

job_config DICT

User-specified custom and Klio-specific job configuration. See Job Configuration for supported Klio configuration as well as where to specify any additional custom configuration needed for a job.

Required

Further Configuration Options

Accessing Configuration in a Job

A Klio job has access to its configuration in a couple of ways: via the job’s pipeline definition, and within the transforms themselves.

Pipeline

In a Klio job’s run.py file where the pipeline is defined, the run function will be provided with the KlioContext.config object. For example:

# in run.py

def run(in_pcol, config):
    my_custom_config = config.job_config.my_custom_config

    out_pcol = in_pcol | MyTransform(my_custom_config)

    return out_pcol

Important

When using a runner other than DirectRunner, if access to configuration is needed within a transform’s logic, use the approached defined below. This is because the config object itself is not pickle-able.

Instantiating class-based transforms occur during the launching of the pipeline, then gets pickled to be unloaded onto the remote worker (e.g. a Dataflow worker). Therefore, any instance variable defined in a transform’s __init__ method that is not a pickable object will not be accessible to the rest of the class when the worker executes the transform’s logic.

Transforms

While the job’s configuration is automatically provided to a job’s run.py::run function, if access to configuration is needed for a transform’s logic, then use the provided Utilities.

# in transforms.py

import apache_beam as beam
from klio.transforms import decorators

class MyTransform(beam.DoFn):
    @decorators.handle_klio
    def process(self, item):
        my_custom_config = self._klio.config.job_config.my_custom_config
        ...

Templating and Overriding a Job’s Configuration

Klio supports templating klio-job.yaml as well as overriding the configuration’s values via the CLI.

Templating Keys and Values

To template any key or value in klio-job.yaml, declare it with ${...}, for example:

# klio-job.yaml snippet
job_name: my-job
job_config:
  a_key: ${my_template_value}
  ${a_template_key}: some_static_value

Then, use the --template flag (supported on most klio job subcommands – see CLI documentation for which commands supports the flag) to provide the content for the templated variables:

# command accepts multiple `--template` flags
$ klio job run \
  --template my_template_value=foo \
  --template a_template_key=some_variable_key

Within your Klio job, you’ll have access to these configuration values just like normal:

# in transforms.py

import apache_beam as beam
from klio.transforms import decorators

class MyTransform(beam.DoFn):
    @decorators.handle_klio
    def process(self, item):
        my_templated_value = self._klio.config.job_config.a_key
        # my_templated_value == 'foo'
        ...

Overriding Values

Similar to templating, Klio supports overriding the values of keys during runtime. While templating can be used for both keys and values in klio-job.yaml, overriding is limited to just values.

For example, a snippet of a Klio job’s configuration might look like:

# klio-job.yaml snippet
job_name: my-job
job_config:
  a_key: a_default_value
  b_key: another_default_value
  c_key: some_other_default_value

Then, use the --override flag (supported on most klio job subcommands – see CLI documentation for which commands supports the flag) to override the desired keys:

# command accepts multiple `--override` flags
$ klio job run \
  --override job_config.a_key=a_new_value \
  --override job_config.b_key=b_new_value

Attention

Be sure to provide the full path to the desired key to change. In this example, a_key is nested under job_config, so the full path is job_config.a_key.

Within your Klio job, you’ll have access to these configuration values just like normal:

# in transforms.py

import apache_beam as beam
from klio.transforms import decorators

class MyTransform(beam.DoFn):
    @decorators.handle_klio
    def process(self, item):
        a_value = self._klio.config.job_config.a_key
        # a_value == 'a_new_value'
        c_value = self._klio.config.job_config.c_key
        # c_value == 'some_other_default_value'
        ...

Attention

In order to override a value for a data/event input or output, you must specify a name in its configuration, and then use the name in the path of the override:

job_config:
  events:
    inputs:
      - type: pubsub
        name: my_input
        topic: my/pubsub/topic
        subscription: my/pubsub/subscription
$ klio job run \
  --override job_config.events.inputs.my_input.topic=/my/other/pubsub/topic

Examples

Streaming

Case:

  • Runner: DirectRunner

  • Events: Consume KlioMessage events from a Google Pub/Sub subscription; write KlioMessage events to a Google Pub/Sub topic.

  • Data: Read input binary data from a GCS bucket; write output binary data to a GCS bucket.

version: 2
job_name: streaming-example
pipeline_options:
  streaming: True
  project: my-gcp-project
  worker_harness_container_image: streaming-example-job-image
  runner: DirectRunner
job_config:
  events:
    inputs:
      - type: pubsub
        subscription: projects/my-gcp-project/subscriptions/my-input-subscription
        skip_klio_read: False
    outputs:
      - type: pubsub
        topic: projects/my-gcp-project/topics/my-output-topics
        skip_klio_write: False
  data:
    inputs:
      - type: gcs
        location: gs://my-input-bucket
        file_suffix: .ogg
        skip_klio_existence_check: False
    outputs:
      - type: gcs
        location: gs://my-output-bucket
        file_suffix: .wav
        skip_klio_existence_check: False

Case:

  • Runner: DataflowRunner

  • Events: Consume KlioMessage events from a Google Pub/Sub subscription; write KlioMessage events to a Google Pub/Sub topic.

  • Data: Read input binary data from a GCS bucket; write output binary data to a GCS bucket.

version: 2
job_name: streaming-dataflow-example
pipeline_options:
  streaming: True
  project: my-gcp-project
  worker_harness_container_image: streaming-dataflow-example-job-image
  experiments:
    - enable_stackdriver_agent_metrics
    - beam_fn_api
  region: europe-west1
  staging_location: gs://streaming-dataflow-example-bucket/staging
  num_workers: 2
  autoscaling_algorithm: NONE
  disk_size_gb: 50
  worker_disk_type: pd-ssd
  worker_machine_type: n1-highmem-2
  runner: DataflowRunner
job_config:
  events:
    inputs:
      - type: pubsub
        subscription: projects/my-gcp-project/subscriptions/my-input-subscription
        skip_klio_read: False
    outputs:
      - type: pubsub
        topic: projects/my-gcp-project/topics/my-output-topics
        skip_klio_write: False
  data:
    inputs:
      - type: gcs
        location: gs://streaming-dataflow-example-bucket/my-input-folder
        file_suffix: .ogg
        skip_klio_existence_check: False
    outputs:
      - type: gcs
        location: gs://streaming-dataflow-example-bucket/my-output-folder
        file_suffix: .wav
        skip_klio_existence_check: False

Batch

Case:

  • Runner: DirectRunner

  • Events: Generate KlioMessage events from a local file; write KlioMessage events to a local file.

  • Data: Read input binary data from a GCS bucket; write output binary data to a GCS bucket.

version: 2
job_name: batch-example
pipeline_options:
  streaming: False
  worker_harness_container_image: batch-example-job-image
  runner: DirectRunner
job_config:
  events:
    inputs:
      - type: file
        location: ./batch_track_ids.txt
        skip_klio_read: False
    outputs:
      - type: file
        location: ./batch_track_ids_output
        skip_klio_write: False
  data:
    inputs:
      - type: gcs
        location: gs://batch-example-job/my-input-bucket
        file_suffix: .ogg
        skip_klio_existence_check: False
    outputs:
      - type: gcs
        location: gs://batch-example-job/my-output-bucket
        file_suffix: .wav
        skip_klio_existence_check: False

Case:

  • Runner: DataflowRunner

  • Events: Generate KlioMessage events from a local file; write KlioMessage events to a local file.

  • Data: Read input binary data from a GCS bucket; write output binary data to a GCS bucket.

  • Use Python packaging for dependency management instead of using/packaging with Docker to run on the workers

version: 2
job_name: batch-example
pipeline_options:
  streaming: False
  worker_harness_container_image: batch-example-job-image
  project: my-gcp-project
  worker_harness_container_image: streaming-dataflow-example-job-image
  experiments:
    - enable_stackdriver_agent_metrics
  region: europe-west1
  staging_location: gs://streaming-dataflow-example-bucket/staging
  num_workers: 2
  autoscaling_algorithm: NONE
  disk_size_gb: 50
  worker_disk_type: pd-ssd
  worker_machine_type: n1-highmem-2
  runner: DataflowRunner
  setup_file: setup.py
job_config:
  events:
    inputs:
      - type: file
        location: gs://batch-example-job/event-input/batch_track_ids.txt
        skip_klio_read: False
    outputs:
      - type: file
        location: gs://batch-example-job/event-output/batch_track_ids_output
        skip_klio_write: False
  data:
    inputs:
      - type: gcs
        location: gs://batch-example-job/my-input-bucket
        file_suffix: .ogg
        skip_klio_existence_check: False
    outputs:
      - type: gcs
        location: gs://batch-example-job/my-output-bucket
        file_suffix: .wav
        skip_klio_existence_check: False