Source code for samplerate.lowlevel

"""Lowlevel wrappers around libsamplerate.

The docstrings of the `src_*` functions are adapted from the libsamplerate
header file.
"""
import os as _os
import sys as _sys
from ctypes.util import find_library as _find_library

import numpy as _np

# Locate and load libsamplerate
from samplerate._src import ffi
lib_basename = 'libsamplerate'
lib_filename = _find_library('samplerate')

if _os.environ.get('READTHEDOCS') == 'True':
    # Mock minimum C API for Read the Docs
    class MockLib(object):
        @classmethod
        def src_get_version(cls):
            return ffi.new('char[]', 'libsamplerate-0.1.9 (c) ...')
    lib_filename = 'mock'
    _lib = MockLib()
elif lib_filename is None:
    if _sys.platform == 'darwin':
        lib_filename = '{}.dylib'.format(lib_basename)
    elif _sys.platform == 'win32':
        from platform import architecture
        lib_filename = '{}-{}.dll'.format(lib_basename, architecture()[0])
    else:
        raise OSError('{} not found'.format(lib_basename))
    lib_filename = _os.path.join(
        _os.path.dirname(_os.path.abspath(__file__)), '_samplerate_data',
        lib_filename)
    _lib = ffi.dlopen(lib_filename)
else:
    _lib = ffi.dlopen(lib_filename)


def _check_data(data):
    """Check whether `data` is a valid input/output for libsamplerate.

    Returns
    -------
    num_frames
        Number of frames in `data`.
    channels
        Number of channels in `data`.

    Raises
    ------
        ValueError: If invalid data is supplied.
    """
    if not (data.dtype == _np.float32 and data.flags.c_contiguous):
        raise ValueError('supplied data must be float32 and C contiguous')
    if data.ndim == 2:
        num_frames, channels = data.shape
    elif data.ndim == 1:
        num_frames, channels = data.size, 1
    else:
        raise ValueError('rank > 2 not supported')
    return num_frames, channels


[docs]def src_strerror(error): """Convert the error number into a string.""" return ffi.string(_lib.src_strerror(error)).decode()
[docs]def src_get_name(converter_type): """Return the name of the converter given by `converter_type`.""" return ffi.string(_lib.src_get_name(converter_type)).decode()
[docs]def src_get_description(converter_type): """Return the description of the converter given by `converter_type`.""" return ffi.string(_lib.src_get_description(converter_type)).decode()
[docs]def src_get_version(): """Return the version string of libsamplerate.""" return ffi.string(_lib.src_get_version()).decode()
[docs]def src_simple(input_data, output_data, ratio, converter_type, channels): """Perform a single conversion from an input buffer to an output buffer. Simple interface for performing a single conversion from input buffer to output buffer at a fixed conversion ratio. Simple interface does not require initialisation as it can only operate on a single buffer worth of audio. """ input_frames, _ = _check_data(input_data) output_frames, _ = _check_data(output_data) data = ffi.new('SRC_DATA*') data.input_frames = input_frames data.output_frames = output_frames data.src_ratio = ratio data.data_in = ffi.cast('float*', ffi.from_buffer(input_data)) data.data_out = ffi.cast('float*', ffi.from_buffer(output_data)) error = _lib.src_simple(data, converter_type, channels) return error, data.input_frames_used, data.output_frames_gen
[docs]def src_new(converter_type, channels): """Initialise a new sample rate converter. Parameters ---------- converter_type : int Converter to be used. channels : int Number of channels. Returns ------- state An anonymous pointer to the internal state of the converter. error : int Error code. """ error = ffi.new('int*') state = _lib.src_new(converter_type, channels, error) return state, error[0]
[docs]def src_delete(state): """Release `state`. Cleanup all internal allocations. """ _lib.src_delete(state)
[docs]def src_process(state, input_data, output_data, ratio, end_of_input=0): """Standard processing function. Returns non zero on error. """ input_frames, _ = _check_data(input_data) output_frames, _ = _check_data(output_data) data = ffi.new('SRC_DATA*') data.input_frames = input_frames data.output_frames = output_frames data.src_ratio = ratio data.data_in = ffi.cast('float*', ffi.from_buffer(input_data)) data.data_out = ffi.cast('float*', ffi.from_buffer(output_data)) data.end_of_input = end_of_input error = _lib.src_process(state, data) return error, data.input_frames_used, data.output_frames_gen
[docs]def src_error(state): """Return an error number.""" return _lib.src_error(state) if state else None
[docs]def src_reset(state): """Reset the internal SRC state. Does not modify the quality settings. Does not free any memory allocations. Returns non zero on error. """ return _lib.src_reset(state) if state else None
[docs]def src_set_ratio(state, new_ratio): """Set a new SRC ratio. This allows step responses in the conversion ratio. Returns non zero on error. """ return _lib.src_set_ratio(state, new_ratio) if state else None
[docs]def src_is_valid_ratio(ratio): """Return `True` if ratio is a valid conversion ratio, `False` otherwise. """ return bool(_lib.src_is_valid_ratio(ratio))
@ffi.callback('src_callback_t') def _src_input_callback(cb_data, data): """Internal callback function to be used with the callback API. Pulls the Python callback function from the handle contained in `cb_data` and calls it to fetch frames. Frames are converted to the format required by the API (float, interleaved channels). A reference to these data is kept internally. Returns ------- frames : int The number of frames supplied. """ cb_data = ffi.from_handle(cb_data) ret = cb_data['callback']() if ret is None: cb_data['last_input'] = None return 0 # No frames supplied input_data = _np.require(ret, requirements='C', dtype=_np.float32) input_frames, channels = _check_data(input_data) # Check whether the correct number of channels is supplied by user. if cb_data['channels'] != channels: raise ValueError('Invalid number of channels in callback.') # Store a reference of the input data to ensure it is still alive when # accessed by libsamplerate. cb_data['last_input'] = input_data data[0] = ffi.cast('float*', ffi.from_buffer(input_data)) return input_frames
[docs]def src_callback_new(callback, converter_type, channels): """Initialisation for the callback based API. Parameters ---------- callback : function Called whenever new frames are to be read. Must return a NumPy array of shape (num_frames, channels). converter_type : int Converter to be used. channels : int Number of channels. Returns ------- state An anonymous pointer to the internal state of the converter. handle A CFFI handle to the callback data. error : int Error code. """ cb_data = {'callback': callback, 'channels': channels} handle = ffi.new_handle(cb_data) error = ffi.new('int*') state = _lib.src_callback_new(_src_input_callback, converter_type, channels, error, handle) if state == ffi.NULL: return None, handle, error[0] return state, handle, error[0]
[docs]def src_callback_read(state, ratio, frames, data): """Read up to `frames` worth of data using the callback API. Returns ------- frames : int Number of frames read or -1 on error. """ data_ptr = ffi.cast('float*f', ffi.from_buffer(data)) return _lib.src_callback_read(state, ratio, frames, data_ptr)
__libsamplerate_version__ = src_get_version() if __libsamplerate_version__.startswith(lib_basename): __libsamplerate_version__ = __libsamplerate_version__[len( lib_basename) + 1:__libsamplerate_version__.find(' ')]