~dricottone/filters

ref: 7b51a036bbb61fbe5c6a0467742a87b09f306732 filters/filter/ab.py -rw-r--r-- 3.4 KiB
7b51a036Dominic Ricottone Refactored into package, to use gap argument parser; Remade build system 4 years ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#!/usr/bin/env python3

"""filter -m=ab [OPTIONS] <DATA>
Alpha-beta filter - Filter out noise of measurements to estimate data.

Options:
  -a, --alpha  correction to estimated state [Default: 0.05]
  -b, --beta   correction to estimated velocity of state [Default: 0.005]
  -d, --delta  initial velocity [Default: 0]
  -i, --inital initial state [Default: 0]
  -t, --time   unit of time [Default: 1]

Currently, assumes the function of acceleration (i.e. `v1 <- f(v0)`) is
`v1 <- v0`.
"""

__all__ = ['cli_wrapper', 'filter', 'report']

import sys

from typing import Callable, List, Dict, Iterator


def cli_wrapper(**data: Dict):
    """Handler for the alpha-beta filter. Checks and cleans given options,
    and performs optional reporting.
    """
    _alpha = data["alpha"] if data["alpha"] is not None else 0.05
    _beta = data["beta"] if data["beta"] is not None else 0.005
    _init_state = data["initial"] if data["initial"] is not None else 0
    _init_velocity = data["delta"] if data["delta"] is not None else 0
    _time = data["time"] if data["time"] is not None else 1.0
    _raw = data["data_raw"]

    _acceleration = lambda x: x #constant acceleration

    _filter = filter(
        _raw,
        _alpha,
        _beta,
        _acceleration,
        _init_state,
        _init_velocity,
        _time,
    )

    if data["report"]:
        sys.stdout.write(
            report_header(_alpha, _beta, _init_state, _init_velocity)
        )
        for actual, estimated in zip(_raw, _filter):
            sys.stdout.write("{0:8.4f}  {1:8.4f}\n".format(actual, estimated))
    else:
        for estimated in _filter:
            sys.stdout.write("{0:.4f}\n".format(estimated))


def filter(
    data: List[float],
    alpha: float,
    beta: float,
    acceleration: Callable[[float], float],
    init_state: float,
    init_velocity: float,
    time: float,
) -> Iterator[float]:
    """Iterate over data, passing it through an alpha-beta filter.

    Arguments:
      data           measurement from each time interval
      alpha          correction to estimated state
      beta           correction to estimated velocity
      acceleration   function of v1 <- v0
      init_state     initial estimate of state
      init_velocity  initial estimate of velocity
      time           time unit
    """
    x_last = init_state
    v_last = init_velocity
    for index, data_point in enumerate(data):
        #estimate given last values
        x_est = x_last + (time * v_last)
        v_est = acceleration(v_last)

        #correct for residual
        x_res = (data_point - x_est)
        x_est += (alpha * x_res)
        v_est += ( (beta * x_res) / time )

        x_last = x_est
        v_last = v_est

        yield x_est


def report_header(
    alpha: float,
    beta: float,
    init_state: float,
    init_velocity: float,
) -> str:
    """Draw a report header summarizing the filter.

    Appears as:
    ```
    Alpha-beta filter
      α=<alpha>,β=<beta>
      Initial estimate <init_state> changing <init_velocity> per time unit
    Actual:   Est.:
    ========  ========
    ```
    The estimates then should be printed alongside the raw measurements.
    """
    _msg = (
        "Alpha-beta filter",
        "  α={0:.4f}, β={1:.4f}".format(alpha, beta),
        "  Initial estimate: {0:.4f} changing {1:.4f} per time unit".format(
            init_state, init_velocity,
        ),
        "Actual:   Est.:",
        "========  ========",
    )
    return "\n".join(_msg) + "\n"