~dricottone/filters

8479aea2db66090f2b0db01a1c42593975a3ba0e — Dominic Ricottone 4 years ago d79aed1
Added rng package, convolve + kalman filters
A filter/convolve.py => filter/convolve.py +91 -0
@@ 0,0 1,91 @@
#!/usr/bin/env python3

"""filter convolve [OPTIONS] DATA
Convolution filter - Filter out noise of measurements to estimate data.

Options:
  -k KERNEL,         filter data through KERNEL, where KERNEL is one or more
    --kernel KERNEL    numeric factors [Default: 1.0]
"""

__all__ = ['cli_wrapper', 'filter', 'report']

import sys
from typing import List, Dict, Iterator

def cli_wrapper(
        **data: Dict,
) -> None:
    """Handler for the convolution filter. Checks and cleans given options,
    and performs optional reporting.
    """
    _kernel = _normalize(
        data["kernel"] if data["kernel"] is not None else [1.0]
    )
    _raw = data["data_raw"]

    _filter = filter(
        _raw,
        _kernel,
    )

    if data["report"]:
        sys.stdout.write(report_header(_kernel))
        for measured, estimated in zip(_raw, _filter):
            sys.stdout.write("{0:8.4f}  {1:8.4f}\n".format(measured, estimated))
    else:
        for estimated in _filter:
            sys.stdout.write("{0:.4f}\n".format(estimated))

def filter(
    data: List[float],
    kernel: List[float]
) -> Iterator[float]:
    """Iterate over data, passing it through the kernel.

    Arguments:
      data    measurements
      kernel  measurement adjustments
    """
    length = len(data)
    #NOTE: for evenly-sized kernels, extra goes to top and left
    offset = len(kernel) // 2

    for index in range(length):
        _sum = 0.0
        for kernel_index, kernel_point in enumerate(kernel, start=-offset):
            target = (index + kernel_index) % length
            _sum += data[target] * kernel_point
        yield _sum

def report_header(
    kernel: List[float],
) -> str:
    """Draw a report header summarizing the filter.

    Appears as:
    ```
    Convolution filter
      kernel=[<value1> ... <valueK>]
    Raw:      Est.:
    ========  ========
    ```

    The estimates then should be printed alongside the raw measurements.
    """
    _msg = (
        "Convolution filter",
        "  kernel={0}".format(kernel),
        "Raw:      Est.:",
        "========  ========",
    )
    return "\n".join(_msg) + "\n"

def _normalize(kernel: List[float]) -> List[float]:
    _sum = sum(kernel)
    if _sum == 1:
        return kernel
    else:
        weight = 1.0/_sum
        return [factor * weight for factor in kernel]


A filter/kalman.py => filter/kalman.py +186 -0
@@ 0,0 1,186 @@
#!/usr/bin/env python3

"""filter kalman [OPTIONS] DATA
Kalman filter - Filter out normally-distributed noise of measurements to
estimate data.

Options:
  -d, --delta     initial velocity of state per time unit [Default: 0]
  -i, --inital    initial estimate of state [Default: 0]
  -s, --sigma     initial std. deviation of state distribution [Default: 1]
  -v, --variance  variance of data measurements [Default: 1]

Currently assumed that acceleration is constant and that velocity is non-
variate.
"""

__all__ = ['cli_wrapper', 'filter', 'report']

import sys
from typing import Callable, List, Dict, Iterator, Tuple

def cli_wrapper(**data: Dict):
    """Handler for the Kalman filter. Checks and cleans given options,
    and performs optional reporting.
    """
    _raw = data["data_raw"]
    _variance = data["variance"] if data["variance"] is not None else 1
    _init_state_mu = data["initial_estimate"]
    _init_state_sigma = data["initial_std_deviation"]
    _init_velocity_mu = data["delta"] if data["delta"] is not None else 0

    _init_velocity_sigma = 0 #non-variate velocity
    _time = 1.0 #constant time unit
    _acceleration = lambda x: x #constant acceleration


    _filter = filter(
        _raw,
        _variance,
        _init_state_mu,
        _init_state_sigma,
        _init_velocity_mu,
        _init_velocity_sigma,
        _acceleration,
        _time,
    )

    if data["report"]:
        sys.stdout.write(
            report_header(
                _variance,
                _init_state_mu,
                _init_state_sigma,
                _init_velocity_mu,
                _init_velocity_sigma,
                _acceleration,
                _time,
            ),
        )
        for measured, filtered in zip(_raw, _filter):
            estimated, variance = filtered
            sys.stdout.write(
                "{0:8.4f}  {1:8.4f}  {2:8.4f}\n".format(
                    measured, estimated, variance,
                ),
            )
    else:
        for estimated, _ in _filter:
            sys.stdout.write("{0:.4f}\n".format(estimated))

