Event Configuration


Google Pub/Sub

Mode: streaming

Consuming KlioMessages from Google Pub/Sub is supported for all runners.

Example configuration for Google Pub/Sub:

name: my-cool-job
  streaming: True
            - type: pubsub
              topic: my-parent-output-topic
              subscription: my-input-subscription

Value: pubsub

Runner: Dataflow, Direct
job_config.events.inputs[].topic STR

The Google Pub/Sub topic to which the job will subscribe when started.

KlioMessages can be published to this topic via klio message publish or from a parent Klio job.

While optional, if KlioMessages are published via klio message publish, a topic is required. A topic is also required if no subscription is specified.

Runner: Dataflow, Direct


When only specifying a topic, KlioMessages will be lost between jobs as Beam creates a temporary subscription on startup.


Multiple jobs can subscribe to the same topic.

If each job has a unique subscription to the topic, they will each receive the same KlioMessage published to that topic. This is useful for multiple dependencies on a parent job, or setting up staging/canary environments.

If each job uses the same subscription to the topic, only one job will process any given KlioMessage.

job_config.events.inputs[].subscription STR

The Google Pub/Sub subscription (correlated to the topic, if configured) from which Klio will read.

A subscription is required if a topic is not specified.

Runner: Dataflow, Direct


Multiple jobs can subscribe to the same topic.

If each job has a unique subscription to the topic, they will each receive the same KlioMessage published to that topic. This is useful for multiple dependencies on a parent job, or setting up staging/canary environments.

If each job uses the same subscription to the topic, only one job will process any given KlioMessage.

job_config.events.inputs[].skip_klio_read BOOL

Inherited from global event input config.

Google BigQuery

Mode: Batch

Consuming KlioMessages from Google BigQuery is supported for all runners.

Klio allows the user to specify events using either queries or columns from a table. Tables can be specified either though a table reference in the table field (e.g. 'DATASET.TABLE' or 'PROJECT:DATASET.TABLE') or by filling in the fields for project, dataset, and table. If the columns representing the event input are not specified through the columns field, all columns from the table are returned. Note that the user cannot specify both a query and a table (with its associated project, dataset, and columns); they are mutually exclusive.

Example configuration for Google BigQuery, by specifying a table:

name: my-cool-job
    streaming: True
            - type: bq
            - entity_id
            dataset: bigquery_dataset
            project: gcp_project
            table: bigquery_table

Example configuration for Google BigQuery, by specifying a query in the BigQuery dialect:

name: my-cool-job
    streaming: True
        - type: bq
          query: |
              SELECT * FROM [gcp_project.bigquery_dataset.bigquery_table]

Value: bq

Runner: Dataflow, Direct

Not applicable if query is specified.

A list of strings specifying the columns to read events from. If only one column is provided, the value will be returned as a bytestring. If no columns are specified (meaning that all columns are selected), or if more than one column is specified, the results including the column names will be serialized to a JSON bytestring, e.g. '{"field1": "foo", "field2": bar"}'.

Runner: Dataflow, Direct
job_config.events.inputs[].columns[].<column> STR

A column name in the table or query result used to build the event input from.

Runner: Dataflow, Direct
job_config.events.inputs[].table STR

Name of the table to use for event input.

The ID must contain only letters a-z, A-Z, numbers 0-9, or underscores _.

If dataset and query arguments are not specified, then the table argument must contain the entire table reference specified as: 'DATASET.TABLE' or 'PROJECT:DATASET.TABLE'.

Runner: Dataflow, Direct
Required when using project, dataset, table, and columns to specify event inputs
job_config.events.inputs[].dataset STR

Name of the event input table’s dataset.

Ignored if the table reference is fully specified by the table argument or if a query is specified.

Runner: Dataflow, Direct
Required when using project, dataset, table, and columns to specify event inputs and table reference does not include dataset
job_config.events.inputs[].project STR

Name of the event input table’s project.

Ignored if the table reference is fully specified by the table argument or if a query is specified.

Runner: Dataflow, Direct
Required when using project, dataset, table, and columns to specify event inputs and table reference does not include project
job_config.events.inputs[].query STR

Query string supplying the columns. Mutually exclusive with specifying the table field.

Runner: Dataflow, Direct
Required if project, dataset, table and columns were not used to specify event inputs
job_config.events.inputs[].validate BOOL

