KlioMessage
Custom protos are implicitly supported with some small manual/custom work required. There are 2 parts that will need changes or custom code.
The process is very similar to publishing a KlioMessage from a non-Klio job.
First, construct & serialize the custom protobuf message:
from google.cloud import pubsub from klio_core.proto import klio_pb2 # import your own protobuf def from my_proto import my_proto_pb2 # Build your own message based off of your custom proto custom_proto_msg = my_proto_pb2.MyCustomProtoMessage() custom_proto_msg.data = "some relevant data" # Serialize to a bytestring custom_proto_msg_serialized = custom_proto_msg.SerializeToString()
Next, construct the KlioMessage with the serialized custom protobuf message.
klio_message = klio_pb2.KlioMessage() # Assign the custom proto data to either element OR payload. # Option 1: element # Use element when you DO NOT have an otherwise unique # identifier that refers to data to be processed. klio_message.data.element = custom_proto_msg_serialized # Option 2: payload # Use payload when DO have a unique identifier to refer # data to be processed. klio_message.data.payload = custom_proto_msg_serialized
Then follow Step 2 and Step 3 from Can I publish KlioMessages from a non-Klio job to a Klio job?
Attention
If the serialized custom protobuf message is assigned to klio_message.data.element as option 1 above outlines (instead of klio_message.data.payload), then the default existence checks that Klio does for input data and output data will need to be turned off (and implement your own if they’re needed).
klio_message.data.element
klio_message.data.payload
Jobs consuming custom protobuf messages will need to handle their deserialization:
# transforms.py import apache_beam as beam from klio.transforms import decorators # import your own protobuf def from my_proto import my_proto_pb2 class YourTransform(beam.DoFn): @decorators.handle_klio def process(self, data): custom_msg = my_proto_pb2.YourCustomProtoMessage() # Deserialize from bytestring into custom proto # message object. # Option 1: deserialize from element custom_msg.ParseFromString(data.element) # Option 2: deserialize from payload custom_msg.ParseFromString(data.payload) # <-- rest of transform logic --> yield data
No configuration changes are needed.
The above example yields the original data value that it received. If the job needs to pass state between transforms, and that state is a custom protobuf message, then be sure to re-serialize the custom_msg object to bytes.
data
custom_msg
bytes