Source code for librosa.segment

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Temporal segmentation
=====================

Recurrence and self-similarity
------------------------------
.. autosummary::
    :toctree: generated/

    recurrence_matrix
    recurrence_to_lag
    lag_to_recurrence
    timelag_filter

Temporal clustering
-------------------
.. autosummary::
    :toctree: generated/

    agglomerative
    subsegment
"""

from decorator import decorator

import numpy as np
import scipy
import scipy.signal

import sklearn
import sklearn.cluster
import sklearn.feature_extraction
import sklearn.neighbors

from . import cache
from . import util
from .util.exceptions import ParameterError

__all__ = ['recurrence_matrix',
           'recurrence_to_lag',
           'lag_to_recurrence',
           'timelag_filter',
           'agglomerative',
           'subsegment']


[docs]@cache(level=30) def recurrence_matrix(data, k=None, width=1, metric='euclidean', sym=False, sparse=False, mode='connectivity', bandwidth=None, axis=-1): '''Compute a recurrence matrix from a data matrix. `rec[i, j]` is non-zero if (`data[:, i]`, `data[:, j]`) are k-nearest-neighbors and `|i - j| >= width` Parameters ---------- data : np.ndarray A feature matrix k : int > 0 [scalar] or None the number of nearest-neighbors for each sample Default: `k = 2 * ceil(sqrt(t - 2 * width + 1))`, or `k = 2` if `t <= 2 * width + 1` width : int >= 1 [scalar] only link neighbors `(data[:, i], data[:, j])` if `|i - j| >= width` metric : str Distance metric to use for nearest-neighbor calculation. See `sklearn.neighbors.NearestNeighbors` for details. sym : bool [scalar] set `sym=True` to only link mutual nearest-neighbors sparse : bool [scalar] if False, returns a dense type (ndarray) if True, returns a sparse type (scipy.sparse.csr_matrix) mode : str, {'connectivity', 'distance', 'affinity'} If 'connectivity', a binary connectivity matrix is produced. If 'distance', then a non-zero entry contains the distance between points. If 'affinity', then non-zero entries are mapped to `exp( - distance(i, j) / bandwidth)` where `bandwidth` is as specified below. bandwidth : None or float > 0 If using ``mode='affinity'``, this can be used to set the bandwidth on the affinity kernel. If no value is provided, it is set automatically to the median distance between furthest nearest neighbors. axis : int The axis along which to compute recurrence. By default, the last index (-1) is taken. Returns ------- rec : np.ndarray or scipy.sparse.csr_matrix, [shape=(t, t)] Recurrence matrix See Also -------- sklearn.neighbors.NearestNeighbors scipy.spatial.distance.cdist librosa.feature.stack_memory recurrence_to_lag Notes ----- This function caches at level 30. Examples -------- Find nearest neighbors in MFCC space >>> y, sr = librosa.load(librosa.util.example_audio_file()) >>> mfcc = librosa.feature.mfcc(y=y, sr=sr) >>> R = librosa.segment.recurrence_matrix(mfcc) Or fix the number of nearest neighbors to 5 >>> R = librosa.segment.recurrence_matrix(mfcc, k=5) Suppress neighbors within +- 7 samples >>> R = librosa.segment.recurrence_matrix(mfcc, width=7) Use cosine similarity instead of Euclidean distance >>> R = librosa.segment.recurrence_matrix(mfcc, metric='cosine') Require mutual nearest neighbors >>> R = librosa.segment.recurrence_matrix(mfcc, sym=True) Use an affinity matrix instead of binary connectivity >>> R_aff = librosa.segment.recurrence_matrix(mfcc, mode='affinity') Plot the feature and recurrence matrices >>> import matplotlib.pyplot as plt >>> plt.figure(figsize=(8, 4)) >>> plt.subplot(1, 2, 1) >>> librosa.display.specshow(R, x_axis='time', y_axis='time') >>> plt.title('Binary recurrence (symmetric)') >>> plt.subplot(1, 2, 2) >>> librosa.display.specshow(R_aff, x_axis='time', y_axis='time', ... cmap='magma_r') >>> plt.title('Affinity recurrence') >>> plt.tight_layout() ''' data = np.atleast_2d(data) # Swap observations to the first dimension and flatten the rest data = np.swapaxes(data, axis, 0) t = data.shape[0] data = data.reshape((t, -1)) if width < 1: raise ParameterError('width must be at least 1') if mode not in ['connectivity', 'distance', 'affinity']: raise ParameterError(("Invalid mode='{}'. Must be one of " "['connectivity', 'distance', " "'affinity']").format(mode)) if k is None: if t > 2 * width + 1: k = 2 * np.ceil(np.sqrt(t - 2 * width + 1)) else: k = 2 if bandwidth is not None: if bandwidth <= 0: raise ParameterError('Invalid bandwidth={}. ' 'Must be strictly positive.'.format(bandwidth)) k = int(k) # Build the neighbor search object try: knn = sklearn.neighbors.NearestNeighbors(n_neighbors=min(t-1, k + 2 * width), metric=metric, algorithm='auto') except ValueError: knn = sklearn.neighbors.NearestNeighbors(n_neighbors=min(t-1, k + 2 * width), metric=metric, algorithm='brute') knn.fit(data) # Get the knn graph if mode == 'affinity': kng_mode = 'distance' else: kng_mode = mode rec = knn.kneighbors_graph(mode=kng_mode).tolil() # Remove connections within width for diag in range(-width + 1, width): rec.setdiag(0, diag) # Retain only the top-k links per point for i in range(t): # Get the links from point i links = rec[i].nonzero()[1] # Order them ascending idx = links[np.argsort(rec[i, links].toarray())][0] # Everything past the kth closest gets squashed rec[i, idx[k:]] = 0 # symmetrize if sym: rec = rec.minimum(rec.T) rec = rec.tocsr() rec.eliminate_zeros() if mode == 'connectivity': rec = rec.astype(np.bool) elif mode == 'affinity': if bandwidth is None: bandwidth = np.median(rec.max(axis=1).data) rec.data[:] = np.exp(- rec.data / bandwidth) if not sparse: rec = rec.toarray() return rec
[docs]def recurrence_to_lag(rec, pad=True, axis=-1): '''Convert a recurrence matrix into a lag matrix. `lag[i, j] == rec[i+j, j]` Parameters ---------- rec : np.ndarray, or scipy.sparse.spmatrix [shape=(n, n)] A (binary) recurrence matrix, as returned by `recurrence_matrix` pad : bool If False, `lag` matrix is square, which is equivalent to assuming that the signal repeats itself indefinitely. If True, `lag` is padded with `n` zeros, which eliminates the assumption of repetition. axis : int The axis to keep as the `time` axis. The alternate axis will be converted to lag coordinates. Returns ------- lag : np.ndarray The recurrence matrix in (lag, time) (if `axis=1`) or (time, lag) (if `axis=0`) coordinates Raises ------ ParameterError : if `rec` is non-square See Also -------- recurrence_matrix lag_to_recurrence Examples -------- >>> y, sr = librosa.load(librosa.util.example_audio_file()) >>> mfccs = librosa.feature.mfcc(y=y, sr=sr) >>> recurrence = librosa.segment.recurrence_matrix(mfccs) >>> lag_pad = librosa.segment.recurrence_to_lag(recurrence, pad=True) >>> lag_nopad = librosa.segment.recurrence_to_lag(recurrence, pad=False) >>> import matplotlib.pyplot as plt >>> plt.figure(figsize=(8, 4)) >>> plt.subplot(1, 2, 1) >>> librosa.display.specshow(lag_pad, x_axis='time', y_axis='lag') >>> plt.title('Lag (zero-padded)') >>> plt.subplot(1, 2, 2) >>> librosa.display.specshow(lag_nopad, x_axis='time') >>> plt.title('Lag (no padding)') >>> plt.tight_layout() ''' axis = np.abs(axis) if rec.ndim != 2 or rec.shape[0] != rec.shape[1]: raise ParameterError('non-square recurrence matrix shape: ' '{}'.format(rec.shape)) sparse = scipy.sparse.issparse(rec) roll_ax = None if sparse: roll_ax = 1 - axis lag_format = rec.format if axis == 0: rec = rec.tocsc() elif axis in (-1, 1): rec = rec.tocsr() t = rec.shape[axis] if sparse: if pad: kron = np.asarray([[1, 0]]).swapaxes(axis, 0) lag = scipy.sparse.kron(kron.astype(rec.dtype), rec, format='lil') else: lag = scipy.sparse.lil_matrix(rec) else: if pad: padding = [(0, 0), (0, 0)] padding[(1-axis)] = (0, t) lag = np.pad(rec, padding, mode='constant') else: lag = rec.copy() idx_slice = [slice(None)] * lag.ndim for i in range(1, t): idx_slice[axis] = i lag[tuple(idx_slice)] = util.roll_sparse(lag[tuple(idx_slice)], -i, axis=roll_ax) if sparse: return lag.asformat(lag_format) else: return np.ascontiguousarray(lag.T).T
[docs]def lag_to_recurrence(lag, axis=-1): '''Convert a lag matrix into a recurrence matrix. Parameters ---------- lag : np.ndarray or scipy.sparse.spmatrix A lag matrix, as produced by `recurrence_to_lag` axis : int The axis corresponding to the time dimension. The alternate axis will be interpreted in lag coordinates. Returns ------- rec : np.ndarray or scipy.sparse.spmatrix [shape=(n, n)] A recurrence matrix in (time, time) coordinates For sparse matrices, format will match that of `lag`. Raises ------ ParameterError : if `lag` does not have the correct shape See Also -------- recurrence_to_lag Examples -------- >>> y, sr = librosa.load(librosa.util.example_audio_file()) >>> mfccs = librosa.feature.mfcc(y=y, sr=sr) >>> recurrence = librosa.segment.recurrence_matrix(mfccs) >>> lag_pad = librosa.segment.recurrence_to_lag(recurrence, pad=True) >>> lag_nopad = librosa.segment.recurrence_to_lag(recurrence, pad=False) >>> rec_pad = librosa.segment.lag_to_recurrence(lag_pad) >>> rec_nopad = librosa.segment.lag_to_recurrence(lag_nopad) >>> import matplotlib.pyplot as plt >>> plt.figure(figsize=(8, 4)) >>> plt.subplot(2, 2, 1) >>> librosa.display.specshow(lag_pad, x_axis='time', y_axis='lag') >>> plt.title('Lag (zero-padded)') >>> plt.subplot(2, 2, 2) >>> librosa.display.specshow(lag_nopad, x_axis='time', y_axis='time') >>> plt.title('Lag (no padding)') >>> plt.subplot(2, 2, 3) >>> librosa.display.specshow(rec_pad, x_axis='time', y_axis='time') >>> plt.title('Recurrence (with padding)') >>> plt.subplot(2, 2, 4) >>> librosa.display.specshow(rec_nopad, x_axis='time', y_axis='time') >>> plt.title('Recurrence (without padding)') >>> plt.tight_layout() ''' if axis not in [0, 1, -1]: raise ParameterError('Invalid target axis: {}'.format(axis)) axis = np.abs(axis) if lag.ndim != 2 or (lag.shape[0] != lag.shape[1] and lag.shape[1 - axis] != 2 * lag.shape[axis]): raise ParameterError('Invalid lag matrix shape: {}'.format(lag.shape)) # Since lag must be 2-dimensional, abs(axis) = axis t = lag.shape[axis] sparse = scipy.sparse.issparse(lag) if sparse: rec = scipy.sparse.lil_matrix(lag) roll_ax = 1 - axis else: rec = lag.copy() roll_ax = None idx_slice = [slice(None)] * lag.ndim for i in range(1, t): idx_slice[axis] = i rec[tuple(idx_slice)] = util.roll_sparse(lag[tuple(idx_slice)], i, axis=roll_ax) sub_slice = [slice(None)] * rec.ndim sub_slice[1 - axis] = slice(t) rec = rec[tuple(sub_slice)] if sparse: return rec.asformat(lag.format) else: return np.ascontiguousarray(rec.T).T
[docs]def timelag_filter(function, pad=True, index=0): '''Filtering in the time-lag domain. This is primarily useful for adapting image filters to operate on `recurrence_to_lag` output. Using `timelag_filter` is equivalent to the following sequence of operations: >>> data_tl = librosa.segment.recurrence_to_lag(data) >>> data_filtered_tl = function(data_tl) >>> data_filtered = librosa.segment.lag_to_recurrence(data_filtered_tl) Parameters ---------- function : callable The filtering function to wrap, e.g., `scipy.ndimage.median_filter` pad : bool Whether to zero-pad the structure feature matrix index : int >= 0 If `function` accepts input data as a positional argument, it should be indexed by `index` Returns ------- wrapped_function : callable A new filter function which applies in time-lag space rather than time-time space. Examples -------- Apply a 5-bin median filter to the diagonal of a recurrence matrix >>> y, sr = librosa.load(librosa.util.example_audio_file()) >>> chroma = librosa.feature.chroma_cqt(y=y, sr=sr) >>> rec = librosa.segment.recurrence_matrix(chroma) >>> from scipy.ndimage import median_filter >>> diagonal_median = librosa.segment.timelag_filter(median_filter) >>> rec_filtered = diagonal_median(rec, size=(1, 3), mode='mirror') Or with affinity weights >>> rec_aff = librosa.segment.recurrence_matrix(chroma, mode='affinity') >>> rec_aff_fil = diagonal_median(rec_aff, size=(1, 3), mode='mirror') >>> import matplotlib.pyplot as plt >>> plt.figure(figsize=(8,8)) >>> plt.subplot(2, 2, 1) >>> librosa.display.specshow(rec, y_axis='time') >>> plt.title('Raw recurrence matrix') >>> plt.subplot(2, 2, 2) >>> librosa.display.specshow(rec_filtered) >>> plt.title('Filtered recurrence matrix') >>> plt.subplot(2, 2, 3) >>> librosa.display.specshow(rec_aff, x_axis='time', y_axis='time', ... cmap='magma_r') >>> plt.title('Raw affinity matrix') >>> plt.subplot(2, 2, 4) >>> librosa.display.specshow(rec_aff_fil, x_axis='time', ... cmap='magma_r') >>> plt.title('Filtered affinity matrix') >>> plt.tight_layout() ''' def __my_filter(wrapped_f, *args, **kwargs): '''Decorator to wrap the filter''' # Map the input data into time-lag space args = list(args) args[index] = recurrence_to_lag(args[index], pad=pad) # Apply the filtering function result = wrapped_f(*args, **kwargs) # Map back into time-time and return return lag_to_recurrence(result) return decorator(__my_filter, function)
[docs]@cache(level=30) def subsegment(data, frames, n_segments=4, axis=-1): '''Sub-divide a segmentation by feature clustering. Given a set of frame boundaries (`frames`), and a data matrix (`data`), each successive interval defined by `frames` is partitioned into `n_segments` by constrained agglomerative clustering. .. note:: If an interval spans fewer than `n_segments` frames, then each frame becomes a sub-segment. Parameters ---------- data : np.ndarray Data matrix to use in clustering frames : np.ndarray [shape=(n_boundaries,)], dtype=int, non-negative] Array of beat or segment boundaries, as provided by `librosa.beat.beat_track`, `librosa.onset.onset_detect`, or `agglomerative`. n_segments : int > 0 Maximum number of frames to sub-divide each interval. axis : int Axis along which to apply the segmentation. By default, the last index (-1) is taken. Returns ------- boundaries : np.ndarray [shape=(n_subboundaries,)] List of sub-divided segment boundaries See Also -------- agglomerative : Temporal segmentation librosa.onset.onset_detect : Onset detection librosa.beat.beat_track : Beat tracking Notes ----- This function caches at level 30. Examples -------- Load audio, detect beat frames, and subdivide in twos by CQT >>> y, sr = librosa.load(librosa.util.example_audio_file(), duration=8) >>> tempo, beats = librosa.beat.beat_track(y=y, sr=sr, hop_length=512) >>> beat_times = librosa.frames_to_time(beats, sr=sr, hop_length=512) >>> cqt = np.abs(librosa.cqt(y, sr=sr, hop_length=512)) >>> subseg = librosa.segment.subsegment(cqt, beats, n_segments=2) >>> subseg_t = librosa.frames_to_time(subseg, sr=sr, hop_length=512) >>> subseg array([ 0, 2, 4, 21, 23, 26, 43, 55, 63, 72, 83, 97, 102, 111, 122, 137, 142, 153, 162, 180, 182, 185, 202, 210, 221, 231, 241, 256, 261, 271, 281, 296, 301, 310, 320, 339, 341, 344, 361, 368, 382, 389, 401, 416, 420, 430, 436, 451, 456, 465, 476, 489, 496, 503, 515, 527, 535, 544, 553, 558, 571, 578, 590, 607, 609, 638]) >>> import matplotlib.pyplot as plt >>> plt.figure() >>> librosa.display.specshow(librosa.amplitude_to_db(cqt, ... ref=np.max), ... y_axis='cqt_hz', x_axis='time') >>> lims = plt.gca().get_ylim() >>> plt.vlines(beat_times, lims[0], lims[1], color='lime', alpha=0.9, ... linewidth=2, label='Beats') >>> plt.vlines(subseg_t, lims[0], lims[1], color='linen', linestyle='--', ... linewidth=1.5, alpha=0.5, label='Sub-beats') >>> plt.legend(frameon=True, shadow=True) >>> plt.title('CQT + Beat and sub-beat markers') >>> plt.tight_layout() ''' frames = util.fix_frames(frames, x_min=0, x_max=data.shape[axis], pad=True) if n_segments < 1: raise ParameterError('n_segments must be a positive integer') boundaries = [] idx_slices = [slice(None)] * data.ndim for seg_start, seg_end in zip(frames[:-1], frames[1:]): idx_slices[axis] = slice(seg_start, seg_end) boundaries.extend(seg_start + agglomerative(data[idx_slices], min(seg_end - seg_start, n_segments), axis=axis)) return np.ascontiguousarray(boundaries)
[docs]def agglomerative(data, k, clusterer=None, axis=-1): """Bottom-up temporal segmentation. Use a temporally-constrained agglomerative clustering routine to partition `data` into `k` contiguous segments. Parameters ---------- data : np.ndarray data to cluster k : int > 0 [scalar] number of segments to produce clusterer : sklearn.cluster.AgglomerativeClustering, optional An optional AgglomerativeClustering object. If `None`, a constrained Ward object is instantiated. axis : int axis along which to cluster. By default, the last axis (-1) is chosen. Returns ------- boundaries : np.ndarray [shape=(k,)] left-boundaries (frame numbers) of detected segments. This will always include `0` as the first left-boundary. See Also -------- sklearn.cluster.AgglomerativeClustering Examples -------- Cluster by chroma similarity, break into 20 segments >>> y, sr = librosa.load(librosa.util.example_audio_file(), duration=15) >>> chroma = librosa.feature.chroma_cqt(y=y, sr=sr) >>> bounds = librosa.segment.agglomerative(chroma, 20) >>> bound_times = librosa.frames_to_time(bounds, sr=sr) >>> bound_times array([ 0. , 1.672, 2.322, 2.624, 3.251, 3.506, 4.18 , 5.387, 6.014, 6.293, 6.943, 7.198, 7.848, 9.033, 9.706, 9.961, 10.635, 10.89 , 11.54 , 12.539]) Plot the segmentation over the chromagram >>> import matplotlib.pyplot as plt >>> plt.figure() >>> librosa.display.specshow(chroma, y_axis='chroma', x_axis='time') >>> plt.vlines(bound_times, 0, chroma.shape[0], color='linen', linestyle='--', ... linewidth=2, alpha=0.9, label='Segment boundaries') >>> plt.axis('tight') >>> plt.legend(frameon=True, shadow=True) >>> plt.title('Power spectrogram') >>> plt.tight_layout() """ # Make sure we have at least two dimensions data = np.atleast_2d(data) # Swap data index to position 0 data = np.swapaxes(data, axis, 0) # Flatten the features n = data.shape[0] data = data.reshape((n, -1)) if clusterer is None: # Connect the temporal connectivity graph grid = sklearn.feature_extraction.image.grid_to_graph(n_x=n, n_y=1, n_z=1) # Instantiate the clustering object clusterer = sklearn.cluster.AgglomerativeClustering(n_clusters=k, connectivity=grid, memory=cache) # Fit the model clusterer.fit(data) # Find the change points from the labels boundaries = [0] boundaries.extend( list(1 + np.nonzero(np.diff(clusterer.labels_))[0].astype(int))) return np.asarray(boundaries)