def filter(
    data: List[float],
    variance: float,
    init_state_mu: float,
    init_state_sigma: float,
    init_velocity_mu: float,
    init_velocity_sigma: float,
    acceleration: Callable[[Tuple[float, float]], Tuple[float,float]],
    time: float,
) -> Iterator[Tuple[float,float]]:
    """Iterate over data, passing it through an alpha-beta filter.

    Arguments:
      data                 measurement from each time interval
      variance             variance of measurements
      init_state_mu        initial estimate of state
      init_state_sigma     std. deviation of state distribution
      init_velocity_mu     initial estimate of velocity
      init_velocity_sigma  std. deviation of velocity distribution
      acceleration         function of v1 <- v0
      time                 time unit
    """
    # normal distributions as tuples: (E(x), Var(x), )
    estimated = (init_state_mu, init_state_sigma**2, )
    velocity = (init_velocity_mu, init_velocity_sigma**2, )

    for measurement in data:
        estimated = _add(estimated, _multiply(velocity, (time,0,)))
        velocity = acceleration(velocity)

        estimated = _multiply(estimated, (measurement, variance, ))

        yield estimated

def report_header(
    variance: float,
    init_state_mu: float,
    init_state_sigma: float,
    init_velocity_mu: float,
    init_velocity_sigma: float,
    acceleration: Callable[[float], float],
    time: float,
) -> str:
    """Draw a report header summarizing the filter.

    Appears as:

    ```
    ```

    The estimates and variances then should be printed alongside the raw
    measurements.
    """
    _msg = (
        "Kalman filter",
        "Raw:      Est.:     Var.:",
        "========  ========  ========",
    )
    return "\n".join(_msg) + "\n"

def _add(
    x: Tuple[float, float],
    y: Tuple[float, float],
) -> Tuple[float, float]:
    """Add a normal distribution to another normal distribution or a constant.

    In the case of two normal distributions (X and Y), the resultant values
    are:
      E(X + Y) = E(X) + E(Y)
      Var(X + Y) = Var(X) + Var(Y)

    In the case of a normal distribution (X) and a constant (C), the resultant
    values are:
      E(X + C) = E(X) + C
      Var(X + C) = Var(X)
    """
    if y[1] == 0:
        z = (x[0]+y[0], x[1], )
    else:
        z = (x[0]+y[0], x[1]+y[1], )
    return z

def _multiply(
    x: Tuple[float, float],
    y: Tuple[float, float],
) -> Tuple[float, float]:
    """Multiply a normal distribution by another normal distribution or by a
    constant.

    Given two normal distributions (X and Y), the resultant values are:
      E(X * Y) = (Var(Y)E(X) + Var(X)E(Y)) / (Var(X) + Var(Y))
      Var(X * Y) = (Var(X)Var(Y)) / (Var(X) + Var(Y))

    Given a normal distribution (X) and a constant (C), the resultant values
    are:
      E(X * C) = E(X) * C
      Var(X * C) = Var(X) * (C^2)
    """
    #_print_normal_distribution(x, "X")
    #_print_normal_distribution(y, "Y")
    if y[1] == 0:
        z = ((x[0] * y[0]), (x[1] * y[0]**2), )
    else:
        _denom = x[1] + y[1]
        _mean = ((x[0] * y[1]) + (y[0] * x[1])) / _denom
        _var = (x[1] * y[1]) / _denom
        z = (_mean, _var, )
    #_print_normal_distribution(z, "Z")
    return z

def _print_normal_distribution(
    x: Tuple[float, float],
    name: str,
) -> None:
    sys.stdout.write("{0} = N({1},{2})\n".format(name, x[0], x[1]))


A plot.sh => plot.sh +15 -0
@@ 0,0 1,15 @@
#!/bin/sh

tmpd="$(mktemp -d)"
trap 'rm -rf "${tmpd}"' EXIT

