Job Configuration

Klio-specific and user-specified custom job configuration.

job_config.allow_non_klio_messages BOOL

Allow this job to process free-form, non-KlioMessage messages.

Default: False

job_config.blocking BOOL

Wait for Dataflow job to finish before exiting.

Default: False

job_config.<additional_key> ANY

Arbitrary key-value pairs for any custom configuration specific to a job. These will be attached to the job_config attribute of KlioConfig object.

See Accessing Configuration in a Job on how to access configuration within a pipeline or transform.

job_config.events

Event inputs/outputs designate where to read/write KlioMessages.

The KlioMessage contains a unique identifier of some sort that refers to a unit of work (e.g. file IDs, track IDs, etc.). This unique identifier can then be used to look up the binary data as configured in job_config.data for the job to process. A job’s events can therefore be seen as “triggers” of work needing to be done on particular binary data.

Example:

name: my-cool-job
pipeline_options:
  streaming: True
job_config:
  events:
    inputs:
      - type: pubsub
        subscription: my-input-subscription
    outputs:
      - type: pubsub
        topic: my-output-topic

job_config.events.inputs[]

A list of input configuration that will be will used to determine when and how to do work.

If more than one input is configured, please familiarized yourself with how multiple configured inputs are handled in Klio.

job_config.events.inputs[].type STR

Type of input the job is reading events from.

See Event Configuration for the supported configuration by type.

Streaming Options: pubsub, custom
Batch Options: file, avro, bigquery, custom
job_config.events.inputs[].<type_specific_config>

See Event Configuration for the supported configuration by type.

job_config.events.inputs[].skip_klio_read BOOL

Klio will automatically read from the configured input unless this value is set to True.

If all declared inputs are configured to skip Klio’s automatic reading from Pub/Sub, the Pipeline object will then be given to the job’s run.py::run function instead of a PCollection object. In this case, you must implement the reading behavior (i.e. using a different Beam I/O transform).

Useful for implementing different behavior than the default, or to toggle off multiple inputs.

Default: False

job_config.events.outputs[]

A list of output configurations that Klio will use to signify that work has been completed.

Warning

Currently, only one event output configuration is supported in Klio out of the box.

If more than one output is required, set skip_klio_write of each output configuration to True.

job_config.events.outputs[].type STR

Type of output the job is writing events to.

See Event Configuration for the supported configuration by type.

Streaming Options: pubsub, custom
Batch Options: file, bigquery, custom
job_config.events.outputs[].<type_specific_config>

See Event Configuration for the supported configuration by type.

job_config.events.outputs[].skip_klio_write BOOL

Klio will automatically write to this output topic unless this value is set to True.

Useful for implementing different behavior than the default, using multiple outputs, or to toggle off event output.

Default: False

job_config.data

Data inputs/outputs refer to where the files are (typically GCS buckets) that KlioMessages generated by event inputs refer to.

job_config.data.inputs[]

A list of input configurations that Klio will use to look for data to be processed.

By default, Klio will drop a KlioMessage when input data for the corresponding element ID does not exist. Set skip_klio_existence_check to True to implement non-default behavior.

Note

Klio does not upload data automatically to the configured location. This must be done from within the pipeline.

Warning

Currently, only one data input configuration is supported in Klio out of the box.

If more than one input is required, set skip_klio_existence_check of each input configuration to True.

job_config.data.inputs[].type STR

Type of input the job is reading data from.

See Data Configuration for the supported configuration by type.

Options: gcs, custom

job_config.data.inputs[].<type_specific_config>

See Data Configuration for the supported configuration by type.

job_config.data.inputs[].ping BOOL

Set a global ping mode of KlioMessages.

When True, ping mode will not trigger transforms for messages and send it directly to configured event output.

If ping is set on an individual KlioMessage - whether True or False - that setting will be preferred over this global setting.

Default: False

job_config.data.inputs[].skip_klio_existence_check BOOL

Tell Klio to skip its default input data existence check. Set this to True when input data existence checks are not needed, or to implement behavior different than the default.

Read more about how Klio performs these data existence checks.

Default: False

job_config.data.outputs[]

A list of output configurations that Klio will use to look for data that has already been processed.

Note

Klio does not upload data automatically to the configured location. This must be done from within the pipeline.

Warning

Currently, only one data output configuration is supported in Klio out of the box.

If more than one output is required, set skip_klio_existence_check of each output configuration to True.

job_config.data.outputs[].type STR

Type of output the job is writing data to.

See Data Configuration for the supported configuration by type.

Options: gcs, custom

job_config.data.outputs[].<type_specific_config>

See Data Configuration for the supported configuration by type.

job_config.data.outputs[].force BOOL

Set a global force of KlioMessages if output data already exists.

When True, force mode will force the pipeline to process work when its corresponding output data already exists.

If force is set on an individual KlioMessage - whether True or False - that setting will be preferred over this global setting.

Default: False

job_config.data.outputs[].skip_klio_existence_check BOOL

Tell Klio to skip its default output data existence check. Set this to True when output data existence checks are not needed, or to implement behavior different than the default.

Read more about how Klio performs these data existence checks.

Default: False

job_config.metrics

With no additional configuration needed, metrics will be turned on and collected. The default client depends on the runner:

DataflowRunner: Native Apache Beam metrics client
DirectGKERunner: Shumway metrics client
DirectRunner: Python standard library logging

Caution

When running on Dataflow, in order for the Native metrics client to be able to report metrics to Stackdriver, the following experiment must be added to klio-job.yaml:

# <--snip-->
pipeline_options:
  experiments:
    - enable_stackdriver_agent_metrics
# <--snip-->

See documentation on metrics for information on how to emit metrics from a pipeline.

job_config.metrics.logger DICT | BOOL

Default metrics client on DirectRunner. To turn it off, set this key to False. To adjust its configuration, use the properties level and timer_unit.

job_config.metrics.logger.level STR

Log level at which metrics are emitted.

Options: debug, info, warning, error, critical
Default: debug
job_config.metrics.logger.timer_unit STR

Globally set the default unit of time for timers.

Options: ns, nanoseconds, us, microseconds, ms, milliseconds, s, seconds
Default: ns
job_config.metrics.native DICT | BOOL

Default metrics client on DataflowRunner. To turn it off, set this key to False.

job_config.metrics.native.timer_unit STR

Globally set the default unit of time for timers.

Options: ns, nanoseconds, us, microseconds, ms, milliseconds, s, seconds
Default: ns
job_config.metrics.shumway DICT | BOOL

Default metrics client on DirectGKERunner. To turn it off, set this key to False.

job_config.metrics.shumway.timer_unit STR

Globally set the default unit of time for timers.

Options: ns, nanoseconds, us, microseconds, ms, milliseconds, s, seconds
Default: ns