A .gitignore => .gitignore +7 -0
@@ 0,0 1,7 @@
+**/__pycache__
+**/__mypycache__
+**/*.pyc
+build
+dist
+*.egg-info
+
A Makefile => Makefile +29 -0
@@ 0,0 1,29 @@
+
+unittest = unittest --color
+#unittest = python -m unittest
+unittest_discover = unittest --color --working-directory .
+#unittest_discover = python -m unittest discover tests --top-level-directory . --start-directory
+python = python3
+
+clean:
+ rm -rf **/__pycache__ **/__mypycache__ **/*.pyc build dist *.egg-info
+
+test:
+ $(python) -m py_compile filter/*.py
+ $(unittest_discover) tests
+ MYPY_CACHE_DIR=filter/__mypycache__ mypy -p filter
+
+build:
+ gap filter/cli.toml --no-debug-mode --output=filter/cli.py
+ $(python) setup.py sdist bdist_wheel
+
+unittest:
+ $(unittest_discover) tests --verbose
+ $(unittest) tests/generated_syntax_tests.py --verbose
+
+install:
+ pipx install --spec . filter
+
+uninstall:
+ pipx uninstall filter
+
M README.md => README.md +4 -2
@@ 1,7 1,9 @@
-A filtering script
+# filter
+
+Command line data filtering
```sh
-$ filter.py -m=ab data/weights -a=.4 -b=0 -i=160 -d=1 -r
+$ filter -m=ab data/weights -a=.4 -b=0 -i=160 -d=1 -r
Testing on N=12
Using alpha=0.4000, beta=0.0000
Initial estimate of 160.0000, accelerating 1.0000 per time unit
D filter.py => filter.py +0 -148
@@ 1,148 0,0 @@
-#!/usr/bin/env python3
-
-# std library
-import random
-
-# custom library
-from mylib.cli import ARGS, read_args_or_stdin, set_verbosity
-
-# typing
-from typing import *
-
-def ab(data: List[float],
- *,
- alpha: float,
- beta: float,
- acceleration: Callable[[float], float] = lambda x: x,
- init_state: float = 0,
- init_velocity: float = 0,
- time: float = 1
- ) -> List[float]:
- """
- Alpha-beta filter - Filter out noise of measurements to estimate data.
- Takes as given:
- initial state
- initial velocity of state
- alpha and beta correction parameters
-
- Arguments:
- data : measurements of state
- alpha : correction to estimated state
- beta : correction to estimated velocity of state
- Options:
- acceleration : function f such that v1 <- f(v0) [Default: v1 <- v0]
- init_state : initial state estimate [Default: 0]
- init_velocity : initial velocity of state [Default: 0]
- time : the time unit [Default: 1]
- """
- estimates = list()
- x_last = init_state
- v_last = init_velocity
-
- for index, data_point in enumerate(data):
- #estimate given last values
- x_est = x_last + (time * v_last)
- v_est = acceleration(v_last)
-
- #correct for residual
- x_res = (data_point - x_est)
- x_est += (alpha * x_res)
- v_est += ( (beta * x_res) / time )
-
- x_last = x_est
- v_last = v_est
-
- estimates.append(x_est)
-
- return estimates
-
-# ^ filters
-###############################################################################
-# v internal functions
-
-def noise(data: List[float]) -> List[float]:
- """
- Introduce random noise (r in [-1,1]) to a set of data points.
- """
- def _noise_iter(data: List[float]) -> Iterator[float]:
- for d in data:
- yield d + random.uniform(-1,1)
- return list(_noise_iter(data))
-
-# ^ internal functions
-###############################################################################
-# v cli functions
-
-def print_help():
- print("Usage: filter -m=METHOD DATA")
-
-def get_data():
- data = list()
- for d in read_args_or_stdin(arg_number=-1, opt_number=-1,
- opts=('f', 'files')):
- if not len(d):
- continue
- else:
- data.append( float(d) )
- return data
-
-def report_est(data, estimations, alpha, beta, initial_state, initial_velocity):
- """
- Prints a report on the estimations to STDOUT.
- """
- print("Testing on N={}".format(len(data)))
- print("Using alpha={:.4f}, beta={:.4f}".format(alpha,beta))
- print("Initial estimate of {:.4f}, accelerating {:.4f} per time unit".format(initial_state,initial_velocity))
- print(" Actual: Est.: \n ======== ========")
- for d,e in zip(data,estimations):
- print(" {:8.4f} {:8.4f}".format(d,e))
-
-def print_est(estimations):
- """
- Prints the estimations to STDOUT with 4 decimal places of accuracy.
- """
- for e in estimations:
- print("{:.4f}".format(e))
-
-# ^ cli functions
-###############################################################################
-
-if __name__ == '__main__':
- import sys
-
- set_verbosity()
- if len(sys.argv) < 2 or ARGS.any('h', 'help'):
- print_help()
- sys.exit(0)
-
- method = ARGS.getany(('m', 'method'), number=1)
- try:
- _method = locals()[method]
- except: # if no method specified, or if bad method specified
- print(f'Invalid method {method}')
- print_help()
- sys.exit(1)
-
- data = get_data()
- if not data:
- print_help()
- sys.exit(1)
-
- alpha = ARGS.getany(('a', 'alpha'), .05, factory=float, number=1)
- beta = ARGS.getany(('b', 'beta'), .005, factory=float, number=1)
- initial = ARGS.getany(('i', 'initial'), 0, factory=float, number=1)
- delta = ARGS.getany(('d', 'delta'), 0, factory=float, number=1)
- report = ARGS.any('r', 'report')
-
- est = _method(data,
- alpha=alpha,
- beta=beta,
- init_state=initial,
- init_velocity=delta
- )
- if report:
- report_est(data, est, alpha, beta, initial, delta)
- else:
- print_est(est)
- sys.exit(0)
-
A filter/__init__.py => filter/__init__.py +0 -0
A filter/__main__.py => filter/__main__.py +56 -0
@@ 0,0 1,56 @@
+#!/usr/bin/env python3
+
+import sys
+import importlib
+
+from . import cli
+from . import internals
+
+def main():
+ _config, _positionals = cli.main(sys.argv[1:])
+
+ _help = "help" in _config.keys()
+ _method = _config.get("method", "")
+ _files = _config.get("file", [])
+ _files.extend(_positionals)
+
+ _data = {
+ "alpha": internals._try_get_float(_config, "alpha"),
+ "beta": internals._try_get_float(_config, "beta"),
+ "delta": internals._try_get_float(_config, "delta"),
+ "initial": internals._try_get_float(_config, "initial"),
+ "method": _method,
+ "report": "report" in _config.keys(),
+ "time": internals._try_get_float(_config, "time"),
+ }
+
+ if _method == "ab":
+ from . import ab as implementation
+ elif len(_method) > 0:
+ # if some methodology given but not in above list
+ internals._print_invalid_methodology(_method)
+ internals._print_usage()
+ sys.exit(1)
+
+ if _help and len(_method) == 0:
+ # requesting help
+ internals._print_help()
+ sys.exit(0)
+ elif _help:
+ # requesting help with a methodology
+ sys.stdout.print(implementation.__doc__)
+ sys.exit(0)
+ elif len(_method) == 0:
+ # not requesting help, but still no methodology
+ internals._print_usage()
+ sys.exit(1)
+
+ _data["data_raw"] = internals._get_raw_data(_files)
+ implementation.cli_wrapper(**_data)
+
+
+ sys.exit(0)
+
+if __name__ == "__main__":
+ main()
+
A filter/ab.py => filter/ab.py +124 -0
@@ 0,0 1,124 @@
+#!/usr/bin/env python3
+
+"""filter -m=ab [OPTIONS] <DATA>
+Alpha-beta filter - Filter out noise of measurements to estimate data.
+
+Options:
+ -a, --alpha correction to estimated state [Default: 0.05]
+ -b, --beta correction to estimated velocity of state [Default: 0.005]
+ -d, --delta initial velocity [Default: 0]
+ -i, --inital initial state [Default: 0]
+ -t, --time unit of time [Default: 1]
+
+Currently, assumes the function of acceleration (i.e. `v1 <- f(v0)`) is
+`v1 <- v0`.
+"""
+
+__all__ = ['cli_wrapper', 'filter', 'report']
+
+import sys
+
+from typing import Callable, List, Dict, Iterator
+
+
+def cli_wrapper(**data: Dict):
+ """Handler for the alpha-beta filter. Checks and cleans given options,
+ and performs optional reporting.
+ """
+ _alpha = data["alpha"] if data["alpha"] is not None else 0.05
+ _beta = data["beta"] if data["beta"] is not None else 0.005
+ _init_state = data["initial"] if data["initial"] is not None else 0
+ _init_velocity = data["delta"] if data["delta"] is not None else 0
+ _time = data["time"] if data["time"] is not None else 1.0
+ _raw = data["data_raw"]
+
+ _acceleration = lambda x: x #constant acceleration
+
+ _filter = filter(
+ _raw,
+ _alpha,
+ _beta,
+ _acceleration,
+ _init_state,
+ _init_velocity,
+ _time,
+ )
+
+ if data["report"]:
+ sys.stdout.write(
+ report_header(_alpha, _beta, _init_state, _init_velocity)
+ )
+ for actual, estimated in zip(_raw, _filter):
+ sys.stdout.write("{0:8.4f} {1:8.4f}\n".format(actual, estimated))
+ else:
+ for estimated in _filter:
+ sys.stdout.write("{0:.4f}\n".format(estimated))
+
+
+def filter(
+ data: List[float],
+ alpha: float,
+ beta: float,
+ acceleration: Callable[[float], float],
+ init_state: float,
+ init_velocity: float,
+ time: float,
+) -> Iterator[float]:
+ """Iterate over data, passing it through an alpha-beta filter.
+
+ Arguments:
+ data measurement from each time interval
+ alpha correction to estimated state
+ beta correction to estimated velocity
+ acceleration function of v1 <- v0
+ init_state initial estimate of state
+ init_velocity initial estimate of velocity
+ time time unit
+ """
+ x_last = init_state
+ v_last = init_velocity
+ for index, data_point in enumerate(data):
+ #estimate given last values
+ x_est = x_last + (time * v_last)
+ v_est = acceleration(v_last)
+
+ #correct for residual
+ x_res = (data_point - x_est)
+ x_est += (alpha * x_res)
+ v_est += ( (beta * x_res) / time )
+
+ x_last = x_est
+ v_last = v_est
+
+ yield x_est
+
+
+def report_header(
+ alpha: float,
+ beta: float,
+ init_state: float,
+ init_velocity: float,
+) -> str:
+ """Draw a report header summarizing the filter.
+
+ Appears as:
+ ```
+ Alpha-beta filter
+ α=<alpha>,β=<beta>
+ Initial estimate <init_state> changing <init_velocity> per time unit
+ Actual: Est.:
+ ======== ========
+ ```
+ The estimates then should be printed alongside the raw measurements.
+ """
+ _msg = (
+ "Alpha-beta filter",
+ " α={0:.4f}, β={1:.4f}".format(alpha, beta),
+ " Initial estimate: {0:.4f} changing {1:.4f} per time unit".format(
+ init_state, init_velocity,
+ ),
+ "Actual: Est.:",
+ "======== ========",
+ )
+ return "\n".join(_msg) + "\n"
+
A filter/cli.py => filter/cli.py +198 -0
@@ 0,0 1,198 @@
+#!/usr/bin/env python3
+
+import re
+
+def main(arguments):
+ config=dict()
+ positional=[]
+ pattern=re.compile(r"(?:-(?:a|b|d|f|h|x|i|m|r|t)|--(?:alpha|beta|delta|file|help|initial|method|report|time))(?:=.*)?$")
+ consuming,needing,wanting=None,0,0
+ attached_value=None
+ while len(arguments) and arguments[0]!="--":
+ if consuming is not None:
+ if config[consuming] is None:
+ config[consuming]=arguments.pop(0)
+ else:
+ config[consuming].append(arguments.pop(0))
+ needing-=1
+ wanting-=1
+ if wanting==0:
+ consuming,needing,wanting=None,0,0
+ elif pattern.match(arguments[0]):
+ option = arguments.pop(0).lstrip('-')
+ if '=' in option:
+ option,attached_value=option.split('=',1)
+ if option=="alpha":
+ if attached_value is not None:
+ config["alpha"]=attached_value
+ attached_value=None
+ consuming,needing,wanting=None,0,0
+ else:
+ config["alpha"]=None
+ consuming,needing,wanting="alpha",1,1
+ elif option=="beta":
+ if attached_value is not None:
+ config["beta"]=attached_value
+ attached_value=None
+ consuming,needing,wanting=None,0,0
+ else:
+ config["beta"]=None
+ consuming,needing,wanting="beta",1,1
+ elif option=="delta":
+ if attached_value is not None:
+ config["delta"]=attached_value
+ attached_value=None
+ consuming,needing,wanting=None,0,0
+ else:
+ config["delta"]=None
+ consuming,needing,wanting="delta",1,1
+ elif option=="file":
+ if attached_value is not None:
+ config["file"]=[attached_value]
+ consuming,needing,wanting="file",0,8
+ attached_value=None
+ else:
+ config["file"]=[]
+ consuming,needing,wanting="file",0,8
+ elif option=="help":
+ if attached_value is not None:
+ message=(
+ 'unexpected value while parsing "help"'
+ ' (expected 0 values)'
+ )
+ raise ValueError(message) from None
+ config["help"]=True
+ elif option=="initial":
+ if attached_value is not None:
+ config["initial"]=attached_value
+ attached_value=None
+ consuming,needing,wanting=None,0,0
+ else:
+ config["initial"]=None
+ consuming,needing,wanting="initial",1,1
+ elif option=="method":
+ if attached_value is not None:
+ config["method"]=attached_value
+ attached_value=None
+ consuming,needing,wanting=None,0,0
+ else:
+ config["method"]=None
+ consuming,needing,wanting="method",1,1
+ elif option=="report":
+ if attached_value is not None:
+ message=(
+ 'unexpected value while parsing "report"'
+ ' (expected 0 values)'
+ )
+ raise ValueError(message) from None
+ config["report"]=True
+ elif option=="time":
+ if attached_value is not None:
+ config["time"]=attached_value
+ attached_value=None
+ consuming,needing,wanting=None,0,0
+ else:
+ config["time"]=None
+ consuming,needing,wanting="time",1,1
+ elif option=="a":
+ if attached_value is not None:
+ config["alpha"]=attached_value
+ attached_value=None
+ consuming,needing,wanting=None,0,0
+ else:
+ config["alpha"]=None
+ consuming,needing,wanting="alpha",1,1
+ elif option=="b":
+ if attached_value is not None:
+ config["beta"]=attached_value
+ attached_value=None
+ consuming,needing,wanting=None,0,0
+ else:
+ config["beta"]=None
+ consuming,needing,wanting="beta",1,1
+ elif option=="d":
+ if attached_value is not None:
+ config["delta"]=attached_value
+ attached_value=None
+ consuming,needing,wanting=None,0,0
+ else:
+ config["delta"]=None
+ consuming,needing,wanting="delta",1,1
+ elif option=="f":
+ if attached_value is not None:
+ config["file"]=[attached_value]
+ consuming,needing,wanting="file",0,8
+ attached_value=None
+ else:
+ config["file"]=[]
+ consuming,needing,wanting="file",0,8
+ elif option=="h":
+ if attached_value is not None:
+ message=(
+ 'unexpected value while parsing "help"'
+ ' (expected 0 values)'
+ )
+ raise ValueError(message) from None
+ config["help"]=True
+ elif option=="x":
+ if attached_value is not None:
+ message=(
+ 'unexpected value while parsing "help"'
+ ' (expected 0 values)'
+ )
+ raise ValueError(message) from None
+ config["help"]=True
+ elif option=="i":
+ if attached_value is not None:
+ config["initial"]=attached_value
+ attached_value=None
+ consuming,needing,wanting=None,0,0
+ else:
+ config["initial"]=None
+ consuming,needing,wanting="initial",1,1
+ elif option=="m":
+ if attached_value is not None:
+ config["method"]=attached_value
+ attached_value=None
+ consuming,needing,wanting=None,0,0
+ else:
+ config["method"]=None
+ consuming,needing,wanting="method",1,1
+ elif option=="r":
+ if attached_value is not None:
+ message=(
+ 'unexpected value while parsing "report"'
+ ' (expected 0 values)'
+ )
+ raise ValueError(message) from None
+ config["report"]=True
+ elif option=="t":
+ if attached_value is not None:
+ config["time"]=attached_value
+ attached_value=None
+ consuming,needing,wanting=None,0,0
+ else:
+ config["time"]=None
+ consuming,needing,wanting="time",1,1
+ else:
+ positional.append(arguments.pop(0))
+ if needing>0:
+ message=(
+ f'unexpected end while parsing "{consuming}"'
+ f' (expected {needing} values)'
+ )
+ raise ValueError(message) from None
+ for argument in arguments[1:]:
+ positional.append(argument)
+ return config,positional
+
+if __name__=="__main__":
+ import sys
+ cfg,pos = main(sys.argv[1:])
+ cfg = {k:v for k,v in cfg.items() if v is not None}
+ if len(cfg):
+ print("Options:")
+ for k,v in cfg.items():
+ print(f"{k:20} = {v}")
+ if len(pos):
+ print("Positional arguments:", ", ".join(pos))
A filter/cli.toml => filter/cli.toml +37 -0
@@ 0,0 1,37 @@
+[alpha]
+number = 1
+alternatives = ['a']
+
+[beta]
+number = 1
+alternatives = ['b']
+
+[delta]
+number = 1
+alternatives = ['d']
+
+[file]
+minimum = 1
+maximum = 9
+alternatives = ['f']
+
+[help]
+number = 0
+alternatives = ['h', 'x']
+
+[initial]
+number = 1
+alternatives = ['i']
+
+[method]
+number = 1
+alternatives = ['m']
+
+[report]
+number = 0
+alternatives = ['r']
+
+[time]
+number = 1
+alternatives = ['t']
+
A filter/internals.py => filter/internals.py +90 -0
@@ 0,0 1,90 @@
+#!/usr/bin/env python3
+
+import sys
+import random
+
+from typing import *
+
+def _try_get_float(
+ mapping: Dict,
+ key: str,
+ *,
+ default: Optional[float] = None,
+) -> Optional[float]:
+ if key in mapping:
+ return float(mapping[key])
+ else:
+ return default
+
+def _read_stdin() -> Iterator[float]:
+ try:
+ for line in sys.stdin.readlines():
+ line = line.strip()
+ if len(line) == 0:
+ continue
+ try:
+ yield float(line.strip())
+ except:
+ _print_invalid_data(line)
+ except KeyboardInterrupt:
+ sys.stdout.write("\n")
+
+def _read_file(filename) -> Iterator[float]:
+ try:
+ with open(filename, 'r') as f:
+ for line in f.readlines():
+ line = line.strip()
+ if len(line) == 0:
+ continue
+ try:
+ yield float(line)
+ except:
+ _print_invalid_data(line)
+ except OSError:
+ _print_invalid_file(filename)
+
+def _get_raw_data(filenames: List[str]) -> List[float]:
+ raw_data: List[float] = list()
+
+ if len(filenames) == 0:
+ raw_data.extend(list(_read_stdin()))
+ else:
+ for filename in filenames:
+ if filename == '-':
+ raw_data.extend(list(_read_stdin()))
+ else:
+ raw_data.extend(list(_read_file(filename)))
+
+ return raw_data
+
+def noise(data: List[float]) -> List[float]:
+ """
+ Introduce random noise (r in [-1,1]) to a set of data points.
+ """
+ def _noise_iter(data: List[float]) -> Iterator[float]:
+ for d in data:
+ yield d + random.uniform(-1,1)
+ return list(_noise_iter(data))
+
+def _print_help() -> None:
+ _msg = "Usage: filter -m=METHOD DATA\n"
+ sys.stdout.write(_msg)
+
+def _print_usage() -> None:
+ _msg = "Usage: filter -m=METHOD DATA\n"
+ sys.stderr.write(_msg)
+
+def _print_invalid_methodology(method: str) -> None:
+ _msg = "{0}: Invalid methodology '{1}'\n".format(sys.argv[0], method)
+ sys.stderr.write(_msg)
+
+def _print_invalid_file(filename: str) -> None:
+ _msg = "{0}: Invalid file '{1}'\n".format(sys.argv[0], filename)
+ sys.stderr.write(_msg)
+
+def _print_invalid_data(line: str) -> None:
+ _msg = "{0}: Cannot convert '{1}' into numeric value\n".format(
+ sys.argv[0], line,
+ )
+ sys.stderr.write(_msg)
+
A requirements.txt => requirements.txt +2 -0
@@ 0,0 1,2 @@
+setuptools>=47.1.1
+
A setup.py => setup.py +23 -0
@@ 0,0 1,23 @@
+#!/usr/bin/env python3
+
+from setuptools import setup
+
+long_description = None
+with open('README.md', encoding='utf-8') as f:
+ long_description = f.read()
+
+setup(
+ name="filter",
+ packages=["filter"],
+ version="1.0.0",
+ license="GPL",
+ description="Data filters",
+ long_description=long_description,
+ long_description_content_type='text/markdown',
+ author="Dominic Ricottone",
+ author_email="me@dominic-ricottone.com",
+ url="git.dominic-ricottone.com/gap",
+ entry_points={"console_scripts": ["filter = filter.__main__:main"]},
+ python_requires=">=3.6",
+)
+
D test.py => test.py +0 -52
@@ 1,52 0,0 @@
-#!/usr/bin/env python3.7
-
-# custom lib
-from mylib.env import ENV, read_stdin
-from mylib.cli import ARGS
-from mylib.fs import File, exists
-
-# typing
-from typing import *
-
-
-def read_args_or_stdin(*,
- arg_number: int = 1,
- opts: Tuple[str, ...] = ('f', 'files'),
- opt_number: int = 1 ) -> List[str]:
- """
- Input handler for well-behaved scripts.
-
- Checks for '-f' or '--file' command line options and N file names following
- them. Else check for the N first positional argument. Else assume STDIN. If
- '-' is found at any step, STDIN is used in place.
-
- Options:
- arg_number : Max num. of files from positional arguments [Default: 1]
- opts : File option flags [Default: 'f', 'file']
- opt_number : Max num. of files from file options [Default: 1]
- """
- # process cli
- fnames = ARGS.getany(opts, [], number=opt_number)
- positional = ARGS.positional()
- if not fnames and positional:
- fnames = positional[:arg_number]
-
- # build buffer
- if fnames:
- buffer = list()
- for fname in fnames:
- if fname == '-':
- buffer += read_stdin() # env.read_stdin
- elif exists(fname): # fs.exists
- buffer += File(fname).lines()
- else:
- pass # TODO: option for raising here
- else:
- buffer = read_stdin() # env.read_stdin
-
- return buffer
-
-if __name__ == '__main__':
- lines = read_args_or_stdin()
- print('\n'.join(lines))
-