I/O

class klio.transforms.io.KlioReadFromText

Read from a local or GCS file with each new line as a KlioMessage.data.element.

__init__(file_pattern=None, min_bundle_size=0, compression_type='auto', strip_trailing_newlines=True, coder=StrUtf8Coder, validate=True, skip_header_lines=0, **kwargs)

Initialize the ReadFromText transform.

Parameters
  • file_pattern (str) – The file path to read from as a local file path or a GCS gs:// path. The path can contain glob characters (*, ?, and [...] sets).

  • min_bundle_size (int) – Minimum size of bundles that should be generated when splitting this source into bundles. See FileBasedSource for more details.

  • compression_type (str) – Used to handle compressed input files. Typical value is CompressionTypes.AUTO, in which case the underlying file_path’s extension will be used to detect the compression.

  • strip_trailing_newlines (bool) – Indicates whether this source should remove the newline char in each line it reads before decoding that line.

  • validate (bool) – flag to verify that the files exist during the pipeline creation time.

  • skip_header_lines (int) – Number of header lines to skip. Same number is skipped from each source file. Must be 0 or higher. Large number of skipped lines might impact performance.

  • coder (Coder) – Coder used to decode each line.

class klio.transforms.io.KlioReadFromBigQuery

Read data from BigQuery.

This PTransform uses a BigQuery export job to take a snapshot of the table on GCS, and then reads from each produced file. File format is Avro by default.

Parameters
  • table (str, callable, ValueProvider) – The ID of the table, or a callable that returns it. The ID must contain only letters a-z, A-Z, numbers 0-9, or underscores _. If dataset argument is None then the table argument must contain the entire table reference specified as: 'DATASET.TABLE' or 'PROJECT:DATASET.TABLE'. If it’s a callable, it must receive one argument representing an element to be written to BigQuery, and return a TableReference, or a string table name as specified above.

  • dataset (str) – The ID of the dataset containing this table or None if the table reference is specified entirely by the table argument.

  • project (str) – The ID of the project containing this table.

  • klio_message_columns (list) –

    A list of fields (str) that should be assigned to KlioMessage.data.element.

    Note

    If more than one field is provided, the results including the column names will be serialized to JSON before assigning to KlioMessage.data.element. (e.g. '{"field1": "foo", "field2": bar"}'). If only one field is provided, just the value will be assigned to KlioMessage.data.element.

  • query (str, ValueProvider) – A query to be used instead of arguments table, dataset, and project.

  • validate (bool) – If True, various checks will be done when source gets initialized (e.g., is table present?). This should be True for most scenarios in order to catch errors as early as possible (pipeline construction instead of pipeline execution). It should be False if the table is created during pipeline execution by a previous step.

  • coder (Coder) – The coder for the table rows. If None, then the default coder is _JsonToDictCoder, which will interpret every row as a JSON serialized dictionary.

  • use_standard_sql (bool) – Specifies whether to use BigQuery’s standard SQL dialect for this query. The default value is False. If set to True, the query will use BigQuery’s updated SQL dialect with improved standards compliance. This parameter is ignored for table inputs.

  • flatten_results (bool) – Flattens all nested and repeated fields in the query results. The default value is True.

  • kms_key (str) – Optional Cloud KMS key name for use when creating new temporary tables.

  • gcs_location (str, ValueProvider) – The name of the Google Cloud Storage bucket where the extracted table should be written as a string or a ValueProvider. If None, then the temp_location parameter is used.

  • bigquery_job_labels (dict) – A dictionary with string labels to be passed to BigQuery export and query jobs created by this transform. See: https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfiguration

  • use_json_exports (bool) –

    By default, this transform works by exporting BigQuery data into Avro files, and reading those files. With this parameter, the transform will instead export to JSON files. JSON files are slower to read due to their larger size. When using JSON exports, the BigQuery types for DATE, DATETIME, TIME, and TIMESTAMP will be exported as strings.

    This behavior is consistent with BigQuerySource. When using Avro exports, these fields will be exported as native Python types (datetime.date, datetime.datetime, datetime.datetime, and datetime.datetime respectively). Avro exports are recommended. To learn more about BigQuery types, and Time-related type representations, see: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types To learn more about type conversions between BigQuery and Avro, see: https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro#avro_conversions

class klio.transforms.io.KlioReadFromAvro

Read avro from a local directory or GCS bucket.

Data from avro is dumped into JSON and assigned to KlioMessage.data. element.

KlioReadFromAvro is the default read for event input config type avro. However, KlioReadFromAvro can also be called explicity in a pipeline.

Example pipeline reading in elements from an avro file:

def run(pipeline, config):
    initial_data_path = os.path.join(DIRECTORY_TO_AVRO, "twitter.avro")
    pipeline | io_transforms.KlioReadFromAvro(
            file_pattern=initial_data_path
    ) | beam.ParDo(transforms.HelloKlio())
