I/O

class klio.transforms.io.KlioReadFromText

Read from a local or GCS file with each new line as a KlioMessage.data.element.

__init__(*args, **kwargs)

Initialize self. See help(type(self)) for accurate signature.

class klio.transforms.io.KlioReadFromBigQuery

Read data from BigQuery.

This PTransform uses a BigQuery export job to take a snapshot of the table on GCS, and then reads from each produced file. File format is Avro by default.

Parameters
  • table (str, callable, ValueProvider) – The ID of the table, or a callable that returns it. The ID must contain only letters a-z, A-Z, numbers 0-9, or underscores _. If dataset argument is None then the table argument must contain the entire table reference specified as: 'DATASET.TABLE' or 'PROJECT:DATASET.TABLE'. If it’s a callable, it must receive one argument representing an element to be written to BigQuery, and return a TableReference, or a string table name as specified above.

  • dataset (str) – The ID of the dataset containing this table or None if the table reference is specified entirely by the table argument.

  • project (str) – The ID of the project containing this table.

  • klio_message_columns (list) –

    A list of fields (str) that should be assigned to KlioMessage.data.element.

    Note

    If more than one field is provided, the results including the column names will be serialized to JSON before assigning to KlioMessage.data.element. (e.g. '{"field1": "foo", "field2": bar"}'). If only one field is provided, just the value will be assigned to KlioMessage.data.element.

  • query (str, ValueProvider) – A query to be used instead of arguments table, dataset, and project.

  • validate (bool) – If True, various checks will be done when source gets initialized (e.g., is table present?). This should be True for most scenarios in order to catch errors as early as possible (pipeline construction instead of pipeline execution). It should be False if the table is created during pipeline execution by a previous step.

  • coder (Coder) – The coder for the table rows. If None, then the default coder is _JsonToDictCoder, which will interpret every row as a JSON serialized dictionary.

  • use_standard_sql (bool) – Specifies whether to use BigQuery’s standard SQL dialect for this query. The default value is False. If set to True, the query will use BigQuery’s updated SQL dialect with improved standards compliance. This parameter is ignored for table inputs.

  • flatten_results (bool) – Flattens all nested and repeated fields in the query results. The default value is True.

  • kms_key (str) – Optional Cloud KMS key name for use when creating new temporary tables.

  • gcs_location (str, ValueProvider) – The name of the Google Cloud Storage bucket where the extracted table should be written as a string or a ValueProvider. If None, then the temp_location parameter is used.

  • bigquery_job_labels (dict) – A dictionary with string labels to be passed to BigQuery export and query jobs created by this transform. See: https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfiguration

  • use_json_exports (bool) –

    By default, this transform works by exporting BigQuery data into Avro files, and reading those files. With this parameter, the transform will instead export to JSON files. JSON files are slower to read due to their larger size. When using JSON exports, the BigQuery types for DATE, DATETIME, TIME, and TIMESTAMP will be exported as strings.

    This behavior is consistent with BigQuerySource. When using Avro exports, these fields will be exported as native Python types (datetime.date, datetime.datetime, datetime.datetime, and datetime.datetime respectively). Avro exports are recommended. To learn more about BigQuery types, and Time-related type representations, see: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types To learn more about type conversions between BigQuery and Avro, see: https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro#avro_conversions

class klio.transforms.io.KlioReadFromAvro

Read avro from a local directory or GCS bucket.

Data from avro is dumped into JSON and assigned to KlioMessage.data. element.

KlioReadFromAvro is the default read for event input config type avro. However, KlioReadFromAvro can also be called explicitly in a pipeline.

Example pipeline reading in elements from an avro file:

def run(pipeline, config):
    initial_data_path = os.path.join(DIRECTORY_TO_AVRO, "twitter.avro")
    pipeline | io_transforms.KlioReadFromAvro(
            file_pattern=initial_data_path
    ) | beam.ParDo(transforms.HelloKlio())