mkfifo "${tmpd}/f1"
rng uniform -m 5 -d 2 -n 100 \
	| tee "${tmpd}/f1" \
	| filter ab -i 100 -d 2 -a 0.2 -b 0.02 \
	| paste "${tmpd}/f1" - \
	| awk 'BEGIN{OFS="\t"; print "measurements", "estimates"} {print $1, $2}' \
	| gnuplot -p -e "set terminal dumb; set autoscale; plot '-' using 1:2 with lines notitle"

rm -rf "${tmpd}"


A rng/__init__.py => rng/__init__.py +0 -0
A rng/__main__.py => rng/__main__.py +59 -0
@@ 0,0 1,59 @@
#!/usr/main/env python3

import sys

from . import cli
from . import internals

def main():
    _config, _positionals = cli.main(sys.argv[1:])

    if "version" in _config.keys():
        internals._print_version()
        sys.exit(0)
    elif "list-distributions" in _config.keys():
        internals._print_distributions("normal", "uniform", "notrandom")
        sys.exit(0)
    elif "distribution" in _config.keys():
        _dist = config.get("distribution", "")
    elif len(_positionals) > 0:
        _dist = _positionals.pop(0)
    elif "help" in _config.keys():
        internals._print_help()
        sys.exit(0)
    else:
        internals._print_usage()
        sys.exit(1)

    _data = {
        "delta": internals._try_get_float(_config, "delta"),
        "distribution": _dist,
        "initial": internals._try_get_float(_config, "initial"),
        "mu": internals._try_get_float(_config, "mu"),
        "number": internals._try_get_int(_config, "number"),
        "offset": internals._try_get_float(_config, "offset"),
        "report": "report" in _config.keys(),
        "sigma": internals._try_get_float(_config, "sigma"),
    }

    if _dist == "uniform":
        from . import uniform as implementation
    elif _dist == "normal":
        from . import normal as implementation
    elif _dist == "notrandom":
        from . import notrandom as implementation
    elif len(_dist) > 0:
        internals._print_invalid_distribution(_dist)
        internals._print_usage()
        sys.exit(1)
    if "help" in _config.keys():
        sys.stdout.write(implementation.__doc__)
        sys.exit(0)

    implementation.cli_wrapper(**_data)

    sys.exit(0)

if __name__ == "__main__":
    main()


A rng/cli.py => rng/cli.py +222 -0
@@ 0,0 1,222 @@
#!/usr/bin/env python3

import re

