
Default Metrics Provided

Coming soon!

Custom User Metrics

Within a Klio transform, you are able to create metric objects during pipeline execution. Right now, Klio provides a metrics logger (via the standard library’s logging module), and Stackdriver log-based metrics. The Stackdriver log-based metrics client is an overlay of the metrics logger where it creates a user-defined metric within Stackdriver per Python metric object created. It is otherwise the same in terms of logging an event for measurement.


Please familiarize yourself with the limitations detailed below.

Quickstart Example

import apache_beam as beam

from klio.transforms import decorators

class LogKlioMessage(beam.DoFn):
    def __init__(self):
        self.model = None

        # a simple counter
        self.entity_ctr = self._klio.metrics.counter("entity-counter")

        # a counter specific to this transform
        self.success_ctr = self._klio.metrics.counter(
            "success-counter", transform=self.__class__.__name__
        # a counter with user-defined tags
        self.error_ctr = self._klio.metrics.counter(
            "error-counter", tags={"job-version": "v1"}

        # a gauge for a specific transform with tags
        self.model_memory_gauge = self._klio.metrics.gauge(
            tags={"units": "mb"},

        # a simple timer (defaults to nanoseconds)
        self.process_latency = self._klio.metrics.timer("process-latency")

        # a timer specific to transform with tags, using milliseconds
        self.load_model_latency = self._klio.metrics.timer(
            tags={"units": "ms", "model_version": "2019-01-01"},

    def setup(self):
        # use timer as a context manager
        with self.load_model_latency:
            self.model = load("my-model.pb")

        # some way get the memory footprint
        model_memory = self.model.get_memory_usage()

    def process(self, item):  # increment counter
        self.process_latency.start()  # start timer directly

        # do the thing
  "Hello, Klio!")
  "Received element {}".format(item.element))
  "Received payload {}".format(item.payload))

        except SomeException as e:
    # increment counter
            # do another thing
    # increment counter

        # stop counter directly, before yield'ing

        yield item


Metrics objects should be created in the __init__ method or the setup method of your transform.

Stackdriver Required Setup

Access Control

Your default service account for the project must have at least Logs Configuration Writer permission in order to create metrics based off of logs.

Create Dashboard

During the runtime of a pipeline, Klio will automatically create or reuse the user-defined metrics in Stackdriver Logging. Klio is not yet able to programmatically create dashboards in Stackdriver Monitoring, but this functionality is coming soon!

Follow the Stackdriver documentation on creating dashboards & charts for log-based metrics.


With no additional configuration needed, metrics will be turned on and collected.

The default client depends on the runner:

Dataflow: Stackdriver Log-based Metric Client
Direct: Standard Library Log Metric Client

Default Configuration

In your klio-job.yaml, if you accept the default configuration, you do not need to add anything.

Setting no metrics configuration is the same as:

    logger:  # default on for Direct Runner
      # level that metrics are emitted
      level: debug
      # default timer unit in nanoseconds
      timer_unit: ns
    stackdriver_logger:  # default on for Dataflow
      # level that metrics are emitted
      level: debug
      # default timer unit in nanoseconds
      timer_unit: ns

The default configuration above is the same as setting metrics clients to True:

    logger: true
    stackdriver_logger: true

To turn off/on a metrics client, set its value to false/true:

    stackdriver_logger: false


While on Dataflow, setting logger to False will have no effect when stackdriver_logger is still turned on.


While using the Direct runner, turning on stackdriver_logger will have no effect.

This is because Stackdriver log-based metrics requires logs to be sent to Stackdriver while the Direct runner sends logs to stdout/stderr.

Available Configuration

For both logger and stackdriver_logger, the following configuration is available:


Level at which metrics are emitted.

Options: debug, info, warning, error, critical.

Default: debug


Globally set the default unit of time for timers.

Options: ns, nanoseconds, us, microseconds, ms, milliseconds, s, seconds.

Default: ns

Metric Types

Klio follows Dropwizard’s metric types , in line with heroic services and Scio pipelines.

When creating/instantiated metric objects, a name argument is required. Optional supported keyword arguments are transform=STR and tags=DICT. Every metric will have a tag key/value pair for metric_type.


Metrics objects should be created in the __init__ method or the setup method of your transform.


A simple integer that can only be incremented.

Usage examples:

# a simple counter
my_counter = self._klio.metrics.counter("my-counter")

# a counter specific to a transform
my_counter = self._klio.metrics.counter(
  "my-counter", transform=self.__class__.__name__
my_counter = self._klio.metrics.counter(
  "my-counter", transform="MyTransform"

# a counter with user-defined tags
my_counter = self._klio.metrics.counter(
  tags={"model-version": "v1", "image-version": "v1beta1"},

# incrementing a counter

How it looks:

INFO:klio:Got entity id: d34db33f
INFO:klio.metrics:[my-counter] value: 1 transform:'MyTransform' tags: {'model-version': 'v1', 'image-version': 'v1beta1', 'metric_type': 'counter'}



At the moment, the Stackdriver log-based metrics client within Klio can only support counter-type metrics. You may still create gauge-type & timer-type metrics, but those will only show up in logs, not on Stackdriver.

A simple integer that is set. It reflects a measurement at that point in time (i.e. memory usage, number of currently-open connections).

Usage examples:

# a simple gauge
my_gauge = self._klio.metrics.gauge("my-gauge")

# a gauge specific to a transform
my_gauge = self._klio.metrics.gauge(
  "my-gauge", transform=self.__class__.__name__
my_gauge = self._klio.metrics.gauge(
  "my-gauge", transform="MyTransform"

# a gauge with user-defined tags
my_gauge = self._klio.metrics.gauge(
    "model-version": "v1",
    "image-version": "v1beta1",
    "units": "some-unit",

# set a gauge

How it looks:

INFO:klio.metrics:[my-gauge] value: 42 transform: 'MyTransform' tags: {'units': 'some-unit', 'metric_type': 'gauge'}



At the moment, the Stackdriver log-based metrics client within Klio can only support counter-type metrics. You may still create gauge-type & timer-type metrics, but those will only show up in logs, not on Stackdriver.

An integer reflected a duration of an event (i.e. time to process an entity, response latency).

You can measure duration with a timer object in two ways: via the start/stop methods, or as a context manager (see examples below).


Timers default to measuring in nanoseconds (ns), but can be configured to measure in seconds (s), milliseconds (ms), or microseconds (us).

This can be done within timer object creation, (example below), or globally via configuration (see [available configuration](#available-configuration)). Setting the unit on a specific timer will override the global configuration.

Usage Examples:

# a simple timer
my_timer = self._klio.metrics.timer("my-timer")

# a timer using seconds
my_timer = self._klio.metrics.timer("my-timer", timer_unit="s")

# a timer specific to a transform
my_timer = self._klio.metrics.timer(
  "my-timer", transform=self.__class__.__name__
my_timer = self._klio.metrics.timer(
  "my-timer", transform="MyTransform"

# a timer with user-defined tags
my_timer = self._klio.metrics.timer(
    "model-version": "v1",
    "image-version": "v1beta1",

# either start & stop a timer directly
# do the thing

# or use it as a context manager
with my_timer:
  # do things

How it looks:

INFO:klio.metrics:[my-timer] value: 562200.0026050955 transform: 'HelloKlio' tags: {'metric_type': 'timer', 'unit': 'ns'}

Unsupported Types

Unlike Scio pipelines and backend services, Klio cannot support certain metric types, like histogram, meter, and deriving meter due to technical limitations imposed by Dataflow. We will reinvestigate if/when those limitations are addressed.


Gauge & timer support in Stackdriver: Klio does not yet support gauges or timers for log-based metrics in Stackdriver (they will still be logged to Stackdriver Logging, though). Right now, Klio only relies on Stackdriver’s construct of counters. In the future, Klio may support gauges and/or timers through distribution-type metrics. Users are free to experiment with creating distribution metrics by hand based off the logs.

Metrics between transforms: Because Dataflow does not yet support stateful processing for streaming Python pipelines (planned 2020), maintaining metrics between transforms of a pipeline can not be supported (i.e. timing an entity across a whole pipeline of multiple transforms.

Stackdriver metrics for historical logs: In Stackdriver, metrics based off of logs will be tracked after the metric is created. Stackdriver will ignore any previous log lines before the metric is made.