Source code for vqf.pyvqf

# SPDX-FileCopyrightText: 2021 Daniel Laidig <laidig@control.tu-berlin.de>
#
# SPDX-License-Identifier: MIT

from __future__ import annotations

import copy
import dataclasses
import math
from dataclasses import dataclass, field
from typing import TypedDict, Any, overload, cast

import numpy as np


class PyVQFBatchResults9D(TypedDict):
    quat6D: np.ndarray
    quat9D: np.ndarray
    delta: np.ndarray
    bias: np.ndarray
    biasSigma: np.ndarray
    restDetected: np.ndarray
    magDistDetected: np.ndarray


class PyVQFBatchResults6D(TypedDict):
    quat6D: np.ndarray
    bias: np.ndarray
    biasSigma: np.ndarray
    restDetected: np.ndarray


PyVQFBatchResults = PyVQFBatchResults9D | PyVQFBatchResults6D


class PyVQFStateDict(TypedDict):
    gyrQuat: np.ndarray
    accQuat: np.ndarray
    delta: float

    restDetected: bool
    magDistDetected: bool

    lastAccLp: np.ndarray
    accLpState: np.ndarray
    lastAccCorrAngularRate: float

    kMagInit: float
    lastMagDisAngle: float
    lastMagCorrAngularRate: float

    bias: np.ndarray
    biasP: np.ndarray

    motionBiasEstRLpState: np.ndarray
    motionBiasEstBiasLpState: np.ndarray

    restLastSquaredDeviations: np.ndarray
    restT: float
    restLastGyrLp: np.ndarray
    restGyrLpState: np.ndarray
    restLastAccLp: np.ndarray
    restAccLpState: np.ndarray

    magRefNorm: float
    magRefDip: float
    magUndisturbedT: float
    magRejectT: float
    magCandidateNorm: float
    magCandidateDip: float
    magCandidateT: float
    magNormDip: np.ndarray
    magNormDipLpState: np.ndarray


EPS = np.finfo(float).eps


@dataclass
class PyVQFParams:
    tauAcc: float = 3.0
    tauMag: float = 9.0
    motionBiasEstEnabled: bool = True
    restBiasEstEnabled: bool = True
    magDistRejectionEnabled: bool = True
    biasSigmaInit: float = 0.5
    biasForgettingTime: float = 100.0
    biasClip: float = 2.0
    biasSigmaMotion: float = 0.1
    biasVerticalForgettingFactor: float = 0.0001
    biasSigmaRest: float = 0.03
    restMinT: float = 1.5
    restFilterTau: float = 0.5
    restThGyr: float = 2.0
    restThAcc: float = 0.5
    magCurrentTau: float = 0.05
    magRefTau: float = 20.0
    magNormTh: float = 0.1
    magDipTh: float = 10.0
    magNewTime: float = 20.0
    magNewFirstTime: float = 5.0
    magNewMinGyr: float = 20.0
    magMinUndisturbedTime: float = 0.5
    magMaxRejectionTime: float = 60.0
    magRejectionFactor: float = 2.0


@dataclass
class PyVQFState:
    gyrQuat: np.ndarray = field(default_factory=lambda: np.array([1, 0, 0, 0], float))
    accQuat: np.ndarray = field(default_factory=lambda: np.array([1, 0, 0, 0], float))
    delta: float = 0.0

    restDetected: bool = False
    magDistDetected: bool = True

    lastAccLp: np.ndarray = field(default_factory=lambda: np.zeros(3, float))
    accLpState: np.ndarray = field(default_factory=lambda: np.full((2, 3), np.nan, float))
    lastAccCorrAngularRate: float = 0.0

    kMagInit: float = 1.0
    lastMagDisAngle: float = 0.0
    lastMagCorrAngularRate: float = 0.0

    bias: np.ndarray = field(default_factory=lambda: np.zeros(3, float))
    biasP: np.ndarray = field(default_factory=lambda: np.full((3, 3), np.nan, float))

    motionBiasEstRLpState: np.ndarray = field(default_factory=lambda: np.full((2, 9), np.nan, float))
    motionBiasEstBiasLpState: np.ndarray = field(default_factory=lambda: np.full((2, 2), np.nan, float))

    restLastSquaredDeviations: np.ndarray = field(default_factory=lambda: np.zeros(2, float))
    restT: float = 0.0
    restLastGyrLp: np.ndarray = field(default_factory=lambda: np.zeros(3, float))
    restGyrLpState: np.ndarray = field(default_factory=lambda: np.full((2, 3), np.nan, float))
    restLastAccLp: np.ndarray = field(default_factory=lambda: np.zeros(3, float))
    restAccLpState: np.ndarray = field(default_factory=lambda: np.full((2, 3), np.nan, float))

    magRefNorm: float = 0.0
    magRefDip: float = 0.0
    magUndisturbedT: float = 0.0
    magRejectT: float = -1.0
    magCandidateNorm: float = -1.0
    magCandidateDip: float = 0.0
    magCandidateT: float = 0.0
    magNormDip: np.ndarray = field(default_factory=lambda: np.zeros(2, float))
    magNormDipLpState: np.ndarray = field(default_factory=lambda: np.full((2, 2), np.nan, float))


@dataclass
class PyVQFCoefficients:
    gyrTs: float
    accTs: float
    magTs: float

    accLpB: np.ndarray = field(default_factory=lambda: np.full(3, np.nan, float))
    accLpA: np.ndarray = field(default_factory=lambda: np.full(2, np.nan, float))

    kMag: float = -1.0

    biasP0: float = -1.0
    biasV: float = -1.0
    biasMotionW: float = -1.0
    biasVerticalW: float = -1.0
    biasRestW: float = -1.0

    restGyrLpB: np.ndarray = field(default_factory=lambda: np.full(3, np.nan, float))
    restGyrLpA: np.ndarray = field(default_factory=lambda: np.full(2, np.nan, float))
    restAccLpB: np.ndarray = field(default_factory=lambda: np.full(3, np.nan, float))
    restAccLpA: np.ndarray = field(default_factory=lambda: np.full(2, np.nan, float))

    kMagRef: float = -1.0
    magNormDipLpB: np.ndarray = field(default_factory=lambda: np.full(3, np.nan, float))
    magNormDipLpA: np.ndarray = field(default_factory=lambda: np.full(2, np.nan, float))