def main(arguments):
	config=dict()
	positional=[]
	pattern=re.compile(r"(?:-(?:d|h|x|i|m|n|o|r|s|v|V)|--(?:delta|distribution|help|initial|list-distributions|mu|number|offset|report|sigma|version))(?:=.*)?$")
	consuming,needing,wanting=None,0,0
	attached_value=None
	while len(arguments) and arguments[0]!="--":
		if consuming is not None:
			if config[consuming] is None:
				config[consuming]=arguments.pop(0)
			else:
				config[consuming].append(arguments.pop(0))
			needing-=1
			wanting-=1
			if wanting==0:
				consuming,needing,wanting=None,0,0
		elif pattern.match(arguments[0]):
			option = arguments.pop(0).lstrip('-')
			if '=' in option:
				option,attached_value=option.split('=',1)
			if option=="delta":
				if attached_value is not None:
					config["delta"]=attached_value
					attached_value=None
					consuming,needing,wanting=None,0,0
				else:
					config["delta"]=None
					consuming,needing,wanting="delta",1,1
			elif option=="distribution":
				if attached_value is not None:
					config["distribution"]=attached_value
					attached_value=None
					consuming,needing,wanting=None,0,0
				else:
					config["distribution"]=None
					consuming,needing,wanting="distribution",1,1
			elif option=="help":
				if attached_value is not None:
					message=(
						'unexpected value while parsing "help"'
						' (expected 0 values)'
					)
					raise ValueError(message) from None
				config["help"]=True
			elif option=="initial":
				if attached_value is not None:
					config["initial"]=attached_value
					attached_value=None
					consuming,needing,wanting=None,0,0
				else:
					config["initial"]=None
					consuming,needing,wanting="initial",1,1
			elif option=="list-distributions":
				if attached_value is not None:
					message=(
						'unexpected value while parsing "list-distributions"'
						' (expected 0 values)'
					)
					raise ValueError(message) from None
				config["list-distributions"]=True
			elif option=="mu":
				if attached_value is not None:
					config["mu"]=attached_value
					attached_value=None
					consuming,needing,wanting=None,0,0
				else:
					config["mu"]=None
					consuming,needing,wanting="mu",1,1
			elif option=="number":
				if attached_value is not None:
					config["number"]=attached_value
					attached_value=None
					consuming,needing,wanting=None,0,0
				else:
					config["number"]=None
					consuming,needing,wanting="number",1,1
			elif option=="offset":
				if attached_value is not None:
					config["offset"]=attached_value
					attached_value=None
					consuming,needing,wanting=None,0,0
				else:
					config["offset"]=None
					consuming,needing,wanting="offset",1,1
			elif option=="report":
				if attached_value is not None:
					message=(
						'unexpected value while parsing "report"'
						' (expected 0 values)'
					)
					raise ValueError(message) from None
				config["report"]=True
			elif option=="sigma":
				if attached_value is not None:
					config["sigma"]=attached_value
					attached_value=None
					consuming,needing,wanting=None,0,0
				else:
					config["sigma"]=None
					consuming,needing,wanting="sigma",1,1
			elif option=="version":
				if attached_value is not None:
					message=(
						'unexpected value while parsing "version"'
						' (expected 0 values)'
					)
					raise ValueError(message) from None
				config["version"]=True
			elif option=="d":
				if attached_value is not None:
					config["delta"]=attached_value
					attached_value=None
					consuming,needing,wanting=None,0,0
				else:
					config["delta"]=None
					consuming,needing,wanting="delta",1,1
			elif option=="h":
				if attached_value is not None:
					message=(
						'unexpected value while parsing "help"'
						' (expected 0 values)'
					)
					raise ValueError(message) from None
				config["help"]=True
			elif option=="x":
				if attached_value is not None:
					message=(
						'unexpected value while parsing "help"'
						' (expected 0 values)'
					)
					raise ValueError(message) from None
				config["help"]=True
			elif option=="i":
				if attached_value is not None:
					config["initial"]=attached_value
					attached_value=None
					consuming,needing,wanting=None,0,0
				else:
					config["initial"]=None
					consuming,needing,wanting="initial",1,1
			elif option=="m":
				if attached_value is not None:
					config["mu"]=attached_value
					attached_value=None
					consuming,needing,wanting=None,0,0
				else:
					config["mu"]=None
					consuming,needing,wanting="mu",1,1
			elif option=="n":
				if attached_value is not None:
					config["number"]=attached_value
					attached_value=None
					consuming,needing,wanting=None,0,0
				else:
					config["number"]=None
					consuming,needing,wanting="number",1,1
			elif option=="o":
				if attached_value is not None:
					config["offset"]=attached_value
					attached_value=None
					consuming,needing,wanting=None,0,0
				else:
					config["offset"]=None
					consuming,needing,wanting="offset",1,1
			elif option=="r":
				if attached_value is not None:
					message=(
						'unexpected value while parsing "report"'
						' (expected 0 values)'
					)
					raise ValueError(message) from None
				config["report"]=True
			elif option=="s":
				if attached_value is not None:
					config["sigma"]=attached_value
					attached_value=None
					consuming,needing,wanting=None,0,0
				else:
					config["sigma"]=None
					consuming,needing,wanting="sigma",1,1
			elif option=="v":
				if attached_value is not None:
					message=(
						'unexpected value while parsing "version"'
						' (expected 0 values)'
					)
					raise ValueError(message) from None
				config["version"]=True
			elif option=="V":
				if attached_value is not None:
					message=(
						'unexpected value while parsing "version"'
						' (expected 0 values)'
					)
					raise ValueError(message) from None
				config["version"]=True
		else:
			positional.append(arguments.pop(0))
	if needing>0:
		message=(
			f'unexpected end while parsing "{consuming}"'
			f' (expected {needing} values)'
		)
		raise ValueError(message) from None
	for argument in arguments[1:]:
		positional.append(argument)
	return config,positional

if __name__=="__main__":
	import sys
	cfg,pos = main(sys.argv[1:])
	cfg = {k:v for k,v in cfg.items() if v is not None}
	if len(cfg):
		print("Options:")
		for k,v in cfg.items():
			print(f"{k:20} = {v}")
	if len(pos):
		print("Positional arguments:", ", ".join(pos))

