Metrics

Default Metrics Provided

Klio collects some metrics by default. When running on Dataflow, these metrics will be automatically available in the job’s UI as custom counters, as well as custom metrics in Stackdriver monitoring.

Metrics Collected

IO Transforms

The following metrics are collected by default from Klio’s IO transforms:

Name

Type

Description

Collecting Transform(s)

kmsg-read 1

counter

KlioMessage read in as event input.

kmsg-write 1

counter

KlioMessage written out as event output.

1(1,2)

Collecting transform is automatically used in pipeline unless configured otherwise. See Built-in Transforms for more information on what is built-in within a Klio pipeline.

Helper Transforms

The following metrics are collected by default from Klio’s helper transforms:

Name

Type

Description

Collecting Decorator(s)

kmsg-data-found-input 2

counter

KlioMessage with input data found.

KlioGcsCheckInputExists

kmsg-data-not-found-input 2

counter

KlioMessage with input data not found.

KlioGcsCheckInputExists

kmsg-data-found-output 2

counter

KlioMessage with output data found.

KlioGcsCheckOutputExists

kmsg-data-not-found-output 2

counter

KlioMessage with output data not found.

KlioGcsCheckOutputExists

kmsg-process-ping 2

counter

KlioMessage not in ping mode and will be processed.

KlioFilterPing

kmsg-skip-ping 2

counter

KlioMessage in ping mode, and will skip processing to be passed through directly to the configured event output, if any.

KlioFilterPing

kmsg-process-force 2

counter

KlioMessage in force mode, and will be processed even though its referenced output data exists.

KlioFilterForce

kmsg-skip-force 2

counter

KlioMessage not in force mode, and will skip processing because its referenced output data already exists.

KlioFilterForce

kmsg-output

counter

KlioMessage written to event output specifically through the KlioWriteToEventOutput transform (excludes the final event output automatically handled by Klio with kmsg-write).

KlioWriteToEventOutput

kmsg-drop

counter

KlioMessage processed by the KlioDrop transform and will be dropped.

KlioDrop

kmsg-drop-not-recipient

counter

KlioMessage dropped because the message is not intended for the current job to handle.

KlioCheckRecipients

kmsg-debug

counter

KlioMessage processed by the KlioDebugMessage transform.

KlioDebugMessage

kmsg-trigger-upstream

counter

KlioMessage emitted to upstream’s event input via KlioTriggerUpstream.

KlioTriggerUpstream

2(1,2,3,4,5,6,7,8)

Collecting transform is automatically used in pipeline unless configured otherwise. See Built-in Transforms for more information on what is built-in within a Klio pipeline.

Decorators

Klio also collects transform-level metrics through many of the built-in decorators.

Note

These metrics are on the transform-level, not pipeline level. Therefore, each metric name plus the associated transform will count as one unique metric.

For example, if there are two transforms that use the @handle_klio decorator, then two sets of metrics (i.e. kmsg-received, kmsg-success, kmsg-drop-error, kmsg-timer) will be collected, one per transform.

The following metrics are collected by default:

Name

Type

Description

Collecting Transform(s)

kmsg-received

counter

KlioMessage received by a transform (before processing begins).

kmsg-success

counter

KlioMessage successfully processed by a transform.

kmsg-drop-error

counter

KlioMessage dropped because of error during processing. This includes messages dropped from retries exhausted (kmsg-drop-retry-error) and messages timing out (kmsg-drop-timed-out). This does not include messages dropped via KlioDrop transform (kmsg-drop).

kmsg-timer

timer

Time it takes to process KlioMessage. This includes messages that are processed successfully as well as messages that have been dropped because of error.

This timer defaults to measuring in units as configured in klio-job.yaml under job_config.metrics in the following order of precedence:

  1. .timer_unit

  2. .stackdriver_logger.timer_unit

  3. .logger.timer_unit

  4. If nothing is set, then seconds will be used.

kmsg-retry-attempt

counter

Number of retries for a given KlioMessage.

@retry

kmsg-drop-retry-error

counter

KlioMessage dropped from exhausting the number of configured retries. This number is included in kmsg-drop-error.

@retry

kmsg-drop-timed-out

counter

Processing timed out for a KlioMessage. This number is included in kmsg-drop-error.

@timeout

Viewing Emitted Metrics

When using Dataflow, metrics will be automatically emitted to Dataflow & Stackdriver monitoring.

For example, in Dataflow’s job UI, within the right-side column listing “Job Info”, a “Custom Counters” section should be visible (and will include any custom user metrics):

Metrics viewed under "Custom Counters" in Dataflow Job UI

Metrics viewed under “Custom Counters” in Dataflow Job UI

All metrics, both these default metrics as well as custom user metrics, are also available in Stackdriver Monitoring.

For example, when creating a graph for a dashboard, select the resource type “Dataflow Job”, and then the desired metric to graph under “Metric”. Add a filter for a particular transform to avoid viewing a metric of the same name for all the transforms (particularly useful for metrics collected via decorators).

Klio metrics available in Stackdriver Monitoring Dashboards

Klio metrics available in Stackdriver Monitoring Dashboards