If True, various checks will be done when source gets initialized (e.g., is table present?). This should be True for most scenarios in order to catch errors as early as possible (pipeline construction instead of pipeline execution). It should be False if the table is created during pipeline execution by a previous step. Defaults to True.

Runner: Dataflow, Direct
job_config.events.inputs[].coder STR

A string representing the import path to a coder for the table rows if serialized to disk.

If not specified, then the default coder is RowAsDictJsonCoder, which will interpret every line in a file as a JSON serialized dictionary. This argument needs a value only in special cases when returning table rows as dictionaries is not desirable.

Runner: Dataflow, Direct
job_config.events.inputs[].use_standard_sql BOOL

Specifies whether to use BigQuery’s standard SQL dialect for the query. The default value is False. If set to True, the query will use BigQuery’s updated SQL dialect with improved standards compliance. This parameter is ignored for table inputs.

Runner: Dataflow, Direct
job_config.events.inputs[].flatten_results BOOL

Flattens all nested and repeated fields in the query results. The default value is True.

Runner: Dataflow, Direct
job_config.events.inputs[].kms_key STR

Optional Cloud KMS key name for use when creating new tables.

Runner: Dataflow, Direct
job_config.events.inputs[].skip_klio_read BOOL

Inherited from global event input config.

Text Files

Mode: batch

Consuming KlioMessages from files hosted on Google Cloud Storage is supported for all runners. Each line of a file is converted to KlioMessage.data.element.


Read from local files is only supported on Direct Runner.

Example configuration for reading local files:

name: my-cool-job
      - type: file
        location: ./input-ids.txt

Example configuration for reading files from Google Cloud Storage:

name: my-cool-job
      - type: file
        location: gs://my-event-input-bucket/input-ids.txt

Value: file

Runner: Dataflow, Direct


Mode: Batch

KlioReadFromAvro reads records in from the provided avro location. If the records’ schema include an element field, the KlioMessage data.element will be set to the value. If there is no element field on the records read in from the avro, the entire record will be cast to bytes and stuffed into the KlioMessage data.element field.

Example configuration for reading elements from a directory of avro files:

name: my-cool-batch-job
    streaming: False
            - type: avro
              location: gs://my-bucket/

Example configuration for reading elements from a specific avro file:

name: my-cool-batch-job
    streaming: False
            - type: avro
              location: gs://my-bucket
              file_pattern: my_specific_file.avro

Value: avro

Runner: Dataflow, Direct
job_config.events.inputs[].location STR

The file path to read from as a local file path or a GCS gs:// path. The path can contain glob characters (*, ?, and [...] sets).


Local files are only supported for Direct Runner.

Runner: Dataflow, Direct
job_config.events.inputs[].file_pattern STR

Pattern of file name(s) to read. This field is optional if job_config.events.inputs[].location is provided.

Runner: Dataflow, Direct


If both job_config.events.inputs[].location and job_config.events.inputs[].file_pattern are provided, the two fields will be joined to to find files matching file_pattern located in the provided location path.

job_config.events.inputs[].min_bundle_size INT

Minimum size of bundles that should be generated when splitting this source into bundles. See apache_beam.io.filebasedsource.FileBasedSource for more details.

Default is 0.

Runner: Dataflow, Direct
job_config.events.inputs[].compression_type STR

Used to handle compressed input files. Typical value is CompressionTypes.AUTO, in which case the underlying file_path’s extension will be used to detect the compression.

Default is apache_beam.io.filesystem.CompressionTypes.AUTO.

Runner: Dataflow, Direct
job_config.events.inputs[].strip_trailing_newlines BOOL

Indicates whether this source should remove the newline char in each line it reads before decoding that line.

Default is True.

Runner: Dataflow, Direct
job_config.events.inputs[].validate BOOL

Flag to verify that the files exist during the pipeline creation time.

Default is True.

Runner: Dataflow, Direct
job_config.events.inputs[].skip_header_lines INT

Number of header lines to skip. Same number is skipped from each source file. Must be 0 or higher. Large number of skipped lines might impact performance.

Default is 0.

Runner: Dataflow, Direct
job_config.events.inputs[].coder STR

Coder used to decode each line.

Defaults to apache_beam.coders.coders.StrUtf8Coder.

Runner: Dataflow, Direct
job_config.events.inputs[].skip_klio_read BOOL

Inherited from global event input config.


Mode: streaming or batch

Example configuration for a custom event input that is not supported by Klio:

name: my-cool-job
            - type: custom
              some_key: some_value

Value: custom

Runner: Dataflow, Direct
job_config.events.inputs[].skip_klio_read BOOL

Inherited from global event input config. This will be set to True automatically.

job_config.events.inputs[].<custom-key> ANY

Any arbitrary key-value pairs for custom event input configuration specific to a job.


Google Pub/Sub

Mode: streaming

Publishing KlioMessages to Google Pub/Sub is supported for all runners.

Example configuration for Google Pub/Sub:

name: my-cool-job
    streaming: True
        - type: pubsub
          topic: my-output-topic

Value: pubsub

Runner: Dataflow, Direct
job_config.events.outputs[].topic STR

The topic that this job will publish to once it has finished processing. Unless skip_klio_write is True, Klio will automatically write KlioMessages to this topic signifying work is completed.

Runner: Dataflow, Direct
job_config.events.outputs[].skip_klio_write BOOL

Inherited from global event output config.


Mode: Batch

KlioWriteToAvro writes records in the provided local or GCS location.

The avro schema defaults to a bytes field keyed as element. The transform takes the incoming record, parses it to a KlioMessage, then sets the KlioMessage field data.element as the value of the avro schema field element. The transform can also be used to write avro files locally when testing on DirectRunner.

Example configuration for writing elements to a local avro file or an avro file stored in GCS.

name: my-cool-batch-job
    streaming: False
            - type: avro
              location: gs://my-bucket/avro-subdirectory

Value: avro

Runner: Dataflow, Direct
job_config.events.outputs[].location STR

Location of local or GCS avro file(s) to write to. This field is optional if job_config.events.outputs[].file_path_prefix is provided.

Runner: Dataflow, Direct
job_config.events.outputs[].file_pattern STR

Pattern of file name(s) to read. This field is optional if job_config.events.outputs[].location is provided.

Runner: Dataflow, Direct
job_config.events.outputs[].codec STR

Codec to use for block-level compression. Default value is 'deflate'.

Runner: Dataflow, Direct
job_config.events.outputs[].file_name_suffix STR

Suffix of files to write.

Runner: Dataflow, Direct
job_config.events.outputs[].num_shards STR

Number of shards to use when writing files.

Runner: Dataflow, Direct
job_config.events.outputs[].shard_name_template STR

Template for file shard names.

Runner: Dataflow, Direct
job_config.events.outputs[].mime_type STR

Mime type of written files. Defaults to 'application/x-avro'

Runner: Dataflow, Direct
job_config.events.inputs[].skip_klio_write BOOL

Inherited from global event input config.

Google BigQuery

Mode: Batch

Publishing KlioMessages to Google BigQuery is supported for all runners.

The payload is used from KlioMessage.data to create a row in the designated BigQuery table. The value of the payload must be JSON-serializable since Klio loads the payload data as JSON before writing to BigQuery.

Example configuration for Google BigQuery:

name: my-cool-job
      - type: bq
        project: my-project
        dataset: my-dataset
        table: my-table
        schema: |
                "fields": [
                        "name": "file_name",
                        "type": "STRING",
                        "mode": "REQUIRED"
                        "name": "file_size_bytes",
                        "type": "INTEGER",
                        "mode": "NULLABLE"

Value: bq

Runner: Dataflow, Direct
job_config.events.outputs[].project STR

The ID of the project containing this table or None if the table reference is specified entirely by the table argument.

Runner: Dataflow, Direct
job_config.events.outputs[].dataset STR

The ID of the dataset containing this table or None if the table reference is specified entirely by the table argument.

Runner: Dataflow, Direct
job_config.events.outputs[].table STR

The ID of the table. The ID must contain only letters a-z, A-Z, numbers 0-9, or underscores _. If dataset argument is None then the table argument must contain the entire table reference specified as: 'DATASET.TABLE' or 'PROJECT:DATASET.TABLE'.

Runner: Dataflow, Direct
job_config.events.outputs[].schema DICT

The schema to be used if the BigQuery table to write has to be created. This should be specified as the string representing a dictionary in the form of:

    "fields": [
            "name": "<field name>",  # required
            "type": "<TYPE>",  # required
            "mode": "<MODE>",  # required
            "description": "<DESCRIPTION>"  # optional
        }, ...

Refer to the BigQuery schema documentation for more information on defining a schema.

Runner: Dataflow, Direct
job_config.events.outputs[].create_disposition STR

A string describing what happens if the table does not exist. Possible values are:

  • "CREATE_IF_NEEDED": create if does not exist.

  • "CREATE_NEVER": fail the write if does not exist.

Refer to BigQuery create disposition documentation for more information.

Defaults to "CREATE_IF_NEEDED".

Runner: Dataflow, Direct
job_config.events.outputs[].write_disposition STR

A string describing what happens if the table has already some data. Possible values are:

  • "WRITE_TRUNCATE": delete existing rows.

  • "WRITE_APPEND": add to existing rows.

  • "WRITE_EMPTY": fail the write if table not empty.

Refer to BigQuery write disposition documentation for more information.

Defaults to "WRITE_APPEND".


For streaming pipelines, "WRITE_TRUNCATE" can not be used; a ValueError will be raised.

Runner: Dataflow, Direct

Text Files

Mode: batch

Writing KlioMessages to files hosted on Google Cloud Storage is supported for all runners. Each line of a file is converted to KlioMessage.data.element.


Writing to local files is only supported on Direct Runner.

Example configuration for writing to local files:

name: my-cool-job
      - type: file
        location: ./output-ids
        file_name_suffix: .txt

Example configuration for writing files to Google Cloud Storage:

name: my-cool-job
      - type: file
        location: gs://my-event-input-bucket/output-ids
        file_name_suffix: .txt

Value: file

Runner: Dataflow, Direct
job_config.events.outputs[].location STR

The file path to write to as a local file path or a GCS gs:// path. The files written will begin with this location as a prefix, followed by a shard identifier (see num_shards), and end in a common extension, if given by file_name_suffix. In most cases, only this argument is specified and num_shards, shard_name_template, and file_name_suffix use default values.


Local files are only supported for Direct Runner.

Runner: Dataflow, Direct
job_config.events.outputs[].file_name_suffix STR

Suffix for the files written. Can be used to define desired file extension.

Defaults to "".

Runner: Dataflow, Direct
job_config.events.outputs[].append_trailing_newlines BOOL

Indicate whether this sink should write an additional newline char after writing each element.

Defaults to True.

Runner: Dataflow, Direct
job_config.events.outputs[].num_shards INT

The number of files (shards) used for output. If not set, the runner will decide on the optimal number of shards.


Constraining the number of shards is likely to reduce the performance of a pipeline. Setting this value is not recommended unless you require a specific number of output files.

Runner: Dataflow, Direct
job_config.events.outputs[].shard_name_template STR

A template string containing placeholders for the shard number and shard count. Currently only '' and '-SSSSS-of-NNNNN' are supported patterns. When constructing a filename for a particular shard number, the upper-case letters S and N are replaced with the 0-padded shard number and shard count respectively. This argument can be '' in which case it behaves as if num_shards was set to 1 and only one file will be generated.

The default pattern used is '-SSSSS-of-NNNNN'.

Runner: Dataflow, Direct
job_config.events.outputs[].coder STR

Coder used to encode each line.

Defaults to apache_beam.coders.coders.ToBytesCoder.

Runner: Dataflow, Direct
job_config.events.outputs[].compression_type STR

Used to handle compressed output files. Typical value is apache_beam.io.filesystem.CompressionTypes.AUTO, in which case the final file path’s extension (as determined by location, file_name_suffix, num_shards and shard_name_template) will be used to detect the compression.

Defaults to apache_beam.io.filesystem.CompressionTypes.AUTO.

Runner: Dataflow, Direct
job_config.events.outputs[].header STR

String to write at beginning of file as a header. If not None and append_trailing_newlines is set to True, \n will be added.

Defaults to None.

Runner: Dataflow, Direct
job_config.events.outputs[].footer STR

String to write at the end of file as a footer. If not None and append_trailing_newlines is set to True, \n will be added.

Defaults to None.

Runner: Dataflow, Direct
job_config.events.inputs[].skip_klio_write BOOL

Inherited from global event output config.


Mode: streaming or batch

Example configuration for a custom event input that is not supported by Klio:

name: my-cool-job
        - type: custom
          some_key: some_value

Value: custom | Runner: Dataflow, Direct | Required

job_config.events.inputs[].skip_klio_write BOOL

Inherited from global event output config. This will be set to True automatically.

job_config.events.outputs[].<custom-key> ANY

Any arbitrary key-value pairs for custom event output configuration specific to a job.