Vocal Separation & Spectrograms

The following example takes the vocal separation tutorial from librosa and adapts it to a Klio pipeline.

Refer to the full example

This example only highlights a Klio job’s transforms.py and run.py, but a fully functional Klio job requires other files as well.

For the full example, please refer to the code in the examples directory.

Klio-ified Transforms

Helper Transforms

The following transforms are simple functions to be used with apache_beam.Map (see Defining a Klio Pipeline below for how its used).

create_key_from_element is used to take a PCollection of KlioMessages and transform it into a PCollection of (key, KlioMessage) pairings, where the key is the data.element of the KlioMessage. This will be helpful for when we need to pair up (apache_beam.CoGroupByKey) outputs of the same audio file.

def create_key_from_element(item):
    kmsg = klio_pb2.KlioMessage()
    kmsg.ParseFromString(item)
    return (kmsg.data.element, item)

The next helper transform, subtract_filter_from_full, takes a (key, KlioMessage) pairing and calculates the difference between the full spectrogram and the nearest neighbors filtered spectrogram. This will be used to help calculate the masks.

def subtract_filter_from_full(key_pair):
    key, pair_data = key_pair
    full = _unpickle_from_klio_message(pair_data["full"][0])
    nn_filter = _unpickle_from_klio_message(pair_data["nnfilter"][0])

    net = full - nn_filter
    payload = pickle.dumps(net)
    kmsg = klio_pb2.KlioMessage()
    kmsg.data.element = key
    kmsg.data.payload = payload

    return (key, kmsg.SerializeToString())

Helper Functions for Transforms

These last two functions are not transforms, just utility functions that are used in a few transforms.

_unpickle_from_klio_message takes raw bytes, serializes the bytes into a KlioMessage, and returns the data.payload of the KlioMessage unpickled.

def _unpickle_from_klio_message(item):
    kmsg = klio_pb2.KlioMessage()
    kmsg.ParseFromString(item)
    return pickle.loads(kmsg.data.payload)

The next helper function, _dump_to_klio_message, creates a KlioMessage given a key and payload data. It also serializes the payload data with numpy.save() before returning the KlioMessage as raw bytes.

def _dump_to_klio_message(key, payload):
    kmsg = klio_pb2.KlioMessage()
    kmsg.data.element = key
    out = io.BytesIO()
    np.save(out, payload)
    kmsg.data.payload = out.getvalue()
    return kmsg.SerializeToString()

Full transforms.py Example

Full transforms.py example