A rng/cli.toml => rng/cli.toml +42 -0
@@ 0,0 1,42 @@
[delta]
number = 1
alternatives = ['d']

[distribution]
number = 1

[help]
number = 0
alternatives = ['h', 'x']

[initial]
number = 1
alternatives = ['i']

[list-distributions]
number = 0

[mu]
number = 1
alternatives = ['m']

[number]
number = 1
alternatives = ['n']

[offset]
number = 1
alternatives = ['o']

[report]
number = 0
alternatives = ['r']

[sigma]
number = 1
alternatives = ['s']

[version]
number = 0
alternatives = ['v', 'V']


A rng/internals.py => rng/internals.py +55 -0
@@ 0,0 1,55 @@
#!/usr/bin/env python3

import sys
from typing import *

VERSION = (1,0,1,)

def _try_get_float(
    mapping: Dict,
    key: str,
    *,
    default: Optional[float] = None,
) -> Optional[float]:
    if key in mapping:
        return float(mapping[key])
    else:
        return default

def _try_get_int(
    mapping: Dict,
    key: str,
    *,
    default: Optional[float] = None,
) -> Optional[float]:
    if key in mapping:
        return int(mapping[key])
    else:
        return default

def _print_help() -> None:
    _msg = "Usage: rng DISTRIBUTION [OPTIONS]\n"
    sys.stdout.write(_msg)

def _print_version() -> None:
    _msg = "rng {0}\n".format(".".join(str(v) for v in VERSION))
    sys.stdout.write(_msg)

def _print_distributions(*dist: str) -> None:
    _msg = "Valid distributions: {0}\n".format(", ".join(dist))
    sys.stdout.write(_msg)

def _print_usage() -> None:
    _msg = (
        "Usage: rng DISTRIBUTION [OPTIONS]",
        "Try `rng --list-distributions` and `rng DISTRIBUTION --help`",
    )
    sys.stderr.write("\n".join(_msg) + "\n")

def _print_invalid_distribution(dist: str) -> None:
    _msg = (
        "{0}: Invalid distribution '{1}'".format(sys.argv[0], dist),
        "Try `rng --list-distributions`",
    )
    sys.stderr.write("\n".join(_msg) + "\n")


A rng/normal.py => rng/normal.py +73 -0
@@ 0,0 1,73 @@
#!/usr/bin/env python3

"""rng normal [OPTIONS]
Uniform distribution - Generate random data from a normal distribution.

Options:
  -d, --delta   velocity of average per time unit [Default: 0]
  -m, --mu      average of distribution [Default: 0]
  -n, --number  number of random data points to generate [Default: 10]
  -s, --sigma   standard deviation of distribution [Default: 1]

Currently assumed that sigma is constant over time.
"""

import sys
import random
import itertools
from typing import Callable, List, Dict, Iterator

def cli_wrapper(**data: Dict):
    """Handler for the uniform distribution. Checks and cleans given options,
    and performs optional reporting.
    """
    _number = data["number"] if data["number"] is not None else 10
    _init_mu = data["mu"] if data["mu"] is not None else 0.0
    _sigma = data["sigma"] if data["sigma"] is not None else 1.0
    _init_velocity = data["delta"] if data["delta"] is not None else 0.0
    _acceleration = lambda x: x #constant acceleration

    _distribution = distribution(
        _init_mu,
        _sigma,
        _init_velocity,
        _acceleration,
    )

    if data["report"]:
        sys.stdout.write(
            report_header(_init_mu, _sigma, _init_velocity, _acceleration)
        )
    if _number > 0:
        for number in itertools.islice(_distribution, _number):
            sys.stdout.write("{0:.4f}\n".format(number))

def distribution(
    init_mu: float,
    sigma: float,
    init_velocity: float,
    acceleration: Callable[[float], float],
) -> Iterator[float]:
    mu = init_mu
    velocity = init_velocity
    while True:
        yield random.normalvariate(mu, sigma)
        mu += velocity
        velocity = acceleration(velocity)

# use this in a report function later: μ±
def report_header(
    init_mu: float,
    sigma: float,
    init_velocity: float,
    acceleration: Callable[[float], float],
) -> str:
    _msg = (
        "Normal distribution",
        "  μ={0:.4f}, σ={1:.4f}".format(init_mu, sigma),
        "  dμ/dt={0:.4f}".format(init_velocity),
        "Obs.:",
        "========",
    )
    return "\n".join(_msg) + "\n"


