Klio Message

The KlioMessage is protobuf data passed between transforms (and streaming jobs). Its data represents one unit of work for a transform.

The KlioMessage is found in klio.proto and can be imported via klio_core.proto.klio_pb2.

See the FAQs page for how to work with non-KlioMessages, how to publish KlioMessages from a non-Klio job, and using custom protobuf messages.

Processing Logic

When a streaming job is running and it receives a KlioMessage, Klio first detects if the job is in the message’s downstream path, signifying either bottom-up or top-down execution. If the job is not in the downstream path and is not an intended recipient of the message, then it should not process this message. Klio will then drop the message for no further processing.

If the job should process the received message, then Klio looks to see if the message is in ping-mode. If it is, then the message is just passed through and published to the job’s output topic.

If the message is not in ping-mode, Klio then checks to see if the output data for the message already exists. If it does exist, then Klio looks to see if the message itself of the pipeline is configured to force-reprocess already-generated output. If not, then the message is just passed through and published to the job’s output topic.

If the output data does not exist, or force mode is turned on, then Klio checks to see if the input data for the message exists. If not, Klio will drop the message and log that it can’t process any further.

Note

Coming soon! The ability for Klio to automatically trigger parent jobs for when input data doesn’t exist is in development.

If the input data does exist, then Klio will invoke the rest of the user’s pipeline as defined in run.py.

Klio processing logic flow

Message handling: Klio will either drop the message if it doesn’t need to or cannot process it, pass through the message directly to the output topic to avoid unnecessary work, or process the message if all conditions are met.

Protobuf Definitions

KlioMessage

metadata

Metadata related to the message.

Required
data

Data of the message.

Required
version

Version of the message.

Type: Version
Required

KlioMessage.Metadata

downstream

Jobs by which the message must be processed. If empty, then all jobs that receive the message will process it. If not empty, then the job will check if itself is listed within downstream. If it’s not, the message will be ignored and no work will be processed.

Deprecated. Users should migrate to Metadata.intended_recipients.

type: KlioJob
repeated
visited

Jobs by which the message has already been processed. No jobs are repeated. When a message is in ping mode (by setting ping to True), this is used to log/visualize the DAG.

type: KlioJob
repeated
job_audit_log

Audit log for all jobs that the message has visited. This can be considered the audit trail for a message.

repeated
ping

If True, then no transformation work will be done for this message, and the message will be published to the job’s output topic(s). The job will log about the received message. This is meant for debugging and/or visualizing the DAG.

Type: bool
Optional, default: False
force

If True, and if the output data already exists for the message, then the job will force the transform to run again.

Type: bool
Optional, default: False
intended_recipients

Jobs by which the message must be processed. Used to detected between top-down and bottom-up execution modes.

Required for v2

KlioMessage.Metadata.Recipients

One of the following attributes are required:

anyone

Current message is intended for any recipient, signifying top-down execution. Mutually exclusive with KlioMessage.Metadata.Recipients.limited.

limited

Current message is intended for the included recipients, signifying bottom-up execution. Mutually exclusive with KlioMessage.Metadata.Recipients.anyone.

KlioMessage.Metadata.Recipients.Anyone

This is an empty “stub” message. Its presence is used to simply signify top-down execution.

KlioMessage.Metadata.Recipients.Limited

recipients

An array of KlioJobs. Only jobs included in recipients should process the message. Otherwise, the job should just drop the message to avoid further processing.

Type: KlioJob
Repeated
trigger_children_of

When set to a particular job, it signifies that the message was originally in top-down execution mode across a graph of jobs, but a dependency was missing for the job assigned to trigger_children_of, therefore triggering bottom-up execution for a subset of the graph. Once dependencies are made available, the job triggering bottom-up execution for that subset should then return the message to top-down mode. This is done by re-assigning KlioMessage.Metadata.intended_recipients to Anyone.

Type: KlioJob

KlioMessage.Data

element

The reference identifier that refers to a particular file on which the job will perform work.

Type: bytes
Required
payload

Data shared between transforms. It reflects what the previous transform in the pipeline returned/yielded (if that transform was decorated with the @handle_klio decorator). The first transform in the pipeline after reading from event input will always be None.

See Transforms for how to make use of a message’s payload.

Type: bytes
Optional
entity_id

The reference identifier that refers to a particular file on which the job will perform work.

Deprecated. Users should migrate to data.element.

Type: bytes
Required

KlioJob

Warning

KlioJob will be undergoing API changes for v2 of Klio.

job_name

Name of job (as configured in klio-job.yaml::job_name).

Type: string
Required
gcp_project

GCP project of job (as configured in klio-job.yaml::pipeline_options.project).

Type: string
Required for Dataflow
inputs

The job’s event & data input(s)

Marked for deprecation.

Repeated

KlioJob.JobInput

Warning

KlioJob.JobInput has been marked for deprecation for v2.

topic

The job’s Pub/Sub input topic.

Type: string
Required
subscription

The job’s Pub/Sub input subscription.

Type: string
Optional
data_location

The job’s Pub/Sub input location of input GCS data.

Type: string
Optional

KlioJobAuditLogItem

timestamp

Timestamp of when the audit log item was created.

Type: google.protobuf.Timestamp
Required
klio_job

The KlioJob that is working on the message.

Type: KlioJob
Required

Version

UNKNOWN

No version set.

V1

Version 1 of KlioMessage.

V2

Version 2 of KlioMessage.