~dricottone/filters

1a1eaedc0203a3bb86b1ff5d41a4d8540930a4b0 — dricottone 5 years ago
Initial commit
6 files changed, 257 insertions(+), 0 deletions(-)

A README.md
A data/1_10
A data/1_81
A data/weights
A filter.py
A test.py
A  => README.md +23 -0
@@ 1,23 @@
A filtering script

```sh
$ filter.py -m=ab data/weights -a=.4 -b=0 -i=160 -d=1 -r
Testing on N=12
Using alpha=0.4000, beta=0.0000
Initial estimate of 160.0000, accelerating 1.0000 per time unit
  Actual:   Est.:
  ========  ========
  158.0000  159.8000
  164.2000  162.1600
  160.3000  162.0160
  159.9000  161.7696
  162.1000  162.5018
  164.6000  163.9411
  169.6000  166.8046
  167.4000  167.6428
  166.4000  167.7457
  171.0000  169.6474
  171.2000  170.8684
  172.6000  172.1611
```


A  => data/1_10 +11 -0
@@ 1,11 @@
1
2
3
4
5
6
7
8
9
10


A  => data/1_81 +10 -0
@@ 1,10 @@
1
4
9
16
25
36
49
64
81


A  => data/weights +13 -0
@@ 1,13 @@
158.0
164.2
160.3
159.9
162.1
164.6
169.6
167.4
166.4
171.0
171.2
172.6


A  => filter.py +148 -0
@@ 1,148 @@
#!/usr/bin/env python3

# std library
import random

# custom library
from mylib.cli import ARGS, read_args_or_stdin, set_verbosity

# typing
from typing import *

def ab(data: List[float],
       *,
       alpha: float,
       beta: float,
       acceleration: Callable[[float], float] = lambda x: x,
       init_state: float = 0,
       init_velocity: float = 0,
       time: float = 1
      ) -> List[float]:
    """
    Alpha-beta filter - Filter out noise of measurements to estimate data.
    Takes as given:
      initial state
      initial velocity of state
      alpha and beta correction parameters

    Arguments:
      data          : measurements of state
      alpha         : correction to estimated state
      beta          : correction to estimated velocity of state
    Options:
      acceleration  : function f such that v1 <- f(v0) [Default: v1 <- v0]
      init_state    : initial state estimate [Default: 0]
      init_velocity : initial velocity of state [Default: 0]
      time          : the time unit [Default: 1]
    """
    estimates = list()
    x_last = init_state
    v_last = init_velocity

    for index, data_point in enumerate(data):
        #estimate given last values
        x_est = x_last + (time * v_last)
        v_est = acceleration(v_last)

        #correct for residual
        x_res = (data_point - x_est)
        x_est += (alpha * x_res)
        v_est += ( (beta * x_res) / time )

        x_last = x_est
        v_last = v_est

        estimates.append(x_est)

    return estimates

# ^ filters
###############################################################################
# v internal functions

def noise(data: List[float]) -> List[float]:
    """
    Introduce random noise (r in [-1,1]) to a set of data points.
    """
    def _noise_iter(data: List[float]) -> Iterator[float]:
        for d in data:
            yield d + random.uniform(-1,1)
    return list(_noise_iter(data))

# ^ internal functions
###############################################################################
# v cli functions

def print_help():
    print("Usage: filter -m=METHOD DATA")

def get_data():
    data = list()
    for d in read_args_or_stdin(arg_number=-1, opt_number=-1,
                                opts=('f', 'files')):
        if not len(d):
            continue
        else:
            data.append( float(d) )
    return data

def report_est(data, estimations, alpha, beta, initial_state, initial_velocity):
    """
    Prints a report on the estimations to STDOUT.
    """
    print("Testing on N={}".format(len(data)))
    print("Using alpha={:.4f}, beta={:.4f}".format(alpha,beta))
    print("Initial estimate of {:.4f}, accelerating {:.4f} per time unit".format(initial_state,initial_velocity))
    print("  Actual:   Est.:  \n  ========  ========")
    for d,e in zip(data,estimations):
        print("  {:8.4f}  {:8.4f}".format(d,e))

def print_est(estimations):
    """
    Prints the estimations to STDOUT with 4 decimal places of accuracy.
    """
    for e in estimations:
        print("{:.4f}".format(e))

# ^ cli functions
###############################################################################

if __name__ == '__main__':
    import sys

    set_verbosity()
    if len(sys.argv) < 2 or ARGS.any('h', 'help'):
        print_help()
        sys.exit(0)

    method = ARGS.getany(('m', 'method'), number=1)
    try:
        _method = locals()[method]
    except: # if no method specified, or if bad method specified
        print(f'Invalid method {method}')
        print_help()
        sys.exit(1)

    data = get_data()
    if not data:
        print_help()
        sys.exit(1)

    alpha = ARGS.getany(('a', 'alpha'), .05, factory=float, number=1)
    beta = ARGS.getany(('b', 'beta'), .005, factory=float, number=1)
    initial = ARGS.getany(('i', 'initial'), 0, factory=float, number=1)
    delta = ARGS.getany(('d', 'delta'), 0, factory=float, number=1)
    report = ARGS.any('r', 'report')

    est = _method(data,
                  alpha=alpha,
                  beta=beta,
                  init_state=initial,
                  init_velocity=delta
                 )
    if report:
        report_est(data, est, alpha, beta, initial, delta)
    else:
        print_est(est)
    sys.exit(0)


A  => test.py +52 -0
@@ 1,52 @@
#!/usr/bin/env python3.7

# custom lib
from mylib.env import ENV, read_stdin
from mylib.cli import ARGS
from mylib.fs import File, exists

# typing
from typing import *


def read_args_or_stdin(*,
                       arg_number: int = 1,
                       opts: Tuple[str, ...] = ('f', 'files'),
                       opt_number: int = 1                    ) -> List[str]:
    """
    Input handler for well-behaved scripts.

    Checks for '-f' or '--file' command line options and N file names following
    them. Else check for the N first positional argument. Else assume STDIN. If
    '-' is found at any step, STDIN is used in place.

    Options:
      arg_number : Max num. of files from positional arguments  [Default: 1]
      opts       : File option flags [Default: 'f', 'file']
      opt_number : Max num. of files from file options [Default: 1]
    """
    # process cli
    fnames = ARGS.getany(opts, [], number=opt_number)
    positional = ARGS.positional()
    if not fnames and positional:
        fnames = positional[:arg_number]

    # build buffer
    if fnames:
        buffer = list()
        for fname in fnames:
            if fname == '-':
                buffer += read_stdin() # env.read_stdin
            elif exists(fname): # fs.exists
                buffer += File(fname).lines()
            else:
                pass # TODO: option for raising here
    else:
        buffer = read_stdin() # env.read_stdin

    return buffer

if __name__ == '__main__':
    lines = read_args_or_stdin()
    print('\n'.join(lines))