[docs] class PyVQF: """A Versatile Quaternion-based Filter for IMU Orientation Estimation. This class implements the orientation estimation filter described in the following publication: D. Laidig and T. Seel. "VQF: Highly Accurate IMU Orientation Estimation with Bias Estimation and Magnetic Disturbance Rejection." Information Fusion 2023, 91, 187--204. `doi:10.1016/j.inffus.2022.10.014 <https://doi.org/10.1016/j.inffus.2022.10.014>`_. [Accepted manuscript available at `arXiv:2203.17024 <https://arxiv.org/abs/2203.17024>`_.] The filter can perform simultaneous 6D (magnetometer-free) and 9D (gyr+acc+mag) sensor fusion and can also be used without magnetometer data. It performs rest detection, gyroscope bias estimation during rest and motion, and magnetic disturbance detection and rejection. Different sampling rates for gyroscopes, accelerometers, and magnetometers are supported as well. While in most cases, the defaults will be reasonable, the algorithm can be influenced via a number of tuning parameters. To use this class for online (sample-by-sample) processing, 1. create a instance of the class and provide the sampling time and, optionally, parameters 2. for every sample, call one of the update functions to feed the algorithm with IMU data 3. access the estimation results with :meth:`getQuat6D`, :meth:`getQuat9D` and the other getter methods. If the full data is available in numpy arrays, you can use :meth:`updateBatch`. This class is a pure Python implementation of the algorithm that only depends on the Python standard library and `numpy <https://numpy.org/>`_. Note that the wrapper :class:`vqf.VQF` for the C++ implementation :cpp:class:`VQF` is much faster than this pure Python implementation. Depending on use case and programming language of choice, the following alternatives might be useful: +------------------------+----------------------------+--------------------------+---------------------------+ | | Full Version | Basic Version | Offline Version | | | | | | +========================+============================+==========================+===========================+ | **C++** | :cpp:class:`VQF` | :cpp:class:`BasicVQF` | :cpp:func:`offlineVQF` | +------------------------+----------------------------+--------------------------+---------------------------+ | **Python/C++ (fast)** | :py:class:`vqf.VQF` | :py:class:`vqf.BasicVQF` | :py:meth:`vqf.offlineVQF` | +------------------------+----------------------------+--------------------------+---------------------------+ | **Pure Python (slow)** | **vqf.PyVQF (this class)** | -- | -- | +------------------------+----------------------------+--------------------------+---------------------------+ | **Pure Matlab (slow)** | :mat:class:`VQF.m <VQF>` | -- | -- | +------------------------+----------------------------+--------------------------+---------------------------+ In the most common case (using the default parameters and all data being sampled with the same frequency, create the class like this: .. code-block:: from vqf import PyVQF vqf = PyVQF(0.01) # 0.01 s sampling time, i.e. 100 Hz Example code to create an object with magnetic disturbance rejection disabled (use the `**-operator <https://docs.python.org/3/tutorial/controlflow.html#unpacking-argument-lists>`_ to pass parameters from a dict): .. code-block:: from vqf import PyVQF vqf = PyVQF(0.01, magDistRejectionEnabled=False) # 0.01 s sampling time, i.e. 100 Hz To use this class as a replacement for the basic version BasicVQF, pass the following parameters: .. code-block:: from vqf import PyVQF vqf = PyVQF(0.01, motionBiasEstEnabled=False, restBiasEstEnabled=False, magDistRejectionEnabled=False) See :cpp:struct:`VQFParams` for a detailed description of all parameters. """ def __init__(self, gyrTs: float, accTs: float = -1.0, magTs: float = -1.0, **params: Any) -> None: """ :param gyrTs: sampling time of the gyroscope measurements in seconds :param accTs: sampling time of the accelerometer measurements in seconds (the value of `gyrTs` is used if set to -1) :param magTs: sampling time of the magnetometer measurements in seconds (the value of `gyrTs` is used if set to -1) :param (params): optional parameters to override the defaults (see :cpp:struct:`VQFParams` for a full list and detailed descriptions) """ accTs = accTs if accTs > 0 else gyrTs magTs = magTs if magTs > 0 else gyrTs self._params = PyVQFParams(**params) self._state = PyVQFState() self._coeffs = PyVQFCoefficients(gyrTs=gyrTs, accTs=accTs, magTs=magTs) self._setup()
[docs] def updateGyr(self, gyr: np.ndarray) -> None: """Performs gyroscope update step. It is only necessary to call this function directly if gyroscope, accelerometers and magnetometers have different sampling rates. Otherwise, simply use :meth:`update`. :param gyr: gyroscope measurement in rad/s -- numpy array with shape (3,) :return: None """ assert gyr.shape == (3,) # rest detection if self._params.restBiasEstEnabled or self._params.magDistRejectionEnabled: gyrLp = self.filterVec(gyr, self._params.restFilterTau, self._coeffs.gyrTs, self._coeffs.restGyrLpB, self._coeffs.restGyrLpA, self._state.restGyrLpState) deviation = gyr - gyrLp squaredDeviation = deviation.dot(deviation) biasClip = self._params.biasClip*np.pi/180.0 if squaredDeviation >= (self._params.restThGyr*np.pi/180.0)**2 or np.max(np.abs(gyrLp)) > biasClip: self._state.restT = 0.0 self._state.restDetected = False self._state.restLastGyrLp = gyrLp self._state.restLastSquaredDeviations[0] = squaredDeviation # remove estimated gyro bias gyrNoBias = gyr - self._state.bias # gyroscope prediction step gyrNorm = math.sqrt(gyrNoBias.dot(gyrNoBias)) angle = gyrNorm * self._coeffs.gyrTs if gyrNorm > EPS: c = np.cos(angle/2) s = np.sin(angle/2)/gyrNorm gyrStepQuat = np.array([c, s*gyrNoBias[0], s*gyrNoBias[1], s*gyrNoBias[2]], float) self._state.gyrQuat = self.quatMultiply(self._state.gyrQuat, gyrStepQuat) self.normalize(self._state.gyrQuat)
[docs] def updateAcc(self, acc: np.ndarray) -> None: """Performs accelerometer update step. It is only necessary to call this function directly if gyroscope, accelerometers and magnetometers have different sampling rates. Otherwise, simply use :meth:`update`. Should be called after :meth:`updateGyr` and before :meth:`updateMag`. :param acc: accelerometer measurement in m/s² -- numpy array with shape (3,) :return: None """ assert acc.shape == (3,) # ignore [0 0 0] samples if acc[0] == 0.0 and acc[1] == 0.0 and acc[2] == 0.0: return accTs = self._coeffs.accTs # rest detection if self._params.restBiasEstEnabled: accLp = self.filterVec(acc, self._params.restFilterTau, accTs, self._coeffs.restAccLpB, self._coeffs.restAccLpA, self._state.restAccLpState) deviation = acc - accLp squaredDeviation = deviation.dot(deviation) if squaredDeviation >= self._params.restThAcc**2: self._state.restT = 0.0 self._state.restDetected = False else: self._state.restT += accTs if self._state.restT >= self._params.restMinT: self._state.restDetected = True self._state.restLastAccLp = accLp self._state.restLastSquaredDeviations[1] = squaredDeviation # filter acc in inertial frame accEarth = self.quatRotate(self._state.gyrQuat, acc) self._state.lastAccLp = self.filterVec(accEarth, self._params.tauAcc, accTs, self._coeffs.accLpB, self._coeffs.accLpA, self._state.accLpState) # transform to 6D earth frame and normalize accEarth = self.quatRotate(self._state.accQuat, self._state.lastAccLp) # a = self._state.accQuat # b = self.state.lastAccLp # accEarth = self.quatRotate(a, b) self.normalize(accEarth) # inclination correction q_w = math.sqrt((accEarth[2]+1)/2) if q_w > 1e-6: accCorrQuat = np.array([q_w, 0.5*accEarth[1]/q_w, -0.5*accEarth[0]/q_w, 0], float) else: accCorrQuat = np.array([0, 1, 0, 0], float) self._state.accQuat = self.quatMultiply(accCorrQuat, self._state.accQuat) self.normalize(self._state.accQuat) # calculate correction angular rate to facilitate debugging self._state.lastAccCorrAngularRate = math.acos(accEarth[2])/self._coeffs.accTs # bias estimation if self._params.motionBiasEstEnabled or self._params.restBiasEstEnabled: biasClip = self._params.biasClip*np.pi/180.0 bias = self._state.bias # get rotation matrix corresponding to accGyrQuat accGyrQuat = self.getQuat6D() R = np.array([ 1 - 2*accGyrQuat[2]**2 - 2*accGyrQuat[3]**2, # r11 2*(accGyrQuat[2]*accGyrQuat[1] - accGyrQuat[0]*accGyrQuat[3]), # r12 2*(accGyrQuat[0]*accGyrQuat[2] + accGyrQuat[3]*accGyrQuat[1]), # r13 2*(accGyrQuat[0]*accGyrQuat[3] + accGyrQuat[2]*accGyrQuat[1]), # r21 1 - 2*accGyrQuat[1]**2 - 2*accGyrQuat[3]**2, # r22 2*(accGyrQuat[2]*accGyrQuat[3] - accGyrQuat[1]*accGyrQuat[0]), # r23 2*(accGyrQuat[3]*accGyrQuat[1] - accGyrQuat[0]*accGyrQuat[2]), # r31 2*(accGyrQuat[0]*accGyrQuat[1] + accGyrQuat[3]*accGyrQuat[2]), # r32 1 - 2*accGyrQuat[1]**2 - 2*accGyrQuat[2]**2, # r33 ], float) # calculate R*b_hat (only the x and y component, as z is not needed) biasLp = np.array([ R[0]*bias[0] + R[1]*bias[1] + R[2]*bias[2], R[3]*bias[0] + R[4]*bias[1] + R[5]*bias[2], ], float) # low-pass filter R and R*b_hat R = self.filterVec(R, self._params.tauAcc, accTs, self._coeffs.accLpB, self._coeffs.accLpA, self._state.motionBiasEstRLpState) biasLp = self.filterVec(biasLp, self._params.tauAcc, accTs, self._coeffs.accLpB, self._coeffs.accLpA, self._state.motionBiasEstBiasLpState) # set measurement error and covariance for the respective Kalman filter update if self._state.restDetected and self._params.restBiasEstEnabled: e = self._state.restLastGyrLp - bias R = np.eye(3) w = np.full(3, self._coeffs.biasRestW) elif self._params.motionBiasEstEnabled: e = np.array([ -accEarth[1]/accTs + biasLp[0] - R[0]*bias[0] - R[1]*bias[1] - R[2]*bias[2], accEarth[0]/accTs + biasLp[1] - R[3]*bias[0] - R[4]*bias[1] - R[5]*bias[2], - R[6]*bias[0] - R[7]*bias[1] - R[8]*bias[2], ], float) R.shape = (3, 3) w = np.array([self._coeffs.biasMotionW, self._coeffs.biasMotionW, self._coeffs.biasVerticalW], float) else: w = None e = None # Kalman filter update # step 1: P = P + V (also increase covariance if there is no measurement update!) if self._state.biasP[0, 0] < self._coeffs.biasP0: self._state.biasP[0, 0] += self._coeffs.biasV if self._state.biasP[1, 1] < self._coeffs.biasP0: self._state.biasP[1, 1] += self._coeffs.biasV if self._state.biasP[2, 2] < self._coeffs.biasP0: self._state.biasP[2, 2] += self._coeffs.biasV if w is not None and e is not None: # clip disagreement to -2..2 °/s # (this also effectively limits the harm done by the first inclination correction step) e = np.clip(e, -biasClip, biasClip) # step 2: K = P R^T inv(W + R P R^T) K = self._state.biasP @ R.T @ np.linalg.inv(np.diag(w) + R @ self._state.biasP @ R.T) # step 3: bias = bias + K (y - R bias) = bias + K e bias += K @ e # step 4: P = P - K R P self._state.biasP -= K @ R @ self._state.biasP # clip bias estimate to -2..2 °/s bias[:] = np.clip(bias, -biasClip, biasClip)
[docs] def updateMag(self, mag: np.ndarray) -> None: """Performs magnetometer update step. It is only necessary to call this function directly if gyroscope, accelerometers and magnetometers have different sampling rates. Otherwise, simply use :meth:`update`. Should be called after :meth:`updateAcc`. :param mag: magnetometer measurement in arbitrary units -- numpy array with shape (3,) :return: None """ assert mag.shape == (3,) # ignore [0 0 0] samples if mag[0] == 0.0 and mag[1] == 0.0 and mag[2] == 0.0: return magTs = self._coeffs.magTs # bring magnetometer measurement into 6D earth frame magEarth = self.quatRotate(self.getQuat6D(), mag) if self._params.magDistRejectionEnabled: magNormDip = self._state.magNormDip magNormDip[0] = math.sqrt(magEarth.dot(magEarth)) magNormDip[1] = -math.asin(magEarth[2]/magNormDip[0]) if self._params.magCurrentTau > 0: magNormDip[:] = self.filterVec(magNormDip, self._params.magCurrentTau, magTs, self._coeffs.magNormDipLpB, self._coeffs.magNormDipLpA, self._state.magNormDipLpState) # magnetic disturbance detection if abs(magNormDip[0] - self._state.magRefNorm) < self._params.magNormTh*self._state.magRefNorm and \ abs(magNormDip[1] - self._state.magRefDip) < self._params.magDipTh*np.pi/180.0: self._state.magUndisturbedT += magTs if self._state.magUndisturbedT >= self._params.magMinUndisturbedTime: self._state.magDistDetected = False self._state.magRefNorm += self._coeffs.kMagRef*(magNormDip[0] - self._state.magRefNorm) self._state.magRefDip += self._coeffs.kMagRef*(magNormDip[1] - self._state.magRefDip) else: self._state.magUndisturbedT = 0.0 self._state.magDistDetected = True # new magnetic field acceptance if abs(magNormDip[0] - self._state.magCandidateNorm) < self._params.magNormTh*self._state.magCandidateNorm \ and abs(magNormDip[1] - self._state.magCandidateDip) < self._params.magDipTh*np.pi/180.0: gyrNorm = math.sqrt(self._state.restLastGyrLp.dot(self._state.restLastGyrLp)) if gyrNorm >= self._params.magNewMinGyr*np.pi/180.0: self._state.magCandidateT += magTs self._state.magCandidateNorm += self._coeffs.kMagRef*(magNormDip[0] - self._state.magCandidateNorm) self._state.magCandidateDip += self._coeffs.kMagRef*(magNormDip[1] - self._state.magCandidateDip) if self._state.magDistDetected and (self._state.magCandidateT >= self._params.magNewTime or ( self._state.magRefNorm == 0.0 and self._state.magCandidateT >= self._params.magNewFirstTime)): self._state.magRefNorm = self._state.magCandidateNorm self._state.magRefDip = self._state.magCandidateDip self._state.magDistDetected = False self._state.magUndisturbedT = self._params.magMinUndisturbedTime else: self._state.magCandidateT = 0.0 self._state.magCandidateNorm = magNormDip[0] self._state.magCandidateDip = magNormDip[1] # calculate disagreement angle based on current magnetometer measurement self._state.lastMagDisAngle = math.atan2(magEarth[0], magEarth[1]) - self._state.delta # make sure the disagreement angle is in the range [-pi, pi] if self._state.lastMagDisAngle > np.pi: self._state.lastMagDisAngle -= 2*np.pi elif self._state.lastMagDisAngle < -np.pi: self._state.lastMagDisAngle += 2*np.pi k = self._coeffs.kMag if self._params.magDistRejectionEnabled: # magnetic disturbance rejection if self._state.magDistDetected: if self._state.magRejectT <= self._params.magMaxRejectionTime: self._state.magRejectT += magTs k = 0 else: k /= self._params.magRejectionFactor else: self._state.magRejectT = max(self._state.magRejectT - self._params.magRejectionFactor*magTs, 0.0) # ensure fast initial convergence if self._state.kMagInit != 0.0: # make sure that the gain k is at least 1/N, N=1,2,3,... in the first few samples if k < self._state.kMagInit: k = self._state.kMagInit # iterative expression to calculate 1/N self._state.kMagInit = self._state.kMagInit/(self._state.kMagInit+1) # disable if t > tauMag if self._state.kMagInit*self._params.tauMag < self._coeffs.magTs: self._state.kMagInit = 0.0 # first-order filter step self._state.delta += k*self._state.lastMagDisAngle # calculate correction angular rate to facilitate debugging self._state.lastMagCorrAngularRate = k*self._state.lastMagDisAngle/self._coeffs.magTs # make sure delta is in the range [-pi, pi] if self._state.delta > np.pi: self._state.delta -= 2*np.pi elif self._state.delta < -np.pi: self._state.delta += 2*np.pi
[docs] def update(self, gyr: np.ndarray, acc: np.ndarray, mag: np.ndarray | None = None) -> None: """Performs filter update step for one sample. :param gyr: gyr gyroscope measurement in rad/s -- numpy array with shape (3,) :param acc: acc accelerometer measurement in m/s² -- numpy array with shape (3,) :param mag: optional mag magnetometer measurement in arbitrary units -- numpy array with shape (3,) :return: None """ self.updateGyr(gyr) self.updateAcc(acc) if mag is not None: self.updateMag(mag)
@overload def updateBatch(self, gyr: np.ndarray, acc: np.ndarray, mag: np.ndarray) -> PyVQFBatchResults9D: ... @overload def updateBatch(self, gyr: np.ndarray, acc: np.ndarray, mag: None = None) -> PyVQFBatchResults6D: ...
[docs] def updateBatch(self, gyr: np.ndarray, acc: np.ndarray, mag: np.ndarray | None = None) -> PyVQFBatchResults: """Performs batch update for multiple samples at once. In order to use this function, all input data must have the same sampling rate and be provided as a numpy array. The output is a dictionary containing - **quat6D** -- the 6D quaternion -- numpy array with shape (N, 4) - **bias** -- gyroscope bias estimate in rad/s -- numpy array with shape (N, 3) - **biasSigma** -- uncertainty of gyroscope bias estimate in rad/s -- numpy array with shape (N,) - **restDetected** -- rest detection state -- boolean numpy array with shape (N,) in all cases and if magnetometer data is provided additionally - **quat9D** -- the 9D quaternion -- numpy array with shape (N, 4) - **delta** -- heading difference angle between 6D and 9D quaternion in rad -- numpy array with shape (N,) - **magDistDetected** -- magnetic disturbance detection state -- boolean numpy array with shape (N,) :param gyr: gyroscope measurement in rad/s -- numpy array with shape (N,3) :param acc: accelerometer measurement in m/s² -- numpy array with shape (N,3) :param mag: optional magnetometer measurement in arbitrary units -- numpy array with shape (N,3) :return: dict with entries as described above """ N = gyr.shape[0] assert acc.shape == gyr.shape assert gyr.shape == (N, 3) out6D = np.empty((N, 4)) outBias = np.empty((N, 3)) outBiasSigma = np.empty((N,)) outRest = np.empty(N, dtype=bool) if mag is not None: assert mag.shape == gyr.shape out9D = np.empty((N, 4)) outDelta = np.empty((N,)) outMagDist = np.empty(N, dtype=bool) for i in range(N): self.update(gyr[i], acc[i], mag[i]) out6D[i] = self.getQuat6D() out9D[i] = self.getQuat9D() outDelta[i] = self._state.delta outBias[i], outBiasSigma[i] = self.getBiasEstimate() outRest[i] = self._state.restDetected outMagDist[i] = self._state.magDistDetected return {'quat6D': out6D, 'quat9D': out9D, 'delta': outDelta, 'bias': outBias, 'biasSigma': outBiasSigma, 'restDetected': outRest, 'magDistDetected': outMagDist} else: for i in range(N): self.update(gyr[i], acc[i]) out6D[i] = self.getQuat6D() outBias[i], outBiasSigma[i] = self.getBiasEstimate() outRest[i] = self._state.restDetected return {'quat6D': out6D, 'bias': outBias, 'biasSigma': outBiasSigma, 'restDetected': outRest}
[docs] def getQuat3D(self) -> np.ndarray: r"""Returns the angular velocity strapdown integration quaternion :math:`^{\mathcal{S}_i}_{\mathcal{I}_i}\mathbf{q}`. :return: quaternion as numpy array with shape (4,) """ return self._state.gyrQuat.copy()
[docs] def getQuat6D(self) -> np.ndarray: r"""Returns the 6D (magnetometer-free) orientation quaternion :math:`^{\mathcal{S}_i}_{\mathcal{E}_i}\mathbf{q}`. :return: quaternion as numpy array with shape (4,) """ return self.quatMultiply(self._state.accQuat, self._state.gyrQuat)
[docs] def getQuat9D(self) -> np.ndarray: r"""Returns the 9D (with magnetometers) orientation quaternion :math:`^{\mathcal{S}_i}_{\mathcal{E}}\mathbf{q}`. :return: quaternion as numpy array with shape (4,) """ return self.quatApplyDelta(self.quatMultiply(self._state.accQuat, self._state.gyrQuat), self._state.delta)
[docs] def getDelta(self) -> float: r""" Returns the heading difference :math:`\delta` between :math:`\mathcal{E}_i` and :math:`\mathcal{E}`. :math:`^{\mathcal{E}_i}_{\mathcal{E}}\mathbf{q} = \begin{bmatrix}\cos\frac{\delta}{2} & 0 & 0 & \sin\frac{\delta}{2}\end{bmatrix}^T`. :return: delta angle in rad (:cpp:member:`VQFState::delta`) """ return self._state.delta
[docs] def getBiasEstimate(self) -> tuple[np.ndarray, float]: """Returns the current gyroscope bias estimate and the uncertainty. The returned standard deviation sigma represents the estimation uncertainty in the worst direction and is based on an upper bound of the largest eigenvalue of the covariance matrix. :return: gyroscope bias estimate (rad/s) as (3,) numpy array and standard deviation sigma of the estimation uncertainty (rad/s) """ # use largest absolute row sum as upper bound estimate for largest eigenvalue (Gershgorin circle theorem) # and clip output to biasSigmaInit P = min(np.max(np.sum(np.abs(self._state.biasP), axis=1)), self._coeffs.biasP0) sigma = np.sqrt(P)*np.pi/100.0/180.0 return self._state.bias.copy(), sigma
[docs] def setBiasEstimate(self, bias: np.ndarray, sigma: float) -> None: """Sets the current gyroscope bias estimate and the uncertainty. If a value for the uncertainty sigma is given, the covariance matrix is set to a corresponding scaled identity matrix. :param bias: gyroscope bias estimate (rad/s) :param sigma: standard deviation of the estimation uncertainty (rad/s) - set to -1 (default) in order to not change the estimation covariance matrix """ assert bias.shape == (3,) self._state.bias[:] = bias if sigma > 0: self._state.biasP = (sigma*180.0*100.0/np.pi)**2 * np.eye(3)
[docs] def getRestDetected(self) -> bool: """Returns true if rest was detected.""" return self._state.restDetected
[docs] def getMagDistDetected(self) -> bool: """Returns true if a disturbed magnetic field was detected.""" return self._state.magDistDetected
[docs] def getRelativeRestDeviations(self) -> np.ndarray: """Returns the relative deviations used in rest detection. Looking at those values can be useful to understand how rest detection is working and which thresholds are suitable. The output array is filled with the last values for gyroscope and accelerometer, relative to the threshold. In order for rest to be detected, both values must stay below 1. :return: relative rest deviations as (2,) numpy array """ return np.array([ np.sqrt(self._state.restLastSquaredDeviations[0]) / (self._params.restThGyr * np.pi / 180.0), np.sqrt(self._state.restLastSquaredDeviations[1]) / self._params.restThAcc, ], float)
[docs] def getMagRefNorm(self) -> float: """Returns the norm of the currently accepted magnetic field reference.""" return self._state.magRefNorm
[docs] def getMagRefDip(self) -> float: """Returns the dip angle of the currently accepted magnetic field reference.""" return self._state.magRefDip
[docs] def setMagRef(self, norm: float, dip: float) -> None: """Overwrites the current magnetic field reference. :param norm: norm of the magnetic field reference :param dip: dip angle of the magnetic field reference """ self._state.magRefNorm = norm self._state.magRefDip = dip
[docs] def setTauAcc(self, tauAcc: float) -> None: r"""Sets the time constant for accelerometer low-pass filtering. For more details, see :cpp:member:`VQFParams::tauAcc`. :param tauAcc: time constant :math:`\tau_\mathrm{acc}` in seconds """ if self._params.tauAcc == tauAcc: return self._params.tauAcc = tauAcc newB, newA = self.filterCoeffs(self._params.tauAcc, self._coeffs.accTs) self.filterAdaptStateForCoeffChange(self._state.lastAccLp, self._coeffs.accLpB, self._coeffs.accLpA, newB, newA, self._state.accLpState) # For R and biasLP, the last value is not saved in the state. # Since b0 is small (at reasonable settings), the last output is close to state[0]. self.filterAdaptStateForCoeffChange(self._state.motionBiasEstRLpState[0].copy(), self._coeffs.accLpB, self._coeffs.accLpA, newB, newA, self._state.motionBiasEstRLpState) self.filterAdaptStateForCoeffChange(self._state.motionBiasEstBiasLpState[0].copy(), self._coeffs.accLpB, self._coeffs.accLpA, newB, newA, self._state.motionBiasEstBiasLpState) self._coeffs.accLpB = newB self._coeffs.accLpA = newA
[docs] def setTauMag(self, tauMag: float) -> None: r"""Sets the time constant for the magnetometer update. For more details, see :cpp:member:`VQFParams::tauMag`. :param tauMag: time constant :math:`\tau_\mathrm{mag}` in seconds """ self._params.tauMag = tauMag self._coeffs.kMag = self.gainFromTau(self._params.tauMag, self._coeffs.magTs)
[docs] def setMotionBiasEstEnabled(self, enabled: bool) -> None: """Enables/disabled gyroscope bias estimation during motion.""" if self._params.motionBiasEstEnabled == enabled: return self._params.motionBiasEstEnabled = enabled self._state.motionBiasEstRLpState = np.full((2, 9), np.nan, float) self._state.motionBiasEstBiasLpState = np.full((2, 2), np.nan, float)
[docs] def setRestBiasEstEnabled(self, enabled: bool) -> None: """Enables/disables rest detection and bias estimation during rest.""" if self._params.restBiasEstEnabled == enabled: return self._params.restBiasEstEnabled = enabled self._state.restDetected = False self._state.restLastSquaredDeviations = np.zeros(2, float) self._state.restT = 0.0 self._state.restLastGyrLp = np.zeros(3, float) self._state.restGyrLpState = np.full((2, 3), np.nan, float) self._state.restLastAccLp = np.zeros(3, float) self._state.restAccLpState = np.full((2, 3), np.nan, float)
[docs] def setMagDistRejectionEnabled(self, enabled: bool) -> None: """Enables/disables magnetic disturbance detection and rejection.""" if self._params.magDistRejectionEnabled == enabled: return self._params.magDistRejectionEnabled = enabled self._state.magDistDetected = True self._state.magRefNorm = 0.0 self._state.magRefDip = 0.0 self._state.magUndisturbedT = 0.0 self._state.magRejectT = self._params.magMaxRejectionTime self._state.magCandidateNorm = -1.0 self._state.magCandidateDip = 0.0 self._state.magCandidateT = 0.0 self._state.magNormDip = np.zeros(2, float) self._state.magNormDipLpState = np.full((2, 2), np.nan, float)
[docs] def setRestDetectionThresholds(self, thGyr: float, thAcc: float) -> None: """Sets the current thresholds for rest detection. :param thGyr: new value for :cpp:member:`VQFParams::restThGyr` :param thAcc: new value for :cpp:member:`VQFParams::restThAcc` """ self._params.restThGyr = thGyr self._params.restThAcc = thAcc
@property def params(self) -> PyVQFParams: """Read-only property to access the current parameters. :return: dataclass with fields corresponding to :cpp:struct:`VQFParams` """ return copy.deepcopy(self._params) @property def coeffs(self) -> PyVQFCoefficients: """Read-only property to access the coefficients used by the algorithm. :return: dataclass with fields corresponding to :cpp:struct:`VQFCoefficients` """ return copy.deepcopy(self._coeffs) @property def state(self) -> PyVQFStateDict: """Property to access the current state. This property can be written to in order to set a completely arbitrary filter state, which is intended for debugging purposes. However, note that the returned dict is a copy of the state and changing elements of this dict will not influence the actual state. In order to modify the state, access the state, change some elements and then replace the whole state with the modified copy, e.g. .. code-block:: # does not work: v.state['delta'] = 0 state = vqf.state state['delta'] = 0 vqf.state = state :return: dict with entries corresponding to :cpp:struct:`VQFState` """ return cast(PyVQFStateDict, dataclasses.asdict(self._state)) @state.setter def state(self, state: PyVQFStateDict) -> None: assert state.keys() == {f.name for f in dataclasses.fields(PyVQFState)} for k in state: assert isinstance(state[k], type(getattr(self._state, k))) if isinstance(state[k], np.ndarray): assert state[k].dtype == getattr(self._state, k).dtype assert state[k].shape == getattr(self._state, k).shape self._state = PyVQFState(**copy.deepcopy(state))
[docs] def resetState(self) -> None: """Resets the state to the default values at initialization. Resetting the state is equivalent to creating a new instance of this class. """ self._state = PyVQFState() self._state.biasP = self._coeffs.biasP0*np.eye(3) self._state.magRejectT = self._params.magMaxRejectionTime
[docs] @staticmethod def quatMultiply(q1: np.ndarray, q2: np.ndarray) -> np.ndarray: r"""Performs quaternion multiplication (:math:`\mathbf{q}_\mathrm{out} = \mathbf{q}_1 \otimes \mathbf{q}_2`). :param q1: input quaternion 1 -- numpy array with shape (4,) :param q2: input quaternion 2 -- numpy array with shape (4,) :return: output quaternion -- numpy array with shape (4,) """ assert q1.shape == (4,) assert q2.shape == (4,) q10, q11, q12, q13 = q1.tolist() q20, q21, q22, q23 = q2.tolist() w = q10 * q20 - q11 * q21 - q12 * q22 - q13 * q23 x = q10 * q21 + q11 * q20 + q12 * q23 - q13 * q22 y = q10 * q22 - q11 * q23 + q12 * q20 + q13 * q21 z = q10 * q23 + q11 * q22 - q12 * q21 + q13 * q20 return np.array([w, x, y, z], float)
[docs] @staticmethod def quatConj(q: np.ndarray) -> np.ndarray: r"""Calculates the quaternion conjugate (:math:`\mathbf{q}_\mathrm{out} = \mathbf{q}^*`). :param q: input quaternion -- numpy array with shape (4,) :return: output quaternion -- numpy array with shape (4,) """ assert q.shape == (4,) return np.array([q[0], -q[1], -q[2], -q[3]], float)
[docs] @staticmethod def quatApplyDelta(q: np.ndarray, delta: float) -> np.ndarray: r""" Applies a heading rotation by the angle delta (in rad) to a quaternion. :math:`\mathbf{q}_\mathrm{out} = \begin{bmatrix}\cos\frac{\delta}{2} & 0 & 0 & \sin\frac{\delta}{2}\end{bmatrix} \otimes \mathbf{q}` :param q: input quaternion -- numpy array with shape (4,) :param delta: heading rotation angle in rad :return: output quaternion -- numpy array with shape (4,) """ assert q.shape == (4,) c = np.cos(delta/2) s = np.sin(delta/2) w = c * q[0] - s * q[3] x = c * q[1] - s * q[2] y = c * q[2] + s * q[1] z = c * q[3] + s * q[0] return np.array([w, x, y, z], float)
[docs] @staticmethod def quatRotate(q: np.ndarray, v: np.ndarray) -> np.ndarray: r"""Rotates a vector with a given quaternion. :math:`\begin{bmatrix}0 & \mathbf{v}_\mathrm{out}\end{bmatrix} = \mathbf{q} \otimes \begin{bmatrix}0 & \mathbf{v}\end{bmatrix} \otimes \mathbf{q}^*` :param q: input quaternion -- numpy array with shape (4,) :param v: input vector -- numpy array with shape (3,) :return: output vector -- numpy array with shape (3,) """ assert q.shape == (4,) assert v.shape == (3,) q0, q1, q2, q3 = q.tolist() v0, v1, v2 = v.tolist() x = (1 - 2*q2*q2 - 2*q3*q3)*v0 + 2*v1*(q2*q1 - q0*q3) + 2*v2*(q0*q2 + q3*q1) y = 2*v0*(q0*q3 + q2*q1) + v1*(1 - 2*q1*q1 - 2*q3*q3) + 2*v2*(q2*q3 - q1*q0) z = 2*v0*(q3*q1 - q0*q2) + 2*v1*(q0*q1 + q3*q2) + v2*(1 - 2*q1*q1 - 2*q2*q2) return np.array([x, y, z], float)
[docs] @staticmethod def normalize(vec: np.ndarray) -> None: """Normalizes a vector in-place. :param vec: vector -- one-dimensional numpy array that will be normalized in-place :return: None """ norm = math.sqrt(vec.dot(vec)) if norm != 0.0: vec /= norm
[docs] @staticmethod def gainFromTau(tau: float, Ts: float) -> float: r"""Calculates the gain for a first-order low-pass filter from the 1/e time constant. :math:`k = 1 - \exp\left(-\frac{T_\mathrm{s}}{\tau}\right)` The cutoff frequency of the resulting filter is :math:`f_\mathrm{c} = \frac{1}{2\pi\tau}`. :param tau: time constant :math:`\tau` in seconds - use -1 to disable update (:math:`k=0`) or 0 to obtain unfiltered values (:math:`k=1`) :param Ts: sampling time :math:`T_\mathrm{s}` in seconds :return: filter gain *k* """ assert Ts > 0 if tau < 0: return 0.0 # k=0 for negative tau (disable update) elif tau == 0.0: return 1.0 # k=1 for tau=0 else: return 1 - np.exp(-Ts/tau) # fc = 1/(2*pi*tau)
[docs] @staticmethod def filterCoeffs(tau: float, Ts: float) -> tuple[np.ndarray, np.ndarray]: r"""Calculates coefficients for a second-order Butterworth low-pass filter. The filter is parametrized via the time constant of the dampened, non-oscillating part of step response and the resulting cutoff frequency is :math:`f_\mathrm{c} = \frac{\sqrt{2}}{2\pi\tau}`. When :math:`\tau < \frac{T_\mathrm{s}}{2}` (which corresponds to :math:`f_\mathrm{c}` exceeding 90 % of the Nyquist frequency), a direct passthrough fallback is used to prevent instability. :param tau: time constant :math:`\tau` in seconds :param Ts: sampling time :math:`T_\mathrm{s}` in seconds :return: numerator coefficients b as (3,) numpy array, denominator coefficients a (without :math:`a_0=1`) as (2,) numpy array """ assert tau > 0 assert Ts > 0 # disable filter and use direct passthrough when tau < Ts/2 to avoid instability # (this corresponds to fc exceeding 90% of the Nyquist frequency) if tau < Ts/2: return np.array([1.0, 0.0, 0.0], float), np.array([0.0, 0.0], float) # second order Butterworth filter based on https://stackoverflow.com/a/52764064 fc = math.sqrt(2) / (2.0 * math.pi * tau) # time constant of dampened, non-oscillating part of step response C = math.tan(math.pi*fc*Ts) D = C**2 + math.sqrt(2)*C + 1 b0 = C*C/D b1 = 2*b0 b2 = b0 # a0 = 1.0 a1 = 2*(C**2-1)/D a2 = (1-math.sqrt(2)*C+C**2)/D return np.array([b0, b1, b2], float), np.array([a1, a2], float)
[docs] @staticmethod def filterInitialState(x0: float, b: np.ndarray, a: np.ndarray) -> np.ndarray: r"""Calculates the initial filter state for a given steady-state value. :param x0: steady state value :param b: numerator coefficients :param a: denominator coefficients (without :math:`a_0=1`) :return: filter state -- numpy array with shape (2,) """ assert b.shape == (3,) assert a.shape == (2,) # initial state for steady state (equivalent to scipy.signal.lfilter_zi, obtained by setting y=x=x0 in the # filter update equation) return np.array([ x0*(1 - b[0]), x0*(b[2] - a[1]) ], float)
[docs] @staticmethod def filterAdaptStateForCoeffChange(last_y: np.ndarray, b_old: np.ndarray, a_old: np.ndarray, b_new: np.ndarray, a_new: np.ndarray, state: np.ndarray) -> None: r"""Adjusts the filter state when changing coefficients. This function assumes that the filter is currently in a steady state, i.e. the last input values and the last output values are all equal. Based on this, the filter state is adjusted to new filter coefficients so that the output does not jump. :param last_y: last filter output values -- numpy array with shape (N,) :param b_old: previous numerator coefficients -- numpy array with shape (3,) :param a_old: previous denominator coefficients (without :math:`a_0=1`) -- numpy array with shape (2,) :param b_new: new numerator coefficients -- numpy array with shape (3,) :param a_new: new denominator coefficients (without :math:`a_0=1`) -- numpy array with shape (2,) :param state: filter state -- numpy array with shape (N*2,), will be modified :return: None """ N = last_y.shape[0] assert last_y.shape == (N,) assert b_old.shape == (3,) assert a_old.shape == (2,) assert b_new.shape == (3,) assert a_new.shape == (2,) assert state.shape == (2, N) if math.isnan(state[0, 0]): return state[0] = state[0] + (b_old[0] - b_new[0])*last_y state[1] = state[1] + (b_old[1] - b_new[1] - a_old[0] + a_new[0])*last_y
[docs] @staticmethod def filterStep(x: np.ndarray, b: np.ndarray, a: np.ndarray, state: np.ndarray) -> np.ndarray: r"""Performs a filter step. Note: Unlike the C++ implementation, this function is vectorized and can process multiple values at once. :param x: input values -- numpy array with shape (N,) :param b: numerator coefficients -- numpy array with shape (3,) :param a: denominator coefficients (without :math:`a_0=1`) -- numpy array with shape (2,) :param state: filter state -- numpy array with shape (2, N), will be modified :return: filtered values -- numpy array with shape (N,) """ x = np.asarray(x) N = x.shape[0] # this function is vectorized, unlike the C++ version assert x.shape == (N,) assert b.shape == (3,) assert a.shape == (2,) assert state.shape == (2, N) # difference equations based on scipy.signal.lfilter documentation # assumes that a0 == 1.0 y = b[0] * x + state[0] state[0] = b[1] * x - a[0] * y + state[1] state[1] = b[2] * x - a[1] * y return y
[docs] @staticmethod def filterVec(x: np.ndarray, tau: float, Ts: float, b: np.ndarray, a: np.ndarray, state: np.ndarray) -> np.ndarray: r"""Performs filter step for vector-valued signal with averaging-based initialization. During the first :math:`\tau` seconds, the filter output is the mean of the previous samples. At :math:`t=\tau`, the initial conditions for the low-pass filter are calculated based on the current mean value and from then on, regular filtering with the rational transfer function described by the coefficients b and a is performed. :param x: input values -- numpy array with shape (N,) :param tau: filter time constant \:math:`\tau` in seconds (used for initialization) :param Ts: sampling time :math:`T_\mathrm{s}` in seconds (used for initialization) :param b: numerator coefficients -- numpy array with shape (3,) :param a: denominator coefficients (without :math:`a_0=1`) -- numpy array with shape (2,) :param state: filter state -- numpy array with shape (2, N), will be modified :return: filtered values -- numpy array with shape (N,) """ N = x.shape[0] assert N >= 2 assert x.shape == (N,) assert b.shape == (3,) assert a.shape == (2,) assert state.shape == (2, N) # to avoid depending on a single sample, average the first samples (for duration tau) # and then use this average to calculate the filter initial state if math.isnan(state[0, 0]): # initialization phase if math.isnan(state[0, 1]): # first sample state[0, 1] = 0 # state[0, 1] is used to store the sample count state[1, :] = 0 # state[1, :] is used to store the sum state[0, 1] += 1 state[1, :] += x out = state[1]/state[0, 1] if state[0, 1]*Ts >= tau: for i in range(N): state[:, i] = PyVQF.filterInitialState(out[i], b, a) return out return PyVQF.filterStep(x, b, a, state)
def _setup(self) -> None: """Calculates coefficients based on parameters and sampling rates.""" coeffs = self._coeffs params = self._params assert coeffs.gyrTs > 0 assert coeffs.accTs > 0 assert coeffs.magTs > 0 coeffs.accLpB, coeffs.accLpA = self.filterCoeffs(params.tauAcc, coeffs.accTs) coeffs.kMag = self.gainFromTau(params.tauMag, coeffs.magTs) coeffs.biasP0 = (params.biasSigmaInit*100.0)**2 # the system noise increases the variance from 0 to (0.1 °/s)^2 in biasForgettingTime seconds coeffs.biasV = (0.1*100.0)**2*coeffs.accTs/params.biasForgettingTime pMotion = (params.biasSigmaMotion*100.0)**2 coeffs.biasMotionW = pMotion**2 / coeffs.biasV + pMotion coeffs.biasVerticalW = coeffs.biasMotionW / max(params.biasVerticalForgettingFactor, 1e-10) pRest = (params.biasSigmaRest*100.0)**2 coeffs.biasRestW = pRest**2 / coeffs.biasV + pRest coeffs.restGyrLpB, coeffs.restGyrLpA = self.filterCoeffs(params.restFilterTau, coeffs.gyrTs) coeffs.restAccLpB, coeffs.restAccLpA = self.filterCoeffs(params.restFilterTau, coeffs.accTs) coeffs.kMagRef = self.gainFromTau(params.magRefTau, coeffs.magTs) if params.magCurrentTau > 0: coeffs.magNormDipLpB, coeffs.magNormDipLpA = self.filterCoeffs(params.magCurrentTau, coeffs.magTs) self.resetState()