~dricottone/filters

d79aed132890f93f5a4db83aa0a4704676a3c828 — Dominic Ricottone 4 years ago eb78014
Refactored algorithms + improved documentation
8 files changed, 225 insertions(+), 141 deletions(-)

M Makefile
M README.md
M filter/__main__.py
M filter/ab.py
M filter/cli.py
M filter/cli.toml
M filter/internals.py
M setup.py
M Makefile => Makefile +9 -6
@@ 6,24 6,27 @@ unittest_discover = unittest --color --working-directory .
python = python3

clean:
	rm -rf **/__pycache__ **/__mypycache__ **/*.pyc build dist *.egg-info
	rm -rf **/__pycache__ **/__mypycache__ **/*.pyc build dist filters.egg-info

test:
	$(python) -m py_compile filter/*.py
	$(unittest_discover) tests
	MYPY_CACHE_DIR=filter/__mypycache__ mypy -p filter
	$(python) -m py_compile filter/*.py rng/*.py
	#$(unittest_discover) tests

build:
	gap filter/cli.toml --no-debug-mode --output=filter/cli.py
	gap rng/cli.toml --no-debug-mode --output=rng/cli.py
	$(python) setup.py sdist bdist_wheel

unittest:
	$(unittest_discover) tests --verbose
	$(unittest) tests/generated_syntax_tests.py --verbose

reinstall: uninstall install

install:
	pipx install --spec . filter
	pipx install .

uninstall:
	pipx uninstall filter
	pipx uninstall filters



M README.md => README.md +43 -18
@@ 1,25 1,50 @@
# filter
# filters

Command line data filtering
## Commands

### filter

```
filter METHOD [OPTIONS]
```

Command line data filtering.

Available `METHOD`s are: `ab` and `convolve`. Use `filter METHOD --help` to see available options.



### rng

```
rng DISTRIBUTION [OPTIONS]
```

Command line data generation.

Available `DISTRIBUTION`s are: `normal`, `uniform`, and `notrandom`. Use `rng DISTRIBUTION --help` to see available options.



## Example

```sh
$filter -m=ab data/weights -a=.4 -b=0 -i=160 -d=1 -r
$ rng uniform --mu 100 --delta 5 --offset 10 \
  | filter ab --report --delta 5 --initial 100
Alpha-beta filter
  α=0.4000, β=0.0000
  Initial estimate: 160.0000 changing 1.0000 per time unit
Actual:   Est.:
  α=0.0500, β=0.0050
  Initial estimate: 100.0000 changing 5.0000 per time unit
Raw:      Est.:
========  ========
158.0000  159.8000
164.2000  162.1600
160.3000  162.0160
159.9000  161.7696
162.1000  162.5018
164.6000  163.9411
169.6000  166.8046
167.4000  167.6428
166.4000  167.7457
171.0000  169.6474
171.2000  170.8684
172.6000  172.1611
109.8960  105.2448
 99.0558  109.7086
119.8484  114.9356
115.0693  119.6868
119.0251  124.3752
126.9214  129.1972
127.7748  133.8095
133.2530  138.4348
145.2687  143.4037
136.5438  147.6973
```


M filter/__main__.py => filter/__main__.py +36 -18
@@ 8,41 8,59 @@ from . import internals
def main():
    _config, _positionals = cli.main(sys.argv[1:])

    _method = _config.get("method", "")
    if "version" in _config.keys():
        internals._print_version()
        sys.exit(0)
    elif "list-methodologies" in _config.keys():
        internals._print_methodologies("ab", "convolve")
        sys.exit(0)
    elif "methodology" in _config.keys():
        _method = config.get("methodology", "")
    elif len(_positionals) > 0:
        _method = _positionals.pop(0)
    elif "help" in _config.keys():
        internals._print_help()
        sys.exit(0)
    else:
        internals._print_usage()
        sys.exit(1)

    _init_estimate = 0
    _init_deviation = 1
    if "initial" in _config.keys():
        _initial = internals._try_get_list_float(_config, "initial")
        if len(_initial) > 1:
            _init_estimate = _initial[0]
            _init_deviation = _initial[1]
        elif len(_initial) == 1:
            _init_estimate = _initial[0]

    _data = {
        "alpha": internals._try_get_float(_config, "alpha"),
        "beta": internals._try_get_float(_config, "beta"),
        "delta": internals._try_get_float(_config, "delta"),
        "initial": internals._try_get_float(_config, "initial"),
        "initial_estimate": _init_estimate,
        "initial_std_deviation": _init_deviation,
        "kernel": internals._try_get_list_float(_config, "kernel"),
        "method": _method,
        "report": "report" in _config.keys(),
        "time": internals._try_get_float(_config, "time"),
        "variance": internals._try_get_float(_config, "variance"),
    }

    if "version" in _config.keys():
        internals._print_version()
        sys.exit(0)

    if _method == "ab":
        from . import ab as implementation
    elif _method == "kalman":
        from . import kalman as implementation
    elif _method == "convolve":
        from . import convolve as implementation
    elif len(_method) > 0:
        # if some methodology given but not in above list
        internals._print_invalid_methodology(_method)
        internals._print_usage()
        sys.exit(1)

    if "help" in _config.keys() and "method" not in _config.keys():
        # requesting help
        internals._print_help()
        sys.exit(0)
    elif "help" in _config.keys():
        # requesting help with a methodology
    if "help" in _config.keys():
        sys.stdout.write(implementation.__doc__)
        sys.exit(0)
    elif "method" not in _config.keys():
        # not requesting help, but still no methodology
        internals._print_usage()
        sys.exit(1)

    _files = _config.get("file", [])
    _files.extend(_positionals)

M filter/ab.py => filter/ab.py +39 -35
@@ 1,67 1,70 @@
#!/usr/bin/env python3

"""filter -m=ab [OPTIONS] <DATA>
"""filter ab [OPTIONS] DATA
Alpha-beta filter - Filter out noise of measurements to estimate data.

Options:
  -a, --alpha  correction to estimated state [Default: 0.05]
  -b, --beta   correction to estimated velocity of state [Default: 0.005]
  -d, --delta  initial velocity [Default: 0]
  -i, --inital initial state [Default: 0]
  -t, --time   unit of time [Default: 1]

Currently, assumes the function of acceleration (i.e. `v1 <- f(v0)`) is
`v1 <- v0`.
  -a N, --alpha N  correction to estimated state [Default: 0.05]
  -b N, --beta N   correction to estimated velocity of state [Default: 0.005]
  -d N, --delta N  initial velocity of state per time unit [Default: 0]
  -i M N,          initial state; M is the estimate and N is the std. deviation
    --initial M N    [Note: std. deviation unused] [Default: 0 0]

Currently assumed that acceleration is constant and that the time unit is 1.
"""

__all__ = ['cli_wrapper', 'filter', 'report']

import sys

from typing import Callable, List, Dict, Iterator


def cli_wrapper(**data: Dict):
    """Handler for the alpha-beta filter. Checks and cleans given options,
    and performs optional reporting.
    """
    _raw = data["data_raw"]
    _alpha = data["alpha"] if data["alpha"] is not None else 0.05
    _beta = data["beta"] if data["beta"] is not None else 0.005
    _init_state = data["initial"] if data["initial"] is not None else 0
    _init_state = data["initial_estimate"]
    _init_velocity = data["delta"] if data["delta"] is not None else 0
    _time = data["time"] if data["time"] is not None else 1.0
    _raw = data["data_raw"]

    _time = 1.0 #constant time unit
    _acceleration = lambda x: x #constant acceleration

    _filter = filter(
        _raw,
        _alpha,
        _beta,
        _acceleration,
        _init_state,
        _init_velocity,
        _acceleration,
        _time,
    )

    if data["report"]:
        sys.stdout.write(
            report_header(_alpha, _beta, _init_state, _init_velocity)
            report_header(
                _alpha,
                _beta,
                _init_state,
                _init_velocity,
                _acceleration,
                _time,
            )
        )
        for actual, estimated in zip(_raw, _filter):
            sys.stdout.write("{0:8.4f}  {1:8.4f}\n".format(actual, estimated))
        for measured, estimated in zip(_raw, _filter):
            sys.stdout.write("{0:8.4f}  {1:8.4f}\n".format(measured, estimated))
    else:
        for estimated in _filter:
            sys.stdout.write("{0:.4f}\n".format(estimated))


def filter(
    data: List[float],
    alpha: float,
    beta: float,
    acceleration: Callable[[float], float],
    init_state: float,
    init_velocity: float,
    acceleration: Callable[[float], float],
    time: float,
) -> Iterator[float]:
    """Iterate over data, passing it through an alpha-beta filter.


@@ 75,29 78,30 @@ def filter(
      init_velocity  initial estimate of velocity
      time           time unit
    """
    x_last = init_state
    v_last = init_velocity
    for index, data_point in enumerate(data):
    last_estimated = init_state
    last_velocity = init_velocity
    for data_point in data:
        #estimate given last values
        x_est = x_last + (time * v_last)
        v_est = acceleration(v_last)
        estimated = last_estimated + (time * last_velocity)
        velocity = acceleration(last_velocity)

        #correct for residual
        x_res = (data_point - x_est)
        x_est += (alpha * x_res)
        v_est += ( (beta * x_res) / time )
        residual = (data_point - estimated)
        estimated += (alpha * residual)
        velocity += ( (beta * residual) / time )

        x_last = x_est
        v_last = v_est

        yield x_est
        last_estimated = estimated
        last_velocity = velocity

        yield estimated

def report_header(
    alpha: float,
    beta: float,
    init_state: float,
    init_velocity: float,
    acceleration: Callable[[float], float],
    time: float,
) -> str:
    """Draw a report header summarizing the filter.



@@ 105,8 109,8 @@ def report_header(
    ```
    Alpha-beta filter
      α=<alpha>,β=<beta>
      Initial estimate <init_state> changing <init_velocity> per time unit
    Actual:   Est.:
      Initial estimate: <init_state> changing <init_velocity> per time unit
    Raw:      Est.:
    ========  ========
    ```
    The estimates then should be printed alongside the raw measurements.


@@ 117,7 121,7 @@ def report_header(
        "  Initial estimate: {0:.4f} changing {1:.4f} per time unit".format(
            init_state, init_velocity,
        ),
        "Actual:   Est.:",
        "Raw:      Est.:",
        "========  ========",
    )
    return "\n".join(_msg) + "\n"

M filter/cli.py => filter/cli.py +44 -36
@@ 5,7 5,7 @@ import re
def main(arguments):
	config=dict()
	positional=[]
	pattern=re.compile(r"(?:-(?:a|b|d|f|h|x|i|m|r|t|v|V)|--(?:alpha|beta|delta|file|help|initial|method|report|time|version))(?:=.*)?$")
	pattern=re.compile(r"(?:-(?:a|b|d|f|h|x|i|k|r|v|V)|--(?:alpha|beta|delta|file|help|initial|kernel|list-methodologies|methodology|report|variance|version))(?:=.*)?$")
	consuming,needing,wanting=None,0,0
	attached_value=None
	while len(arguments) and arguments[0]!="--":


@@ 53,7 53,7 @@ def main(arguments):
					attached_value=None
				else:
					config["file"]=[]
					consuming,needing,wanting="file",0,8
					consuming,needing,wanting="file",1,9
			elif option=="help":
				if attached_value is not None:
					message=(


@@ 64,20 64,36 @@ def main(arguments):
				config["help"]=True
			elif option=="initial":
				if attached_value is not None:
					config["initial"]=attached_value
					config["initial"]=[attached_value]
					consuming,needing,wanting="initial",0,1
					attached_value=None
					consuming,needing,wanting=None,0,0
				else:
					config["initial"]=None
					consuming,needing,wanting="initial",1,1
			elif option=="method":
					config["initial"]=[]
					consuming,needing,wanting="initial",1,2
			elif option=="kernel":
				if attached_value is not None:
					config["method"]=attached_value
					config["kernel"]=[attached_value]
					consuming,needing,wanting="kernel",0,8
					attached_value=None
				else:
					config["kernel"]=[]
					consuming,needing,wanting="kernel",1,9
			elif option=="list-methodologies":
				if attached_value is not None:
					message=(
						'unexpected value while parsing "list-methodologies"'
						' (expected 0 values)'
					)
					raise ValueError(message) from None
				config["list-methodologies"]=True
			elif option=="methodology":
				if attached_value is not None:
					config["methodology"]=attached_value
					attached_value=None
					consuming,needing,wanting=None,0,0
				else:
					config["method"]=None
					consuming,needing,wanting="method",1,1
					config["methodology"]=None
					consuming,needing,wanting="methodology",1,1
			elif option=="report":
				if attached_value is not None:
					message=(


@@ 86,14 102,14 @@ def main(arguments):
					)
					raise ValueError(message) from None
				config["report"]=True
			elif option=="time":
			elif option=="variance":
				if attached_value is not None:
					config["time"]=attached_value
					config["variance"]=attached_value
					attached_value=None
					consuming,needing,wanting=None,0,0
				else:
					config["time"]=None
					consuming,needing,wanting="time",1,1
					config["variance"]=None
					consuming,needing,wanting="variance",1,1
			elif option=="version":
				if attached_value is not None:
					message=(


@@ 133,7 149,7 @@ def main(arguments):
					attached_value=None
				else:
					config["file"]=[]
					consuming,needing,wanting="file",0,8
					consuming,needing,wanting="file",1,9
			elif option=="h":
				if attached_value is not None:
					message=(


@@ 152,20 168,20 @@ def main(arguments):
				config["help"]=True
			elif option=="i":
				if attached_value is not None:
					config["initial"]=attached_value
					config["initial"]=[attached_value]
					consuming,needing,wanting="initial",0,1
					attached_value=None
					consuming,needing,wanting=None,0,0
				else:
					config["initial"]=None
					consuming,needing,wanting="initial",1,1
			elif option=="m":
					config["initial"]=[]
					consuming,needing,wanting="initial",1,2
			elif option=="k":
				if attached_value is not None:
					config["method"]=attached_value
					config["kernel"]=[attached_value]
					consuming,needing,wanting="kernel",0,8
					attached_value=None
					consuming,needing,wanting=None,0,0
				else:
					config["method"]=None
					consuming,needing,wanting="method",1,1
					config["kernel"]=[]
					consuming,needing,wanting="kernel",1,9
			elif option=="r":
				if attached_value is not None:
					message=(


@@ 174,22 190,14 @@ def main(arguments):
					)
					raise ValueError(message) from None
				config["report"]=True
			elif option=="t":
			elif option=="v":
				if attached_value is not None:
					config["time"]=attached_value
					config["variance"]=attached_value
					attached_value=None
					consuming,needing,wanting=None,0,0
				else:
					config["time"]=None
					consuming,needing,wanting="time",1,1
			elif option=="v":
				if attached_value is not None:
					message=(
						'unexpected value while parsing "version"'
						' (expected 0 values)'
					)
					raise ValueError(message) from None
				config["version"]=True
					config["variance"]=None
					consuming,needing,wanting="variance",1,1
			elif option=="V":
				if attached_value is not None:
					message=(

M filter/cli.toml => filter/cli.toml +14 -6
@@ 20,22 20,30 @@ number = 0
alternatives = ['h', 'x']

[initial]
number = 1
minimum = 1
maximum = 2
alternatives = ['i']

[method]
[kernel]
minimum = 1
maximum = 9
alternatives = ['k']

[list-methodologies]
number = 0

[methodology]
number = 1
alternatives = ['m']

[report]
number = 0
alternatives = ['r']

[time]
[variance]
number = 1
alternatives = ['t']
alternatives = ['v']

[version]
number = 0
alternatives = ['v', 'V']
alternatives = ['V']


M filter/internals.py => filter/internals.py +35 -17
@@ 1,10 1,10 @@
#!/usr/bin/env python3

import sys
import random

from typing import *

VERSION = (1,0,1,)

def _try_get_float(
    mapping: Dict,
    key: str,


@@ 16,6 16,23 @@ def _try_get_float(
    else:
        return default

def _try_get_list_float(
    mapping: Dict,
    key: str,
    *,
    default: Optional[List[float]] = None,
) -> Optional[List[float]]:
    if key in mapping:
        _list = list()
        for value in mapping[key]:
            try:
                _list.append(float(value))
            except:
                _print_invalid_data(value)
        return _list
    else:
        return default

def _read_stdin() -> Iterator[float]:
    try:
        for line in sys.stdin.readlines():


@@ 57,30 74,31 @@ def _get_raw_data(filenames: List[str]) -> List[float]:

    return raw_data

def noise(data: List[float]) -> List[float]:
    """
    Introduce random noise (r in [-1,1]) to a set of data points.
    """
    def _noise_iter(data: List[float]) -> Iterator[float]:
        for d in data:
            yield d + random.uniform(-1,1)
    return list(_noise_iter(data))

def _print_help() -> None:
    _msg = "Usage: filter -m=METHOD DATA\n"
    _msg = "Usage: filter METHOD [OPTIONS] DATA\n"
    sys.stdout.write(_msg)

def _print_version() -> None:
    _msg = "gap 1.0.0\n"
    _msg = "filter {0}\n".format(".".join(str(v) for v in VERSION))
    sys.stdout.write(_msg)

def _print_methodologies(*method: str) -> None:
    _msg = "Valid methodologies: {0}\n".format(", ".join(method))
    sys.stdout.write(_msg)

def _print_usage() -> None:
    _msg = "Usage: filter -m=METHOD DATA\n"
    sys.stderr.write(_msg)
    _msg = (
        "Usage: filter METHOD [OPTIONS] DATA",
        "Try `filter --list-methodologies` and `filter METHOD --help`",
    )
    sys.stderr.write("\n".join(_msg) + "\n")

def _print_invalid_methodology(method: str) -> None:
    _msg = "{0}: Invalid methodology '{1}'\n".format(sys.argv[0], method)
    sys.stderr.write(_msg)
    _msg = (
        "{0}: Invalid methodology '{1}'\n".format(sys.argv[0], method),
        "Try `filter --list-methodologies`",
    )
    sys.stderr.write("\n".join(_msg) + "\n")

def _print_invalid_file(filename: str) -> None:
    _msg = "{0}: Invalid file '{1}'\n".format(sys.argv[0], filename)

M setup.py => setup.py +5 -5
@@ 7,17 7,17 @@ with open('README.md', encoding='utf-8') as f:
    long_description = f.read()

setup(
    name="filter",
    packages=["filter"],
    version="1.0.0",
    name="filters",
    packages=["filter", "rng"],
    version="1.0.1",
    license="GPL",
    description="Data filters",
    long_description=long_description,
    long_description_content_type='text/markdown',
    author="Dominic Ricottone",
    author_email="me@dominic-ricottone.com",
    url="git.dominic-ricottone.com/gap",
    entry_points={"console_scripts": ["filter = filter.__main__:main"]},
    url="git.dominic-ricottone.com/filters",
    entry_points={"console_scripts": ["filter = filter.__main__:main", "rng = rng.__main__:main"]},
    python_requires=">=3.6",
)