~dricottone/filters

ref: f62c909a49e4a579a0b74b9b6632c25e127b3668 filters/filter/ab.py -rw-r--r-- 3.7 KiB
f62c909aDominic Ricottone Version 1.0.1; Cleaned up documentation 4 years ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#!/usr/bin/env python3

"""filter ab [OPTIONS] DATA
Alpha-beta filter - Filter out noise of measurements to estimate data.

Options:
  -a N, --alpha N  correction to estimated state [Default: 0.05]
  -b N, --beta N   correction to estimated velocity of state [Default: 0.005]
  -d N, --delta N  initial velocity of state per time unit [Default: 0]
  -i M N,          initial state; M is the estimate and N is the std. deviation
    --initial M N    [Note: std. deviation unused] [Default: 0 0]

Currently assumed that acceleration is 0 and the time unit is 1.
"""

__all__ = ['cli_wrapper', 'filter', 'report']

import sys
from typing import Callable, List, Dict, Iterator

def cli_wrapper(**data: Dict):
    """Handler for the alpha-beta filter. Checks and cleans given options,
    and performs optional reporting.
    """
    _raw = data["data_raw"]
    _alpha = data["alpha"] if data["alpha"] is not None else 0.05
    _beta = data["beta"] if data["beta"] is not None else 0.005
    _init_state = data["initial_estimate"]
    _init_velocity = data["delta"] if data["delta"] is not None else 0

    _time = 1.0 #constant time unit
    _acceleration = lambda x: x #constant acceleration

    _filter = filter(
        _raw,
        _alpha,
        _beta,
        _init_state,
        _init_velocity,
        _acceleration,
        _time,
    )

    if data["report"]:
        sys.stdout.write(
            report_header(
                _alpha,
                _beta,
                _init_state,
                _init_velocity,
                _acceleration,
                _time,
            )
        )
        for measured, estimated in zip(_raw, _filter):
            sys.stdout.write("{0:8.4f}  {1:8.4f}\n".format(measured, estimated))
    else:
        for estimated in _filter:
            sys.stdout.write("{0:.4f}\n".format(estimated))

def filter(
    data: List[float],
    alpha: float,
    beta: float,
    init_state: float,
    init_velocity: float,
    acceleration: Callable[[float], float],
    time: float,
) -> Iterator[float]:
    """Iterate over data, passing it through an alpha-beta filter.

    Arguments:
      data           measurement from each time interval
      alpha          correction to estimated state
      beta           correction to estimated velocity
      acceleration   function of v1 <- v0
      init_state     initial estimate of state
      init_velocity  initial estimate of velocity
      time           time unit
    """
    last_estimated = init_state
    last_velocity = init_velocity
    for data_point in data:
        #estimate given last values
        estimated = last_estimated + (time * last_velocity)
        velocity = acceleration(last_velocity)

        #correct for residual
        residual = (data_point - estimated)
        estimated += (alpha * residual)
        velocity += ( (beta * residual) / time )

        last_estimated = estimated
        last_velocity = velocity

        yield estimated

def report_header(
    alpha: float,
    beta: float,
    init_state: float,
    init_velocity: float,
    acceleration: Callable[[float], float],
    time: float,
) -> str:
    """Draw a report header summarizing the filter.

    Appears as:
    ```
    Alpha-beta filter
      α=<alpha>,β=<beta>
      Initial estimate: <init_state> changing <init_velocity> per time unit
    Raw:      Est.:
    ========  ========
    ```
    The estimates then should be printed alongside the raw measurements.
    """
    _msg = (
        "Alpha-beta filter",
        "  α={0:.4f}, β={1:.4f}".format(alpha, beta),
        "  Initial estimate: {0:.4f} changing {1:.4f} per time unit".format(
            init_state, init_velocity,
        ),
        "Raw:      Est.:",
        "========  ========",
    )
    return "\n".join(_msg) + "\n"