~dricottone/filters

ref: 5c3cae6369ce5153c5e6470a9a4e4c092cec8ee1 filters/filter/kalman.py -rw-r--r-- 6.0 KiB
5c3cae63Dominic Ricottone Added type hint 4 years ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
#!/usr/bin/env python3

"""filter kalman [OPTIONS] DATA
Kalman filter - Filter out normally-distributed noise of measurements to
estimate data.

Options:
  -d, --delta     initial velocity of state per time unit [Default: 0]
  -i, --inital    initial estimate of state [Default: 0]
  -s, --sigma     initial std. deviation of state distribution [Default: 1]
  -v, --variance  variance of data measurements [Default: 1]

Currently assumed that acceleration is 0, velocity is non-variate, and the time
unit is 1.
"""

__all__ = ['cli_wrapper', 'filter', 'report']

import sys
from typing import Callable, List, Dict, Iterator, Tuple

def cli_wrapper(**data: Dict):
    """Handler for the Kalman filter. Checks and cleans given options,
    and performs optional reporting.
    """
    _raw = data["data_raw"]
    _variance = data["variance"] if data["variance"] is not None else 1
    _init_state_mu = data["initial_estimate"]
    _init_state_sigma = data["initial_std_deviation"]
    _init_velocity_mu = data["delta"] if data["delta"] is not None else 0

    _init_velocity_sigma = 0 #non-variate velocity
    _time = 1.0 #constant time unit
    _acceleration = lambda x: x #constant acceleration


    _filter = filter(
        _raw,
        _variance,
        _init_state_mu,
        _init_state_sigma,
        _init_velocity_mu,
        _init_velocity_sigma,
        _acceleration,
        _time,
    )

    if data["report"]:
        sys.stdout.write(
            report_header(
                _variance,
                _init_state_mu,
                _init_state_sigma,
                _init_velocity_mu,
                _init_velocity_sigma,
                _acceleration,
                _time,
            ),
        )
        for measured, filtered in zip(_raw, _filter):
            estimated, variance = filtered
            sys.stdout.write(
                "{0:8.4f}  {1:8.4f}  {2:8.4f}\n".format(
                    measured, estimated, variance,
                ),
            )
    else:
        for estimated, _ in _filter:
            sys.stdout.write("{0:.4f}\n".format(estimated))

def filter(
    data: List[float],
    variance: float,
    init_state_mu: float,
    init_state_sigma: float,
    init_velocity_mu: float,
    init_velocity_sigma: float,
    acceleration: Callable[[Tuple[float, float]], Tuple[float,float]],
    time: float,
) -> Iterator[Tuple[float,float]]:
    """Iterate over data, passing it through an alpha-beta filter.

    Arguments:
      data                 measurement from each time interval
      variance             variance of measurements
      init_state_mu        initial estimate of state
      init_state_sigma     std. deviation of state distribution
      init_velocity_mu     initial estimate of velocity
      init_velocity_sigma  std. deviation of velocity distribution
      acceleration         function of v1 <- v0
      time                 time unit
    """
    # normal distributions as tuples: (E(x), Var(x), )
    estimated = (init_state_mu, init_state_sigma**2, )
    velocity = (init_velocity_mu, init_velocity_sigma**2, )

    for measurement in data:
        estimated = _add(estimated, _multiply(velocity, (time,0,)))
        velocity = acceleration(velocity)

        estimated = _multiply(estimated, (measurement, variance, ))

        yield estimated

def report_header(
    variance: float,
    init_state_mu: float,
    init_state_sigma: float,
    init_velocity_mu: float,
    init_velocity_sigma: float,
    acceleration: Callable[[float], float],
    time: float,
) -> str:
    """Draw a report header summarizing the filter.

    Appears as:

    ```
    Kalman filter
      Distribution of estimated initial state: N(<mu>,<sigma>)
      Distribution of expected change per time unit: N(<mu>,<sigma>)
      Variance of measurements: <variance>
    Raw:      Est.:     Var.:
    ========  ========  ========
    ```

    The estimates and variances then should be printed alongside the raw
    measurements.
    """
    _msg = (
        "Kalman filter",
        "  Distribution of estimated initial state: N({0},{1}²)".format(
            init_state_mu, init_state_sigma,
        ),
        "  Distribution of expected change per time unit: N({0},{1}²)".format(
            init_velocity_mu, init_velocity_sigma,
        ),
        "  Variance of measurements: {0}".format(variance),
        "Raw:      Est.:     Var.:",
        "========  ========  ========",
    )
    return "\n".join(_msg) + "\n"

def _add(
    x: Tuple[float, float],
    y: Tuple[float, float],
) -> Tuple[float, float]:
    """Add a normal distribution to another normal distribution or a constant.

    In the case of two normal distributions (X and Y), the resultant values
    are:
      E(X + Y) = E(X) + E(Y)
      Var(X + Y) = Var(X) + Var(Y)

    In the case of a normal distribution (X) and a constant (C), the resultant
    values are:
      E(X + C) = E(X) + C
      Var(X + C) = Var(X)
    """
    if y[1] == 0:
        z = (x[0]+y[0], x[1], )
    else:
        z = (x[0]+y[0], x[1]+y[1], )
    return z

def _multiply(
    x: Tuple[float, float],
    y: Tuple[float, float],
) -> Tuple[float, float]:
    """Multiply a normal distribution by another normal distribution or by a
    constant.

    Given two normal distributions (X and Y), the resultant values are:
      E(X * Y) = (Var(Y)E(X) + Var(X)E(Y)) / (Var(X) + Var(Y))
      Var(X * Y) = (Var(X)Var(Y)) / (Var(X) + Var(Y))

    Given a normal distribution (X) and a constant (C), the resultant values
    are:
      E(X * C) = E(X) * C
      Var(X * C) = Var(X) * (C^2)
    """
    #_print_normal_distribution(x, "X")
    #_print_normal_distribution(y, "Y")
    if y[1] == 0:
        z = ((x[0] * y[0]), (x[1] * y[0]**2), )
    else:
        _denom = x[1] + y[1]
        _mean = ((x[0] * y[1]) + (y[0] * x[1])) / _denom
        _var = (x[1] * y[1]) / _denom
        z = (_mean, _var, )
    #_print_normal_distribution(z, "Z")
    return z

def _print_normal_distribution(
    x: Tuple[float, float],
    name: str,
) -> None:
    sys.stdout.write("{0} = N({1},{2})\n".format(name, x[0], x[1]))