#!/usr/bin/env python3
from re import compile as re_compile
from .timeentry import TimeEntry, ENTRY_PATTERNS
PAGE_PATTERNS = (
re_compile("[ADFJMNOS][aceopu][bcglnprtvy] [1-9][0-9]?, 20[12][89012] [012][0-9]:[0-5][0-9]"),
re_compile("\(GMT-0[456]:00\) .*"),
re_compile("Page [1-9][0-9]?"),
re_compile("of [1-9][0-9]?"),
re_compile("Timesheet"),
re_compile("[ADFJMNOS][aceopu][bcglnprtvy] [1-9][0-9]?, 20[12][89012] - [ADFJMNOS][aceopu][bcglnprtvy] [1-9][0-9]?, 20[12][89012]"),
re_compile("ID"),
re_compile("Time Code"),
re_compile("Project"),
re_compile("TimeType"),
)
def move_item(data, index, destination):
"""Re-order a list, moving an item from `index` to `destination`."""
if index < destination:
return data[:index] + data[index+1:destination+1] + data[index:index+1] + data[destination+1:]
elif index > destination:
return data[:destination] + data[index:index+1] + data[destination:index] + data[index+1:]
else:
return data
def remove_item(data, index):
"""Adjust a list, removing an item from `index`."""
del data[index]
return data
def is_multiple_8(string):
try:
return int(float(string)) % 8 == 0
except:
return False
class TimeSheet(object):
def __init__(self, semistructured_data):
self._warnings = []
self._issues = []
self._data = self.beat_data_with_a_stick(semistructured_data)
self.assert_header()
self.parse_header()
self.parse_footer()
self.parse_pages()
self.parse_entries()
def __str__(self):
return self.header.get("dates", "TimeSheet(...)")
def __len__(self):
return len(self._data)
def report_issues(self):
if self._warnings:
print(f"There are some warnings in the timesheet for {self}...")
for warning in self._warnings:
print(warning)
if self._issues:
print(f"There are some issues in the timesheet for {self}...")
for issue in self._issues:
print(issue)
for index, line in enumerate(self._data):
print(index, line)
for entry in self._entries:
entry.report_issues()
def log_issue(self, *parts):
self._issues.append(''.join(parts))
def log_warning(self, *parts):
self._warnings.append(''.join(parts))
def beat_data_with_a_stick(self, data):
"""A fragile solution to the problems resulting from
PDF-to-HTML-to-long data conversions.
"""
# a small number of timesheets have a 'Doc.No.' field
if data[14][0] == "Doc.No." and data[17][0] == "1":
self.log_warning("beat_data_with_a_stick: killing 'Doc.No.' fields")
#Note: subtract 1 due to the other operation realigning indices
data = remove_item(data, 14)
data = remove_item(data, 17-1)
elif data[16][0] == "Doc.No." and data[19][0] == "1":
self.log_warning(
"beat_data_with_a_stick: ",
"killing 'Doc.No.' label and field",
)
data = remove_item(data, 16)
data = remove_item(data, 19-1)
# 'Post Status:' can float up if the status is 'Not posted'
if data[6][0] == "Post Status:" and data[7][0] == "Not posted":
self.log_warning(
"beat_data_with_a_stick: ",
"re-sorting post status label and field",
)
#Note: add/subtract 1 due to the other operation realigning indices
data = move_item(data, 6, 17+1)
data = move_item(data, 7-1, 20)
# 'Function:' and '[1] Full Time' can float up together
if data[4][0] == "Function:" and data[5][0] == "[1] Full Time":
self.log_warning(
"beat_data_with_a_stick: ",
"re-sorting function label and field",
)
#Note: add/subtract 1 due to the other operation realigning indices
data = move_item(data, 4, 10+1)
data = move_item(data, 5-1, 13)
elif data[8][0] == "Function:" and data[11][0] == "[1] Full Time":
self.log_warning(
"beat_data_with_a_stick: ",
"re-sorting function label and field",
)
data = move_item(data, 8, 10)
data = move_item(data, 11, 13)
# 'Percent Billability:' can float down
if data[28][0] == "Percent Billability:":
self.log_warning(
"beat_data_with_a_stick: ",
"re-sorting percent billability label",
)
data = move_item(data, 28, 25)
# 'Validation:' and 'Passed' can float up
if data[16][0] == "Validation:" and data[19][0] == "Passed":
self.log_warning(
"beat_data_with_a_stick: ",
"re-sorting validation label and field",
)
data = move_item(data, 16, 17)
data = move_item(data, 19, 20)
# 'Posted'/'Not posted' can float up
if data[17][0] == "Posted" or data[17][0] == "Not posted":
self.log_warning(
"beat_data_with_a_stick: ",
"re-sorting post status field",
)
data = move_item(data, 17, 19)
# 'ID', 'Time Code', 'Project', and 'TimeType' and float down
if data[41][0] == "ID" and data[42][0] == "Time Code" and data[43][0] == "Project" and data[44][0] == "TimeType":
self.log_warning(
"beat_data_with_a_stick: ",
"re-sorting ID, Time Code, Project, and TimeType header",
)
data = move_item(data, 41, 30)
data = move_item(data, 42, 31)
data = move_item(data, 43, 32)
data = move_item(data, 44, 33)
elif data[43][0] == "ID" and data[44][0] == "Time Code" and data[45][0] == "Project" and data[46][0] == "TimeType":
self.log_warning(
"beat_data_with_a_stick: ",
"re-sorting ID, Time Code, Project, and TimeType header",
)
data = move_item(data, 43, 30)
data = move_item(data, 44, 31)
data = move_item(data, 45, 32)
data = move_item(data, 46, 33)
return data
def assert_header_item(self, index, should_be):
if self._data[index][0] != should_be:
self.log_issue(
"assert_header: ",
f"item {index} is not {should_be} ",
f"(is {self._data[index][0]})",
)
def assert_header_item_any(self, index, should_be_any):
if self._data[index][0] not in should_be_any:
self.log_issue(
"assert_header: ",
f"item {index} is not one of {', '.join(should_be_any)} ",
f"(is {self._data[index][0]})",
)
def assert_header(self):
"""Validate a document based on the header lines."""
self.assert_header_item(0, "Timesheet")
self.assert_header_item(2, "Location:")
self.assert_header_item(3, "[E01] Fors Marsh Group")
self.assert_header_item(4, "Department:")
self.assert_header_item_any(5, ("[3200] Advanced Analytics", "[3230] Data Management", ))
self.assert_header_item(6, "Employee Type:")
self.assert_header_item(7, "[1] Annual Salary")
self.assert_header_item(8, "Location (Default")
self.assert_header_item(9, "[LOCAL] Location")
self.assert_header_item(10, "Function:")
self.assert_header_item(11, "Exempt:")
self.assert_header_item(12, "Status:")
self.assert_header_item(13, "[1] Full Time")
self.assert_header_item(14, "Yes")
self.assert_header_item_any(15, ("Approved", "Closed", "On Hold [Draft]", ))
self.assert_header_item(16, "Post Status:")
self.assert_header_item(17, "Validation:")
self.assert_header_item(18, "Date/Time:")
self.assert_header_item_any(19, ("Posted", "Not posted", ))
self.assert_header_item_any(20, ("Passed", "Warnings", ))
#21 should be like "Mon N, YYYY HH:MM"
self.assert_header_item(22, "Total Timesheet:")
self.assert_header_item(23, "Standard Hours:")
self.assert_header_item(24, "Total Billable:")
self.assert_header_item(25, "Percent Billability:")
if not is_multiple_8(self._data[27][0]):
self.log_issue(
"assert_header: ",
"item 27 is not a multiple of 8 ",
f"(is {self._data[27]})",
)
self.assert_header_item(30, "ID")
self.assert_header_item(31, "Time Code")
self.assert_header_item(32, "Project")
self.assert_header_item(33, "TimeType")
def parse_header(self):
"""Read data from the document header and clear those lines."""
self.header = {
"dates": self._data[1][0],
"employ_location": self._data[3][0],
"employ_location_default": self._data[9][0],
"employ_dept": self._data[5][0],
"employ_type": self._data[7][0],
"employ_exempt": self._data[14][0],
"status": self._data[15][0],
"status_posting": self._data[19][0],
"status_validation": self._data[20][0],
"status_timestamp": self._data[21][0],
"hours": self._data[26][0],
"hours_minimum": self._data[27][0],
"hours_billable": self._data[28][0],
"percent_billable": self._data[29][0],
}
del self._data[:34]
def parse_footer(self):
"""Loop though lines to identify the document footer and clear it."""
target = None
for n in range(len(self)-1, 0, -1):
if self._data[n][0] == "Hours Distribution by Time Code":
target = n
break
if target is None:
self.log_issue(
"parse_footer: ",
"could not locate document footer",
)
else:
del self._data[target:]
def parse_pages(self):
"""Loop through lines to identify page headers and footers, and clear
those lines.
"""
# Pages begin with "Timesheet"
page_breaks = []
for index, line in enumerate(self._data):
if line[0] == "Timesheet":
page_breaks.append(index)
if not page_breaks:
self.log_issue(
"parse_pages: ",
"could not locate any page breaks",
)
# At each page break, there is a page footer sequence before:
# + "Mmm N, YYYY HH:MM"
# + "(GMT-0H:00) TZNAME"
# + "Page N"
# + "of N"
# ...and a page header sequence after:
# + "Mmm N, YYYY - Mmm N, YYYY"
# + "ID"
# + "Time Code"
# + "Project"
# + "TimeType"
# None of this is useful data. Immediately delete it.
for page_break in reversed(page_breaks):
i = page_break - 7
# don't question why this is necessary
if str(self)=="May 16, 2019 - May 31, 2019" and page_break == 275:
i -= 30
while i <= page_break:
j = 0
while j < len(PAGE_PATTERNS):
if PAGE_PATTERNS[j].match(self._data[i][0]):
del self._data[i]
j = 0
else:
j += 1
i += 1
def parse_entries(self):
"""Loop through lines to identify time entries."""
i = 0
entries = []
while i < len(self):
if self._data[i][1] == 20:
if ENTRY_PATTERNS[0].match(self._data[i][0]):
entries.append([self._data[i]])
else:
self.log_issue(
"parse_entries: ",
"something unexpected in the entry start position ",
f"({self._data[i][0]})",
)
else:
if self._data[i][0] in ("Mon", "Tue Wed", "Thu", "Fri", "Sat", "Sun", "Total", ):
pass
elif len(entries)>0:
entries[-1].append(self._data[i])
else:
self.log_issue(
"parse_entries: ",
"something unexpected before any entries started ",
f"({self._data[i][0]})",
)
i += 1
self._entries = [TimeEntry(e, str(self), e[0][0]) for e in entries]