Source code for librosa.core.dtw

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Sequence Alignment with Dynamic Time Warping."""

import numpy as np
from numba import jit
import six
from scipy.spatial.distance import cdist
from ..util.exceptions import ParameterError

__all__ = ['dtw', 'fill_off_diagonal']


[docs]def fill_off_diagonal(x, radius, value=0): """Sets all cells of a matrix to a given ``value`` if they lie outside a constraint region. In this case, the constraint region is the Sakoe-Chiba band which runs with a fixed ``radius`` along the main diagonal. When ``x.shape[0] != x.shape[1]``, the radius will be expanded so that ``x[-1, -1] = 1`` always. ``x`` will be modified in place. Parameters ---------- x : np.ndarray [shape=(N, M)] Input matrix, will be modified in place. radius : float The band radius (1/2 of the width) will be ``int(radius*min(x.shape))``. value : int ``x[n, m] = value`` when ``(n, m)`` lies outside the band. Examples -------- >>> x = np.ones((8, 8)) >>> global_constraints(x, 0.25) >>> x array([[1, 1, 0, 0, 0, 0, 0, 0], [1, 1, 1, 0, 0, 0, 0, 0], [0, 1, 1, 1, 0, 0, 0, 0], [0, 0, 1, 1, 1, 0, 0, 0], [0, 0, 0, 1, 1, 1, 0, 0], [0, 0, 0, 0, 1, 1, 1, 0], [0, 0, 0, 0, 0, 1, 1, 1], [0, 0, 0, 0, 0, 0, 1, 1]]) >>> x = np.ones((8, 12)) >>> global_constraints(x, 0.25) >>> x array([[1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0], [1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0], [0, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0], [0, 0, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0], [0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 0, 0], [0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 0], [0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1], [0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1]]) """ nx, ny = x.shape # Calculate the radius in indices, rather than proportion radius = np.round(radius * np.min(x.shape)) nx, ny = x.shape offset = np.abs((x.shape[0] - x.shape[1])) if nx < ny: idx_u = np.triu_indices_from(x, k=radius + offset) idx_l = np.tril_indices_from(x, k=-radius) else: idx_u = np.triu_indices_from(x, k=radius) idx_l = np.tril_indices_from(x, k=-radius - offset) # modify input matrix x[idx_u] = value x[idx_l] = value
[docs]def dtw(X=None, Y=None, C=None, metric='euclidean', step_sizes_sigma=None, weights_add=None, weights_mul=None, subseq=False, backtrack=True, global_constraints=False, band_rad=0.25): '''Dynamic time warping (DTW). This function performs a DTW and path backtracking on two sequences. We follow the nomenclature and algorithmic approach as described in [1]. .. [1] Meinard Mueller Fundamentals of Music Processing — Audio, Analysis, Algorithms, Applications Springer Verlag, ISBN: 978-3-319-21944-8, 2015. Parameters ---------- X : np.ndarray [shape=(K, N)] audio feature matrix (e.g., chroma features) Y : np.ndarray [shape=(K, M)] audio feature matrix (e.g., chroma features) C : np.ndarray [shape=(N, M)] Precomputed distance matrix. If supplied, X and Y must not be supplied and ``metric`` will be ignored. metric : str Identifier for the cost-function as documented in `scipy.spatial.cdist()` step_sizes_sigma : np.ndarray [shape=[n, 2]] Specifies allowed step sizes as used by the dtw. weights_add : np.ndarray [shape=[n, ]] Additive weights to penalize certain step sizes. weights_mul : np.ndarray [shape=[n, ]] Multiplicative weights to penalize certain step sizes. subseq : binary Enable subsequence DTW, e.g., for retrieval tasks. backtrack : binary Enable backtracking in accumulated cost matrix. global_constraints : binary Applies global constraints to the cost matrix ``C`` (Sakoe-Chiba band). band_rad : float The Sakoe-Chiba band radius (1/2 of the width) will be ``int(radius*min(C.shape))``. Returns ------- D : np.ndarray [shape=(N,M)] accumulated cost matrix. D[N,M] is the total alignment cost. When doing subsequence DTW, D[N,:] indicates a matching function. wp : np.ndarray [shape=(N,2)] Warping path with index pairs. Each row of the array contains an index pair n,m). Only returned when ``backtrack`` is True. Raises ------ ParameterError If you are doing diagonal matching and Y is shorter than X or if an incompatible combination of X, Y, and C are supplied. If your input dimensions are incompatible. Examples -------- >>> import numpy as np >>> import matplotlib.pyplot as plt >>> y, sr = librosa.load(librosa.util.example_audio_file(), offset=10, duration=15) >>> X = librosa.feature.chroma_cens(y=y, sr=sr) >>> noise = np.random.rand(X.shape[0], 200) >>> Y = np.concatenate((noise, noise, X, noise), axis=1) >>> D, wp = librosa.dtw(X, Y, subseq=True) >>> plt.subplot(2, 1, 1) >>> librosa.display.specshow(D, x_axis='frames', y_axis='frames') >>> plt.title('Database excerpt') >>> plt.plot(wp[:, 1], wp[:, 0], label='Optimal path', color='y') >>> plt.legend() >>> plt.subplot(2, 1, 2) >>> plt.plot(D[-1, :] / wp.shape[0]) >>> plt.xlim([0, Y.shape[1]]) >>> plt.ylim([0, 2]) >>> plt.title('Matching cost function') >>> plt.tight_layout() ''' # Default Parameters if step_sizes_sigma is None: step_sizes_sigma = np.array([[1, 1], [0, 1], [1, 0]]) if weights_add is None: weights_add = np.zeros(len(step_sizes_sigma)) if weights_mul is None: weights_mul = np.ones(len(step_sizes_sigma)) if len(step_sizes_sigma) != len(weights_add): raise ParameterError('len(weights_add) must be equal to len(step_sizes_sigma)') if len(step_sizes_sigma) != len(weights_mul): raise ParameterError('len(weights_mul) must be equal to len(step_sizes_sigma)') if C is None and (X is None or Y is None): raise ParameterError('If C is not supplied, both X and Y must be supplied') if C is not None and (X is not None or Y is not None): raise ParameterError('If C is supplied, both X and Y must not be supplied') # calculate pair-wise distances, unless already supplied. if C is None: # take care of dimensions X = np.atleast_2d(X) Y = np.atleast_2d(Y) try: C = cdist(X.T, Y.T, metric=metric) except ValueError as e: msg = ('scipy.spatial.distance.cdist returned an error.\n' 'Please provide your input in the form X.shape=(K, N) and Y.shape=(K, M).\n' '1-dimensional sequences should be reshaped to X.shape=(1, N) and Y.shape=(1, M).') six.reraise(ParameterError, ParameterError(msg)) # for subsequence matching: # if N > M, Y can be a subsequence of X if subseq and (X.shape[1] > Y.shape[1]): C = C.T C = np.atleast_2d(C) # if diagonal matching, Y has to be longer than X # (X simply cannot be contained in Y) if np.array_equal(step_sizes_sigma, np.array([[1, 1]])) and (C.shape[0] > C.shape[1]): raise ParameterError('For diagonal matching: Y.shape[1] >= X.shape[1] ' '(C.shape[1] >= C.shape[0])') max_0 = step_sizes_sigma[:, 0].max() max_1 = step_sizes_sigma[:, 1].max() if global_constraints: # Apply global constraints to the cost matrix fill_off_diagonal(C, band_rad, value=np.inf) # initialize whole matrix with infinity values D = np.ones(C.shape + np.array([max_0, max_1])) * np.inf # set starting point to C[0, 0] D[max_0, max_1] = C[0, 0] if subseq: D[max_0, max_1:] = C[0, :] # initialize step matrix with -1 # will be filled in calc_accu_cost() with indices from step_sizes_sigma D_steps = -1 * np.ones(D.shape, dtype=np.int) # calculate accumulated cost matrix D, D_steps = calc_accu_cost(C, D, D_steps, step_sizes_sigma, weights_mul, weights_add, max_0, max_1) # delete infinity rows and columns D = D[max_0:, max_1:] D_steps = D_steps[max_0:, max_1:] if backtrack: if subseq: # search for global minimum in last row of D-matrix wp_end_idx = np.argmin(D[-1, :]) + 1 wp = backtracking(D_steps[:, :wp_end_idx], step_sizes_sigma) else: # perform warping path backtracking wp = backtracking(D_steps, step_sizes_sigma) wp = np.asarray(wp, dtype=int) # since we transposed in the beginning, we have to adjust the index pairs back if subseq and (X.shape[1] > Y.shape[1]): wp = np.fliplr(wp) return D, wp else: return D
@jit(nopython=True) def calc_accu_cost(C, D, D_steps, step_sizes_sigma, weights_mul, weights_add, max_0, max_1): '''Calculate the accumulated cost matrix D. Use dynamic programming to calculate the accumulated costs. Parameters ---------- C : np.ndarray [shape=(N, M)] pre-computed cost matrix D : np.ndarray [shape=(N, M)] accumulated cost matrix D_steps : np.ndarray [shape=(N, M)] steps which were used for calculating D step_sizes_sigma : np.ndarray [shape=[n, 2]] Specifies allowed step sizes as used by the dtw. weights_add : np.ndarray [shape=[n, ]] Additive weights to penalize certain step sizes. weights_mul : np.ndarray [shape=[n, ]] Multiplicative weights to penalize certain step sizes. max_0 : int maximum number of steps in step_sizes_sigma in dim 0. max_1 : int maximum number of steps in step_sizes_sigma in dim 1. Returns ------- D : np.ndarray [shape=(N,M)] accumulated cost matrix. D[N,M] is the total alignment cost. When doing subsequence DTW, D[N,:] indicates a matching function. D_steps : np.ndarray [shape=(N,M)] steps which were used for calculating D. See Also -------- dtw ''' for cur_n in range(max_0, D.shape[0]): for cur_m in range(max_1, D.shape[1]): # accumulate costs for cur_step_idx, cur_w_add, cur_w_mul in zip(range(step_sizes_sigma.shape[0]), weights_add, weights_mul): cur_D = D[cur_n - step_sizes_sigma[cur_step_idx, 0], cur_m - step_sizes_sigma[cur_step_idx, 1]] cur_C = cur_w_mul * C[cur_n - max_0, cur_m - max_1] cur_C += cur_w_add cur_cost = cur_D + cur_C # check if cur_cost is smaller than the one stored in D if cur_cost < D[cur_n, cur_m]: D[cur_n, cur_m] = cur_cost # save step-index D_steps[cur_n, cur_m] = cur_step_idx return D, D_steps @jit(nopython=True) def backtracking(D_steps, step_sizes_sigma): '''Backtrack optimal warping path. Uses the saved step sizes from the cost accumulation step to backtrack the index pairs for an optimal warping path. Parameters ---------- D_steps : np.ndarray [shape=(N, M)] Saved indices of the used steps used in the calculation of D. step_sizes_sigma : np.ndarray [shape=[n, 2]] Specifies allowed step sizes as used by the dtw. Returns ------- wp : list [shape=(N,)] Warping path with index pairs. Each list entry contains an index pair (n,m) as a tuple See Also -------- dtw ''' wp = [] # Set starting point D(N,M) and append it to the path cur_idx = (D_steps.shape[0] - 1, D_steps.shape[1] - 1) wp.append((cur_idx[0], cur_idx[1])) # Loop backwards. # Stop criteria: # Setting it to (0, 0) does not work for the subsequence dtw, # so we only ask to reach the first row of the matrix. while cur_idx[0] > 0: cur_step_idx = D_steps[(cur_idx[0], cur_idx[1])] # save tuple with minimal acc. cost in path cur_idx = (cur_idx[0] - step_sizes_sigma[cur_step_idx][0], cur_idx[1] - step_sizes_sigma[cur_step_idx][1]) # append to warping path wp.append((cur_idx[0], cur_idx[1])) return wp