Parameters
  • file_pattern (str) – the file glob to read.

  • location (str) – local or GCS path of file(s) to read.

  • min_bundle_size (int) – the minimum size in bytes, to be considered when splitting the input into bundles.

  • validate (bool) – flag to verify that the files exist during the pipeline creation time.

class klio.transforms.io.KlioWriteToText

Write to a local or GCS file with each new line as KlioMessage.data.element.

__init__(*args, **kwargs)

Initialize a WriteToText transform.

Parameters
  • file_path_prefix (str) – The file path to write to. The files written will begin with this prefix, followed by a shard identifier (see num_shards), and end in a common extension, if given by file_name_suffix. In most cases, only this argument is specified and num_shards, shard_name_template, and file_name_suffix use default values.

  • file_name_suffix (str) – Suffix for the files written.

  • append_trailing_newlines (bool) – indicate whether this sink should write an additional newline char after writing each element.

  • num_shards (int) – The number of files (shards) used for output. If not set, the service will decide on the optimal number of shards. Constraining the number of shards is likely to reduce the performance of a pipeline. Setting this value is not recommended unless you require a specific number of output files.

  • shard_name_template (str) – A template string containing placeholders for the shard number and shard count. Currently only '' and '-SSSSS-of-NNNNN' are patterns accepted by the service. When constructing a filename for a particular shard number, the upper-case letters S and N are replaced with the 0-padded shard number and shard count respectively. This argument can be '' in which case it behaves as if num_shards was set to 1 and only one file will be generated. The default pattern used is '-SSSSS-of-NNNNN'.

  • coder (Coder) – Coder used to encode each line.

  • compression_type (str) – Used to handle compressed output files. Typical value is CompressionTypes.AUTO, in which case the final file path’s extension (as determined by file_path_prefix, file_name_suffix, num_shards and shard_name_template) will be used to detect the compression.

  • header (str) – String to write at beginning of file as a header. If not None and append_trailing_newlines is set, \n will be added.

  • footer (str) – String to write at the end of file as a footer. If not None and append_trailing_newlines is set, \n will be added.

class klio.transforms.io.KlioWriteToBigQuery

Writes to BigQuery table with each row as KlioMessage.data.element.

__init__(table, dataset=None, project=None, schema=None, create_disposition='CREATE_IF_NEEDED', write_disposition='WRITE_APPEND', kms_key=None, batch_size=None, max_file_size=None, max_files_per_bundle=None, test_client=None, custom_gcs_temp_location=None, method=None, insert_retry_strategy=None, additional_bq_parameters=None, table_side_inputs=None, schema_side_inputs=None, triggering_frequency=None, validate=True, temp_file_format=None, ignore_insert_ids=False)

Initialize a WriteToBigQuery transform.

