~dricottone/filters

ref: 1a1eaedc0203a3bb86b1ff5d41a4d8540930a4b0 filters/filter.py -rwxr-xr-x 4.2 KiB
1a1eaedc — dricottone Initial commit 5 years ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#!/usr/bin/env python3

# std library
import random

# custom library
from mylib.cli import ARGS, read_args_or_stdin, set_verbosity

# typing
from typing import *

def ab(data: List[float],
       *,
       alpha: float,
       beta: float,
       acceleration: Callable[[float], float] = lambda x: x,
       init_state: float = 0,
       init_velocity: float = 0,
       time: float = 1
      ) -> List[float]:
    """
    Alpha-beta filter - Filter out noise of measurements to estimate data.
    Takes as given:
      initial state
      initial velocity of state
      alpha and beta correction parameters

    Arguments:
      data          : measurements of state
      alpha         : correction to estimated state
      beta          : correction to estimated velocity of state
    Options:
      acceleration  : function f such that v1 <- f(v0) [Default: v1 <- v0]
      init_state    : initial state estimate [Default: 0]
      init_velocity : initial velocity of state [Default: 0]
      time          : the time unit [Default: 1]
    """
    estimates = list()
    x_last = init_state
    v_last = init_velocity

    for index, data_point in enumerate(data):
        #estimate given last values
        x_est = x_last + (time * v_last)
        v_est = acceleration(v_last)

        #correct for residual
        x_res = (data_point - x_est)
        x_est += (alpha * x_res)
        v_est += ( (beta * x_res) / time )

        x_last = x_est
        v_last = v_est

        estimates.append(x_est)

    return estimates

# ^ filters
###############################################################################
# v internal functions

def noise(data: List[float]) -> List[float]:
    """
    Introduce random noise (r in [-1,1]) to a set of data points.
    """
    def _noise_iter(data: List[float]) -> Iterator[float]:
        for d in data:
            yield d + random.uniform(-1,1)
    return list(_noise_iter(data))

# ^ internal functions
###############################################################################
# v cli functions

def print_help():
    print("Usage: filter -m=METHOD DATA")

def get_data():
    data = list()
    for d in read_args_or_stdin(arg_number=-1, opt_number=-1,
                                opts=('f', 'files')):
        if not len(d):
            continue
        else:
            data.append( float(d) )
    return data

def report_est(data, estimations, alpha, beta, initial_state, initial_velocity):
    """
    Prints a report on the estimations to STDOUT.
    """
    print("Testing on N={}".format(len(data)))
    print("Using alpha={:.4f}, beta={:.4f}".format(alpha,beta))
    print("Initial estimate of {:.4f}, accelerating {:.4f} per time unit".format(initial_state,initial_velocity))
    print("  Actual:   Est.:  \n  ========  ========")
    for d,e in zip(data,estimations):
        print("  {:8.4f}  {:8.4f}".format(d,e))

def print_est(estimations):
    """
    Prints the estimations to STDOUT with 4 decimal places of accuracy.
    """
    for e in estimations:
        print("{:.4f}".format(e))

# ^ cli functions
###############################################################################

if __name__ == '__main__':
    import sys

    set_verbosity()
    if len(sys.argv) < 2 or ARGS.any('h', 'help'):
        print_help()
        sys.exit(0)

    method = ARGS.getany(('m', 'method'), number=1)
    try:
        _method = locals()[method]
    except: # if no method specified, or if bad method specified
        print(f'Invalid method {method}')
        print_help()
        sys.exit(1)

    data = get_data()
    if not data:
        print_help()
        sys.exit(1)

    alpha = ARGS.getany(('a', 'alpha'), .05, factory=float, number=1)
    beta = ARGS.getany(('b', 'beta'), .005, factory=float, number=1)
    initial = ARGS.getany(('i', 'initial'), 0, factory=float, number=1)
    delta = ARGS.getany(('d', 'delta'), 0, factory=float, number=1)
    report = ARGS.any('r', 'report')

    est = _method(data,
                  alpha=alpha,
                  beta=beta,
                  init_state=initial,
                  init_velocity=delta
                 )
    if report:
        report_est(data, est, alpha, beta, initial, delta)
    else:
        print_est(est)
    sys.exit(0)