State can be passed from one transform to the next within a pipeline using the data.payload (bytes) attribute. What one transform yields/returns will turn into the data.payload value for the next transform, with three exceptions:
data.payload
bytes
the yielded/returned value is equal to the data argument given to the transform;
data
the yielded/returned value is None; or
None
the transform raises an exception, therefore dropping the message.
An illustrative example:
# transforms.py import json import apache_beam as beam from klio.transforms import decorators class GutenTagKlio(beam.DoFn): # This example will call some client (e.g. a database) # and passes the response to the next transform @decorators.handle_klio def process(self, data): element = data.element.decode("utf-8") self._klio.logger.info(f"Received element: {element}") # <-- some logic --> response = self.some_client.query(element) yield response @decorators.handle_klio def filter_response(ctx, data): # Example that filters out KlioMessages according to the # message's payload (using the response data from GutenTagKlio) element = data.element.decode("utf-8") ctx.logger.info(f"Received element: {element}") response = data.payload.decode("utf-8") foo_resp_data = response.get("foo") # Only return an element of a PCollection that has the desired # response data needed if foo_resp_data: # Return value should be bytes or a str. foo_resp_str = json.dumps(foo_resp_data) return foo_resp_str.encode("utf-8") # For messages that do not have "foo_resp_data", they will get # silently dropped by Klio/Beam with an (implicit or explicit) # `return None`, so you may want to log any messages that were # filtered out for your own maintenance needs. ctx.logger.info( f"Dropping {element} because no 'foo' in response data" ) class PrivetKlio(beam.DoFn): # Continue to process any message that had "foo" in the response # data (and will be available in the message payload). @decorators.handle_klio def process(self, data): element = data.element.decode("utf-8") self._klio.logger.info(f"Received element: {element}") foo_resp = data.payload.decode("utf-8") foo_resp_data = json.loads(foo_resp) # <-- logic --> # Yielding (or if a transform function, returning) `None` # or an object that is equal to `data` argument given to # this process method will actually clear out the # message's payload for the next transform. # If the next transform needs the same payload data, then # explicitly yield/return the same payload data (in this # case, `data.payload`). yield data