Parameters
  • file_pattern (str) – the file glob to read.

  • location (str) – local or GCS path of file(s) to read.

  • min_bundle_size (int) – the minimum size in bytes, to be considered when splitting the input into bundles.

  • validate (bool) – flag to verify that the files exist during the pipeline creation time.

class klio.transforms.io.KlioReadFromPubSub

Read from a Google Pub/Sub topic or subscription.

Parameters
  • topic (str) – Cloud Pub/Sub topic in the form projects/<project>/topics/<topic>. If provided, subscription must be None.

  • subscription (str) – Existing Cloud Pub/Sub subscription to use in the form projects/<project>/subscriptions/<subscription>. If not specified, a temporary subscription will be created from the specified topic. If provided, topic must be None.

  • id_label (str) – The attribute on incoming Pub/Sub messages to use as a unique record identifier. When specified, the value of this attribute (which can be any string that uniquely identifies the record) will be used for deduplication of messages. If not provided, we cannot guarantee that no duplicate data will be delivered on the Pub/Sub stream. In this case, deduplication of the stream will be strictly best effort.

  • with_attributes (bool) – With True, output elements will be PubsubMessage objects. With False, output elements will be of type bytes (message data only). Defaults to False.

  • timestamp_attribute (str) – Message value to use as element timestamp. If None, uses message publishing time as the timestamp. Timestamp values should be in one of two formats: (1) A numerical value representing the number of milliseconds since the Unix epoch. (2) A string in RFC 3339 format, UTC timezone. Example: 2015-10-29T23:41:41.123Z. The sub-second component of the timestamp is optional, and digits beyond the first three (i.e., time units smaller than milliseconds) may be ignored.

class klio.transforms.io.KlioWriteToText

Write to a local or GCS file with each new line as KlioMessage.data.element.

__init__(*args, **kwargs)

Initialize self. See help(type(self)) for accurate signature.

class klio.transforms.io.KlioWriteToBigQuery

Writes to BigQuery table with each row as KlioMessage.data.element.

__init__(*args, **kwargs)

Initialize self. See help(type(self)) for accurate signature.

class klio.transforms.io.KlioWriteToAvro

Write avro to a local directory or GCS bucket.

KlioMessage.data.element data is parsed out and dumped into avro format.

KlioWriteToAvro is the default write for event output config type avro. However, KlioWriteToAvro can also be called explicitly in a pipeline.

Example pipeline for writing elements to an avro file:

def run(input_pcol, config):
    output_gcs_location = "gs://test-gcs-location"
    return (
        input_pcol
        | beam.ParDo(HelloKlio())
        | transforms.io.KlioWriteToAvro(location=output_gcs_location)
    )
Parameters
  • file_path_prefix (str) – The file path to write to

  • location (str) – local or GCS path to write to

  • schema (str) – The schema to use, as returned by avro.schema.parse

  • codec (str) – The codec to use for block-level compression. defaults to ‘deflate’

  • file_name_suffix (str) – Suffix for the files written.

  • num_shards (int) – The number of files (shards) used for output.

  • shard_name_template (str) – template string for shard number and count

  • mime_type (str) – The MIME type to use for the produced files. Defaults to “application/x-avro”

class klio.transforms.io.KlioWriteToPubSub

Write to a Google Pub/Sub topic.

Parameters
  • topic (str) – Cloud Pub/Sub topic in the form /topics/<project>/<topic>.

  • with_attributes (bool) – With True, input elements will be PubsubMessage objects. With False, input elements will be of type bytes (message data only). Defaults to False.

  • id_label (str) – If set, will set an attribute for each Cloud Pub/Sub message with the given name and a unique value. This attribute can then be used in a ReadFromPubSub PTransform to deduplicate messages.

  • timestamp_attribute (str) – If set, will set an attribute for each Cloud Pub/Sub message with the given name and the message’s publish time as the value.

exception klio.transforms.io.KlioMissingConfiguration

Required configuration is missing.