Any custom user metrics defined in a job’s transforms should also be available to select under “Metric”, too.

Custom User Metrics

Within a Klio transform, you are able to create metric objects during pipeline execution. Klio defaults to using Apache Beam’s metrics (referred to as “native” metrics within Klio), and additionally provides a metrics logger (via the standard library’s logging module).

Deprecated: Stackdriver Log-based Metrics

Klio’s support for Stackdriver log-based metrics has been deprecated since version 21.3.0 and will be removed in a future release. Instead, Klio now provides a NativeMetricsClient that will automatically create and emit metrics to Dataflow Monitoring when the job runs on Dataflow via Beam’s metrics API.

Klio uses this native metrics client automatically, so no migration changes are needed.

Quickstart Example

import apache_beam as beam

from klio.transforms import decorators

class LogKlioMessage(beam.DoFn):
    @decorators.set_klio_context
    def setup(self):
        # a simple counter
        self.entity_ctr = self._klio.metrics.counter("entity-counter")

        # a counter specific to this transform
        self.success_ctr = self._klio.metrics.counter(
            "success-counter", transform=self.__class__.__name__
        )
        # a counter with user-defined tags
        self.error_ctr = self._klio.metrics.counter(
            "error-counter", tags={"job-version": "v1"}
        )

        # a gauge for a specific transform with tags
        self.model_memory_gauge = self._klio.metrics.gauge(
            "model-memory-gauge",
            transform=self.__class__.__name__,
            tags={"units": "mb"},
        )

        # a simple timer (defaults to nanoseconds)
        self.process_latency = self._klio.metrics.timer("process-latency")

        # a timer specific to transform with tags, using milliseconds
        self.load_model_latency = self._klio.metrics.timer(
            "load-model-latency",
            transform=self.__class__.__name__,
            tags={"units": "ms", "model_version": "2019-01-01"},
        )

        # use timer as a context manager
        with self.load_model_latency:
            self.model = load("my-model.pb")

        # some way get the memory footprint
        model_memory = self.model.get_memory_usage()
        self.model_memory_gauge.set(model_memory)

    @decorators.handle_klio
    def process(self, item):
        self.entity_ctr.inc()  # increment counter
        self.process_latency.start()  # start timer directly

        try:
        # do the thing
            self._klio.logger.info("Hello, Klio!")
            self._klio.logger.info("Received element {}".format(item.element))
            self._klio.logger.info("Received payload {}".format(item.payload))

        except SomeException as e:
            self.error_ctr.inc()  # increment counter
            # do another thing
        else:
            self.success_ctr.inc()  # increment counter

        # stop counter directly, before yield'ing
        self.process_latency.stop()

        yield item

Tip

Metrics objects should be created in the setup method of your transform.

Configuration

With no additional configuration needed, metrics will be turned on and collected.

The default client depends on the runner:

Dataflow: Stackdriver Log-based Metric Client
Direct: Standard Library Log Metric Client

Default Configuration

In your klio-job.yaml, if you accept the default configuration, you do not need to add anything.

Setting no metrics configuration is the same as:

job_config:
  metrics:
    native:
      # default timer unit in seconds
      timer_unit: s
    logger:  # default on for Direct Runner
      # level that metrics are emitted
      level: debug
      # default timer unit in nanoseconds
      timer_unit: ns
    stackdriver_logger:
      # level that metrics are emitted
      level: debug
      # default timer unit in nanoseconds
      timer_unit: ns

The default configuration above is the same as setting metrics clients to True:

job_config:
  metrics:
    logger: true
    stackdriver_logger: true

Note

The native client can not be turned off.

To turn off/on a metrics client, set its value to false/true:

job_config:
  metrics:
    stackdriver_logger: false

Note

While on Dataflow, setting logger to False will have no effect when stackdriver_logger is still turned on.

Note

While using the Direct runner, turning on stackdriver_logger will have no effect.

This is because Stackdriver log-based metrics requires logs to be sent to Stackdriver while the Direct runner sends logs to stdout/stderr.

Available Configuration

For all three clients, native, logger and stackdriver_logger, the following configuration is available:

timer_unit

Globally set the default unit of time for timers.

Options: ns, nanoseconds, us, microseconds, ms, milliseconds, s, seconds.

Default: ns

For both logger and stackdriver_logger, the following additional configuration is available:

level

Level at which metrics are emitted.

Options: debug, info, warning, error, critical.

Default: debug

Metric Types

Klio follows Dropwizard’s metric types , in line with heroic services and Scio pipelines.

When creating/instantiated metric objects, a name argument is required. Optional supported keyword arguments are transform=STR and tags=DICT. Every metric will have a tag key/value pair for metric_type.

Note

Metrics objects should be created in the setup method of your transform.

Caution

Native metric objects do not support the tags argument due to limitations in the Beam metrics API. If given, tags will be ignored.

Counters

A simple integer that can only be incremented.

Usage examples:

# a simple counter
my_counter = self._klio.metrics.counter("my-counter")

# a counter specific to a transform
my_counter = self._klio.metrics.counter(
  "my-counter", transform=self.__class__.__name__
)
my_counter = self._klio.metrics.counter(
  "my-counter", transform="MyTransform"
)