# Copyright 2020 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Implement custom transforms using utilities for Klio-fication.
"""

import io
import os
import pickle

import apache_beam as beam
from apache_beam import pvalue

import librosa
import numpy as np

from klio_audio import decorators as audio_decorators
from klio_core.proto import klio_pb2
from klio.transforms import decorators as tfm_decorators



####
# Helper funcs for handling klio & numpy de/serialization when working
# with pcolls that are grouped by key
####
def _unpickle_from_klio_message(item):
    kmsg = klio_pb2.KlioMessage()
    kmsg.ParseFromString(item)
    return pickle.loads(kmsg.data.payload)


def _dump_to_klio_message(key, payload):
    kmsg = klio_pb2.KlioMessage()
    kmsg.data.element = key
    out = io.BytesIO()
    np.save(out, payload)
    kmsg.data.payload = out.getvalue()
    return kmsg.SerializeToString()


#####
# Transforms
#####
class GetMagnitude(beam.DoFn):
    """Get the magnitude of a song given its STFT."""

    @tfm_decorators.handle_klio
    @audio_decorators.handle_binary(load_with_numpy=True)
    def process(self, item):
        element = item.element.decode("utf-8")
        self._klio.logger.debug(
            "Computing the magnitude spectrogram for {}".format(element)
        )
        stft = item.payload
        spectrogram, phase = librosa.magphase(stft)
        yield pvalue.TaggedOutput("phase", phase)
        yield pvalue.TaggedOutput("spectrogram", spectrogram)


class FilterNearestNeighbors(beam.DoFn):
    @tfm_decorators.handle_klio
    @audio_decorators.handle_binary
    def process(self, item):
        element = item.element.decode("utf-8")
        self._klio.logger.debug(
            "Filtering nearest neighbors for {}".format(element)
        )
        spectrogram = item.payload
        nn_filter = librosa.decompose.nn_filter(
            spectrogram,
            aggregate=np.median,
            metric="cosine",
            width=int(librosa.time_to_frames(2)),
        )

        # The output of the filter shouldn't be greater than the input
        # if we assume signals are additive.  Taking the pointwise minimium
        # with the input spectrum forces this.
        nn_filter = np.minimum(spectrogram, nn_filter)
        yield nn_filter


class GetSoftMask(beam.DoFn):
    def __init__(self, margin=1, power=2):
        self.margin = margin
        self.power = power

    @tfm_decorators.set_klio_context
    def process(self, item):
        key, data = item
        first_data = data["first"][0]
        second_data = data["second"][0]
        full_data = data["full"][0]

        first = _unpickle_from_klio_message(first_data)
        second = _unpickle_from_klio_message(second_data)
        full = _unpickle_from_klio_message(full_data)

        self._klio.logger.debug("Getting softmask for {}".format(key))
        mask = librosa.util.softmask(
            first, self.margin * second, power=self.power
        )
        ret = mask * full
        yield _dump_to_klio_message(key, ret)


def create_key_from_element(item):
    kmsg = klio_pb2.KlioMessage()
    kmsg.ParseFromString(item)
    return (kmsg.data.element, item)


# key_pair looks like
# (element, {"full": [<serialized numpy array>],
#  "nnfilter": [<serialized numpy array>]})
def subtract_filter_from_full(key_pair):
    key, pair_data = key_pair
    full = _unpickle_from_klio_message(pair_data["full"][0])
    nn_filter = _unpickle_from_klio_message(pair_data["nnfilter"][0])

    net = full - nn_filter
    payload = pickle.dumps(net)
    kmsg = klio_pb2.KlioMessage()
    kmsg.data.element = key
    kmsg.data.payload = payload

    return (key, kmsg.SerializeToString())

Defining a Klio Pipeline

Below is a walk-through on the actual construction of the Klio-ified pipeline.

In the run.py module, the Klio pipeline is constructed within the run function. Klio takes care of the reading from the configured event input, and passes in the PCollection of KlioMessages.

First, our imports, including importing our audio transforms from above:

import apache_beam as beam

from klio_audio.transforms import io as aio
from klio_audio.transforms import audio

from audio_spectrograms import transforms

We then begin constructing our pipeline by using a few helper transforms provided by the klio-audio package:

  1. GcsLoadBinary to download audio from GCS into memory;

  2. LoadAudio - a wrapper transform around librosa.load() - to load 5 seconds of the audio as a np.ndarray; then

  3. compute the stft with GetSTFT.

def run(in_pcol, job_config):
    # load 5 seconds of audio and get STFT
    stft = (
        in_pcol
        | aio.GcsLoadBinary()
        | audio.LoadAudio(offset=10, duration=5)
        | audio.GetSTFT()
    )

We then use the computed stft to get the magnitude of the audio with our GetMagnitude transform:

    magnitude = (
        stft | "Get magnitude" >> beam.ParDo(transforms.GetMagnitude()).with_outputs()
    )

Next we need to map the spectrogram result to a key (which will be``KlioMessage.data.element``) so we can group all results by key.

    magnitude_key = (
        magnitude.spectrogram
        | "element to spec" >> beam.Map(transforms.create_key_from_element)
    )

Using tagged outputs

Notice how instead of magnitude, we’re piping magnitude.spectrogram to the transform beam.Map(transforms.create_key_from_element). This is because the GetMagnitude() transform has two outputs, both of which are “tagged”. Beam allows transforms to return more than one output by way of tagging. The tags used in apache_beam.pvalue.TaggedOutput in the yield statements of GetMagnitude() turn into attributes on the PCollection themselves.

pvalue.TaggedOutput("spectrogram") -> magnitude.spectrogram
pvalue.TaggedOutput("phase") -> magnitude.phase

Now, we generate the nearest neighbors filter with the spectrogram and also map its result to a key (KlioMessage.data.element).

    nn_filter = (
        magnitude.spectrogram
        | "Get nn filter" >> beam.ParDo(transforms.FilterNearestNeighbors())
        | "element to filter" >> beam.Map(transforms.create_key_from_element)
    )

Map together the magnitude spectrogram with its nearest neighbor by key.

    merge = (
        {"full": magnitude_key, "nnfilter": nn_filter}
        | "merge" >> beam.CoGroupByKey()
    )

Take the grouped PCollection to then find the difference between the full spectrogram and the filtered one.

    net = merge | beam.Map(transforms.subtract_filter_from_full)

Create a mask with our GetSoftMask transform.

    first_mask = (
        {"first": nn_filter, "second": net, "full": magnitude_key}
        | "first mask group" >> beam.CoGroupByKey()
        | "first mask" >> beam.ParDo(transforms.GetSoftMask(margin=2))
    )

And another mask - essentially the inverse of what we just did:

    second_mask = (
        {"first": net, "second": nn_filter, "full": magnitude_key}
        | "second mask group" >> beam.CoGroupByKey()
        | "second mask" >> beam.ParDo(transforms.GetSoftMask(margin=10))
    )

Then generate three sets of output files. First, the plot of the magnitude spectrogram:

    magnitude_out = (
        magnitude.spectrogram
        | "full spec" >> audio.GetSpec()
        | "plot full spec" >> audio.SpecToPlot(title="Full Spectrogam for {element}", y_axis="log")
        | "save full" >> aio.GcsUploadPlot(suffix="-full")
    )

Second, the spectrogram plot of the first mask:

    background_out = (
        first_mask
        | "background spec" >> audio.GetSpec()
        | "plot background spec" >> audio.SpecToPlot(title="Background Spectrogam for {element}", y_axis="log")
        | "save background" >> aio.GcsUploadPlot(suffix="-background")
    )

And third, the spectrogram plot of the second mask:

    foreground_out = (
        second_mask
        | "foreground spec" >> audio.GetSpec()
        | "plot forground spec" >> audio.SpecToPlot(title="Foreground Spectrogam for {element}", y_axis="log")
        | "save foreground" >> aio.GcsUploadPlot(suffix="-foreground")
    )

Finally, we need to flatten the output into a single PCollection of KlioMessages (as Klio is not yet able to handle multiple output PCollections to multiple event outputs), as well as remove duplicate KlioMessages since there are now three “forks” of PCollections: the full spectrogram, the background mask, and the foreground mask.

    out_pcol = (
        (magnitude_out, background_out, foreground_out)
        | "flatten output paths" >> beam.Flatten()
        | "remove dups" >> beam.Distinct()
    )
    return out_pcol

Full run.py Example

Full run.py example

# Copyright 2020 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Construction of the Klio-ified pipeline.
"""

