The publisher of KlioMessages does not need to be a Klio job, nor in Python. It just needs to have the KlioMessage proto definition compiled for whatever language in which you plan on publishing messages.
Construct the KlioMessage:
KlioMessage
from google.cloud import pubsub from klio_core.proto import klio_pb2 klio_message = klio_pb2.KlioMessage() klio_message.data.element = b"s0me-f1le-1D"
Add the intended recipients to the KlioMessage depending if this message should be executed in Top-Down Execution or Bottom-Up Execution mode.
If the desired execution mode is top-down, update the recipients to anyone:
anyone
klio_message.metadata.intended_recipients.anyone.SetInParent()
Or if the desired execution mode is bottom-up, update the recipients to include the receiving job:
this_klio_job = klio_pb2.KlioJob() # Needs to match `klio-job.yaml::job_name` this_klio_job.job_name = "my-job-name" # If running on Dataflow, this needs to # match `klio-job.yaml::pipeline_options.project` this_klio_job.gcp_project = "my-gcp-project" klio_message.metadata.intended_recipients\ .limited.recipients.extend([job])
Finally, serialize the KlioMessage into bytes and publish:
bytes
# serialize to a bytestring to_pubsub = klio_message.SerializeToString() # this would be what is in # `klio-job.yaml:job_config.events[].inputs[].topic` JOB_INPUT_TOPIC = "projects/$YOUR_PROJECT/topics/$YOUR_TOPIC" client = pubsub.PublisherClient() client.publish(topic=JOB_INPUT_TOPIC, data=to_pubsub)