A Klio job’s configuration is defined in klio-job.yaml.
klio-job.yaml
Tip
Klio defaults to reading a YAML file named klio-job.yaml for configuration.
Point the klio command to a different configuration filename via the -c/--config-file flag, e.g.
klio
-c/--config-file
$ klio job run --config-file klio-job-staging.yaml
A klio-job.yaml file have the following top-level keys:
version
INT
1
2
Warning
Version 1 has been deprecated and removed from Klio as of klio-cli version 1.0.0, klio version 0.2.0, and klio-exec version 0.2.0.
klio-cli
1.0.0
0.2.0
klio-exec
job_name
STR
The job’s name. This should be unique within a particular GCP project - different projects can have a job with the same name.
Required
pipeline_options
DICT
Configuration under pipeline_options map directly to Beam and Dataflow options. See Beam Pipeline Options for some commonly-used options, but any option that Beam & Dataflow support are also supported here. More information can be found in the official Beam docs.
job_config
User-specified custom and Klio-specific job configuration. See Job Configuration for supported Klio configuration as well as where to specify any additional custom configuration needed for a job.
A Klio job has access to its configuration in a couple of ways: via the job’s pipeline definition, and within the transforms themselves.
In a Klio job’s run.py file where the pipeline is defined, the run function will be provided with the KlioContext.config object. For example:
run.py
run
KlioContext.config
# in run.py def run(in_pcol, config): my_custom_config = config.job_config.my_custom_config out_pcol = in_pcol | MyTransform(my_custom_config) return out_pcol
Important
When using a runner other than DirectRunner, if access to configuration is needed within a transform’s logic, use the approached defined below. This is because the config object itself is not pickle-able.
DirectRunner
config
Instantiating class-based transforms occur during the launching of the pipeline, then gets pickled to be unloaded onto the remote worker (e.g. a Dataflow worker). Therefore, any instance variable defined in a transform’s __init__ method that is not a pickable object will not be accessible to the rest of the class when the worker executes the transform’s logic.
__init__
While the job’s configuration is automatically provided to a job’s run.py::run function, if access to configuration is needed for a transform’s logic, then use the provided Utilities.
run.py::run
# in transforms.py import apache_beam as beam from klio.transforms import decorators class MyTransform(beam.DoFn): @decorators.handle_klio def process(self, item): my_custom_config = self._klio.config.job_config.my_custom_config ...
Klio supports templating klio-job.yaml as well as overriding the configuration’s values via the CLI.
To template any key or value in klio-job.yaml, declare it with ${...}, for example:
${...}
# klio-job.yaml snippet job_name: my-job job_config: a_key: ${my_template_value} ${a_template_key}: some_static_value
Then, use the --template flag (supported on most klio job subcommands – see CLI documentation for which commands supports the flag) to provide the content for the templated variables:
--template
klio job
# command accepts multiple `--template` flags $ klio job run \ --template my_template_value=foo \ --template a_template_key=some_variable_key
Within your Klio job, you’ll have access to these configuration values just like normal:
# in transforms.py import apache_beam as beam from klio.transforms import decorators class MyTransform(beam.DoFn): @decorators.handle_klio def process(self, item): my_templated_value = self._klio.config.job_config.a_key # my_templated_value == 'foo' ...
Similar to templating, Klio supports overriding the values of keys during runtime. While templating can be used for both keys and values in klio-job.yaml, overriding is limited to just values.
For example, a snippet of a Klio job’s configuration might look like:
# klio-job.yaml snippet job_name: my-job job_config: a_key: a_default_value b_key: another_default_value c_key: some_other_default_value
Then, use the --override flag (supported on most klio job subcommands – see CLI documentation for which commands supports the flag) to override the desired keys:
--override
# command accepts multiple `--override` flags $ klio job run \ --override job_config.a_key=a_new_value \ --override job_config.b_key=b_new_value
Attention
Be sure to provide the full path to the desired key to change. In this example, a_key is nested under job_config, so the full path is job_config.a_key.
a_key
job_config.a_key
# in transforms.py import apache_beam as beam from klio.transforms import decorators class MyTransform(beam.DoFn): @decorators.handle_klio def process(self, item): a_value = self._klio.config.job_config.a_key # a_value == 'a_new_value' c_value = self._klio.config.job_config.c_key # c_value == 'some_other_default_value' ...
In order to override a value for a data/event input or output, you must specify a name in its configuration, and then use the name in the path of the override:
job_config: events: inputs: - type: pubsub name: my_input topic: my/pubsub/topic subscription: my/pubsub/subscription
$ klio job run \ --override job_config.events.inputs.my_input.topic=/my/other/pubsub/topic
Case:
Runner: DirectRunner
Events: Consume KlioMessage events from a Google Pub/Sub subscription; write KlioMessage events to a Google Pub/Sub topic.
KlioMessage
Data: Read input binary data from a GCS bucket; write output binary data to a GCS bucket.
version: 2 job_name: streaming-example pipeline_options: streaming: True project: my-gcp-project worker_harness_container_image: streaming-example-job-image runner: DirectRunner job_config: events: inputs: - type: pubsub subscription: projects/my-gcp-project/subscriptions/my-input-subscription skip_klio_read: False outputs: - type: pubsub topic: projects/my-gcp-project/topics/my-output-topics skip_klio_write: False data: inputs: - type: gcs location: gs://my-input-bucket file_suffix: .ogg skip_klio_existence_check: False outputs: - type: gcs location: gs://my-output-bucket file_suffix: .wav skip_klio_existence_check: False
Runner: DataflowRunner
version: 2 job_name: streaming-dataflow-example pipeline_options: streaming: True project: my-gcp-project worker_harness_container_image: streaming-dataflow-example-job-image experiments: - enable_stackdriver_agent_metrics - beam_fn_api region: europe-west1 staging_location: gs://streaming-dataflow-example-bucket/staging num_workers: 2 autoscaling_algorithm: NONE disk_size_gb: 50 worker_disk_type: pd-ssd worker_machine_type: n1-highmem-2 runner: DataflowRunner job_config: events: inputs: - type: pubsub subscription: projects/my-gcp-project/subscriptions/my-input-subscription skip_klio_read: False outputs: - type: pubsub topic: projects/my-gcp-project/topics/my-output-topics skip_klio_write: False data: inputs: - type: gcs location: gs://streaming-dataflow-example-bucket/my-input-folder file_suffix: .ogg skip_klio_existence_check: False outputs: - type: gcs location: gs://streaming-dataflow-example-bucket/my-output-folder file_suffix: .wav skip_klio_existence_check: False
Events: Generate KlioMessage events from a local file; write KlioMessage events to a local file.
version: 2 job_name: batch-example pipeline_options: streaming: False worker_harness_container_image: batch-example-job-image runner: DirectRunner job_config: events: inputs: - type: file location: ./batch_track_ids.txt skip_klio_read: False outputs: - type: file location: ./batch_track_ids_output skip_klio_write: False data: inputs: - type: gcs location: gs://batch-example-job/my-input-bucket file_suffix: .ogg skip_klio_existence_check: False outputs: - type: gcs location: gs://batch-example-job/my-output-bucket file_suffix: .wav skip_klio_existence_check: False
Use Python packaging for dependency management instead of using/packaging with Docker to run on the workers
version: 2 job_name: batch-example pipeline_options: streaming: False worker_harness_container_image: batch-example-job-image project: my-gcp-project worker_harness_container_image: streaming-dataflow-example-job-image experiments: - enable_stackdriver_agent_metrics region: europe-west1 staging_location: gs://streaming-dataflow-example-bucket/staging num_workers: 2 autoscaling_algorithm: NONE disk_size_gb: 50 worker_disk_type: pd-ssd worker_machine_type: n1-highmem-2 runner: DataflowRunner setup_file: setup.py job_config: events: inputs: - type: file location: gs://batch-example-job/event-input/batch_track_ids.txt skip_klio_read: False outputs: - type: file location: gs://batch-example-job/event-output/batch_track_ids_output skip_klio_write: False data: inputs: - type: gcs location: gs://batch-example-job/my-input-bucket file_suffix: .ogg skip_klio_existence_check: False outputs: - type: gcs location: gs://batch-example-job/my-output-bucket file_suffix: .wav skip_klio_existence_check: False