klio.transforms.helpers.
KlioMessageCounter
Helper transform to count elements.
This transform will yield elements given to it, counting each one it sees.
Since it is a DoFn transform, it needs to be invoked via beam.ParDo when used. Some usage examples:
beam.ParDo
# Example composite transform class MyCompositeTransform(beam.PTransform): def __init__(self, *args, **kwargs): self.transform = MyTransform(*args, **kwargs) self.ctr = KlioMessageCounter(suffix="processed", "MyTransform") def expand(self, pcoll): return ( pcoll | "Process items" >> self.transform | "Count items processed" >> beam.ParDo(self.ctr) ) # Example pipeline segment def run(input_pcol, config): input_data = input_pcol | helpers.KlioGcsCheckInputExists() data_not_found = input_data.not_found | helpers.KlioMessageCounter( suffix="data-not-found", bind_transform="KlioGcsCheckInputExists" ) ...
suffix (str) – suffix of the counter name. The full counter name will be kmsg-{suffix}.
kmsg-{suffix}
bind_transform (str) – Name of transform to bind the counter to. This is used for Dataflow monitoring UI purposes, and can be set to a prior or following transform in a pipeline, or to itself.
KlioGcsCheckInputExists
Klio transform to check input exists in GCS.
KlioGcsCheckOutputExists
Klio transform to check output exists in GCS.
KlioFilterPing
Klio transform to tag outputs if in ping mode or not.
KlioFilterForce
Klio transform to tag outputs if in force mode or not.
KlioWriteToEventOutput
Klio composite transform to write to the configured event output.
KlioDrop
Klio DoFn to log & drop a KlioMessage.
KlioCheckRecipients
Check if current job should handle a received v2 message.
KlioUpdateAuditLog
Update a KlioMessage’s audit log to include current job.
KlioDebugMessage
Log KlioMessage.
prefix (str) – logging prefix. Default: "DEBUG".
"DEBUG"
log_level (str or int) – The desired log level for the KlioMessage logs. See available log levels for what’s supported. Default: "INFO".
"INFO"
KlioSetTrace
Insert a Python debugger trace point.
KlioTriggerUpstream
Trigger upstream job from current job with a given KlioMessage.
KlioMessage
This transform will update the intended recipients in KlioMessage. metadata in order to trigger a partial bottom-up execution of the overall graph of jobs. It will also generate a log message (optional), then publish the KlioMessage to the upstream’s Pub/Sub topic.
KlioMessage. metadata
Caution
Klio does not automatically trigger upstream jobs if input data does not exist. It must be used manually within a job’s pipeline definition (in run.py::run).
run.py::run
Note
In order to get access to input data not found, the automatic data existence check that Klio does must be turned off by setting klio-job.yaml::job_config.data.inputs[].skip_klio_existence_check to True. Then the existence check must be invoked manually. See example run.py and klio-job.yaml files below.
klio-job.yaml::job_config.data.inputs[].skip_klio_existence_check
True
run.py
klio-job.yaml
Example usage:
import apache_beam as beam from klio.transforms import helpers import transforms def run(input_pcol, config): # use the default helper transform to do the default input check # in order to access the output tagged with `not_found` input_data = input_pcol | helpers.KlioGcsCheckInputExists() # Pipe the input data that was not found (using Tagged Outputs) # into `KlioTriggerUpstream` in order to update the KlioMessage # metadata, log it, then publish to upstream's _ = input_data.not_found | helpers.KlioTriggerUpstream( upstream_job_name="my-upstream-job", upstream_topic="projects/my-gcp-project/topics/upstream-topic", log_level="DEBUG", ) # pipe the found input pcollection into transform(s) as needed output_pcol = input_data.found | beam.ParDo(MyTransform()) return output_pcol
# Example klio-job.yaml version: 2 job_name: my-job pipeline_options: project: my-gcp-project # `KlioTriggerUpstream` only supports streaming jobs streaming: True # <-- snip --> job_config: events: inputs: - type: pubsub topic: projects/my-gcp-project/topics/upstream-topic-output subscription: projects/my-gcp-project/subscriptions/my-job-in # <-- snip --> data: inputs: - type: gcs location: gs://my-gcp-project/upstream-output-data file_suffix: .ogg # Be sure to skip Klio's default input existence check in # order to access the input data that was not found. skip_klio_existence_check: True
upstream_job_name (str) – Name of upstream job.
upstream_topic (str) – Pub/Sub topic for the upstream job, in the form of project/<GCP_PROJECT>/topics/<TOPIC_NAME>.
project/<GCP_PROJECT>/topics/<TOPIC_NAME>
log_level (str, int, or None) –
The desired log level for log message, or None if no logging is desired. See available log levels for what’s supported. Default: "INFO".
None
SystemExit – If the current job is not in streaming mode (set in klio-job.yaml::pipeline_options.streaming), if the provided log level is not recognized, or if the provided upstream topic is not in the correct form.