~dricottone/filters

ref: f62c909a49e4a579a0b74b9b6632c25e127b3668 filters/filter/convolve.py -rw-r--r-- 2.3 KiB
f62c909aDominic Ricottone Version 1.0.1; Cleaned up documentation 4 years ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
#!/usr/bin/env python3

"""filter convolve [OPTIONS] DATA
Convolution filter - Filter out noise of measurements to estimate data.

Options:
  -k KERNEL,         filter data through KERNEL, where KERNEL is one or more
    --kernel KERNEL    numeric factors [Default: 1.0]
"""

__all__ = ['cli_wrapper', 'filter', 'report']

import sys
from typing import List, Dict, Iterator

def cli_wrapper(
        **data: Dict,
) -> None:
    """Handler for the convolution filter. Checks and cleans given options,
    and performs optional reporting.
    """
    _kernel = _normalize(
        data["kernel"] if data["kernel"] is not None else [1.0]
    )
    _raw = data["data_raw"]

    _filter = filter(
        _raw,
        _kernel,
    )

    if data["report"]:
        sys.stdout.write(report_header(_kernel))
        for measured, estimated in zip(_raw, _filter):
            sys.stdout.write("{0:8.4f}  {1:8.4f}\n".format(measured, estimated))
    else:
        for estimated in _filter:
            sys.stdout.write("{0:.4f}\n".format(estimated))

def filter(
    data: List[float],
    kernel: List[float]
) -> Iterator[float]:
    """Iterate over data, passing it through the kernel.

    Arguments:
      data    measurements
      kernel  measurement adjustments
    """
    length = len(data)
    #NOTE: for evenly-sized kernels, extra goes to top and left
    offset = len(kernel) // 2

    for index in range(length):
        _sum = 0.0
        for kernel_index, kernel_point in enumerate(kernel, start=-offset):
            target = (index + kernel_index) % length
            _sum += data[target] * kernel_point
        yield _sum

def report_header(
    kernel: List[float],
) -> str:
    """Draw a report header summarizing the filter.

    Appears as:
    ```
    Convolution filter
      kernel=[<value1> ... <valueK>]
    Raw:      Est.:
    ========  ========
    ```

    The estimates then should be printed alongside the raw measurements.
    """
    _msg = (
        "Convolution filter",
        "  kernel={0}".format(kernel),
        "Raw:      Est.:",
        "========  ========",
    )
    return "\n".join(_msg) + "\n"

def _normalize(kernel: List[float]) -> List[float]:
    _sum = sum(kernel)
    if _sum == 1:
        return kernel
    else:
        weight = 1.0/_sum
        return [factor * weight for factor in kernel]