~dricottone/filters

ref: 8479aea2db66090f2b0db01a1c42593975a3ba0e filters/filter/kalman.py -rw-r--r-- 5.4 KiB
8479aea2Dominic Ricottone Added rng package, convolve + kalman filters 4 years ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
#!/usr/bin/env python3

"""filter kalman [OPTIONS] DATA
Kalman filter - Filter out normally-distributed noise of measurements to
estimate data.

Options:
  -d, --delta     initial velocity of state per time unit [Default: 0]
  -i, --inital    initial estimate of state [Default: 0]
  -s, --sigma     initial std. deviation of state distribution [Default: 1]
  -v, --variance  variance of data measurements [Default: 1]

Currently assumed that acceleration is constant and that velocity is non-
variate.
"""

__all__ = ['cli_wrapper', 'filter', 'report']

import sys
from typing import Callable, List, Dict, Iterator, Tuple

def cli_wrapper(**data: Dict):
    """Handler for the Kalman filter. Checks and cleans given options,
    and performs optional reporting.
    """
    _raw = data["data_raw"]
    _variance = data["variance"] if data["variance"] is not None else 1
    _init_state_mu = data["initial_estimate"]
    _init_state_sigma = data["initial_std_deviation"]
    _init_velocity_mu = data["delta"] if data["delta"] is not None else 0

    _init_velocity_sigma = 0 #non-variate velocity
    _time = 1.0 #constant time unit
    _acceleration = lambda x: x #constant acceleration


    _filter = filter(
        _raw,
        _variance,
        _init_state_mu,
        _init_state_sigma,
        _init_velocity_mu,
        _init_velocity_sigma,
        _acceleration,
        _time,
    )

    if data["report"]:
        sys.stdout.write(
            report_header(
                _variance,
                _init_state_mu,
                _init_state_sigma,
                _init_velocity_mu,
                _init_velocity_sigma,
                _acceleration,
                _time,
            ),
        )
        for measured, filtered in zip(_raw, _filter):
            estimated, variance = filtered
            sys.stdout.write(
                "{0:8.4f}  {1:8.4f}  {2:8.4f}\n".format(
                    measured, estimated, variance,
                ),
            )
    else:
        for estimated, _ in _filter:
            sys.stdout.write("{0:.4f}\n".format(estimated))

def filter(
    data: List[float],
    variance: float,
    init_state_mu: float,
    init_state_sigma: float,
    init_velocity_mu: float,
    init_velocity_sigma: float,
    acceleration: Callable[[Tuple[float, float]], Tuple[float,float]],
    time: float,
) -> Iterator[Tuple[float,float]]:
    """Iterate over data, passing it through an alpha-beta filter.

    Arguments:
      data                 measurement from each time interval
      variance             variance of measurements
      init_state_mu        initial estimate of state
      init_state_sigma     std. deviation of state distribution
      init_velocity_mu     initial estimate of velocity
      init_velocity_sigma  std. deviation of velocity distribution
      acceleration         function of v1 <- v0
      time                 time unit
    """
    # normal distributions as tuples: (E(x), Var(x), )
    estimated = (init_state_mu, init_state_sigma**2, )
    velocity = (init_velocity_mu, init_velocity_sigma**2, )

    for measurement in data:
        estimated = _add(estimated, _multiply(velocity, (time,0,)))
        velocity = acceleration(velocity)

        estimated = _multiply(estimated, (measurement, variance, ))

        yield estimated

def report_header(
    variance: float,
    init_state_mu: float,
    init_state_sigma: float,
    init_velocity_mu: float,
    init_velocity_sigma: float,
    acceleration: Callable[[float], float],
    time: float,
) -> str:
    """Draw a report header summarizing the filter.

    Appears as:

    ```
    ```

    The estimates and variances then should be printed alongside the raw
    measurements.
    """
    _msg = (
        "Kalman filter",
        "Raw:      Est.:     Var.:",
        "========  ========  ========",
    )
    return "\n".join(_msg) + "\n"

def _add(
    x: Tuple[float, float],
    y: Tuple[float, float],
) -> Tuple[float, float]:
    """Add a normal distribution to another normal distribution or a constant.

    In the case of two normal distributions (X and Y), the resultant values
    are:
      E(X + Y) = E(X) + E(Y)
      Var(X + Y) = Var(X) + Var(Y)

    In the case of a normal distribution (X) and a constant (C), the resultant
    values are:
      E(X + C) = E(X) + C
      Var(X + C) = Var(X)
    """
    if y[1] == 0:
        z = (x[0]+y[0], x[1], )
    else:
        z = (x[0]+y[0], x[1]+y[1], )
    return z

def _multiply(
    x: Tuple[float, float],
    y: Tuple[float, float],
) -> Tuple[float, float]:
    """Multiply a normal distribution by another normal distribution or by a
    constant.

    Given two normal distributions (X and Y), the resultant values are:
      E(X * Y) = (Var(Y)E(X) + Var(X)E(Y)) / (Var(X) + Var(Y))
      Var(X * Y) = (Var(X)Var(Y)) / (Var(X) + Var(Y))

    Given a normal distribution (X) and a constant (C), the resultant values
    are:
      E(X * C) = E(X) * C
      Var(X * C) = Var(X) * (C^2)
    """
    #_print_normal_distribution(x, "X")
    #_print_normal_distribution(y, "Y")
    if y[1] == 0:
        z = ((x[0] * y[0]), (x[1] * y[0]**2), )
    else:
        _denom = x[1] + y[1]
        _mean = ((x[0] * y[1]) + (y[0] * x[1])) / _denom
        _var = (x[1] * y[1]) / _denom
        z = (_mean, _var, )
    #_print_normal_distribution(z, "Z")
    return z

def _print_normal_distribution(
    x: Tuple[float, float],
    name: str,
) -> None:
    sys.stdout.write("{0} = N({1},{2})\n".format(name, x[0], x[1]))