A rng/notrandom.py => rng/notrandom.py +66 -0
@@ 0,0 1,66 @@
#!/usr/bin/env python3

"""rng notrandom [OPTIONS]
Generate non-random data.

Options:
  -d, --delta    velocity of state per time unit [Default: 0]
  -i, --initial  initial state [Default: 0]
  -n, --number   number of data points to generate [Default: 10]
"""

import sys
import itertools
from typing import Callable, List, Dict, Iterator

def cli_wrapper(**data: Dict):
    """Handler for the uniform distribution. Checks and cleans given options,
    and performs optional reporting.
    """
    _number = data["number"] if data["number"] is not None else 10
    _init_state = data["initial"] if data["initial"] is not None else 0.0
    _init_velocity = data["delta"] if data["delta"] is not None else 0.0
    _acceleration = lambda x: x #constant acceleration

    _distribution = distribution(
        _init_state,
        _init_velocity,
        _acceleration,
    )

    if data["report"]:
        sys.stdout.write(
            report_header(_init_state, _init_velocity, _acceleration)
        )
    if _number > 0:
        for number in itertools.islice(_distribution, _number):
            sys.stdout.write("{0:.4f}\n".format(number))

def distribution(
    init_state: float,
    init_velocity: float,
    acceleration: Callable[[float], float],
) -> Iterator[float]:
    state = init_state
    velocity = init_velocity

    while True:
        yield state
        state += velocity
        velocity = acceleration(velocity)

def report_header(
    init_state: float,
    init_velocity: float,
    acceleration: Callable[[float], float],
):
    _msg = (
        "Not random data",
        "  Initial value of {0:.4f}, changing {1:.4f} per time unit".format(
            init_state, init_velocity,
        ),
        "Data:",
        "========",
    )
    return "\n".join(_msg) + "\n"


A rng/uniform.py => rng/uniform.py +75 -0
@@ 0,0 1,75 @@
#!/usr/bin/env python3

"""rng uniform [OPTIONS]
Uniform distribution - Generate random data from a uniform distribution.

Options:
  -d, --delta   velocity of average per time unit [Default: 0]
  -m, --mu      average of distribution [Default: 0]
  -n, --number  number of random data points to generate [Default: 10]
  -o, --offset  distance from average to bounds of distribution [Default: 1]

Currently assumed that sigma is constant over time.
"""

import sys
import random
import itertools
from typing import Callable, List, Dict, Iterator

def cli_wrapper(**data: Dict):
    """Handler for the uniform distribution. Checks and cleans given options,
    and performs optional reporting.
    """
    _number = data["number"] if data["number"] is not None else 10
    _init_mu = data["mu"] if data["mu"] is not None else 0.0
    _offset = data["offset"] if data["offset"] is not None else 1.0
    _init_velocity = data["delta"] if data["delta"] is not None else 0.0
    _acceleration = lambda x: x #constant acceleration

    _distribution = distribution(
        _init_mu,
        _offset,
        _init_velocity,
        _acceleration,
    )

    if data["report"]:
        sys.stdout.write(
            report_header(_init_mu, _offset, _init_velocity, _acceleration)
        )
    if _number > 0:
        for number in itertools.islice(_distribution, _number):
            sys.stdout.write("{0:.4f}\n".format(number))

def distribution(
    init_mu: float,
    offset: float,
    init_velocity: float,
    acceleration: Callable[[float], float],
) -> Iterator[float]:
    mu = init_mu
    velocity = init_velocity
    while True:
        yield mu + random.uniform(-offset,offset)
        mu += velocity
        velocity = acceleration(velocity)

# use this in a report function later: μ±σ
def report_header(
    init_mu: float,
    offset: float,
    init_velocity: float,
    acceleration: Callable[[float], float],
):
    _msg = (
        "Uniform distribution",
        "  μ={0:.4f}, [{1:.4f},{2:.4f}]".format(
            init_mu, init_mu-offset, init_mu+offset,
        ),
        "  dμ/dt={0:.4f}".format(init_velocity),
        "Obs.:",
        "========",
    )
    return "\n".join(_msg) + "\n"