Direct runner on GKE

Klio can now be run on the additional runner of DirectGKERunner.

Under the hood, when using the new DirectGKERunner with a Klio job, Klio creates a GKE deployment. When GKE starts the containers for the deployment, it runs the Klio job in DirectRunner mode (just like you would locally with klio job run --direct-runner.

The difference between DirectGKERunner and DirectRunner is that the latter automatically acknowledges messages it reads from the Pub/Sub queue before running it through the defined pipeline. This logic has been adapted on DirectGKERunner to acknowledge the message once it’s considered done running through the pipeline (i.e. either it successfully completed the pipeline, was filtered out, or was dropped due to an error in processing). Without this ability, if a container gets OOM-killed or the job gets otherwise interrupted, then the failing or in-progress messages do not return to the Pub/Sub queue for re-delivery. Those messages would be lost.

Using the DirectGKERunner for a Klio job comes with inherent limitations:

  • The Klio job must:
    • be a streaming job (and therefore read event input from a Pub/Sub subscription or topic)

    • not need any aggregation transforms (e.g. GroupByKey, CombinePerKey, Distinct, etc). Reshuffle & Flatten are also not supported.

    • not process non-KlioMessages (klio-job.yaml::job_config.allow_non_klio_messages)

    • be deployed manually (i.e. via klio job run [OPTIONS]).

Logs can be viewed via kubectl 1 commands. This release also ensures required and best-practices labels are set which are helpful for cost association and debugging. Other enabled labels include a “deployed by”, a “klio cli version”, and other custom labels configured in the Klio job config. Klio now supports configuration fields to enable the use of shumway 2, a library for sending metrics to a FFWD 3 agent.


Default metrics 4 have been added for the following transform classes, with the name of the metric in parens:

  • IO transforms:
    • KlioReadFromText (kmsg-read)

    • KlioReadFromBigQuery (kmsg-read)

    • KlioReadFromAvro (kmsg-read)

    • KlioWriteToText (kmsg-write)

    • KlioWriteToBigQuery (kmsg-write)

    • KlioWriteToAvro (kmsg-write)

    • KlioReadFromPubSub (kmsg-read)

    • KlioWriteToPubSub (kmsg-write)

  • Helper Transforms:
    • KlioGcsCheckInputExists (kmsg-data-found-input, kmsg-data-not-found-input)

    • KlioGcsCheckOutputExists (kmsg-data-found-output, kmsg-data-not-found-output)

    • KlioFilterPing (kmsg-process-ping, kmsg-skip-ping)

    • KlioFilterForce (kmsg-process-force, kmsg-skip-force)

    • KlioWriteToEventOutput (kmsg-output)

    • KlioDrop (kmsg-drop)

    • KlioCheckRecipients (kmsg-drop-not-recipient)

    • KlioDebugMessage (kmsg-debug)

    • KlioTriggerUpstream (kmsg-trigger-upstream)

  • Decorators:
    • @retry (kmsg-retry-attempt, kmsg-drop-retry-error)

    • @timeout (kmsg-drop-timed-out)

    • @handle_klio (counters: kmsg-received, kmsg-drop-error, kmsg-success; timer: kmsg-timer)

    • @serialize_klio_message (same metrics as @handle_klio`)

These metrics also show up in Stackdriver monitoring when configured to use Stackdriver and are available to select in Metrics Explorer when building a dashboard.

A user-facing metrics interface for shumway 2 has been added for when log-based metrics is not feasible.




  • Support for deploying, stopping, and deleting jobs with DirectGKERunner

  • Support for setting required and best-practices labels for Kubernetes

    deployments as well as user-set labels

  • Enable job test command to use the --config-file option


  • Fixed bug with writing of klio-job-run-effective.yaml for klio job profile commands

  • Fixed bug with klio message publish not working on google-cloud-pubsub > 2.3.0



  • Add KlioReadFromPubSub and KlioWriteToPubSub IO transforms.

  • Add default metrics to be collected in Klio’s IO transforms.

  • Add default metrics to be collected in Klio’s helper transforms.

  • Add default metrics to be collected in Klio’s decorators.

  • Add support for using Beam’s metrics API directly.

  • Add DirectGKERunner, runs direct runner on GKE with added logic to only ack pub/sub message when: * the pipeline successfully ran, but before any write to event output happens if any, * the message is skipped because output data already exists, * the message is dropped because input data does not exist, or * the message is dropped because it was not the intended recipient.

  • Add metrics interface for shumway


  • KlioTriggerUpstream no longer raises a pickling error when trying to log.


  • The Beam metrics client will always be used, no matter the configured runner.

  • Marked Klio’s support for Stackdriver log-based metrics for deprecation and eventual removal.



  • Add KlioRunner.DIRECT_GKE_RUNNER to variables to support DirectGKERunner



  • Add support for calling DirectGKERunner when set in job configuration.


  • Use newly-added KlioReadFromPubSub and KlioWriteToPubSub transforms instead of native Beam’s transforms.


  • Limit version of line_profiler as the latest introduced breaking API changes.

  • Limit google-cloud-pubsub dependency