~dricottone/filters

7b51a036bbb61fbe5c6a0467742a87b09f306732 — Dominic Ricottone 4 years ago 1a1eaed v1.0.0
Refactored into package, to use gap argument parser; Remade build system
13 files changed, 570 insertions(+), 202 deletions(-)

A .gitignore
A Makefile
M README.md
D filter.py
A filter/__init__.py
A filter/__main__.py
A filter/ab.py
A filter/cli.py
A filter/cli.toml
A filter/internals.py
A requirements.txt
A setup.py
D test.py
A .gitignore => .gitignore +7 -0
@@ 0,0 1,7 @@
**/__pycache__
**/__mypycache__
**/*.pyc
build
dist
*.egg-info


A Makefile => Makefile +29 -0
@@ 0,0 1,29 @@

unittest = unittest --color
#unittest = python -m unittest
unittest_discover = unittest --color --working-directory .
#unittest_discover = python -m unittest discover tests --top-level-directory . --start-directory
python = python3

clean:
	rm -rf **/__pycache__ **/__mypycache__ **/*.pyc build dist *.egg-info

test:
	$(python) -m py_compile filter/*.py
	$(unittest_discover) tests
	MYPY_CACHE_DIR=filter/__mypycache__ mypy -p filter

build:
	gap filter/cli.toml --no-debug-mode --output=filter/cli.py
	$(python) setup.py sdist bdist_wheel

unittest:
	$(unittest_discover) tests --verbose
	$(unittest) tests/generated_syntax_tests.py --verbose

install:
	pipx install --spec . filter

uninstall:
	pipx uninstall filter


M README.md => README.md +4 -2
@@ 1,7 1,9 @@
A filtering script
# filter

Command line data filtering

```sh
$ filter.py -m=ab data/weights -a=.4 -b=0 -i=160 -d=1 -r
$ filter -m=ab data/weights -a=.4 -b=0 -i=160 -d=1 -r
Testing on N=12
Using alpha=0.4000, beta=0.0000
Initial estimate of 160.0000, accelerating 1.0000 per time unit

D filter.py => filter.py +0 -148
@@ 1,148 0,0 @@
#!/usr/bin/env python3

# std library
import random

# custom library
from mylib.cli import ARGS, read_args_or_stdin, set_verbosity

# typing
from typing import *

def ab(data: List[float],
       *,
       alpha: float,
       beta: float,
       acceleration: Callable[[float], float] = lambda x: x,
       init_state: float = 0,
       init_velocity: float = 0,
       time: float = 1
      ) -> List[float]:
    """
    Alpha-beta filter - Filter out noise of measurements to estimate data.
    Takes as given:
      initial state
      initial velocity of state
      alpha and beta correction parameters

    Arguments:
      data          : measurements of state
      alpha         : correction to estimated state
      beta          : correction to estimated velocity of state
    Options:
      acceleration  : function f such that v1 <- f(v0) [Default: v1 <- v0]
      init_state    : initial state estimate [Default: 0]
      init_velocity : initial velocity of state [Default: 0]
      time          : the time unit [Default: 1]
    """
    estimates = list()
    x_last = init_state
    v_last = init_velocity

    for index, data_point in enumerate(data):
        #estimate given last values
        x_est = x_last + (time * v_last)
        v_est = acceleration(v_last)

        #correct for residual
        x_res = (data_point - x_est)
        x_est += (alpha * x_res)
        v_est += ( (beta * x_res) / time )

        x_last = x_est
        v_last = v_est

        estimates.append(x_est)

    return estimates

# ^ filters
###############################################################################
# v internal functions

def noise(data: List[float]) -> List[float]:
    """
    Introduce random noise (r in [-1,1]) to a set of data points.
    """
    def _noise_iter(data: List[float]) -> Iterator[float]:
        for d in data:
            yield d + random.uniform(-1,1)
    return list(_noise_iter(data))

# ^ internal functions
###############################################################################
# v cli functions

def print_help():
    print("Usage: filter -m=METHOD DATA")

def get_data():
    data = list()
    for d in read_args_or_stdin(arg_number=-1, opt_number=-1,
                                opts=('f', 'files')):
        if not len(d):
            continue
        else:
            data.append( float(d) )
    return data

def report_est(data, estimations, alpha, beta, initial_state, initial_velocity):
    """
    Prints a report on the estimations to STDOUT.
    """
    print("Testing on N={}".format(len(data)))
    print("Using alpha={:.4f}, beta={:.4f}".format(alpha,beta))
    print("Initial estimate of {:.4f}, accelerating {:.4f} per time unit".format(initial_state,initial_velocity))
    print("  Actual:   Est.:  \n  ========  ========")
    for d,e in zip(data,estimations):
        print("  {:8.4f}  {:8.4f}".format(d,e))

def print_est(estimations):
    """
    Prints the estimations to STDOUT with 4 decimal places of accuracy.
    """
    for e in estimations:
        print("{:.4f}".format(e))

# ^ cli functions
###############################################################################

if __name__ == '__main__':
    import sys

    set_verbosity()
    if len(sys.argv) < 2 or ARGS.any('h', 'help'):
        print_help()
        sys.exit(0)

    method = ARGS.getany(('m', 'method'), number=1)
    try:
        _method = locals()[method]
    except: # if no method specified, or if bad method specified
        print(f'Invalid method {method}')
        print_help()
        sys.exit(1)

    data = get_data()
    if not data:
        print_help()
        sys.exit(1)

    alpha = ARGS.getany(('a', 'alpha'), .05, factory=float, number=1)
    beta = ARGS.getany(('b', 'beta'), .005, factory=float, number=1)
    initial = ARGS.getany(('i', 'initial'), 0, factory=float, number=1)
    delta = ARGS.getany(('d', 'delta'), 0, factory=float, number=1)
    report = ARGS.any('r', 'report')

    est = _method(data,
                  alpha=alpha,
                  beta=beta,
                  init_state=initial,
                  init_velocity=delta
                 )
    if report:
        report_est(data, est, alpha, beta, initial, delta)
    else:
        print_est(est)
    sys.exit(0)


A filter/__init__.py => filter/__init__.py +0 -0
A filter/__main__.py => filter/__main__.py +56 -0
@@ 0,0 1,56 @@
#!/usr/bin/env python3

import sys
import importlib

from . import cli
from . import internals

def main():
    _config, _positionals = cli.main(sys.argv[1:])

    _help = "help" in _config.keys()
    _method = _config.get("method", "")
    _files = _config.get("file", [])
    _files.extend(_positionals)

    _data = {
        "alpha": internals._try_get_float(_config, "alpha"),
        "beta": internals._try_get_float(_config, "beta"),
        "delta": internals._try_get_float(_config, "delta"),
        "initial": internals._try_get_float(_config, "initial"),
        "method": _method,
        "report": "report" in _config.keys(),
        "time": internals._try_get_float(_config, "time"),
    }

    if _method == "ab":
        from . import ab as implementation
    elif len(_method) > 0:
        # if some methodology given but not in above list
        internals._print_invalid_methodology(_method)
        internals._print_usage()
        sys.exit(1)

    if _help and len(_method) == 0:
        # requesting help
        internals._print_help()
        sys.exit(0)
    elif _help:
        # requesting help with a methodology
        sys.stdout.print(implementation.__doc__)
        sys.exit(0)
    elif len(_method) == 0:
        # not requesting help, but still no methodology
        internals._print_usage()
        sys.exit(1)

    _data["data_raw"] = internals._get_raw_data(_files)
    implementation.cli_wrapper(**_data)


    sys.exit(0)

if __name__ == "__main__":
    main()


A filter/ab.py => filter/ab.py +124 -0
@@ 0,0 1,124 @@
#!/usr/bin/env python3

"""filter -m=ab [OPTIONS] <DATA>
Alpha-beta filter - Filter out noise of measurements to estimate data.

Options:
  -a, --alpha  correction to estimated state [Default: 0.05]
  -b, --beta   correction to estimated velocity of state [Default: 0.005]
  -d, --delta  initial velocity [Default: 0]
  -i, --inital initial state [Default: 0]
  -t, --time   unit of time [Default: 1]

Currently, assumes the function of acceleration (i.e. `v1 <- f(v0)`) is
`v1 <- v0`.
"""

__all__ = ['cli_wrapper', 'filter', 'report']

import sys

from typing import Callable, List, Dict, Iterator


def cli_wrapper(**data: Dict):
    """Handler for the alpha-beta filter. Checks and cleans given options,
    and performs optional reporting.
    """
    _alpha = data["alpha"] if data["alpha"] is not None else 0.05
    _beta = data["beta"] if data["beta"] is not None else 0.005
    _init_state = data["initial"] if data["initial"] is not None else 0
    _init_velocity = data["delta"] if data["delta"] is not None else 0
    _time = data["time"] if data["time"] is not None else 1.0
    _raw = data["data_raw"]

    _acceleration = lambda x: x #constant acceleration

    _filter = filter(
        _raw,
        _alpha,
        _beta,
        _acceleration,
        _init_state,
        _init_velocity,
        _time,
    )

    if data["report"]:
        sys.stdout.write(
            report_header(_alpha, _beta, _init_state, _init_velocity)
        )
        for actual, estimated in zip(_raw, _filter):
            sys.stdout.write("{0:8.4f}  {1:8.4f}\n".format(actual, estimated))
    else:
        for estimated in _filter:
            sys.stdout.write("{0:.4f}\n".format(estimated))


def filter(
    data: List[float],
    alpha: float,
    beta: float,
    acceleration: Callable[[float], float],
    init_state: float,
    init_velocity: float,
    time: float,
) -> Iterator[float]:
    """Iterate over data, passing it through an alpha-beta filter.

    Arguments:
      data           measurement from each time interval
      alpha          correction to estimated state
      beta           correction to estimated velocity
      acceleration   function of v1 <- v0
      init_state     initial estimate of state
      init_velocity  initial estimate of velocity
      time           time unit
    """
    x_last = init_state
    v_last = init_velocity
    for index, data_point in enumerate(data):
        #estimate given last values
        x_est = x_last + (time * v_last)
        v_est = acceleration(v_last)

        #correct for residual
        x_res = (data_point - x_est)
        x_est += (alpha * x_res)
        v_est += ( (beta * x_res) / time )

        x_last = x_est
        v_last = v_est

        yield x_est


def report_header(
    alpha: float,
    beta: float,
    init_state: float,
    init_velocity: float,
) -> str:
    """Draw a report header summarizing the filter.

    Appears as:
    ```
    Alpha-beta filter
      α=<alpha>,β=<beta>
      Initial estimate <init_state> changing <init_velocity> per time unit
    Actual:   Est.:
    ========  ========
    ```
    The estimates then should be printed alongside the raw measurements.
    """
    _msg = (
        "Alpha-beta filter",
        "  α={0:.4f}, β={1:.4f}".format(alpha, beta),
        "  Initial estimate: {0:.4f} changing {1:.4f} per time unit".format(
            init_state, init_velocity,
        ),
        "Actual:   Est.:",
        "========  ========",
    )
    return "\n".join(_msg) + "\n"


A filter/cli.py => filter/cli.py +198 -0
@@ 0,0 1,198 @@
#!/usr/bin/env python3

import re

def main(arguments):
	config=dict()
	positional=[]
	pattern=re.compile(r"(?:-(?:a|b|d|f|h|x|i|m|r|t)|--(?:alpha|beta|delta|file|help|initial|method|report|time))(?:=.*)?$")
	consuming,needing,wanting=None,0,0
	attached_value=None
	while len(arguments) and arguments[0]!="--":
		if consuming is not None:
			if config[consuming] is None:
				config[consuming]=arguments.pop(0)
			else:
				config[consuming].append(arguments.pop(0))
			needing-=1
			wanting-=1
			if wanting==0:
				consuming,needing,wanting=None,0,0
		elif pattern.match(arguments[0]):
			option = arguments.pop(0).lstrip('-')
			if '=' in option:
				option,attached_value=option.split('=',1)
			if option=="alpha":
				if attached_value is not None:
					config["alpha"]=attached_value
					attached_value=None
					consuming,needing,wanting=None,0,0
				else:
					config["alpha"]=None
					consuming,needing,wanting="alpha",1,1
			elif option=="beta":
				if attached_value is not None:
					config["beta"]=attached_value
					attached_value=None
					consuming,needing,wanting=None,0,0
				else:
					config["beta"]=None
					consuming,needing,wanting="beta",1,1
			elif option=="delta":
				if attached_value is not None:
					config["delta"]=attached_value
					attached_value=None
					consuming,needing,wanting=None,0,0
				else:
					config["delta"]=None
					consuming,needing,wanting="delta",1,1
			elif option=="file":
				if attached_value is not None:
					config["file"]=[attached_value]
					consuming,needing,wanting="file",0,8
					attached_value=None
				else:
					config["file"]=[]
					consuming,needing,wanting="file",0,8
			elif option=="help":
				if attached_value is not None:
					message=(
						'unexpected value while parsing "help"'
						' (expected 0 values)'
					)
					raise ValueError(message) from None
				config["help"]=True
			elif option=="initial":
				if attached_value is not None:
					config["initial"]=attached_value
					attached_value=None
					consuming,needing,wanting=None,0,0
				else:
					config["initial"]=None
					consuming,needing,wanting="initial",1,1
			elif option=="method":
				if attached_value is not None:
					config["method"]=attached_value
					attached_value=None
					consuming,needing,wanting=None,0,0
				else:
					config["method"]=None
					consuming,needing,wanting="method",1,1
			elif option=="report":
				if attached_value is not None:
					message=(
						'unexpected value while parsing "report"'
						' (expected 0 values)'
					)
					raise ValueError(message) from None
				config["report"]=True
			elif option=="time":
				if attached_value is not None:
					config["time"]=attached_value
					attached_value=None
					consuming,needing,wanting=None,0,0
				else:
					config["time"]=None
					consuming,needing,wanting="time",1,1
			elif option=="a":
				if attached_value is not None:
					config["alpha"]=attached_value
					attached_value=None
					consuming,needing,wanting=None,0,0
				else:
					config["alpha"]=None
					consuming,needing,wanting="alpha",1,1
			elif option=="b":
				if attached_value is not None:
					config["beta"]=attached_value
					attached_value=None
					consuming,needing,wanting=None,0,0
				else:
					config["beta"]=None
					consuming,needing,wanting="beta",1,1
			elif option=="d":
				if attached_value is not None:
					config["delta"]=attached_value
					attached_value=None
					consuming,needing,wanting=None,0,0
				else:
					config["delta"]=None
					consuming,needing,wanting="delta",1,1
			elif option=="f":
				if attached_value is not None:
					config["file"]=[attached_value]
					consuming,needing,wanting="file",0,8
					attached_value=None
				else:
					config["file"]=[]
					consuming,needing,wanting="file",0,8
			elif option=="h":
				if attached_value is not None:
					message=(
						'unexpected value while parsing "help"'
						' (expected 0 values)'
					)
					raise ValueError(message) from None
				config["help"]=True
			elif option=="x":
				if attached_value is not None:
					message=(
						'unexpected value while parsing "help"'
						' (expected 0 values)'
					)
					raise ValueError(message) from None
				config["help"]=True
			elif option=="i":
				if attached_value is not None:
					config["initial"]=attached_value
					attached_value=None
					consuming,needing,wanting=None,0,0
				else:
					config["initial"]=None
					consuming,needing,wanting="initial",1,1
			elif option=="m":
				if attached_value is not None:
					config["method"]=attached_value
					attached_value=None
					consuming,needing,wanting=None,0,0
				else:
					config["method"]=None
					consuming,needing,wanting="method",1,1
			elif option=="r":
				if attached_value is not None:
					message=(
						'unexpected value while parsing "report"'
						' (expected 0 values)'
					)
					raise ValueError(message) from None
				config["report"]=True
			elif option=="t":
				if attached_value is not None:
					config["time"]=attached_value
					attached_value=None
					consuming,needing,wanting=None,0,0
				else:
					config["time"]=None
					consuming,needing,wanting="time",1,1
		else:
			positional.append(arguments.pop(0))
	if needing>0:
		message=(
			f'unexpected end while parsing "{consuming}"'
			f' (expected {needing} values)'
		)
		raise ValueError(message) from None
	for argument in arguments[1:]:
		positional.append(argument)
	return config,positional

if __name__=="__main__":
	import sys
	cfg,pos = main(sys.argv[1:])
	cfg = {k:v for k,v in cfg.items() if v is not None}
	if len(cfg):
		print("Options:")
		for k,v in cfg.items():
			print(f"{k:20} = {v}")
	if len(pos):
		print("Positional arguments:", ", ".join(pos))

A filter/cli.toml => filter/cli.toml +37 -0
@@ 0,0 1,37 @@
[alpha]
number = 1
alternatives = ['a']

[beta]
number = 1
alternatives = ['b']

[delta]
number = 1
alternatives = ['d']

[file]
minimum = 1
maximum = 9
alternatives = ['f']

[help]
number = 0
alternatives = ['h', 'x']

[initial]
number = 1
alternatives = ['i']

[method]
number = 1
alternatives = ['m']

[report]
number = 0
alternatives = ['r']

[time]
number = 1
alternatives = ['t']


A filter/internals.py => filter/internals.py +90 -0
@@ 0,0 1,90 @@
#!/usr/bin/env python3

import sys
import random

from typing import *

def _try_get_float(
    mapping: Dict,
    key: str,
    *,
    default: Optional[float] = None,
) -> Optional[float]:
    if key in mapping:
        return float(mapping[key])
    else:
        return default

def _read_stdin() -> Iterator[float]:
    try:
        for line in sys.stdin.readlines():
            line = line.strip()
            if len(line) == 0:
                continue
            try:
                yield float(line.strip())
            except:
                _print_invalid_data(line)
    except KeyboardInterrupt:
        sys.stdout.write("\n")

def _read_file(filename) -> Iterator[float]:
    try:
        with open(filename, 'r') as f:
            for line in f.readlines():
                line = line.strip()
                if len(line) == 0:
                    continue
                try:
                    yield float(line)
                except:
                    _print_invalid_data(line)
    except OSError:
        _print_invalid_file(filename)

def _get_raw_data(filenames: List[str]) -> List[float]:
    raw_data: List[float] = list()

    if len(filenames) == 0:
        raw_data.extend(list(_read_stdin()))
    else:
        for filename in filenames:
            if filename == '-':
                raw_data.extend(list(_read_stdin()))
            else:
                raw_data.extend(list(_read_file(filename)))

    return raw_data

def noise(data: List[float]) -> List[float]:
    """
    Introduce random noise (r in [-1,1]) to a set of data points.
    """
    def _noise_iter(data: List[float]) -> Iterator[float]:
        for d in data:
            yield d + random.uniform(-1,1)
    return list(_noise_iter(data))

def _print_help() -> None:
    _msg = "Usage: filter -m=METHOD DATA\n"
    sys.stdout.write(_msg)

def _print_usage() -> None:
    _msg = "Usage: filter -m=METHOD DATA\n"
    sys.stderr.write(_msg)

def _print_invalid_methodology(method: str) -> None:
    _msg = "{0}: Invalid methodology '{1}'\n".format(sys.argv[0], method)
    sys.stderr.write(_msg)

def _print_invalid_file(filename: str) -> None:
    _msg = "{0}: Invalid file '{1}'\n".format(sys.argv[0], filename)
    sys.stderr.write(_msg)

def _print_invalid_data(line: str) -> None:
    _msg = "{0}: Cannot convert '{1}' into numeric value\n".format(
        sys.argv[0], line,
    )
    sys.stderr.write(_msg)


A requirements.txt => requirements.txt +2 -0
@@ 0,0 1,2 @@
setuptools>=47.1.1


A setup.py => setup.py +23 -0
@@ 0,0 1,23 @@
#!/usr/bin/env python3

from setuptools import setup

long_description = None
with open('README.md', encoding='utf-8') as f:
    long_description = f.read()

setup(
    name="filter",
    packages=["filter"],
    version="1.0.0",
    license="GPL",
    description="Data filters",
    long_description=long_description,
    long_description_content_type='text/markdown',
    author="Dominic Ricottone",
    author_email="me@dominic-ricottone.com",
    url="git.dominic-ricottone.com/gap",
    entry_points={"console_scripts": ["filter = filter.__main__:main"]},
    python_requires=">=3.6",
)


D test.py => test.py +0 -52
@@ 1,52 0,0 @@
#!/usr/bin/env python3.7

# custom lib
from mylib.env import ENV, read_stdin
from mylib.cli import ARGS
from mylib.fs import File, exists

# typing
from typing import *


def read_args_or_stdin(*,
                       arg_number: int = 1,
                       opts: Tuple[str, ...] = ('f', 'files'),
                       opt_number: int = 1                    ) -> List[str]:
    """
    Input handler for well-behaved scripts.

    Checks for '-f' or '--file' command line options and N file names following
    them. Else check for the N first positional argument. Else assume STDIN. If
    '-' is found at any step, STDIN is used in place.

    Options:
      arg_number : Max num. of files from positional arguments  [Default: 1]
      opts       : File option flags [Default: 'f', 'file']
      opt_number : Max num. of files from file options [Default: 1]
    """
    # process cli
    fnames = ARGS.getany(opts, [], number=opt_number)
    positional = ARGS.positional()
    if not fnames and positional:
        fnames = positional[:arg_number]

    # build buffer
    if fnames:
        buffer = list()
        for fname in fnames:
            if fname == '-':
                buffer += read_stdin() # env.read_stdin
            elif exists(fname): # fs.exists
                buffer += File(fname).lines()
            else:
                pass # TODO: option for raising here
    else:
        buffer = read_stdin() # env.read_stdin

    return buffer

if __name__ == '__main__':
    lines = read_args_or_stdin()
    print('\n'.join(lines))