From 8479aea2db66090f2b0db01a1c42593975a3ba0e Mon Sep 17 00:00:00 2001 From: Dominic Ricottone Date: Sat, 20 Jun 2020 21:02:52 -0400 Subject: [PATCH] Added rng package, convolve + kalman filters --- filter/convolve.py | 91 +++++++++++++++++++ filter/kalman.py | 186 +++++++++++++++++++++++++++++++++++++ plot.sh | 15 +++ rng/__init__.py | 0 rng/__main__.py | 59 ++++++++++++ rng/cli.py | 222 +++++++++++++++++++++++++++++++++++++++++++++ rng/cli.toml | 42 +++++++++ rng/internals.py | 55 +++++++++++ rng/normal.py | 73 +++++++++++++++ rng/notrandom.py | 66 ++++++++++++++ rng/uniform.py | 75 +++++++++++++++ 11 files changed, 884 insertions(+) create mode 100644 filter/convolve.py create mode 100644 filter/kalman.py create mode 100755 plot.sh create mode 100644 rng/__init__.py create mode 100644 rng/__main__.py create mode 100644 rng/cli.py create mode 100644 rng/cli.toml create mode 100644 rng/internals.py create mode 100644 rng/normal.py create mode 100644 rng/notrandom.py create mode 100644 rng/uniform.py diff --git a/filter/convolve.py b/filter/convolve.py new file mode 100644 index 0000000..64094b3 --- /dev/null +++ b/filter/convolve.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python3 + +"""filter convolve [OPTIONS] DATA +Convolution filter - Filter out noise of measurements to estimate data. + +Options: + -k KERNEL, filter data through KERNEL, where KERNEL is one or more + --kernel KERNEL numeric factors [Default: 1.0] +""" + +__all__ = ['cli_wrapper', 'filter', 'report'] + +import sys +from typing import List, Dict, Iterator + +def cli_wrapper( + **data: Dict, +) -> None: + """Handler for the convolution filter. Checks and cleans given options, + and performs optional reporting. + """ + _kernel = _normalize( + data["kernel"] if data["kernel"] is not None else [1.0] + ) + _raw = data["data_raw"] + + _filter = filter( + _raw, + _kernel, + ) + + if data["report"]: + sys.stdout.write(report_header(_kernel)) + for measured, estimated in zip(_raw, _filter): + sys.stdout.write("{0:8.4f} {1:8.4f}\n".format(measured, estimated)) + else: + for estimated in _filter: + sys.stdout.write("{0:.4f}\n".format(estimated)) + +def filter( + data: List[float], + kernel: List[float] +) -> Iterator[float]: + """Iterate over data, passing it through the kernel. + + Arguments: + data measurements + kernel measurement adjustments + """ + length = len(data) + #NOTE: for evenly-sized kernels, extra goes to top and left + offset = len(kernel) // 2 + + for index in range(length): + _sum = 0.0 + for kernel_index, kernel_point in enumerate(kernel, start=-offset): + target = (index + kernel_index) % length + _sum += data[target] * kernel_point + yield _sum + +def report_header( + kernel: List[float], +) -> str: + """Draw a report header summarizing the filter. + + Appears as: + ``` + Convolution filter + kernel=[ ... ] + Raw: Est.: + ======== ======== + ``` + + The estimates then should be printed alongside the raw measurements. + """ + _msg = ( + "Convolution filter", + " kernel={0}".format(kernel), + "Raw: Est.:", + "======== ========", + ) + return "\n".join(_msg) + "\n" + +def _normalize(kernel: List[float]) -> List[float]: + _sum = sum(kernel) + if _sum == 1: + return kernel + else: + weight = 1.0/_sum + return [factor * weight for factor in kernel] + diff --git a/filter/kalman.py b/filter/kalman.py new file mode 100644 index 0000000..71bbbba --- /dev/null +++ b/filter/kalman.py @@ -0,0 +1,186 @@ +#!/usr/bin/env python3 + +"""filter kalman [OPTIONS] DATA +Kalman filter - Filter out normally-distributed noise of measurements to +estimate data. + +Options: + -d, --delta initial velocity of state per time unit [Default: 0] + -i, --inital initial estimate of state [Default: 0] + -s, --sigma initial std. deviation of state distribution [Default: 1] + -v, --variance variance of data measurements [Default: 1] + +Currently assumed that acceleration is constant and that velocity is non- +variate. +""" + +__all__ = ['cli_wrapper', 'filter', 'report'] + +import sys +from typing import Callable, List, Dict, Iterator, Tuple + +def cli_wrapper(**data: Dict): + """Handler for the Kalman filter. Checks and cleans given options, + and performs optional reporting. + """ + _raw = data["data_raw"] + _variance = data["variance"] if data["variance"] is not None else 1 + _init_state_mu = data["initial_estimate"] + _init_state_sigma = data["initial_std_deviation"] + _init_velocity_mu = data["delta"] if data["delta"] is not None else 0 + + _init_velocity_sigma = 0 #non-variate velocity + _time = 1.0 #constant time unit + _acceleration = lambda x: x #constant acceleration + + + _filter = filter( + _raw, + _variance, + _init_state_mu, + _init_state_sigma, + _init_velocity_mu, + _init_velocity_sigma, + _acceleration, + _time, + ) + + if data["report"]: + sys.stdout.write( + report_header( + _variance, + _init_state_mu, + _init_state_sigma, + _init_velocity_mu, + _init_velocity_sigma, + _acceleration, + _time, + ), + ) + for measured, filtered in zip(_raw, _filter): + estimated, variance = filtered + sys.stdout.write( + "{0:8.4f} {1:8.4f} {2:8.4f}\n".format( + measured, estimated, variance, + ), + ) + else: + for estimated, _ in _filter: + sys.stdout.write("{0:.4f}\n".format(estimated)) + +def filter( + data: List[float], + variance: float, + init_state_mu: float, + init_state_sigma: float, + init_velocity_mu: float, + init_velocity_sigma: float, + acceleration: Callable[[Tuple[float, float]], Tuple[float,float]], + time: float, +) -> Iterator[Tuple[float,float]]: + """Iterate over data, passing it through an alpha-beta filter. + + Arguments: + data measurement from each time interval + variance variance of measurements + init_state_mu initial estimate of state + init_state_sigma std. deviation of state distribution + init_velocity_mu initial estimate of velocity + init_velocity_sigma std. deviation of velocity distribution + acceleration function of v1 <- v0 + time time unit + """ + # normal distributions as tuples: (E(x), Var(x), ) + estimated = (init_state_mu, init_state_sigma**2, ) + velocity = (init_velocity_mu, init_velocity_sigma**2, ) + + for measurement in data: + estimated = _add(estimated, _multiply(velocity, (time,0,))) + velocity = acceleration(velocity) + + estimated = _multiply(estimated, (measurement, variance, )) + + yield estimated + +def report_header( + variance: float, + init_state_mu: float, + init_state_sigma: float, + init_velocity_mu: float, + init_velocity_sigma: float, + acceleration: Callable[[float], float], + time: float, +) -> str: + """Draw a report header summarizing the filter. + + Appears as: + + ``` + ``` + + The estimates and variances then should be printed alongside the raw + measurements. + """ + _msg = ( + "Kalman filter", + "Raw: Est.: Var.:", + "======== ======== ========", + ) + return "\n".join(_msg) + "\n" + +def _add( + x: Tuple[float, float], + y: Tuple[float, float], +) -> Tuple[float, float]: + """Add a normal distribution to another normal distribution or a constant. + + In the case of two normal distributions (X and Y), the resultant values + are: + E(X + Y) = E(X) + E(Y) + Var(X + Y) = Var(X) + Var(Y) + + In the case of a normal distribution (X) and a constant (C), the resultant + values are: + E(X + C) = E(X) + C + Var(X + C) = Var(X) + """ + if y[1] == 0: + z = (x[0]+y[0], x[1], ) + else: + z = (x[0]+y[0], x[1]+y[1], ) + return z + +def _multiply( + x: Tuple[float, float], + y: Tuple[float, float], +) -> Tuple[float, float]: + """Multiply a normal distribution by another normal distribution or by a + constant. + + Given two normal distributions (X and Y), the resultant values are: + E(X * Y) = (Var(Y)E(X) + Var(X)E(Y)) / (Var(X) + Var(Y)) + Var(X * Y) = (Var(X)Var(Y)) / (Var(X) + Var(Y)) + + Given a normal distribution (X) and a constant (C), the resultant values + are: + E(X * C) = E(X) * C + Var(X * C) = Var(X) * (C^2) + """ + #_print_normal_distribution(x, "X") + #_print_normal_distribution(y, "Y") + if y[1] == 0: + z = ((x[0] * y[0]), (x[1] * y[0]**2), ) + else: + _denom = x[1] + y[1] + _mean = ((x[0] * y[1]) + (y[0] * x[1])) / _denom + _var = (x[1] * y[1]) / _denom + z = (_mean, _var, ) + #_print_normal_distribution(z, "Z") + return z + +def _print_normal_distribution( + x: Tuple[float, float], + name: str, +) -> None: + sys.stdout.write("{0} = N({1},{2})\n".format(name, x[0], x[1])) + diff --git a/plot.sh b/plot.sh new file mode 100755 index 0000000..0e645b0 --- /dev/null +++ b/plot.sh @@ -0,0 +1,15 @@ +#!/bin/sh + +tmpd="$(mktemp -d)" +trap 'rm -rf "${tmpd}"' EXIT + +mkfifo "${tmpd}/f1" +rng uniform -m 5 -d 2 -n 100 \ + | tee "${tmpd}/f1" \ + | filter ab -i 100 -d 2 -a 0.2 -b 0.02 \ + | paste "${tmpd}/f1" - \ + | awk 'BEGIN{OFS="\t"; print "measurements", "estimates"} {print $1, $2}' \ + | gnuplot -p -e "set terminal dumb; set autoscale; plot '-' using 1:2 with lines notitle" + +rm -rf "${tmpd}" + diff --git a/rng/__init__.py b/rng/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/rng/__main__.py b/rng/__main__.py new file mode 100644 index 0000000..dccc0ce --- /dev/null +++ b/rng/__main__.py @@ -0,0 +1,59 @@ +#!/usr/main/env python3 + +import sys + +from . import cli +from . import internals + +def main(): + _config, _positionals = cli.main(sys.argv[1:]) + + if "version" in _config.keys(): + internals._print_version() + sys.exit(0) + elif "list-distributions" in _config.keys(): + internals._print_distributions("normal", "uniform", "notrandom") + sys.exit(0) + elif "distribution" in _config.keys(): + _dist = config.get("distribution", "") + elif len(_positionals) > 0: + _dist = _positionals.pop(0) + elif "help" in _config.keys(): + internals._print_help() + sys.exit(0) + else: + internals._print_usage() + sys.exit(1) + + _data = { + "delta": internals._try_get_float(_config, "delta"), + "distribution": _dist, + "initial": internals._try_get_float(_config, "initial"), + "mu": internals._try_get_float(_config, "mu"), + "number": internals._try_get_int(_config, "number"), + "offset": internals._try_get_float(_config, "offset"), + "report": "report" in _config.keys(), + "sigma": internals._try_get_float(_config, "sigma"), + } + + if _dist == "uniform": + from . import uniform as implementation + elif _dist == "normal": + from . import normal as implementation + elif _dist == "notrandom": + from . import notrandom as implementation + elif len(_dist) > 0: + internals._print_invalid_distribution(_dist) + internals._print_usage() + sys.exit(1) + if "help" in _config.keys(): + sys.stdout.write(implementation.__doc__) + sys.exit(0) + + implementation.cli_wrapper(**_data) + + sys.exit(0) + +if __name__ == "__main__": + main() + diff --git a/rng/cli.py b/rng/cli.py new file mode 100644 index 0000000..b173f7a --- /dev/null +++ b/rng/cli.py @@ -0,0 +1,222 @@ +#!/usr/bin/env python3 + +import re + +def main(arguments): + config=dict() + positional=[] + pattern=re.compile(r"(?:-(?:d|h|x|i|m|n|o|r|s|v|V)|--(?:delta|distribution|help|initial|list-distributions|mu|number|offset|report|sigma|version))(?:=.*)?$") + consuming,needing,wanting=None,0,0 + attached_value=None + while len(arguments) and arguments[0]!="--": + if consuming is not None: + if config[consuming] is None: + config[consuming]=arguments.pop(0) + else: + config[consuming].append(arguments.pop(0)) + needing-=1 + wanting-=1 + if wanting==0: + consuming,needing,wanting=None,0,0 + elif pattern.match(arguments[0]): + option = arguments.pop(0).lstrip('-') + if '=' in option: + option,attached_value=option.split('=',1) + if option=="delta": + if attached_value is not None: + config["delta"]=attached_value + attached_value=None + consuming,needing,wanting=None,0,0 + else: + config["delta"]=None + consuming,needing,wanting="delta",1,1 + elif option=="distribution": + if attached_value is not None: + config["distribution"]=attached_value + attached_value=None + consuming,needing,wanting=None,0,0 + else: + config["distribution"]=None + consuming,needing,wanting="distribution",1,1 + elif option=="help": + if attached_value is not None: + message=( + 'unexpected value while parsing "help"' + ' (expected 0 values)' + ) + raise ValueError(message) from None + config["help"]=True + elif option=="initial": + if attached_value is not None: + config["initial"]=attached_value + attached_value=None + consuming,needing,wanting=None,0,0 + else: + config["initial"]=None + consuming,needing,wanting="initial",1,1 + elif option=="list-distributions": + if attached_value is not None: + message=( + 'unexpected value while parsing "list-distributions"' + ' (expected 0 values)' + ) + raise ValueError(message) from None + config["list-distributions"]=True + elif option=="mu": + if attached_value is not None: + config["mu"]=attached_value + attached_value=None + consuming,needing,wanting=None,0,0 + else: + config["mu"]=None + consuming,needing,wanting="mu",1,1 + elif option=="number": + if attached_value is not None: + config["number"]=attached_value + attached_value=None + consuming,needing,wanting=None,0,0 + else: + config["number"]=None + consuming,needing,wanting="number",1,1 + elif option=="offset": + if attached_value is not None: + config["offset"]=attached_value + attached_value=None + consuming,needing,wanting=None,0,0 + else: + config["offset"]=None + consuming,needing,wanting="offset",1,1 + elif option=="report": + if attached_value is not None: + message=( + 'unexpected value while parsing "report"' + ' (expected 0 values)' + ) + raise ValueError(message) from None + config["report"]=True + elif option=="sigma": + if attached_value is not None: + config["sigma"]=attached_value + attached_value=None + consuming,needing,wanting=None,0,0 + else: + config["sigma"]=None + consuming,needing,wanting="sigma",1,1 + elif option=="version": + if attached_value is not None: + message=( + 'unexpected value while parsing "version"' + ' (expected 0 values)' + ) + raise ValueError(message) from None + config["version"]=True + elif option=="d": + if attached_value is not None: + config["delta"]=attached_value + attached_value=None + consuming,needing,wanting=None,0,0 + else: + config["delta"]=None + consuming,needing,wanting="delta",1,1 + elif option=="h": + if attached_value is not None: + message=( + 'unexpected value while parsing "help"' + ' (expected 0 values)' + ) + raise ValueError(message) from None + config["help"]=True + elif option=="x": + if attached_value is not None: + message=( + 'unexpected value while parsing "help"' + ' (expected 0 values)' + ) + raise ValueError(message) from None + config["help"]=True + elif option=="i": + if attached_value is not None: + config["initial"]=attached_value + attached_value=None + consuming,needing,wanting=None,0,0 + else: + config["initial"]=None + consuming,needing,wanting="initial",1,1 + elif option=="m": + if attached_value is not None: + config["mu"]=attached_value + attached_value=None + consuming,needing,wanting=None,0,0 + else: + config["mu"]=None + consuming,needing,wanting="mu",1,1 + elif option=="n": + if attached_value is not None: + config["number"]=attached_value + attached_value=None + consuming,needing,wanting=None,0,0 + else: + config["number"]=None + consuming,needing,wanting="number",1,1 + elif option=="o": + if attached_value is not None: + config["offset"]=attached_value + attached_value=None + consuming,needing,wanting=None,0,0 + else: + config["offset"]=None + consuming,needing,wanting="offset",1,1 + elif option=="r": + if attached_value is not None: + message=( + 'unexpected value while parsing "report"' + ' (expected 0 values)' + ) + raise ValueError(message) from None + config["report"]=True + elif option=="s": + if attached_value is not None: + config["sigma"]=attached_value + attached_value=None + consuming,needing,wanting=None,0,0 + else: + config["sigma"]=None + consuming,needing,wanting="sigma",1,1 + elif option=="v": + if attached_value is not None: + message=( + 'unexpected value while parsing "version"' + ' (expected 0 values)' + ) + raise ValueError(message) from None + config["version"]=True + elif option=="V": + if attached_value is not None: + message=( + 'unexpected value while parsing "version"' + ' (expected 0 values)' + ) + raise ValueError(message) from None + config["version"]=True + else: + positional.append(arguments.pop(0)) + if needing>0: + message=( + f'unexpected end while parsing "{consuming}"' + f' (expected {needing} values)' + ) + raise ValueError(message) from None + for argument in arguments[1:]: + positional.append(argument) + return config,positional + +if __name__=="__main__": + import sys + cfg,pos = main(sys.argv[1:]) + cfg = {k:v for k,v in cfg.items() if v is not None} + if len(cfg): + print("Options:") + for k,v in cfg.items(): + print(f"{k:20} = {v}") + if len(pos): + print("Positional arguments:", ", ".join(pos)) diff --git a/rng/cli.toml b/rng/cli.toml new file mode 100644 index 0000000..d9efce2 --- /dev/null +++ b/rng/cli.toml @@ -0,0 +1,42 @@ +[delta] +number = 1 +alternatives = ['d'] + +[distribution] +number = 1 + +[help] +number = 0 +alternatives = ['h', 'x'] + +[initial] +number = 1 +alternatives = ['i'] + +[list-distributions] +number = 0 + +[mu] +number = 1 +alternatives = ['m'] + +[number] +number = 1 +alternatives = ['n'] + +[offset] +number = 1 +alternatives = ['o'] + +[report] +number = 0 +alternatives = ['r'] + +[sigma] +number = 1 +alternatives = ['s'] + +[version] +number = 0 +alternatives = ['v', 'V'] + diff --git a/rng/internals.py b/rng/internals.py new file mode 100644 index 0000000..74950da --- /dev/null +++ b/rng/internals.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python3 + +import sys +from typing import * + +VERSION = (1,0,1,) + +def _try_get_float( + mapping: Dict, + key: str, + *, + default: Optional[float] = None, +) -> Optional[float]: + if key in mapping: + return float(mapping[key]) + else: + return default + +def _try_get_int( + mapping: Dict, + key: str, + *, + default: Optional[float] = None, +) -> Optional[float]: + if key in mapping: + return int(mapping[key]) + else: + return default + +def _print_help() -> None: + _msg = "Usage: rng DISTRIBUTION [OPTIONS]\n" + sys.stdout.write(_msg) + +def _print_version() -> None: + _msg = "rng {0}\n".format(".".join(str(v) for v in VERSION)) + sys.stdout.write(_msg) + +def _print_distributions(*dist: str) -> None: + _msg = "Valid distributions: {0}\n".format(", ".join(dist)) + sys.stdout.write(_msg) + +def _print_usage() -> None: + _msg = ( + "Usage: rng DISTRIBUTION [OPTIONS]", + "Try `rng --list-distributions` and `rng DISTRIBUTION --help`", + ) + sys.stderr.write("\n".join(_msg) + "\n") + +def _print_invalid_distribution(dist: str) -> None: + _msg = ( + "{0}: Invalid distribution '{1}'".format(sys.argv[0], dist), + "Try `rng --list-distributions`", + ) + sys.stderr.write("\n".join(_msg) + "\n") + diff --git a/rng/normal.py b/rng/normal.py new file mode 100644 index 0000000..e6b1c3e --- /dev/null +++ b/rng/normal.py @@ -0,0 +1,73 @@ +#!/usr/bin/env python3 + +"""rng normal [OPTIONS] +Uniform distribution - Generate random data from a normal distribution. + +Options: + -d, --delta velocity of average per time unit [Default: 0] + -m, --mu average of distribution [Default: 0] + -n, --number number of random data points to generate [Default: 10] + -s, --sigma standard deviation of distribution [Default: 1] + +Currently assumed that sigma is constant over time. +""" + +import sys +import random +import itertools +from typing import Callable, List, Dict, Iterator + +def cli_wrapper(**data: Dict): + """Handler for the uniform distribution. Checks and cleans given options, + and performs optional reporting. + """ + _number = data["number"] if data["number"] is not None else 10 + _init_mu = data["mu"] if data["mu"] is not None else 0.0 + _sigma = data["sigma"] if data["sigma"] is not None else 1.0 + _init_velocity = data["delta"] if data["delta"] is not None else 0.0 + _acceleration = lambda x: x #constant acceleration + + _distribution = distribution( + _init_mu, + _sigma, + _init_velocity, + _acceleration, + ) + + if data["report"]: + sys.stdout.write( + report_header(_init_mu, _sigma, _init_velocity, _acceleration) + ) + if _number > 0: + for number in itertools.islice(_distribution, _number): + sys.stdout.write("{0:.4f}\n".format(number)) + +def distribution( + init_mu: float, + sigma: float, + init_velocity: float, + acceleration: Callable[[float], float], +) -> Iterator[float]: + mu = init_mu + velocity = init_velocity + while True: + yield random.normalvariate(mu, sigma) + mu += velocity + velocity = acceleration(velocity) + +# use this in a report function later: μ± +def report_header( + init_mu: float, + sigma: float, + init_velocity: float, + acceleration: Callable[[float], float], +) -> str: + _msg = ( + "Normal distribution", + " μ={0:.4f}, σ={1:.4f}".format(init_mu, sigma), + " dμ/dt={0:.4f}".format(init_velocity), + "Obs.:", + "========", + ) + return "\n".join(_msg) + "\n" + diff --git a/rng/notrandom.py b/rng/notrandom.py new file mode 100644 index 0000000..2c684d5 --- /dev/null +++ b/rng/notrandom.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python3 + +"""rng notrandom [OPTIONS] +Generate non-random data. + +Options: + -d, --delta velocity of state per time unit [Default: 0] + -i, --initial initial state [Default: 0] + -n, --number number of data points to generate [Default: 10] +""" + +import sys +import itertools +from typing import Callable, List, Dict, Iterator + +def cli_wrapper(**data: Dict): + """Handler for the uniform distribution. Checks and cleans given options, + and performs optional reporting. + """ + _number = data["number"] if data["number"] is not None else 10 + _init_state = data["initial"] if data["initial"] is not None else 0.0 + _init_velocity = data["delta"] if data["delta"] is not None else 0.0 + _acceleration = lambda x: x #constant acceleration + + _distribution = distribution( + _init_state, + _init_velocity, + _acceleration, + ) + + if data["report"]: + sys.stdout.write( + report_header(_init_state, _init_velocity, _acceleration) + ) + if _number > 0: + for number in itertools.islice(_distribution, _number): + sys.stdout.write("{0:.4f}\n".format(number)) + +def distribution( + init_state: float, + init_velocity: float, + acceleration: Callable[[float], float], +) -> Iterator[float]: + state = init_state + velocity = init_velocity + + while True: + yield state + state += velocity + velocity = acceleration(velocity) + +def report_header( + init_state: float, + init_velocity: float, + acceleration: Callable[[float], float], +): + _msg = ( + "Not random data", + " Initial value of {0:.4f}, changing {1:.4f} per time unit".format( + init_state, init_velocity, + ), + "Data:", + "========", + ) + return "\n".join(_msg) + "\n" + diff --git a/rng/uniform.py b/rng/uniform.py new file mode 100644 index 0000000..786296a --- /dev/null +++ b/rng/uniform.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python3 + +"""rng uniform [OPTIONS] +Uniform distribution - Generate random data from a uniform distribution. + +Options: + -d, --delta velocity of average per time unit [Default: 0] + -m, --mu average of distribution [Default: 0] + -n, --number number of random data points to generate [Default: 10] + -o, --offset distance from average to bounds of distribution [Default: 1] + +Currently assumed that sigma is constant over time. +""" + +import sys +import random +import itertools +from typing import Callable, List, Dict, Iterator + +def cli_wrapper(**data: Dict): + """Handler for the uniform distribution. Checks and cleans given options, + and performs optional reporting. + """ + _number = data["number"] if data["number"] is not None else 10 + _init_mu = data["mu"] if data["mu"] is not None else 0.0 + _offset = data["offset"] if data["offset"] is not None else 1.0 + _init_velocity = data["delta"] if data["delta"] is not None else 0.0 + _acceleration = lambda x: x #constant acceleration + + _distribution = distribution( + _init_mu, + _offset, + _init_velocity, + _acceleration, + ) + + if data["report"]: + sys.stdout.write( + report_header(_init_mu, _offset, _init_velocity, _acceleration) + ) + if _number > 0: + for number in itertools.islice(_distribution, _number): + sys.stdout.write("{0:.4f}\n".format(number)) + +def distribution( + init_mu: float, + offset: float, + init_velocity: float, + acceleration: Callable[[float], float], +) -> Iterator[float]: + mu = init_mu + velocity = init_velocity + while True: + yield mu + random.uniform(-offset,offset) + mu += velocity + velocity = acceleration(velocity) + +# use this in a report function later: μ±σ +def report_header( + init_mu: float, + offset: float, + init_velocity: float, + acceleration: Callable[[float], float], +): + _msg = ( + "Uniform distribution", + " μ={0:.4f}, [{1:.4f},{2:.4f}]".format( + init_mu, init_mu-offset, init_mu+offset, + ), + " dμ/dt={0:.4f}".format(init_velocity), + "Obs.:", + "========", + ) + return "\n".join(_msg) + "\n" + -- 2.45.2