# a counter with user-defined tags
my_counter = self._klio.metrics.counter(
  "my-counter",
  tags={"model-version": "v1", "image-version": "v1beta1"},
)

# incrementing a counter
my_counter.inc()

How it looks with the logger client:

INFO:klio:Got entity id: d34db33f
INFO:klio.metrics:[my-counter] value: 1 transform:'MyTransform' tags: {'model-version': 'v1', 'image-version': 'v1beta1', 'metric_type': 'counter'}

Hint

The NativeMetricsClient will not log anything.

Gauges

Warning

At the moment, the Stackdriver log-based metrics client within Klio can only support counter-type metrics. You may still create gauge-type & timer-type metrics, but those will only show up in logs, not on Stackdriver.

Warning

With the native Beam metrics, when running on Dataflow, only Counter and Distribution type metrics are emitted to Dataflow’s monitoring. See documentation for more information.

A simple integer that is set. It reflects a measurement at that point in time (i.e. memory usage, number of currently-open connections).

Usage examples:

# a simple gauge
my_gauge = self._klio.metrics.gauge("my-gauge")

# a gauge specific to a transform
my_gauge = self._klio.metrics.gauge(
  "my-gauge", transform=self.__class__.__name__
)
my_gauge = self._klio.metrics.gauge(
  "my-gauge", transform="MyTransform"
)

# a gauge with user-defined tags
my_gauge = self._klio.metrics.gauge(
  "my-gauge",
  tags={
    "model-version": "v1",
    "image-version": "v1beta1",
    "units": "some-unit",
  },
)

# set a gauge
my_gauge.set(42)

How it looks with the logger client:

INFO:klio.metrics:[my-gauge] value: 42 transform: 'MyTransform' tags: {'units': 'some-unit', 'metric_type': 'gauge'}

Hint

The NativeMetricsClient will not log anything.

Timers

Warning

At the moment, the Stackdriver log-based metrics client within Klio can only support counter-type metrics. You may still create gauge-type & timer-type metrics, but those will only show up in logs, not on Stackdriver.

An integer reflected a duration of an event (i.e. time to process an entity, response latency).

You can measure duration with a timer object in two ways: via the start/stop methods, or as a context manager (see examples below).

Note

Timers default to measuring in nanoseconds (ns), but can be configured to measure in seconds (s), milliseconds (ms), or microseconds (us).

This can be done within timer object creation, (example below), or globally via configuration (see [available configuration](#available-configuration)). Setting the unit on a specific timer will override the global configuration.

Usage Examples:

# a simple timer
my_timer = self._klio.metrics.timer("my-timer")

# a timer using seconds
my_timer = self._klio.metrics.timer("my-timer", timer_unit="s")

# a timer specific to a transform
my_timer = self._klio.metrics.timer(
  "my-timer", transform=self.__class__.__name__
)
my_timer = self._klio.metrics.timer(
  "my-timer", transform="MyTransform"
)

# a timer with user-defined tags
my_timer = self._klio.metrics.timer(
  "my-timer",
  tags={
    "model-version": "v1",
    "image-version": "v1beta1",
  },
)

# either start & stop a timer directly
my_timer.start()
# do the thing
my_timer.stop()

# or use it as a context manager
with my_timer:
  # do things

How it looks with the logger client:

INFO:klio.metrics:[my-timer] value: 562200.0026050955 transform: 'HelloKlio' tags: {'metric_type': 'timer', 'unit': 'ns'}

Hint

The NativeMetricsClient will not log anything.

Unsupported Types

Unlike Scio pipelines and backend services, Klio cannot support certain metric types, like histogram, meter, and deriving meter due to technical limitations imposed by Dataflow. We will reinvestigate if/when those limitations are addressed.

Stackdriver Required Setup

Caution

Support for Stackdriver log-based metrics has been marked for deprecation. See above for more information.

Access Control

Your default service account for the project must have at least Logs Configuration Writer permission in order to create metrics based off of logs.

Create Dashboard

During the runtime of a pipeline, Klio will automatically create or reuse the user-defined metrics in Stackdriver Logging. Klio is not yet able to programmatically create dashboards in Stackdriver Monitoring, but this functionality is coming soon!

Follow the Stackdriver documentation on creating dashboards & charts for log-based metrics.

Limitations

Gauge & timer support in Stackdriver: Klio does not yet support gauges or timers for log-based metrics in Stackdriver (they will still be logged to Stackdriver Logging, though). Right now, Klio only relies on Stackdriver’s construct of counters. In the future, Klio may support gauges and/or timers through distribution-type metrics. Users are free to experiment with creating distribution metrics by hand based off the logs.

Metrics between transforms: Because Dataflow does not yet support stateful processing for streaming Python pipelines (planned 2020), maintaining metrics between transforms of a pipeline can not be supported (i.e. timing an entity across a whole pipeline of multiple transforms.

Stackdriver metrics for historical logs: In Stackdriver, metrics based off of logs will be tracked after the metric is created. Stackdriver will ignore any previous log lines before the metric is made.