The following example takes the vocal separation tutorial from librosa and adapts it to a Klio pipeline.
Refer to the full example
This example only highlights a Klio job’s transforms.py and run.py, but a fully functional Klio job requires other files as well.
transforms.py
run.py
For the full example, please refer to the code in the examples directory.
GetMagnitude
Using librosa.magphase(), compute the spectrogram magnitude and phase (but only use the spectrogram magnitude).
librosa.magphase()
class GetMagnitude(beam.DoFn): """Get the magnitude of a song given its STFT.""" @tfm_decorators.handle_klio @audio_decorators.handle_binary(load_with_numpy=True) def process(self, item): element = item.element.decode("utf-8") self._klio.logger.debug( "Computing the magnitude spectrogram for {}".format(element) ) stft = item.payload spectrogram, phase = librosa.magphase(stft) yield pvalue.TaggedOutput("phase", phase) yield pvalue.TaggedOutput("spectrogram", spectrogram)
Using Klio decorators
Notice that we’re making use of two decorators: @tfm_decorators.handle_klio and @audio_decorators.handle_binary.
@tfm_decorators.handle_klio
@audio_decorators.handle_binary
@tfm_decorators.handle_klio will handle the required de/serialization of incoming and outgoing KlioMessages as well as attach the KlioContext object to the transform’s instance, making it accessible via self._klio.
self._klio
Since the payload of the incoming KlioMessage is expected to be a serialized numpy.ndarray, the @audio_decorators.handle_binary decorator will take that payload and handle the unserialization to a numpy.ndarray object. Here we’re telling @audio_decorators.handle_binary to load with numpy instead of the default pickle.load() in order to give us better memory performance.
payload
KlioMessage
numpy.ndarray
numpy
pickle.load()
Using tagged outputs
The two yield statements in GetMagnitude makes use of Apache Beam’s support for tagged outputs. In the run function of our run.py file, we’ll make use of the tagged outputs to only work with what we need, the spectrogram, not the phase. Tagged outputs can also be used for branching within a pipeline (like if we wanted to do something different to the phase value).
yield
run
spectrogram
phase
FilterNearestNeighbors
Using librosa.decompose.nn_filter(), given the spectrogram from GetMagnitude, filter the nearest neighbors. From librosa’s tutorial:
librosa.decompose.nn_filter()
We’ll compare frames using cosine similarity, and aggregate similar frames by taking their (per-frequency) median value. To avoid being biased by local continuity, we constrain similar frames to be separated by at least 2 seconds. This suppresses sparse/non-repetitive deviations from the average spectrum, and works well to discard vocal elements.
We’ll compare frames using cosine similarity, and aggregate similar frames by taking their (per-frequency) median value.
To avoid being biased by local continuity, we constrain similar frames to be separated by at least 2 seconds.
This suppresses sparse/non-repetitive deviations from the average spectrum, and works well to discard vocal elements.
class FilterNearestNeighbors(beam.DoFn): @tfm_decorators.handle_klio @audio_decorators.handle_binary def process(self, item): element = item.element.decode("utf-8") self._klio.logger.debug( "Filtering nearest neighbors for {}".format(element) ) spectrogram = item.payload nn_filter = librosa.decompose.nn_filter( spectrogram, aggregate=np.median, metric="cosine", width=int(librosa.time_to_frames(2)), ) # The output of the filter shouldn't be greater than the input # if we assume signals are additive. Taking the pointwise minimium # with the input spectrum forces this. nn_filter = np.minimum(spectrogram, nn_filter) yield nn_filter
Using the same decorators slightly differently
GetSoftMask
Given the output of GetMagnitude and FilterNearestNeighbors, generate masks using librosa.util.softmask():
librosa.util.softmask()
class GetSoftMask(beam.DoFn): def __init__(self, margin=1, power=2): self.margin = margin self.power = power @tfm_decorators.set_klio_context def process(self, item): key, data = item first_data = data["first"][0] second_data = data["second"][0] full_data = data["full"][0] first = _unpickle_from_klio_message(first_data) second = _unpickle_from_klio_message(second_data) full = _unpickle_from_klio_message(full_data) self._klio.logger.debug("Getting softmask for {}".format(key)) mask = librosa.util.softmask( first, self.margin * second, power=self.power ) ret = mask * full yield _dump_to_klio_message(key, ret)
The following transforms are simple functions to be used with apache_beam.Map (see Defining a Klio Pipeline below for how its used).
apache_beam.Map
create_key_from_element is used to take a PCollection of KlioMessages and transform it into a PCollection of (key, KlioMessage) pairings, where the key is the data.element of the KlioMessage. This will be helpful for when we need to pair up (apache_beam.CoGroupByKey) outputs of the same audio file.
create_key_from_element
PCollection
KlioMessages
(key, KlioMessage)
apache_beam.CoGroupByKey
def create_key_from_element(item): kmsg = klio_pb2.KlioMessage() kmsg.ParseFromString(item) return (kmsg.data.element, item)
The next helper transform, subtract_filter_from_full, takes a (key, KlioMessage) pairing and calculates the difference between the full spectrogram and the nearest neighbors filtered spectrogram. This will be used to help calculate the masks.
subtract_filter_from_full
def subtract_filter_from_full(key_pair): key, pair_data = key_pair full = _unpickle_from_klio_message(pair_data["full"][0]) nn_filter = _unpickle_from_klio_message(pair_data["nnfilter"][0]) net = full - nn_filter payload = pickle.dumps(net) kmsg = klio_pb2.KlioMessage() kmsg.data.element = key kmsg.data.payload = payload return (key, kmsg.SerializeToString())
These last two functions are not transforms, just utility functions that are used in a few transforms.
_unpickle_from_klio_message takes raw bytes, serializes the bytes into a KlioMessage, and returns the data.payload of the KlioMessage unpickled.
_unpickle_from_klio_message
data.payload
def _unpickle_from_klio_message(item): kmsg = klio_pb2.KlioMessage() kmsg.ParseFromString(item) return pickle.loads(kmsg.data.payload)
The next helper function, _dump_to_klio_message, creates a KlioMessage given a key and payload data. It also serializes the payload data with numpy.save() before returning the KlioMessage as raw bytes.
_dump_to_klio_message
numpy.save()
def _dump_to_klio_message(key, payload): kmsg = klio_pb2.KlioMessage() kmsg.data.element = key out = io.BytesIO() np.save(out, payload) kmsg.data.payload = out.getvalue() return kmsg.SerializeToString()
Full transforms.py example
# Copyright 2020 Spotify AB # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # """ Implement custom transforms using utilities for Klio-fication. """ import io import os import pickle import apache_beam as beam from apache_beam import pvalue import librosa import numpy as np from klio_audio import decorators as audio_decorators from klio_core.proto import klio_pb2 from klio.transforms import decorators as tfm_decorators #### # Helper funcs for handling klio & numpy de/serialization when working # with pcolls that are grouped by key #### def _unpickle_from_klio_message(item): kmsg = klio_pb2.KlioMessage() kmsg.ParseFromString(item) return pickle.loads(kmsg.data.payload) def _dump_to_klio_message(key, payload): kmsg = klio_pb2.KlioMessage() kmsg.data.element = key out = io.BytesIO() np.save(out, payload) kmsg.data.payload = out.getvalue() return kmsg.SerializeToString() ##### # Transforms ##### class GetMagnitude(beam.DoFn): """Get the magnitude of a song given its STFT.""" @tfm_decorators.handle_klio @audio_decorators.handle_binary(load_with_numpy=True) def process(self, item): element = item.element.decode("utf-8") self._klio.logger.debug( "Computing the magnitude spectrogram for {}".format(element) ) stft = item.payload spectrogram, phase = librosa.magphase(stft) yield pvalue.TaggedOutput("phase", phase) yield pvalue.TaggedOutput("spectrogram", spectrogram) class FilterNearestNeighbors(beam.DoFn): @tfm_decorators.handle_klio @audio_decorators.handle_binary def process(self, item): element = item.element.decode("utf-8") self._klio.logger.debug( "Filtering nearest neighbors for {}".format(element) ) spectrogram = item.payload nn_filter = librosa.decompose.nn_filter( spectrogram, aggregate=np.median, metric="cosine", width=int(librosa.time_to_frames(2)), ) # The output of the filter shouldn't be greater than the input # if we assume signals are additive. Taking the pointwise minimium # with the input spectrum forces this. nn_filter = np.minimum(spectrogram, nn_filter) yield nn_filter class GetSoftMask(beam.DoFn): def __init__(self, margin=1, power=2): self.margin = margin self.power = power @tfm_decorators.set_klio_context def process(self, item): key, data = item first_data = data["first"][0] second_data = data["second"][0] full_data = data["full"][0] first = _unpickle_from_klio_message(first_data) second = _unpickle_from_klio_message(second_data) full = _unpickle_from_klio_message(full_data) self._klio.logger.debug("Getting softmask for {}".format(key)) mask = librosa.util.softmask( first, self.margin * second, power=self.power ) ret = mask * full yield _dump_to_klio_message(key, ret) def create_key_from_element(item): kmsg = klio_pb2.KlioMessage() kmsg.ParseFromString(item) return (kmsg.data.element, item) # key_pair looks like # (element, {"full": [<serialized numpy array>], # "nnfilter": [<serialized numpy array>]}) def subtract_filter_from_full(key_pair): key, pair_data = key_pair full = _unpickle_from_klio_message(pair_data["full"][0]) nn_filter = _unpickle_from_klio_message(pair_data["nnfilter"][0]) net = full - nn_filter payload = pickle.dumps(net) kmsg = klio_pb2.KlioMessage() kmsg.data.element = key kmsg.data.payload = payload return (key, kmsg.SerializeToString())
Below is a walk-through on the actual construction of the Klio-ified pipeline.
In the run.py module, the Klio pipeline is constructed within the run function. Klio takes care of the reading from the configured event input, and passes in the PCollection of KlioMessages.
First, our imports, including importing our audio transforms from above:
import apache_beam as beam from klio_audio.transforms import io as aio from klio_audio.transforms import audio from audio_spectrograms import transforms
We then begin constructing our pipeline by using a few helper transforms provided by the klio-audio package:
klio-audio
GcsLoadBinary to download audio from GCS into memory;
GcsLoadBinary
LoadAudio - a wrapper transform around librosa.load() - to load 5 seconds of the audio as a np.ndarray; then
LoadAudio
librosa.load()
np.ndarray
compute the stft with GetSTFT.
stft
GetSTFT
def run(in_pcol, job_config): # load 5 seconds of audio and get STFT stft = ( in_pcol | aio.GcsLoadBinary() | audio.LoadAudio(offset=10, duration=5) | audio.GetSTFT() )
We then use the computed stft to get the magnitude of the audio with our GetMagnitude transform:
magnitude = ( stft | "Get magnitude" >> beam.ParDo(transforms.GetMagnitude()).with_outputs() )
Next we need to map the spectrogram result to a key (which will be``KlioMessage.data.element``) so we can group all results by key.
magnitude_key = ( magnitude.spectrogram | "element to spec" >> beam.Map(transforms.create_key_from_element) )
Notice how instead of magnitude, we’re piping magnitude.spectrogram to the transform beam.Map(transforms.create_key_from_element). This is because the GetMagnitude() transform has two outputs, both of which are “tagged”. Beam allows transforms to return more than one output by way of tagging. The tags used in apache_beam.pvalue.TaggedOutput in the yield statements of GetMagnitude() turn into attributes on the PCollection themselves.
magnitude
magnitude.spectrogram
beam.Map(transforms.create_key_from_element)
GetMagnitude()
apache_beam.pvalue.TaggedOutput
pvalue.TaggedOutput("spectrogram") -> magnitude.spectrogram pvalue.TaggedOutput("phase") -> magnitude.phase
Now, we generate the nearest neighbors filter with the spectrogram and also map its result to a key (KlioMessage.data.element).
KlioMessage.data.element
nn_filter = ( magnitude.spectrogram | "Get nn filter" >> beam.ParDo(transforms.FilterNearestNeighbors()) | "element to filter" >> beam.Map(transforms.create_key_from_element) )
Map together the magnitude spectrogram with its nearest neighbor by key.
merge = ( {"full": magnitude_key, "nnfilter": nn_filter} | "merge" >> beam.CoGroupByKey() )
Take the grouped PCollection to then find the difference between the full spectrogram and the filtered one.
net = merge | beam.Map(transforms.subtract_filter_from_full)
Create a mask with our GetSoftMask transform.
first_mask = ( {"first": nn_filter, "second": net, "full": magnitude_key} | "first mask group" >> beam.CoGroupByKey() | "first mask" >> beam.ParDo(transforms.GetSoftMask(margin=2)) )
And another mask - essentially the inverse of what we just did:
second_mask = ( {"first": net, "second": nn_filter, "full": magnitude_key} | "second mask group" >> beam.CoGroupByKey() | "second mask" >> beam.ParDo(transforms.GetSoftMask(margin=10)) )
Then generate three sets of output files. First, the plot of the magnitude spectrogram:
magnitude_out = ( magnitude.spectrogram | "full spec" >> audio.GetSpec() | "plot full spec" >> audio.SpecToPlot(title="Full Spectrogam for {element}", y_axis="log") | "save full" >> aio.GcsUploadPlot(suffix="-full") )
Second, the spectrogram plot of the first mask:
background_out = ( first_mask | "background spec" >> audio.GetSpec() | "plot background spec" >> audio.SpecToPlot(title="Background Spectrogam for {element}", y_axis="log") | "save background" >> aio.GcsUploadPlot(suffix="-background") )
And third, the spectrogram plot of the second mask:
foreground_out = ( second_mask | "foreground spec" >> audio.GetSpec() | "plot forground spec" >> audio.SpecToPlot(title="Foreground Spectrogam for {element}", y_axis="log") | "save foreground" >> aio.GcsUploadPlot(suffix="-foreground") )
Finally, we need to flatten the output into a single PCollection of KlioMessages (as Klio is not yet able to handle multiple output PCollections to multiple event outputs), as well as remove duplicate KlioMessages since there are now three “forks” of PCollections: the full spectrogram, the background mask, and the foreground mask.
out_pcol = ( (magnitude_out, background_out, foreground_out) | "flatten output paths" >> beam.Flatten() | "remove dups" >> beam.Distinct() ) return out_pcol
Full run.py example
# Copyright 2020 Spotify AB # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # """ Construction of the Klio-ified pipeline. """ import logging import apache_beam as beam from klio_audio.transforms import io as aio from klio_audio.transforms import audio from audio_spectrograms import transforms logging.getLogger("klio").setLevel(logging.DEBUG) def run(in_pcol, job_config): # load 5 seconds of audio and get STFT stft = ( in_pcol | aio.GcsLoadBinary() | audio.LoadAudio(offset=10, duration=5) | audio.GetSTFT() ) # get magnitude of audio magnitude = ( stft | "Get magnitude" >> beam.ParDo(transforms.GetMagnitude()).with_outputs() ) # map the result to a key (the KlioMessage element) # so we can group all results by key magnitude_key = ( magnitude.spectrogram | "element to spec" >> beam.Map(transforms.create_key_from_element) ) # get nearest neighbors and map the result to a key (the KlioMessage element) nn_filter = ( magnitude.spectrogram | "Get nn filter" >> beam.ParDo(transforms.FilterNearestNeighbors()) | "element to filter" >> beam.Map(transforms.create_key_from_element) ) # map together the full magnitude with its filter by key (the KlioMessage element) merge = ( {"full": magnitude_key, "nnfilter": nn_filter} | "merge" >> beam.CoGroupByKey() ) # calc the difference between full magnitude and the filter net = merge | beam.Map(transforms.subtract_filter_from_full) # create a mask from the filter minus the difference of full & filter first_mask = ( {"first": nn_filter, "second": net, "full": magnitude_key} | "first mask group" >> beam.CoGroupByKey() | "first mask" >> beam.ParDo(transforms.GetSoftMask(margin=2)) ) # create another mask from the difference of full & filter minus the filter second_mask = ( {"first": net, "second": nn_filter, "full": magnitude_key} | "second mask group" >> beam.CoGroupByKey() | "second mask" >> beam.ParDo(transforms.GetSoftMask(margin=10)) ) # plot the full magnitude spectrogram magnitude_out = ( magnitude.spectrogram | "full spec" >> audio.GetSpec() | "plot full spec" >> audio.SpecToPlot(title="Full Spectrogam for {element}", y_axis="log") | "save full" >> aio.GcsUploadPlot(suffix="-full") ) # plot the first mask (background) spectrogram background_out = ( first_mask | "background spec" >> audio.GetSpec() | "plot background spec" >> audio.SpecToPlot(title="Background Spectrogam for {element}", y_axis="log") | "save background" >> aio.GcsUploadPlot(suffix="-background") ) # plot the second mask (foreground) spectrogram foreground_out = ( second_mask | "foreground spec" >> audio.GetSpec() | "plot forground spec" >> audio.SpecToPlot(title="Foreground Spectrogam for {element}", y_axis="log") | "save foreground" >> aio.GcsUploadPlot(suffix="-foreground") ) # flatten all outputs into one PCollection, then remove duplicates out_pcol = ( (magnitude_out, background_out, foreground_out) | "flatten output paths" >> beam.Flatten() | "remove dups" >> beam.Distinct() ) return out_pcol