Mode: streaming
Consuming KlioMessages from Google Pub/Sub is supported for all runners.
Example configuration for Google Pub/Sub:
name: my-cool-job pipeline_options: streaming: True job_config: events: inputs: - type: pubsub topic: my-parent-output-topic subscription: my-input-subscription
job_config.events.inputs
[].type
Value: pubsub
pubsub
[].topic STR
The Google Pub/Sub topic to which the job will subscribe when started.
KlioMessages can be published to this topic via klio message publish or from a parent Klio job.
klio message publish
While optional, if KlioMessages are published via klio message publish, a topic is required. A topic is also required if no subscription is specified.
topic
subscription
Warning
When only specifying a topic, KlioMessages will be lost between jobs as Beam creates a temporary subscription on startup.
Note
Multiple jobs can subscribe to the same topic.
If each job has a unique subscription to the topic, they will each receive the same KlioMessage published to that topic. This is useful for multiple dependencies on a parent job, or setting up staging/canary environments.
KlioMessage
If each job uses the same subscription to the topic, only one job will process any given KlioMessage.
[].subscription STR
The Google Pub/Sub subscription (correlated to the topic, if configured) from which Klio will read.
A subscription is required if a topic is not specified.
[].skip_klio_read BOOL
Inherited from global event input config.
Mode: Batch
Consuming KlioMessages from Google BigQuery is supported for all runners.
Klio allows the user to specify events using either queries or columns from a table. Tables can be specified either though a table reference in the table field (e.g. 'DATASET.TABLE' or 'PROJECT:DATASET.TABLE') or by filling in the fields for project, dataset, and table. If the columns representing the event input are not specified through the columns field, all columns from the table are returned. Note that the user cannot specify both a query and a table (with its associated project, dataset, and columns); they are mutually exclusive.
'DATASET.TABLE'
'PROJECT:DATASET.TABLE'
Example configuration for Google BigQuery, by specifying a table:
name: my-cool-job pipeline_options: streaming: True job_config: events: inputs: - type: bq columns: - entity_id dataset: bigquery_dataset project: gcp_project table: bigquery_table
Example configuration for Google BigQuery, by specifying a query in the BigQuery dialect:
name: my-cool-job pipeline_options: streaming: True job_config: events: inputs: - type: bq query: | SELECT * FROM [gcp_project.bigquery_dataset.bigquery_table]
Value: bq
bq
[].columns[]
Not applicable if query is specified.
query
A list of strings specifying the columns to read events from. If only one column is provided, the value will be returned as a bytestring. If no columns are specified (meaning that all columns are selected), or if more than one column is specified, the results including the column names will be serialized to a JSON bytestring, e.g. '{"field1": "foo", "field2": bar"}'.
'{"field1": "foo", "field2": bar"}'
[].columns[].<column> STR
A column name in the table or query result used to build the event input from.
[].table STR
Name of the table to use for event input.
The ID must contain only letters a-z, A-Z, numbers 0-9, or underscores _.
a-z
A-Z
0-9
_
If dataset and query arguments are not specified, then the table argument must contain the entire table reference specified as: 'DATASET.TABLE' or 'PROJECT:DATASET.TABLE'.
[].dataset STR
Name of the event input table’s dataset.
Ignored if the table reference is fully specified by the table argument or if a query is specified.
[].project STR
Name of the event input table’s project.
[].query STR
Query string supplying the columns. Mutually exclusive with specifying the table field.
table
[].validate BOOL
If True, various checks will be done when source gets initialized (e.g., is table present?). This should be True for most scenarios in order to catch errors as early as possible (pipeline construction instead of pipeline execution). It should be False if the table is created during pipeline execution by a previous step. Defaults to True.
True
False
[].coder STR
A string representing the import path to a coder for the table rows if serialized to disk.
If not specified, then the default coder is RowAsDictJsonCoder, which will interpret every line in a file as a JSON serialized dictionary. This argument needs a value only in special cases when returning table rows as dictionaries is not desirable.
RowAsDictJsonCoder
[].use_standard_sql BOOL
Specifies whether to use BigQuery’s standard SQL dialect for the query. The default value is False. If set to True, the query will use BigQuery’s updated SQL dialect with improved standards compliance. This parameter is ignored for table inputs.
[].flatten_results BOOL
Flattens all nested and repeated fields in the query results. The default value is True.
[].kms_key STR
Optional Cloud KMS key name for use when creating new tables.
Mode: batch
Consuming KlioMessages from files hosted on Google Cloud Storage is supported for all runners. Each line of a file is converted to KlioMessage.data.element.
Attention
Read from local files is only supported on Direct Runner.
Example configuration for reading local files:
name: my-cool-job job_config: events: inputs: - type: file location: ./input-ids.txt
Example configuration for reading files from Google Cloud Storage:
name: my-cool-job job_config: events: inputs: - type: file location: gs://my-event-input-bucket/input-ids.txt
Value: file
file
KlioReadFromAvro reads records in from the provided avro location. If the records’ schema include an element field, the KlioMessage data.element will be set to the value. If there is no element field on the records read in from the avro, the entire record will be cast to bytes and stuffed into the KlioMessage data.element field.
KlioReadFromAvro
element
data.element
Example configuration for reading elements from a directory of avro files:
name: my-cool-batch-job pipeline_options: streaming: False job_config: events: inputs: - type: avro location: gs://my-bucket/
Example configuration for reading elements from a specific avro file:
name: my-cool-batch-job pipeline_options: streaming: False job_config: events: inputs: - type: avro location: gs://my-bucket file_pattern: my_specific_file.avro
Value: avro
avro
[].location STR
The file path to read from as a local file path or a GCS gs:// path. The path can contain glob characters (*, ?, and [...] sets).
gs://
*
?
[...]
Local files are only supported for Direct Runner.
[].file_pattern STR
Pattern of file name(s) to read. This field is optional if job_config.events.inputs[].location is provided.
job_config.events.inputs[].location
If both job_config.events.inputs[].location and job_config.events.inputs[].file_pattern are provided, the two fields will be joined to to find files matching file_pattern located in the provided location path.
job_config.events.inputs[].file_pattern
file_pattern
location
[].min_bundle_size INT
Minimum size of bundles that should be generated when splitting this source into bundles. See apache_beam.io.filebasedsource.FileBasedSource for more details.
apache_beam.io.filebasedsource.FileBasedSource
Default is 0.
0
[].compression_type STR
Used to handle compressed input files. Typical value is CompressionTypes.AUTO, in which case the underlying file_path’s extension will be used to detect the compression.
CompressionTypes.AUTO
Default is apache_beam.io.filesystem.CompressionTypes.AUTO.
apache_beam.io.filesystem.CompressionTypes.AUTO
[].strip_trailing_newlines BOOL
Indicates whether this source should remove the newline char in each line it reads before decoding that line.
Default is True.
Flag to verify that the files exist during the pipeline creation time.
[].skip_header_lines INT
Number of header lines to skip. Same number is skipped from each source file. Must be 0 or higher. Large number of skipped lines might impact performance.
Coder used to decode each line.
Defaults to apache_beam.coders.coders.StrUtf8Coder.
apache_beam.coders.coders.StrUtf8Coder
Mode: streaming or batch
Example configuration for a custom event input that is not supported by Klio:
name: my-cool-job job_config: events: inputs: - type: custom some_key: some_value
Value: custom
custom
Inherited from global event input config. This will be set to True automatically.
[].<custom-key> ANY
Any arbitrary key-value pairs for custom event input configuration specific to a job.
Publishing KlioMessages to Google Pub/Sub is supported for all runners.
name: my-cool-job pipeline_options: streaming: True job_config: events: outputs: - type: pubsub topic: my-output-topic
job_config.events.outputs
The topic that this job will publish to once it has finished processing. Unless skip_klio_write is True, Klio will automatically write KlioMessages to this topic signifying work is completed.
skip_klio_write
[].skip_klio_write BOOL
Inherited from global event output config.
KlioWriteToAvro writes records in the provided local or GCS location.
KlioWriteToAvro
The avro schema defaults to a bytes field keyed as element. The transform takes the incoming record, parses it to a KlioMessage, then sets the KlioMessage field data.element as the value of the avro schema field element. The transform can also be used to write avro files locally when testing on DirectRunner.
DirectRunner
Example configuration for writing elements to a local avro file or an avro file stored in GCS.
name: my-cool-batch-job pipeline_options: streaming: False job_config: events: outputs: - type: avro location: gs://my-bucket/avro-subdirectory
Location of local or GCS avro file(s) to write to. This field is optional if job_config.events.outputs[].file_path_prefix is provided.
job_config.events.outputs[].file_path_prefix
Pattern of file name(s) to read. This field is optional if job_config.events.outputs[].location is provided.
job_config.events.outputs[].location
[].codec STR
Codec to use for block-level compression. Default value is 'deflate'.
'deflate'
[].file_name_suffix STR
Suffix of files to write.
[].num_shards STR
Number of shards to use when writing files.
[].shard_name_template STR
Template for file shard names.
[].mime_type STR
Mime type of written files. Defaults to 'application/x-avro'
'application/x-avro'
Publishing KlioMessages to Google BigQuery is supported for all runners.
The payload is used from KlioMessage.data to create a row in the designated BigQuery table. The value of the payload must be JSON-serializable since Klio loads the payload data as JSON before writing to BigQuery.
payload
Example configuration for Google BigQuery:
name: my-cool-job job_config: events: outputs: - type: bq project: my-project dataset: my-dataset table: my-table schema: | { "fields": [ { "name": "file_name", "type": "STRING", "mode": "REQUIRED" }, { "name": "file_size_bytes", "type": "INTEGER", "mode": "NULLABLE" } ] }
The ID of the project containing this table or None if the table reference is specified entirely by the table argument.
None
The ID of the dataset containing this table or None if the table reference is specified entirely by the table argument.
The ID of the table. The ID must contain only letters a-z, A-Z, numbers 0-9, or underscores _. If dataset argument is None then the table argument must contain the entire table reference specified as: 'DATASET.TABLE' or 'PROJECT:DATASET.TABLE'.
dataset
[].schema DICT
The schema to be used if the BigQuery table to write has to be created. This should be specified as the string representing a dictionary in the form of:
{ "fields": [ { "name": "<field name>", # required "type": "<TYPE>", # required "mode": "<MODE>", # required "description": "<DESCRIPTION>" # optional }, ... ] }
Refer to the BigQuery schema documentation for more information on defining a schema.
[].create_disposition STR
A string describing what happens if the table does not exist. Possible values are:
"CREATE_IF_NEEDED": create if does not exist.
"CREATE_IF_NEEDED"
"CREATE_NEVER": fail the write if does not exist.
"CREATE_NEVER"
Refer to BigQuery create disposition documentation for more information.
Defaults to "CREATE_IF_NEEDED".
[].write_disposition STR
A string describing what happens if the table has already some data. Possible values are:
"WRITE_TRUNCATE": delete existing rows.
"WRITE_TRUNCATE"
"WRITE_APPEND": add to existing rows.
"WRITE_APPEND"
"WRITE_EMPTY": fail the write if table not empty.
"WRITE_EMPTY"
Refer to BigQuery write disposition documentation for more information.
Defaults to "WRITE_APPEND".
Caution
For streaming pipelines, "WRITE_TRUNCATE" can not be used; a ValueError will be raised.
ValueError
Writing KlioMessages to files hosted on Google Cloud Storage is supported for all runners. Each line of a file is converted to KlioMessage.data.element.
Writing to local files is only supported on Direct Runner.
Example configuration for writing to local files:
name: my-cool-job job_config: events: outputs: - type: file location: ./output-ids file_name_suffix: .txt
Example configuration for writing files to Google Cloud Storage:
name: my-cool-job job_config: events: inputs: - type: file location: gs://my-event-input-bucket/output-ids file_name_suffix: .txt
The file path to write to as a local file path or a GCS gs:// path. The files written will begin with this location as a prefix, followed by a shard identifier (see num_shards), and end in a common extension, if given by file_name_suffix. In most cases, only this argument is specified and num_shards, shard_name_template, and file_name_suffix use default values.
num_shards
file_name_suffix
shard_name_template
Suffix for the files written. Can be used to define desired file extension.
Defaults to "".
""
[].append_trailing_newlines BOOL
Indicate whether this sink should write an additional newline char after writing each element.
Defaults to True.
[].num_shards INT
The number of files (shards) used for output. If not set, the runner will decide on the optimal number of shards.
Constraining the number of shards is likely to reduce the performance of a pipeline. Setting this value is not recommended unless you require a specific number of output files.
A template string containing placeholders for the shard number and shard count. Currently only '' and '-SSSSS-of-NNNNN' are supported patterns. When constructing a filename for a particular shard number, the upper-case letters S and N are replaced with the 0-padded shard number and shard count respectively. This argument can be '' in which case it behaves as if num_shards was set to 1 and only one file will be generated.
''
'-SSSSS-of-NNNNN'
S
N
The default pattern used is '-SSSSS-of-NNNNN'.
Coder used to encode each line.
Defaults to apache_beam.coders.coders.ToBytesCoder.
apache_beam.coders.coders.ToBytesCoder
Used to handle compressed output files. Typical value is apache_beam.io.filesystem.CompressionTypes.AUTO, in which case the final file path’s extension (as determined by location, file_name_suffix, num_shards and shard_name_template) will be used to detect the compression.
Defaults to apache_beam.io.filesystem.CompressionTypes.AUTO.
[].header STR
String to write at beginning of file as a header. If not None and append_trailing_newlines is set to True, \n will be added.
append_trailing_newlines
\n
Defaults to None.
[].footer STR
String to write at the end of file as a footer. If not None and append_trailing_newlines is set to True, \n will be added.
name: my-cool-job job_config: events: outputs: - type: custom some_key: some_value
Value: custom | Runner: Dataflow, Direct | Required
Inherited from global event output config. This will be set to True automatically.
Any arbitrary key-value pairs for custom event output configuration specific to a job.