Parameters
  • table (str, callable, ValueProvider) – The ID of the table, or a callable that returns it. The ID must contain only letters a-z, A-Z, numbers 0-9, or underscores _. If dataset argument is None then the table argument must contain the entire table reference specified as: 'DATASET.TABLE' or 'PROJECT:DATASET.TABLE'. If it’s a callable, it must receive one argument representing an element to be written to BigQuery, and return a TableReference, or a string table name as specified above. Multiple destinations are only supported on Batch pipelines at the moment.

  • dataset (str) – The ID of the dataset containing this table or None if the table reference is specified entirely by the table argument.

  • project (str) – The ID of the project containing this table or None if the table reference is specified entirely by the table argument.

  • schema (str,dict,ValueProvider,callable) – The schema to be used if the BigQuery table to write has to be created. This can be either specified as a TableSchema. or a ValueProvider that has a JSON string, or a python dictionary, or the string or dictionary itself, object or a single string of the form 'field1:type1,field2:type2,field3:type3' that defines a comma separated list of fields. Here 'type' should specify the BigQuery type of the field. Single string based schemas do not support nested fields, repeated fields, or specifying a BigQuery mode for fields (mode will always be set to 'NULLABLE'). If a callable, then it should receive a destination (in the form of a TableReference or a string, and return a str, dict or TableSchema. One may also pass SCHEMA_AUTODETECT here when using JSON-based file loads, and BigQuery will try to infer the schema for the files that are being loaded.

  • create_disposition (BigQueryDisposition) –

    A string describing what happens if the table does not exist. Possible values are:

    • BigQueryDisposition.CREATE_IF_NEEDED: create if does not exist.

    • BigQueryDisposition.CREATE_NEVER: fail the write if does not exist.

  • write_disposition (BigQueryDisposition) –

    A string describing what happens if the table has already some data. Possible values are:

    • BigQueryDisposition.WRITE_TRUNCATE: delete existing rows.

    • BigQueryDisposition.WRITE_APPEND: add to existing rows.

    • BigQueryDisposition.WRITE_EMPTY: fail the write if table not empty.

    For streaming pipelines WriteTruncate can not be used.

  • kms_key (str) – Optional Cloud KMS key name for use when creating new tables.

  • batch_size (int) – Number of rows to be written to BQ per streaming API insert. The default is 500. insert.

  • test_client – Override the default bigquery client used for testing.

  • max_file_size (int) – The maximum size for a file to be written and then loaded into BigQuery. The default value is 4TB, which is 80% of the limit of 5TB for BigQuery to load any file.

  • max_files_per_bundle (int) – The maximum number of files to be concurrently written by a worker. The default here is 20. Larger values will allow writing to multiple destinations without having to reshard - but they increase the memory burden on the workers.

  • custom_gcs_temp_location (str) – A GCS location to store files to be used for file loads into BigQuery. By default, this will use the pipeline’s temp_location, but for pipelines whose temp_location is not appropriate for BQ File Loads, users should pass a specific one.

  • method – The method to use to write to BigQuery. It may be STREAMING_INSERTS, FILE_LOADS, or DEFAULT. An introduction on loading data to BigQuery: https://cloud.google.com/bigquery/docs/loading-data. DEFAULT will use STREAMING_INSERTS on Streaming pipelines and FILE_LOADS on Batch pipelines.

  • insert_retry_strategy

    The strategy to use when retrying streaming inserts into BigQuery. Options are shown in bigquery_tools.RetryStrategy attrs. Default is to retry always. This means that whenever there are rows that fail to be inserted to BigQuery, they will be retried indefinitely. Other retry strategy settings will produce a deadletter PCollection as output. Appropriate values are:

    • RetryStrategy.RETRY_ALWAYS: retry all rows if there are any kind of errors. Note that this will hold your pipeline back if there are errors until you cancel or update it.

    • RetryStrategy.RETRY_NEVER: rows with errors will not be retried. Instead they will be output to a dead letter queue under the ‘FailedRows’ tag.

    • RetryStrategy.RETRY_ON_TRANSIENT_ERROR: retry rows with transient errors (e.g. timeouts). Rows with permanent errors will be output to dead letter queue under ‘FailedRows’ tag.

  • additional_bq_parameters (callable) – A function that returns a dictionary with additional parameters to pass to BQ when creating / loading data into a table. These can be ‘timePartitioning’, ‘clustering’, etc. They are passed directly to the job load configuration. See https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load

  • table_side_inputs (tuple) – A tuple with AsSideInput PCollections to be passed to the table callable (if one is provided).

  • schema_side_inputs – A tuple with AsSideInput PCollections to be passed to the schema callable (if one is provided).

  • triggering_frequency (int) – Every triggering_frequency duration, a BigQuery load job will be triggered for all the data written since the last load job. BigQuery has limits on how many load jobs can be triggered per day, so be careful not to set this duration too low, or you may exceed daily quota. Often this is set to 5 or 10 minutes to ensure that the project stays well under the BigQuery quota. See https://cloud.google.com/bigquery/quota-policy for more information about BigQuery quotas.

  • validate – Indicates whether to perform validation checks on inputs. This parameter is primarily used for testing.

  • temp_file_format – The format to use for file loads into BigQuery. The options are NEWLINE_DELIMITED_JSON or AVRO, with NEWLINE_DELIMITED_JSON being used by default. For advantages and limitations of the two formats, see https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro and https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-json.

  • ignore_insert_ids – When using the STREAMING_INSERTS method to write data to BigQuery, insert_ids are a feature of BigQuery that support deduplication of events. If your use case is not sensitive to duplication of data inserted to BigQuery, set ignore_insert_ids to True to increase the throughput for BQ writing. See: https://cloud.google.com/bigquery/streaming-data-into-bigquery#disabling_best_effort_de-duplication

class klio.transforms.io.KlioWriteToAvro

Write avro to a local directory or GCS bucket.

KlioMessage.data.element data is parsed out and dumped into avro format.

KlioWriteToAvro is the default write for event output config type avro. However, KlioWriteToAvro can also be called explicity in a pipeline.

Example pipeline for writing elements to an avro file:

def run(input_pcol, config):
    output_gcs_location = "gs://test-gcs-location"
    return (
        input_pcol
        | beam.ParDo(HelloKlio())
        | transforms.io.KlioWriteToAvro(location=output_gcs_location)
    )
Parameters
  • file_path_prefix (str) – The file path to write to

  • location (str) – local or GCS path to write to

  • schema (str) – The schema to use, as returned by avro.schema.parse

  • codec (str) – The codec to use for block-level compression. defaults to ‘deflate’

  • file_name_suffix (str) – Suffix for the files written.

  • num_shards (int) – The number of files (shards) used for output.

  • shard_name_template (str) – template string for shard number and count

  • mime_type (str) – The MIME type to use for the produced files. Defaults to “application/x-avro”

exception klio.transforms.io.KlioMissingConfiguration

Required configuration is missing.