The KlioMessage is protobuf data passed between transforms (and streaming jobs). Its data represents one unit of work for a transform.
KlioMessage
The KlioMessage is found in klio.proto and can be imported via klio_core.proto.klio_pb2.
klio.proto
klio_core.proto.klio_pb2
See the FAQs page for how to work with non-KlioMessages, how to publish KlioMessages from a non-Klio job, and using custom protobuf messages.
When a streaming job is running and it receives a KlioMessage, Klio first detects if the job is in the message’s downstream path, signifying either bottom-up or top-down execution. If the job is not in the downstream path and is not an intended recipient of the message, then it should not process this message. Klio will then drop the message for no further processing.
If the job should process the received message, then Klio looks to see if the message is in ping-mode. If it is, then the message is just passed through and published to the job’s output topic.
If the message is not in ping-mode, Klio then checks to see if the output data for the message already exists. If it does exist, then Klio looks to see if the message itself of the pipeline is configured to force-reprocess already-generated output. If not, then the message is just passed through and published to the job’s output topic.
If the output data does not exist, or force mode is turned on, then Klio checks to see if the input data for the message exists. If not, Klio will drop the message and log that it can’t process any further.
Note
Coming soon! The ability for Klio to automatically trigger parent jobs for when input data doesn’t exist is in development.
If the input data does exist, then Klio will invoke the rest of the user’s pipeline as defined in run.py.
run.py
Message handling: Klio will either drop the message if it doesn’t need to or cannot process it, pass through the message directly to the output topic to avoid unnecessary work, or process the message if all conditions are met.¶
metadata
Metadata related to the message.
data
Data of the message.
version
Version of the message.
KlioMessage.Metadata
downstream
Jobs by which the message must be processed. If empty, then all jobs that receive the message will process it. If not empty, then the job will check if itself is listed within downstream. If it’s not, the message will be ignored and no work will be processed.
Deprecated. Users should migrate to Metadata.intended_recipients.
Metadata.intended_recipients
visited
Jobs by which the message has already been processed. No jobs are repeated. When a message is in ping mode (by setting ping to True), this is used to log/visualize the DAG.
ping
True
job_audit_log
Audit log for all jobs that the message has visited. This can be considered the audit trail for a message.
If True, then no transformation work will be done for this message, and the message will be published to the job’s output topic(s). The job will log about the received message. This is meant for debugging and/or visualizing the DAG.
bool
False
force
If True, and if the output data already exists for the message, then the job will force the transform to run again.
intended_recipients
Jobs by which the message must be processed. Used to detected between top-down and bottom-up execution modes.
KlioMessage.Metadata.Recipients
One of the following attributes are required:
anyone
Current message is intended for any recipient, signifying top-down execution. Mutually exclusive with KlioMessage.Metadata.Recipients.limited.
KlioMessage.Metadata.Recipients.limited
limited
Current message is intended for the included recipients, signifying bottom-up execution. Mutually exclusive with KlioMessage.Metadata.Recipients.anyone.
KlioMessage.Metadata.Recipients.anyone
KlioMessage.Metadata.Recipients.Anyone
This is an empty “stub” message. Its presence is used to simply signify top-down execution.
KlioMessage.Metadata.Recipients.Limited
recipients
An array of KlioJobs. Only jobs included in recipients should process the message. Otherwise, the job should just drop the message to avoid further processing.
trigger_children_of
When set to a particular job, it signifies that the message was originally in top-down execution mode across a graph of jobs, but a dependency was missing for the job assigned to trigger_children_of, therefore triggering bottom-up execution for a subset of the graph. Once dependencies are made available, the job triggering bottom-up execution for that subset should then return the message to top-down mode. This is done by re-assigning KlioMessage.Metadata.intended_recipients to Anyone.
KlioMessage.Metadata.intended_recipients
Anyone
KlioMessage.Data
element
The reference identifier that refers to a particular file on which the job will perform work.
bytes
payload
Data shared between transforms. It reflects what the previous transform in the pipeline returned/yielded (if that transform was decorated with the @handle_klio decorator). The first transform in the pipeline after reading from event input will always be None.
None
See Transforms for how to make use of a message’s payload.
entity_id
Deprecated. Users should migrate to data.element.
data.element
KlioJob
Warning
KlioJob will be undergoing API changes for v2 of Klio.
job_name
Name of job (as configured in klio-job.yaml::job_name).
klio-job.yaml::job_name
string
gcp_project
GCP project of job (as configured in klio-job.yaml::pipeline_options.project).
klio-job.yaml::pipeline_options.project
inputs
The job’s event & data input(s)
Marked for deprecation.
KlioJob.JobInput
KlioJob.JobInput has been marked for deprecation for v2.
topic
The job’s Pub/Sub input topic.
subscription
The job’s Pub/Sub input subscription.
data_location
The job’s Pub/Sub input location of input GCS data.
KlioJobAuditLogItem
timestamp
Timestamp of when the audit log item was created.
google.protobuf.Timestamp
klio_job
The KlioJob that is working on the message.
Version
UNKNOWN
No version set.
V1
Version 1 of KlioMessage.
V2
Version 2 of KlioMessage.