import logging

import apache_beam as beam

from klio_audio.transforms import io as aio
from klio_audio.transforms import audio

from audio_spectrograms import transforms


logging.getLogger("klio").setLevel(logging.DEBUG)


def run(in_pcol, job_config):
    # load 5 seconds of audio and get STFT
    stft = (
        in_pcol
        | aio.GcsLoadBinary()
        | audio.LoadAudio(offset=10, duration=5)
        | audio.GetSTFT()
    )
    # get magnitude of audio
    magnitude = (
        stft | "Get magnitude" >> beam.ParDo(transforms.GetMagnitude()).with_outputs()
    )
    # map the result to a key (the KlioMessage element)
    # so we can group all results by key
    magnitude_key = (
        magnitude.spectrogram
        | "element to spec" >> beam.Map(transforms.create_key_from_element)
    )
    # get nearest neighbors and map the result to a key (the KlioMessage element)
    nn_filter = (
        magnitude.spectrogram
        | "Get nn filter" >> beam.ParDo(transforms.FilterNearestNeighbors())
        | "element to filter" >> beam.Map(transforms.create_key_from_element)
    )
    # map together the full magnitude with its filter by key  (the KlioMessage element)
    merge = (
        {"full": magnitude_key, "nnfilter": nn_filter}
        | "merge" >> beam.CoGroupByKey()
    )
    # calc the difference between full magnitude and the filter
    net = merge | beam.Map(transforms.subtract_filter_from_full)
    # create a mask from the filter minus the difference of full & filter
    first_mask = (
        {"first": nn_filter, "second": net, "full": magnitude_key}
        | "first mask group" >> beam.CoGroupByKey()
        | "first mask" >> beam.ParDo(transforms.GetSoftMask(margin=2))
    )
    # create another mask from the difference of full & filter minus the filter
    second_mask = (
        {"first": net, "second": nn_filter, "full": magnitude_key}
        | "second mask group" >> beam.CoGroupByKey()
        | "second mask" >> beam.ParDo(transforms.GetSoftMask(margin=10))
    )
    # plot the full magnitude spectrogram
    magnitude_out = (
        magnitude.spectrogram
        | "full spec" >> audio.GetSpec()
        | "plot full spec" >> audio.SpecToPlot(title="Full Spectrogam for {element}", y_axis="log")
        | "save full" >> aio.GcsUploadPlot(suffix="-full")
    )
    # plot the first mask (background) spectrogram
    background_out = (
        first_mask
        | "background spec" >> audio.GetSpec()
        | "plot background spec" >> audio.SpecToPlot(title="Background Spectrogam for {element}", y_axis="log")
        | "save background" >> aio.GcsUploadPlot(suffix="-background")
    )
    # plot the second mask (foreground) spectrogram
    foreground_out = (
        second_mask
        | "foreground spec" >> audio.GetSpec()
        | "plot forground spec" >> audio.SpecToPlot(title="Foreground Spectrogam for {element}", y_axis="log")
        | "save foreground" >> aio.GcsUploadPlot(suffix="-foreground")
    )
    # flatten all outputs into one PCollection, then remove duplicates
    out_pcol = (
        (magnitude_out, background_out, foreground_out)
        | "flatten output paths" >> beam.Flatten()
        | "remove dups" >> beam.Distinct()
    )
    return out_pcol