From 02df9b9b5fdd5bb33ffca1561f38041945e8485e Mon Sep 17 00:00:00 2001 From: olari Date: Mon, 28 Jun 2021 14:11:58 +0300 Subject: [PATCH] static typing? --- journal.py | 374 +++++++++++++++++++++++++++-------------------------- 1 file changed, 193 insertions(+), 181 deletions(-) diff --git a/journal.py b/journal.py index fa9a6ce..7241ac7 100644 --- a/journal.py +++ b/journal.py @@ -1,10 +1,11 @@ from copy import deepcopy from datetime import datetime, timedelta -from functools import reduce, partial, cache +from functools import reduce, partial from pathlib import Path from shutil import copyfile, rmtree from subprocess import run from tempfile import mktemp, mkdtemp +from typing import Any, Callable, Optional, Tuple, TypeVar, Union from zipfile import ZipFile import json import random @@ -16,37 +17,37 @@ import traceback ### GLOBALS JOURNAL_PATH = Path.home() / '.journal.json' +T = TypeVar('T') + +AnyCallable = Callable[..., Any] +AnyDict = dict[str, Any] + +Journal = AnyDict ### UTILS -def compose(*fns): - return partial(reduce, flip(apply), fns) -def nth_or_default(n, l, default): +def nth_or_default(n: int, l: list[T], default: T): return l[n] if n < len(l) else default -def apply(f, x): +def apply(f: AnyCallable, x: Any) -> Any: return f(x) -def flip(f): +def flip(f: AnyCallable) -> AnyCallable: return lambda a1, a2: f(a2, a1) + +def compose(*fns: AnyCallable) -> Any: + return partial(reduce, flip(apply), fns) -def identity(x): - return x - -def lazy_get(obj, key, default): - result = obj.get(key) - return result if result is not None else default() - -def wrap_text(text, columns=80): +def wrap_text(text: str, columns: int = 80) -> str: return textwrap.fill(text, columns, replace_whitespace=False, break_on_hyphens=False, break_long_words=False) -def split_keep(delims, string): - res = [] - buf = [] +def split_keep(delims: list[str], string: str) -> list[str]: + res: list[str] = [] + buf: list[str] = [] def flush_buf(): nonlocal res, buf @@ -65,8 +66,8 @@ def split_keep(delims, string): return res -def merge_if(pred, l): - res = [] +def merge_if(pred: Callable[[str, str], bool], l: list[str]) -> list[str]: + res: list[str] = [] for i, curr in enumerate(l): prev = l[i-1] if i-1 >= 0 else None @@ -77,15 +78,15 @@ def merge_if(pred, l): return res -def editor(fpath): +def open_editor(fpath: Path) -> None: run(['nvim', '+', str(fpath)]) -def edit(text, suffix=''): +def edit_text(text: str, suffix: str = '') -> str: fpath = Path(mktemp(suffix=suffix)) fpath.write_text(text) - editor(fpath) + open_editor(fpath) text = fpath.read_text() @@ -93,53 +94,56 @@ def edit(text, suffix=''): return text -def prompt(text): +def prompt(text: str) -> bool: return input(text + ' [y/n] ') == 'y' ### DATE UTILS -def parse_date(date): +def parse_date(date: str) -> datetime: return datetime.strptime(date, '%Y-%m-%d') -def format_date(date): +def format_date(date: datetime) -> str: return date.strftime('%Y-%m-%d') -def parse_timestamp(timestamp): - return datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S') +def today() -> datetime: + return datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) -def format_timestamp(timestamp): - return datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S') - -def evaluate_time_expression(expression): +def evaluate_time_expression(expression: str) -> Optional[datetime]: if expression == 'today': return datetime.now() elif expression == 'yesterday': return datetime.now() - timedelta(days=1) -def get_abbr_for_weekday(date): +def get_abbr_for_weekday(date: str) -> str: return { 0: 'mo', 1: 'tu', 2: 'we', 3: 'th', 4: 'fr', 5: 'sa', 6: 'su', }[parse_date(date).weekday()] +def parse_timestamp(timestamp: str) -> int: + return int(datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S').timestamp()) + +def format_timestamp(timestamp: int) -> str: + return datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S') + ### FILE PARSERS -def parse_foods_file(text): +def parse_foods_file(text: str) -> tuple[AnyDict, AnyDict]: foods_str, recipes_str = text.split('---') - def parse_macro(macro): + def parse_macro(macro: str) -> tuple[str, float]: name, value = macro.split() return (name, float(value.removesuffix('g').removesuffix('kcal'))) - foods = { + foods: AnyDict = { macros[0]: dict(parse_macro(macro) for macro in macros[1:]) for macros in [food.split('\n') for food in foods_str.strip().split('\n\n')] } - recipes = {} + recipes: AnyDict = {} - def evaluate_ingredients(ingredients): - result = {} + def evaluate_ingredients(ingredients: list[str]) -> AnyDict: + result: dict[str, float] = {} total_weight = 0.0 for ingredient in ingredients: @@ -173,7 +177,7 @@ def parse_foods_file(text): return foods, recipes -def evaluate_food_entry(foods, recipes, value, name): +def evaluate_food_entry(foods: AnyDict, recipes: AnyDict, value: float, name: str) -> AnyDict: if name in recipes: food = recipes[name] @@ -193,8 +197,8 @@ def evaluate_food_entry(foods, recipes, value, name): return food -def parse_tasks_file(text): - result = [] +def parse_tasks_file(text: str) -> list[tuple[list[str], str]]: + result: list[tuple[list[str], str]] = [] for task in text.splitlines(): days, name = task.split(':') @@ -203,33 +207,32 @@ def parse_tasks_file(text): return result -@cache -def get_godword(journal): +def get_godword(journal: Journal) -> list[str]: return journal['files']['godword'].strip().split('\n') -@cache -def get_habits(journal): +def get_habits(journal: Journal) -> list[str]: return journal['files']['habits'].strip().split('\n') -@cache -def get_tasks(journal): +def get_tasks(journal: Journal) -> list[tuple[list[str], str]]: return parse_tasks_file(journal['files']['tasks']) ### HACKY HACKERY -_GLOBAL_DO_NOT_USE = {} +_global_do_not_use: Journal = {} -def init_hacky_hackery(journal): - global _GLOBAL_DO_NOT_USE - _GLOBAL_DO_NOT_USE = journal +def init_hacky_hackery(journal: Journal) -> None: + global _global_do_not_use + _global_do_not_use = journal def get_foods_file(): - return parse_foods_file(_GLOBAL_DO_NOT_USE['files']['foods']) + return parse_foods_file(_global_do_not_use['files']['foods']) ### HEADER MODULES -def get_notifications_for_date(journal, date): - notifications = [] +Notification = dict[str, Union[int, str]] + +def get_notifications_for_date(journal: Journal, date: str) -> list[Notification]: + notifications: list[Notification] = [] for day in journal['days'].values(): for entry in day.get('entries'): @@ -243,14 +246,20 @@ def get_notifications_for_date(journal, date): return notifications -def get_yesterdays_sticky(journal, date): +def get_yesterdays_sticky(journal: Journal, date: str) -> Optional[str]: yesterday = format_date(parse_date(date) - timedelta(days=1)) if day := journal['days'].get(yesterday): if sticky := day['header'].get('sticky'): return sticky -header_modules = { +HeaderModule = tuple[ + Callable[[Journal, str], Any], + Callable[[str], Any], + Callable[[Any], str], +] + +header_modules: dict[str, HeaderModule] = { 'godword': ( lambda j, d: [random.choice(get_godword(j)) for _ in range(20)], lambda b: b.split(), @@ -272,7 +281,7 @@ header_modules = { 'notifications': ( lambda j, d: get_notifications_for_date(j, d), lambda b: [{ - 'source': int(parse_timestamp(' '.join(parts[0:2]).strip('[]')).timestamp()), + 'source': parse_timestamp(' '.join(parts[0:2]).strip('[]')), 'message': ' '.join(parts[2:]), } for parts in [line.split() for line in b.splitlines()]], lambda v: '\n'.join(f'[[{format_timestamp(n["source"])}]] {n["message"]}' for n in v) @@ -292,23 +301,23 @@ header_modules = { 'sticky': ( lambda j, d: get_yesterdays_sticky(j, d), - identity, - identity + lambda b: b, + lambda v: v, ) } -def create_header_module(name, journal, date): +def create_header_module(name: str, journal: Journal, date: str) -> AnyDict: return header_modules[name][0](journal, date) -def parse_header_module(name, block): +def parse_header_module(name: str, block: str) -> AnyDict: return header_modules[name][1](block) -def generate_header_module(name, value): +def generate_header_module(name: str, value: AnyDict) -> str: return header_modules[name][2](value) ### ENTRY MODULES -def parse_timer(block): +def parse_timer(block: str) -> AnyDict: rest = block.split() name = None @@ -316,9 +325,10 @@ def parse_timer(block): if len(rest) > 2: name, *rest = rest if len(rest) > 1: - timestamp = int(parse_timestamp(' '.join(rest)).timestamp()) + timestamp = parse_timestamp(' '.join(rest)) + + result: AnyDict = {} - result = {} if name: result['name'] = name if timestamp: @@ -326,18 +336,18 @@ def parse_timer(block): return result -def generate_timer(block): - parts = [] +def generate_timer(value: AnyDict) -> str: + parts: list[str] = [] - if name := block.get('name'): + if name := value.get('name'): parts.append(name) - if ts := block.get('timestamp'): + if ts := value.get('timestamp'): parts.append(format_timestamp(ts)) return ' '.join(parts) -def parse_exercise(block): +def parse_exercise(block: str) -> AnyDict: parts = block.split() if parts[0] == 'walk': @@ -361,21 +371,21 @@ def parse_exercise(block): assert False -def generate_exercise(block): - if block['kind'] == 'walk': - return f'walk {block["minutes"]}min {block["distance"]}km {block["steps"]}steps' - elif block['kind'] == 'calisthenics': - return f'calisthenics {block["sets"]}x{block["reps"]} {block["exercise"]}' +def generate_exercise(value: AnyDict) -> str: + if value['kind'] == 'walk': + return f'walk {value["minutes"]}min {value["distance"]}km {value["steps"]}steps' + elif value['kind'] == 'calisthenics': + return f'calisthenics {value["sets"]}x{value["reps"]} {value["exercise"]}' assert False -DEFAULT_PARSER = lambda b: {'value': b} -DEFAULT_GENERATOR = lambda b: b['value'] +DEFAULT_PARSER: AnyCallable = lambda b: {'value': b} +DEFAULT_GENERATOR: AnyCallable = lambda b: b['value'] -entry_modules = { +entry_modules: dict[str, tuple[Callable[[str], AnyDict], Callable[[AnyDict], str]]] = { 'diet': ( lambda b: {'amount': int(b.split()[0].removesuffix('g')), 'food': b.split()[1].strip()}, - lambda b: f'{b["amount"]}g {b["food"]}'), + lambda v: f'{v["amount"]}g {v["food"]}'), 'exercise': (parse_exercise, generate_exercise), 'behavior': (DEFAULT_PARSER, DEFAULT_GENERATOR), @@ -383,15 +393,15 @@ entry_modules = { 'info': (DEFAULT_PARSER, compose(DEFAULT_GENERATOR, wrap_text)), 'post': ( - lambda b: {'timestamp': int(parse_timestamp(b.removeprefix('@post ').strip()).timestamp())}, - lambda b: format_timestamp(b["timestamp"]) + lambda b: {'timestamp': parse_timestamp(b.removeprefix('@post ').strip())}, + lambda v: format_timestamp(v["timestamp"]) ), 'notes': ( compose( lambda b: b.splitlines(), lambda s: {'source': s[0], 'title': s[1]}, ), - lambda b: f'{b["source"]}\n{b["title"]}' + lambda v: f'{v["source"]}\n{v["title"]}' ), 'task': (DEFAULT_PARSER, DEFAULT_GENERATOR), @@ -404,17 +414,17 @@ entry_modules = { lambda b: b.split(maxsplit=1), lambda s: {'day': s[0], 'message': s[1]} ), - lambda b: f'{b["day"]} {b["message"]}' + lambda v: f'{v["day"]} {v["message"]}' ), } -def parse_entry_module(block: str) -> dict: +def parse_entry_module(block: str) -> AnyDict: tag = block.split()[0].removeprefix('@') block = block.removeprefix(f'@{tag}').strip() return {'type': tag} | entry_modules[tag][0](block) -def generate_entry_module(block: dict) -> str: +def generate_entry_module(block: AnyDict) -> str: if block['type'] == 'notes': return f'@notes\n{entry_modules[block["type"]][1](block)}' @@ -422,7 +432,7 @@ def generate_entry_module(block: dict) -> str: ### READ-ONLY STATS SECTION FUNCTIONS -def generate_stats(page): +def generate_stats(page: AnyDict) -> str: if not page['entries']: return '' @@ -483,19 +493,32 @@ def generate_stats(page): ### PAGE FUNCTIONS -def create_header(journal, date): +def create_header(journal: Journal, date: str) -> AnyDict: return { module: create_header_module(module, journal, date) for module in header_modules } -def parse_header(header): - def split_into_blocks(text): +def create_entry(journal: Journal, date: str) -> AnyDict: + return { + 'timestamp': int(today().timestamp()), + 'blocks': [] + } + +def create_day(journal: Journal, date: str) -> AnyDict: + return { + 'title': date, + 'header': create_header(journal, date), + 'entries': [] + } + +def parse_header(text: str) -> AnyDict: + def split_into_blocks(text: str) -> list[str]: return [b.strip() for b in re.split(r'\n{2,}', text) if b.strip() != ''] - modules = split_into_blocks(header) + modules = split_into_blocks(text) - result = {} + result: AnyDict = {} for module in modules: name, block = module.split('\n', maxsplit=1) @@ -504,27 +527,21 @@ def parse_header(header): return result -def generate_header(header): +def generate_header(header: AnyDict) -> str: result = '' - for name, value in header.items(): - if not value: + for name, header in header.items(): + if not header: continue result += f'\n\n{name.title()}:\n' - result += generate_header_module(name, value) + result += generate_header_module(name, header) return result -def create_entry(journal, date): - return { - 'timestamp': int(datetime.now().timestamp()), - 'blocks': [] - } - -def parse_entry(entry): - def merge_notes_block(l): - res = [] +def parse_entry(timestamp: str, content: str) -> AnyDict: + def merge_notes_block(l: list[str]) -> list[str]: + res: list[str] = [] i = 0 while i < len(l): @@ -538,12 +555,12 @@ def parse_entry(entry): return res - def merge_wrapped_lines(l): + def merge_wrapped_lines(l: list[str]) -> list[str]: TIMESTAMP_LENGTH = len('2020-02-02 02:02:02 ') POST_BLOCK_LENGTH = len('@post 2020-02-02 02:02:02 ') COLUMN_LIMIT = 80 - res = [] + res: list[str] = [] i = 0 while i < len(l): @@ -580,8 +597,8 @@ def parse_entry(entry): return res - def split_post_block(l): - res = [] + def split_post_block(l: list[str]) -> list[str]: + res: list[str] = [] POST_BLOCK_LENGTH = len('@post 2020-02-02 02:02:02') @@ -632,16 +649,14 @@ def parse_entry(entry): lambda b: b if b and not all(c == '\n' for c in b[-1]) else b[:-1], ) - timestamp, content = entry - return { - 'timestamp': int(parse_timestamp(timestamp.strip()).timestamp()), + 'timestamp': parse_timestamp(timestamp.strip()), 'blocks': [parse_entry_module(b) if b.startswith('@') else b for b in split_into_blocks(content)], } -def generate_entry(entry): - def format_block(curr, prev, before_prev): - def format_text(text): +def generate_entry(entry: AnyDict) -> str: + def format_block(curr: Any, prev: Any, before_prev: Any): + def format_text(text: str) -> str: if all(c == '\n' for c in curr): return text @@ -649,7 +664,7 @@ def generate_entry(entry): DUMMY_POST = '@post 2020-02-02 02:02:02 ' is_first = not prev - is_post = before_prev and all(c == '\n' for c in before_prev) and isinstance(prev, dict) and prev['type'] == 'post' + is_post: bool = (before_prev and all(c == '\n' for c in before_prev) and isinstance(prev, dict) and prev['type'] == 'post') if is_first: text = DUMMY_TS + text @@ -691,14 +706,7 @@ def generate_entry(entry): return result -def create_day(journal, date): - return { - 'title': date, - 'header': create_header(journal, date), - 'entries': [] - } - -def parse_day(text): +def parse_day(text: str) -> AnyDict: # discard read-only QS section text = text[text.find('#'):] @@ -708,24 +716,25 @@ def parse_day(text): entries = list(zip(tmp[::2], tmp[1::2])) title, *header = header.split('\n', maxsplit=1) + title = title.removeprefix('# ') header = header[0] if len(header) else '' return { - 'title': title.removeprefix('# '), + 'title': title, 'header': parse_header(header), - 'entries': [parse_entry(e) for e in entries], + 'entries': [parse_entry(timestamp, content) for timestamp, content in entries], } -def generate_day(page): +def generate_day(day: AnyDict) -> str: result = '' - result += generate_stats(page) + result += generate_stats(day) - result += f'\n\n# {page["title"]}' + result += f'\n\n# {day["title"]}' - result += generate_header(page['header']) + result += generate_header(day['header']) - for entry in page['entries']: + for entry in day['entries']: result += generate_entry(entry) result += '\n' @@ -734,34 +743,38 @@ def generate_day(page): ### COMMAND UTILS -def import_journal(fpath): - result = { 'days': {}, 'files': {} } +def load_journal() -> Journal: + return json.loads(JOURNAL_PATH.read_text()) - for name in list(sorted(fpath.glob('*.md'))): - day = parse_day(name.read_text()) - result['days'][name.stem] = day +def save_journal(journal: Journal) -> None: + JOURNAL_PATH.write_text(json.dumps(journal)) - for fname in ['habits', 'godword', 'tasks', 'foods']: - result['files'][fname] = (fpath / fname).read_text() +def import_journal(fpath: Path) -> Journal: + return { + 'days': { + fname.stem: parse_day(fname.read_text()) + for fname in list(sorted(fpath.glob('*.md'))) + }, + 'files': { + fname: (fpath / fname).read_text() + for fname in ['habits', 'godword', 'tasks', 'foods'] + } + } - return result - -def export_journal(journal, fpath): +def export_journal(journal: Journal, fpath: Path) -> None: for day in journal['days'].values(): (fpath / (day['title'] + '.md')).write_text(generate_day(day)) for fname, content in journal['files'].items(): (fpath / fname).write_text(content) -def backup_journal(): +def backup_journal(journal: Journal) -> Path: print('Creating backup...') - tmpdir = mkdtemp() + tmpdir = Path(mkdtemp()) - journal = load_journal() - - export_journal(journal, Path(tmpdir)) - copyfile(str(JOURNAL_PATH), tmpdir + '/journal.json') + export_journal(journal, tmpdir) + copyfile(str(JOURNAL_PATH), str(tmpdir / 'journal.json')) files = Path(tmpdir).glob('*') @@ -779,15 +792,12 @@ def backup_journal(): if script := journal['files'].get('backup'): print('Found script, running...') run(['bash', '-c', script.replace('%ZIPFILE%', f"'{str(zipfile_path.absolute())}'")]) + + return zipfile_path - if prompt('Ran script, delete archive?'): - zipfile_path.unlink() - -def open_journal(date): - journal = load_journal() - +def open_journal(journal: Journal, date: str) -> Journal: if not journal['days'].get(date): - backup_journal() + backup_journal(journal) journal['days'][date] = create_day(journal, date) tmpdir = Path(mkdtemp()) @@ -795,47 +805,38 @@ def open_journal(date): while True: try: - editor(tmpdir / f'{date}.md') + open_editor(tmpdir / f'{date}.md') new_journal = import_journal(tmpdir) break - except Exception as e: + except Exception as _: traceback.print_exc() input('Press enter to try again...') - save_journal(new_journal) - -def load_journal(): - if JOURNAL_PATH.exists(): - return json.loads(JOURNAL_PATH.read_text()) - else: - return import_journal(Path.home() / 'workspace' / 'journal') - -def save_journal(journal): - JOURNAL_PATH.write_text(json.dumps(journal)) + return new_journal ### COMMAND HANDLERS -def handle_open(args): +def handle_open(args: list[str]) -> None: subcommand = nth_or_default(0, args, 'today') if date := evaluate_time_expression(subcommand): - open_journal(format_date(date)) + save_journal(open_journal(load_journal(), format_date(date))) else: print(f'Invalid subcommand: {subcommand}') -def handle_edit(args): +def handle_edit(args: list[str]) -> None: subcommand = nth_or_default(0, args, 'foods') journal = load_journal() if subcommand in journal['files']: - journal['files'][subcommand] = edit(journal['files'][subcommand]) + journal['files'][subcommand] = edit_text(journal['files'][subcommand]) elif prompt(f'Unknown file: {subcommand}, create new?'): - journal['files'][subcommand] = edit('') + journal['files'][subcommand] = edit_text('') save_journal(journal) -def handle_import(args): +def handle_import(args: list[str]) -> None: if len(args) < 1: print('Missing directory.') return @@ -848,7 +849,7 @@ def handle_import(args): save_journal(import_journal(path)) -def handle_export(args): +def handle_export(args: list[str]) -> None: if len(args) < 1: print('Missing directory.') return @@ -861,7 +862,7 @@ def handle_export(args): export_journal(load_journal(), path) -def handle_test(args): +def handle_test(args: list[str]) -> None: journal = load_journal() journal_orig = deepcopy(journal) @@ -878,11 +879,11 @@ def handle_test(args): else: print('Test passed!') -def handle_summary(args): - def generate_food_summary(page): +def handle_summary(args: list[str]) -> None: + def generate_food_summary(day: AnyDict) -> str: result = '' - def print(str=''): + def print(str:str=''): nonlocal result result += '\n' + str @@ -891,7 +892,7 @@ def handle_summary(args): daily_calories = 0.0 daily_protein = 0.0 - for entry in page['entries']: + for entry in day['entries']: has_printed = False entry_calories = 0.0 entry_protein = 0.0 @@ -953,23 +954,34 @@ def handle_summary(args): print(generate_food_summary(journal['days'][date])) +def handle_backup(args: list[str]) -> None: + archive_path = backup_journal(load_journal()) + if prompt('Delete backup archive?'): + archive_path.unlink() + + ### MAIN -def main(): +def main() -> None: init_hacky_hackery(load_journal()) command = nth_or_default(1, sys.argv, 'open') args = sys.argv[2:] - handler = { + def handle_invalid(args: list[str]) -> None: + print(f'Invalid command: {command}') + + command_handlers: dict[str, Callable[[list[str]], None]] = { 'open': handle_open, 'edit': handle_edit, 'import': handle_import, 'export': handle_export, 'test': handle_test, 'summary': handle_summary, - 'backup': lambda _: backup_journal(), - }.get(command, lambda _: print(f'Invalid command: {command}')) + 'backup': handle_backup, + } + + handler = command_handlers.get(command, handle_invalid) handler(args)