Klio collects some metrics by default. When running on Dataflow, these metrics will be automatically available in the job’s UI as custom counters, as well as custom metrics in Stackdriver Monitoring.
The following metrics are collected by default from Klio’s IO transforms:
Name
Type
Description
Collecting Transform(s)
kmsg-read 1
kmsg-read
counter
KlioMessage read in as event input.
KlioMessage
KlioReadFromPubSub
KlioReadFromBigQuery
KlioReadFromAvro
KlioReadFromText
kmsg-write 1
kmsg-write
KlioMessage written out as event output.
KlioWriteToPubSub
KlioWriteToBigQuery
KlioWriteToAvro
KlioWriteToText
Collecting transform is automatically used in pipeline unless configured otherwise. See Built-in Transforms for more information on what is built-in within a Klio pipeline.
The following metrics are collected by default from Klio’s helper transforms:
Collecting Decorator(s)
kmsg-data-found-input 2
kmsg-data-found-input
KlioMessage with input data found.
KlioGcsCheckInputExists
kmsg-data-not-found-input 2
kmsg-data-not-found-input
KlioMessage with input data not found.
kmsg-data-found-output 2
kmsg-data-found-output
KlioMessage with output data found.
KlioGcsCheckOutputExists
kmsg-data-not-found-output 2
kmsg-data-not-found-output
KlioMessage with output data not found.
kmsg-process-ping 2
kmsg-process-ping
KlioMessage not in ping mode and will be processed.
KlioFilterPing
kmsg-skip-ping 2
kmsg-skip-ping
KlioMessage in ping mode, and will skip processing to be passed through directly to the configured event output, if any.
kmsg-process-force 2
kmsg-process-force
KlioMessage in force mode, and will be processed even though its referenced output data exists.
KlioFilterForce
kmsg-skip-force 2
kmsg-skip-force
KlioMessage not in force mode, and will skip processing because its referenced output data already exists.
kmsg-output
KlioMessage written to event output specifically through the KlioWriteToEventOutput transform (excludes the final event output automatically handled by Klio with kmsg-write).
KlioWriteToEventOutput
kmsg-drop
KlioMessage processed by the KlioDrop transform and will be dropped.
KlioDrop
kmsg-drop-not-recipient
KlioMessage dropped because the message is not intended for the current job to handle.
KlioCheckRecipients
kmsg-debug
KlioMessage processed by the KlioDebugMessage transform.
KlioDebugMessage
kmsg-trigger-upstream
KlioMessage emitted to upstream’s event input via KlioTriggerUpstream.
KlioTriggerUpstream
Klio also collects transform-level metrics through many of the built-in decorators.
Note
These metrics are on the transform-level, not pipeline level. Therefore, each metric name plus the associated transform will count as one unique metric.
For example, if there are two transforms that use the @handle_klio decorator, then two sets of metrics (i.e. kmsg-received, kmsg-success, kmsg-drop-error, kmsg-timer) will be collected, one per transform.
@handle_klio
kmsg-received
kmsg-success
kmsg-drop-error
kmsg-timer
The following metrics are collected by default:
KlioMessage received by a transform (before processing begins).
@serialize_klio_message
KlioMessage successfully processed by a transform.
KlioMessage dropped because of error during processing. This includes messages dropped from retries exhausted (kmsg-drop-retry-error) and messages timing out (kmsg-drop-timed-out). This does not include messages dropped via KlioDrop transform (kmsg-drop).
kmsg-drop-retry-error
kmsg-drop-timed-out
timer
Time it takes to process KlioMessage. This includes messages that are processed successfully as well as messages that have been dropped because of error.
This timer defaults to measuring in units as configured in klio-job.yaml under job_config.metrics in the following order of precedence:
klio-job.yaml
job_config.metrics
.timer_unit
.native.timer_unit
.shumway.timer_unit
.logger.timer_unit
If nothing is set, then seconds will be used.
seconds
kmsg-retry-attempt
Number of retries for a given KlioMessage.
@retry
KlioMessage dropped from exhausting the number of configured retries. This number is included in kmsg-drop-error.
Processing timed out for a KlioMessage. This number is included in kmsg-drop-error.
@timeout
When using Dataflow, metrics will be automatically emitted to Dataflow & Stackdriver monitoring.
For example, in Dataflow’s job UI, within the right-side column listing “Job Info”, a “Custom Counters” section should be visible (and will include any custom user metrics):
Metrics viewed under “Custom Counters” in Dataflow Job UI¶
All metrics, both these default metrics as well as custom user metrics, are also available in Stackdriver Monitoring.
For example, when creating a graph for a dashboard, select the resource type “Dataflow Job”, and then the desired metric to graph under “Metric”. Add a filter for a particular transform to avoid viewing a metric of the same name for all the transforms (particularly useful for metrics collected via decorators).
Klio metrics available in Stackdriver Monitoring Dashboards¶
Any custom user metrics defined in a job’s transforms should also be available to select under “Metric”, too.
Within a Klio transform, you are able to create metric objects during pipeline execution. Klio defaults to using Apache Beam’s metrics (referred to as “native” metrics within Klio), and additionally provides a metrics logger (via the standard library’s logging module).
logging
import apache_beam as beam from klio.transforms import decorators class LogKlioMessage(beam.DoFn): @decorators.set_klio_context def setup(self): # a simple counter self.entity_ctr = self._klio.metrics.counter("entity-counter") # a counter specific to this transform self.success_ctr = self._klio.metrics.counter( "success-counter", transform=self.__class__.__name__ ) # a counter with user-defined tags self.error_ctr = self._klio.metrics.counter( "error-counter", tags={"job-version": "v1"} ) # a gauge for a specific transform with tags self.model_memory_gauge = self._klio.metrics.gauge( "model-memory-gauge", transform=self.__class__.__name__, tags={"units": "mb"}, ) # a simple timer (defaults to nanoseconds) self.process_latency = self._klio.metrics.timer("process-latency") # a timer specific to transform with tags, using milliseconds self.load_model_latency = self._klio.metrics.timer( "load-model-latency", transform=self.__class__.__name__, tags={"units": "ms", "model_version": "2019-01-01"}, ) # use timer as a context manager with self.load_model_latency: self.model = load("my-model.pb") # some way get the memory footprint model_memory = self.model.get_memory_usage() self.model_memory_gauge.set(model_memory) @decorators.handle_klio def process(self, item): self.entity_ctr.inc() # increment counter self.process_latency.start() # start timer directly try: # do the thing self._klio.logger.info("Hello, Klio!") self._klio.logger.info("Received element {}".format(item.element)) self._klio.logger.info("Received payload {}".format(item.payload)) except SomeException as e: self.error_ctr.inc() # increment counter # do another thing else: self.success_ctr.inc() # increment counter # stop counter directly, before yield'ing self.process_latency.stop() yield item
Tip
Metrics objects should be created in the setup method of your transform.
setup
With no additional configuration needed, metrics will be turned on and collected.
The default client depends on the runner:
In your klio-job.yaml, if you accept the default configuration, you do not need to add anything.
Setting no metrics configuration is the same as:
job_config: metrics: native: # default timer unit in seconds timer_unit: s logger: # default on for Direct Runner # level that metrics are emitted level: debug # default timer unit in nanoseconds timer_unit: ns shumway: # default timer unit in nanoseconds timer_unit: ns
The default configuration above is the same as setting metrics clients to True:
job_config: metrics: logger: true shumway: true
The native client can not be turned off. If you no longer want to emit metrics to Stackdriver from Dataflow, remove the enable_stackdriver_agent_metrics from job_config.pipeline_options.experiments as detailed below.
enable_stackdriver_agent_metrics
job_config.pipeline_options.experiments
To turn off/on a metrics client, set its value to false/true:
job_config: metrics: logger: false shumway: false
For all three clients, native, shumway, and logger, the following configuration is available:
native
shumway
logger
timer_unit
Globally set the default unit of time for timers.
Options: ns, nanoseconds, us, microseconds, ms, milliseconds, s, seconds.
ns
nanoseconds
us
microseconds
ms
milliseconds
s
Default: ns
For logger, the following additional configuration is available:
level
Level at which metrics are emitted.
Options: debug, info, warning, error, critical.
debug
info
warning
error
critical
Default: debug
Klio follows Dropwizard’s metric types , in line with heroic services and Scio pipelines.
When creating/instantiated metric objects, a name argument is required. Optional supported keyword arguments are transform=STR and tags=DICT. Every metric will have a tag key/value pair for metric_type.
name
transform=STR
tags=DICT
metric_type
Caution
Native metric objects do not support the tags argument due to limitations in the Beam metrics API. If given, tags will be ignored.
tags
A simple integer that can only be incremented.
Usage examples:
# a simple counter my_counter = self._klio.metrics.counter("my-counter") # a counter specific to a transform my_counter = self._klio.metrics.counter( "my-counter", transform=self.__class__.__name__ ) my_counter = self._klio.metrics.counter( "my-counter", transform="MyTransform" ) # a counter with user-defined tags my_counter = self._klio.metrics.counter( "my-counter", tags={"model-version": "v1", "image-version": "v1beta1"}, ) # incrementing a counter my_counter.inc()
How it looks with the logger client:
INFO:klio:Got entity id: d34db33f INFO:klio.metrics:[my-counter] value: 1 transform:'MyTransform' tags: {'model-version': 'v1', 'image-version': 'v1beta1', 'metric_type': 'counter'}
Hint
Both the NativeMetricsClient and ShumwayMetricsClient will not log anything.
NativeMetricsClient
ShumwayMetricsClient
Warning
With the native Beam metrics, when running on Dataflow, only Counter and Distribution type metrics are emitted to Dataflow’s monitoring. See documentation for more information.
A simple integer that is set. It reflects a measurement at that point in time (i.e. memory usage, number of currently-open connections).
# a simple gauge my_gauge = self._klio.metrics.gauge("my-gauge") # a gauge specific to a transform my_gauge = self._klio.metrics.gauge( "my-gauge", transform=self.__class__.__name__ ) my_gauge = self._klio.metrics.gauge( "my-gauge", transform="MyTransform" ) # a gauge with user-defined tags my_gauge = self._klio.metrics.gauge( "my-gauge", tags={ "model-version": "v1", "image-version": "v1beta1", "units": "some-unit", }, ) # set a gauge my_gauge.set(42)
INFO:klio.metrics:[my-gauge] value: 42 transform: 'MyTransform' tags: {'units': 'some-unit', 'metric_type': 'gauge'}
An integer reflected a duration of an event (i.e. time to process an entity, response latency).
You can measure duration with a timer object in two ways: via the start/stop methods, or as a context manager (see examples below).
Timers default to measuring in nanoseconds (ns), but can be configured to measure in seconds (s), milliseconds (ms), or microseconds (us).
This can be done within timer object creation, (example below), or globally via configuration (see [available configuration](#available-configuration)). Setting the unit on a specific timer will override the global configuration.
Usage Examples:
# a simple timer my_timer = self._klio.metrics.timer("my-timer") # a timer using seconds my_timer = self._klio.metrics.timer("my-timer", timer_unit="s") # a timer specific to a transform my_timer = self._klio.metrics.timer( "my-timer", transform=self.__class__.__name__ ) my_timer = self._klio.metrics.timer( "my-timer", transform="MyTransform" ) # a timer with user-defined tags my_timer = self._klio.metrics.timer( "my-timer", tags={ "model-version": "v1", "image-version": "v1beta1", }, ) # either start & stop a timer directly my_timer.start() # do the thing my_timer.stop() # or use it as a context manager with my_timer: # do things
INFO:klio.metrics:[my-timer] value: 562200.0026050955 transform: 'HelloKlio' tags: {'metric_type': 'timer', 'unit': 'ns'}
Unlike Scio pipelines and backend services, Klio cannot support certain metric types, like histogram, meter, and deriving meter due to technical limitations imposed by Dataflow. We will reinvestigate if/when those limitations are addressed.
In order for the Native metrics client to be able to report metrics to Stackdriver, the following experiment must be added to klio-job.yaml:
experiment
# <--snip--> pipeline_options: experiments: - enable_stackdriver_agent_metrics # <--snip-->
During the runtime of a pipeline, Klio’s native Beam metrics will automatically create and emit both default Klio metrics and user-defined metrics in Stackdriver Monitoring. Klio is not yet able to programmatically create dashboards in Stackdriver Monitoring, but this functionality is coming soon!
Follow the Stackdriver documentation on creating dashboards & charts for log-based metrics.
Gauges Dataflow does not yet support gauge-type metrics for jobs (see docs).
Metrics between transforms: Because Dataflow does not yet support stateful processing for streaming Python pipelines (planned 2020), maintaining metrics between transforms of a pipeline can not be supported (i.e. timing an entity across a whole pipeline of multiple transforms.