from copy import deepcopy from datetime import datetime, timedelta from functools import reduce, partial from pathlib import Path from shutil import copyfile, rmtree from subprocess import run from tempfile import mktemp, mkdtemp from zipfile import ZipFile import json import random import re import sys import textwrap ### GLOBALS JOURNAL_PATH = Path.home() / '.journal.json' ### UTILS def nth_or_default(n, l, default): return l[n] if n < len(l) else default def apply(f, x): return f(x) def flip(f): return lambda a1, a2: f(a2, a1) def identity(x): return x def lazy_get(obj, key, default): result = obj.get(key) return result if result is not None else default() def wrap_text(text, columns=80): return textwrap.fill(text, columns, replace_whitespace=False, break_on_hyphens=False, break_long_words=False) def split_keep(delims, string): res = [] buf = [] def flush_buf(): nonlocal res, buf if buf: res.append(''.join(buf)) buf = [] for c in string: if c in delims: flush_buf() res.append(c) else: buf.append(c) flush_buf() return res def merge_if(pred, l): res = [] for i, curr in enumerate(l): prev = l[i-1] if i-1 >= 0 else None if prev and pred(prev, curr): res[-1] += curr else: res.append(curr) return res def editor(fpath): run(['nvim', '+', str(fpath)]) def edit(text, suffix=''): fpath = Path(mktemp(suffix=suffix)) fpath.write_text(text) editor(fpath) text = fpath.read_text() fpath.unlink() return text def prompt(text): return input(text + ' [y/n] ') == 'y' ### DATE UTILS def parse_date(date): return datetime.strptime(date, '%Y-%m-%d') def format_date(date): return date.strftime('%Y-%m-%d') def parse_timestamp(timestamp): return datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S') def format_timestamp(timestamp): return datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S') def evaluate_time_expression(expression): if expression == 'today': return datetime.now() elif expression == 'yesterday': return datetime.now() - timedelta(days=1) ### FILE PARSERS def parse_foods_file(text): foods_str, recipes_str = text.split('---') def parse_macro(macro): name, value = macro.split() return (name, float(value.removesuffix('g').removesuffix('kcal'))) foods = { macros[0]: dict(parse_macro(macro) for macro in macros[1:]) for macros in [food.split('\n') for food in foods_str.strip().split('\n\n')] } recipes = {} def evaluate_ingredients(ingredients): result = {} total_weight = 0.0 for ingredient in ingredients: k,v = parse_macro(ingredient) if k == 'TOTAL': result[k] = v continue else: total_weight += v food = foods.get(k) total = 100.0 if not food: food = recipes[k].copy() total = food['TOTAL'] del food['TOTAL'] for kk,vv in food.items(): if kk not in result: result[kk] = 0.0 result[kk] += vv * (v/total) if 'TOTAL' not in result: result['TOTAL'] = total_weight return result for ingredients in [recipe.split('\n') for recipe in recipes_str.strip().split('\n\n')]: recipes[ingredients[0]] = evaluate_ingredients(ingredients[1:]) return foods, recipes def evaluate_food_entry(foods, recipes, value, name): if name in recipes: food = recipes[name] if value == 0.0: value = food['TOTAL'] food = {k: v*(value/food['TOTAL']) for k,v in food.items()} elif name in foods: if value == 0.0: value = 100 food = {k: v*(value/100.0) for k,v in foods[name].items()} else: breakpoint() print(f'ERROR: Invalid diet entry: {name}') assert False return food def parse_tasks_file(text): result = [] for task in text.splitlines(): days, name = task.split(':') days = days.split(',') result.append((days, name)) return result ### HEADER MODULES def create_godword(journal, date): words = journal['files']['godword'].strip().split('\n') return [random.choice(words) for _ in range(20)] def parse_godword(block): return block.split() def generate_godword(value): return f"{' '.join(value[:10])}\n{' '.join(value[10:])}" def create_habits(journal, date): return {x: False for x in journal['files']['habits'].strip().split('\n')} def parse_habits(block): result = {} for habit in block.splitlines(): value, name = habit.split(maxsplit=1) result[name.strip()] = value[1] == 'x' return result def generate_habits(value): return '\n'.join(f'[{"x" if v else "-"}] {k}' for k,v in value.items()) def create_notifications(journal, date): notifications = [] for day in journal['days'].values(): for entry in day.get('entries'): for block in entry['blocks']: if not isinstance(block, str) and block['type'] == 'notify': if block['day'] == date: notifications.append({ 'source': entry['timestamp'], 'message': block['message'] }) return notifications def parse_notifications(notifications): result = [] for notification in notifications.splitlines(): parts = notification.split() result.append({ 'source': int(parse_timestamp(' '.join(parts[0:2]).strip('[]')).timestamp()), 'message': ' '.join(parts[2:]), }) return result def generate_notifications(value): return '\n'.join(f'[[{format_timestamp(n["source"])}]] {n["message"]}' for n in value) def create_tasks(journal, date): tasks = parse_tasks_file(journal['files']['tasks']) curr_day = { 0: 'mo', 1: 'tu', 2: 'we', 3: 'th', 4: 'fr', 5: 'sa', 6: 'su', }[parse_date(date).weekday()] return {name: False for days, name in tasks if curr_day in days} def parse_tasks(tasks): result = {} for task in tasks.splitlines(): value, name = task.split(maxsplit=1) name = name.strip() result[name] = value[1] == 'x' return result def generate_tasks(value): return '\n'.join(f'[{"x" if v else "-"}] {k}' for k,v in value.items()) def create_sticky(journal, date): yesterday = format_date(parse_date(date) - timedelta(days=1)) if day := journal['days'].get(yesterday): if sticky := day['header'].get('sticky'): return sticky ### ENTRY MODULES def parse_post(block): block = block.removeprefix('@post').strip() try: timestamp = int(parse_timestamp(block[:19]).timestamp()) block = block[19:] except: timestamp = None content = block.strip() result = {} if content: result['content'] = content if timestamp: result['timestamp'] = timestamp return result def generate_post(block): result = '@post' if ts := block.get('timestamp'): result += f' {format_timestamp(ts)}' if content := block.get('content'): result += f' {content}' return wrap_text(result) def parse_notes(block): tag, source, title = block.splitlines() return {'source': source, 'title': title} def generate_notes(block): parts = ['@notes'] if source := block.get('source'): parts.append(source) if title := block.get('title'): parts.append(title) return '\n'.join(parts) def parse_diet(block): tag, amount, food = block.split() amount = int(amount.removesuffix('g')) return {'amount': amount, 'food': food} def generate_diet(block): _, amount, food = block.values() return f'@diet {amount}g {food}' def parse_timer(block): tag, *rest = block.split() name = None timestamp = None if len(rest) > 2: name, *rest = rest if len(rest) > 1: timestamp = int(parse_timestamp(' '.join(rest)).timestamp()) result = {} if name: result['name'] = name if timestamp: result['timestamp'] = timestamp return result def generate_timer(block): parts = [f'@{block["type"]}'] if name := block.get('name'): parts.append(name) if ts := block.get('timestamp'): parts.append(format_timestamp(ts)) return ' '.join(parts) def parse_exercise(block): tag, *parts = block.split() if parts[0] == 'walk': kind, minutes, distance, steps = parts return { 'kind': kind, 'minutes': int(minutes.removesuffix('min')), 'distance': float(distance.removesuffix('km')), 'steps': int(steps.removesuffix('steps')), } elif parts[0] == 'calisthenics': kind, split, exercise = parts sets, reps = split.split('x') return { 'kind': kind, 'reps': reps, 'sets': sets, 'exercise': exercise, } assert False def generate_exercise(block): if block['kind'] == 'walk': return f'@exercise walk {block["minutes"]}min {block["distance"]}km {block["steps"]}steps' elif block['kind'] == 'calisthenics': return f'@exercise calisthenics {block["sets"]}x{block["reps"]} {block["exercise"]}' assert False def parse_notify(block): tag, day, *rest = block.split() return {'day': day.strip(), 'message': ' '.join(rest)} def generate_notify(block): return f'@notify {block["day"]} {block["message"]}' def generate_default(block): return f'@{block["type"]} {block["value"]}' def generate_info(block): return wrap_text(f'@info {block["value"]}') ### PAGE FUNCTIONS def create_header(journal, date): return { 'godword': create_godword(journal, date), 'habits': create_habits(journal, date), 'notifications': create_notifications(journal, date), 'tasks': create_tasks(journal, date), 'sticky': create_sticky(journal, date), } def parse_header(header): header_modules = { 'godword': parse_godword, 'habits': parse_habits, 'notifications': parse_notifications, 'tasks': parse_tasks, 'sticky': identity, } def split_into_blocks(text): return [b.strip() for b in re.split(r'\n{2,}', text) if b.strip() != ''] modules = split_into_blocks(header) result = {} for module in modules: name, value = module.split('\n', maxsplit=1) name = name.lower().removesuffix(':') result[name] = header_modules[name](value) return result def generate_header(header): header_modules = { 'godword': generate_godword, 'habits': generate_habits, 'notifications': generate_notifications, 'tasks': generate_tasks, 'sticky': identity, } result = '' for name, value in header.items(): if not value: continue result += f'\n\n{name.title()}:\n' result += header_modules[name](value) return result def create_entry(journal, date): return { 'timestamp': int(datetime.now().timestamp()), 'blocks': [] } def parse_entry(entry): def create_entry_module_parser(name, handler=None): handler = handler or (lambda b: {'value': b.removeprefix(f'@{name} ')}) return (name, lambda b: {'type': name} | handler(b)) entry_modules = dict([ create_entry_module_parser('hide', lambda _: {}), create_entry_module_parser('post', parse_post), create_entry_module_parser('info'), create_entry_module_parser('notes', parse_notes), create_entry_module_parser('behavior'), create_entry_module_parser('diet', parse_diet), create_entry_module_parser('task'), create_entry_module_parser('start', parse_timer), create_entry_module_parser('stop', parse_timer), create_entry_module_parser('done', parse_timer), create_entry_module_parser('exercise', parse_exercise), create_entry_module_parser('notify', parse_notify), ]) def merge_notes_block(l): res = [] i = 0 while i < len(l): if l[i] == '@notes': # notes nl source nl title res.append('\n'.join([l[i], l[i+2], l[i+4]])) i += 5 else: res.append(l[i]) i += 1 return res def merge_wrapped_lines(l): TIMESTAMP_LENGTH = len('2020-02-02 02:02:02 ') COLUMN_LIMIT = 80 res = [] i = 0 while i < len(l): curr = l[i] prev = l[i-1] if i > 0 else None next = l[i+1] if i+1 < len(l) else None # ['aoeu', '\n', 'aoeu'] if prev and curr == '\n' and next: len_prev = len(prev) # first block is preceded by timestamp if i - 1 == 0: len_prev += TIMESTAMP_LENGTH # do not wrap indented lines if not next[0].isspace(): next_word = next.split()[0] # merge only if text is actually wrapped if len_prev + len(next_word) >= COLUMN_LIMIT: res[-1] += ' ' + next i += 2 continue res.append(curr) i += 1 return res def split_into_blocks(text): return reduce(flip(apply), [ # split the text into sections by newline and tag symbol, keeping the separators partial(split_keep, ('\n', '@')), # merge sequential newlines together into a single whitespace block partial(merge_if, lambda p, c: p == c == '\n'), # attach escaped tags partial(merge_if, lambda p, c: c == '@' and p[-1] == '\\'), # attach tag partial(merge_if, lambda p, c: p == '@'), # attach tags which do not come after newline or another tag partial(merge_if, lambda p, c: c[0] == '@' and not (not p[-1] != '\n' or (p[0] == '@' and p[-1] == ' '))), # merge notes block merge_notes_block, # strip all non-whitespace blocks partial(map, lambda s: s if s.isspace() else s.rstrip()), list, # merge escaped tags with following text partial(merge_if, lambda p, c: p.endswith('\\@')), # merge wrapped lines merge_wrapped_lines, # remove trailing whitespace block lambda b: b if b and not all(c == '\n' for c in b[-1]) else b[:-1], ], text) def parse_module_block(block): tag = block.split()[0][1:] return entry_modules[tag](block) def parse_block(block): if block.startswith('@'): return parse_module_block(block) else: return block timestamp, content = entry return { 'timestamp': int(parse_timestamp(timestamp.strip()).timestamp()), 'blocks': [parse_block(b) for b in split_into_blocks(content)], } def generate_entry(entry): entry_modules = { 'diet': generate_diet, 'exercise': generate_exercise, 'hide': lambda _: '@hide', 'post': generate_post, 'info': generate_info, 'notes': generate_notes, 'behavior': generate_default, 'task': generate_default, 'start': generate_timer, 'stop': generate_timer, 'done': generate_timer, 'notify': generate_notify, } def format_block(block, is_first): def format_text(text): if all(c == '\n' for c in block): return text DUMMY_TS = '2020-02-02 02:02:02 ' if is_first: text = DUMMY_TS + text length = len(text) if length > 80: text = wrap_text(text) if is_first: text = text.removeprefix(DUMMY_TS) return text def format_module(module): return entry_modules[module['type']](module) formatted = format_text(block) if isinstance(block, str) else format_module(block) if result[-1] != '\n' and not all(c == '\n' for c in formatted): formatted = ' ' + formatted return formatted result = f'\n\n{format_timestamp(entry["timestamp"])}' for i, block in enumerate(entry['blocks']): result += format_block(block, i == 0) return result def create_day(journal, date): return { 'title': date, 'header': create_header(journal, date), 'entries': [] } def parse_day(text): ENTRY_RE = re.compile(r'^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) ?', re.MULTILINE) header, *tmp = ENTRY_RE.split(text) entries = list(zip(tmp[::2], tmp[1::2])) title, *header = header.split('\n', maxsplit=1) header = header[0] if len(header) else '' return { 'title': title.removeprefix('# '), 'header': parse_header(header), 'entries': [parse_entry(e) for e in entries], } def generate_day(page): result = f'# {page["title"]}' result += generate_header(page['header']) for entry in page['entries']: result += generate_entry(entry) result += '\n' return result ### COMMAND UTILS def import_journal(fpath): result = { 'days': {}, 'files': {} } for name in list(sorted(fpath.glob('*.md'))): day = parse_day(name.read_text()) result['days'][name.stem] = day for fname in ['habits', 'godword', 'tasks', 'foods']: result['files'][fname] = (fpath / fname).read_text() return result def export_journal(journal, fpath): for day in journal['days'].values(): (fpath / (day['title'] + '.md')).write_text(generate_day(day)) for fname, content in journal['files'].items(): (fpath / fname).write_text(content) def backup_journal(): print('Creating backup...') tmpdir = mkdtemp() journal = load_journal() export_journal(journal, Path(tmpdir)) copyfile(str(JOURNAL_PATH), tmpdir + '/journal.json') files = Path(tmpdir).glob('*') current_date = datetime.now().strftime('%Y-%m-%d') zipfile_path = Path(f'{current_date}.zip') zipfile = ZipFile(zipfile_path, 'w') for file in files: zipfile.write(file, arcname=file.name) rmtree(tmpdir) if script := journal['files'].get('backup'): print('Found script, running...') run(['bash', '-c', script.replace('%ZIPFILE%', f"'{str(zipfile_path.absolute())}'")]) if prompt('Ran script, delete archive?'): zipfile_path.unlink() def open_journal(date): journal = load_journal() if not journal['days'].get(date): backup_journal() journal['days'][date] = create_day(journal, date) tmpdir = Path(mkdtemp()) export_journal(journal, tmpdir) while True: try: editor(tmpdir / f'{date}.md') new_journal = import_journal(tmpdir) break except Exception as e: print('Error:', e) input('Press enter to try again...') save_journal(new_journal) def load_journal(): if JOURNAL_PATH.exists(): return json.loads(JOURNAL_PATH.read_text()) else: return import_journal(Path.home() / 'workspace' / 'journal') def save_journal(journal): JOURNAL_PATH.write_text(json.dumps(journal)) ### COMMAND HANDLERS def handle_open(args): subcommand = nth_or_default(0, args, 'today') if date := evaluate_time_expression(subcommand): open_journal(format_date(date)) else: print(f'Invalid subcommand: {subcommand}') def handle_edit(args): subcommand = nth_or_default(0, args, 'foods') journal = load_journal() if subcommand in journal['files']: journal['files'][subcommand] = edit(journal['files'][subcommand]) elif prompt(f'Unknown file: {subcommand}, create new?'): journal['files'][subcommand] = edit('') save_journal(journal) def handle_import(args): if len(args) < 1: print('Missing directory.') return path = Path(args[0]) if not path.is_dir(): print(f'Invalid directory: {path}') return save_journal(import_journal(path)) def handle_export(args): if len(args) < 1: print('Missing directory.') return path = Path(args[0]) if not path.is_dir(): print(f'Invalid directory: {path}') return export_journal(load_journal(), path) def handle_test(args): journal = load_journal() journal_orig = deepcopy(journal) for day in journal['days']: journal['days'][day] = parse_day(generate_day(journal['days'][day])) if journal != journal_orig: print('Test failed!') print('Dumping journal.fail.json and journal.fail.orig.json...') Path('journal.fail.json').write_text(json.dumps(journal, indent=4)) Path('journal.fail.orig.json').write_text(json.dumps(journal_orig, indent=4)) else: print('Test passed!') def handle_summary(args): subcommand = nth_or_default(0, args, 'today') date = evaluate_time_expression(subcommand) if not date: print(f'Invalid time expression: {subcommand}') return date = format_date(date) journal = load_journal() foods, recipes = parse_foods_file(journal['files']['foods']) daily_grams = 0.0 daily_calories = 0.0 daily_protein = 0.0 for entry in journal['days'][date]['entries']: has_printed = False entry_calories = 0.0 entry_protein = 0.0 for diet in (b for b in entry['blocks'] if type(b) != str and b['type'] == 'diet'): if not has_printed: print(f'-- {format_timestamp(entry["timestamp"])}') has_printed = True value = diet['amount'] name = diet['food'] if name in recipes: food = recipes[name] if value == 0.0: value = food['TOTAL'] food = {k: v*(value/food['TOTAL']) for k,v in food.items()} elif name in foods: if value == 0.0: value = 100 food = {k: v*(value/100.0) for k,v in foods[name].items()} else: print(f'ERROR: Invalid diet entry: {diet}') continue protein = round(food.get('Protein', 0.0), 2) calories = round(food.get('Energy', 0.0), 2) entry_calories += calories entry_protein += protein print(f'{name:<20} {value:<6}g, {calories:<6}kcal, {protein:<6}g protein') if has_printed: entry_calories = round(entry_calories, 2) entry_protein = round(entry_protein, 2) print(f'-- TOTAL: {entry_calories}kcal, {entry_protein}g protein') print() daily_calories += entry_calories daily_protein += entry_protein print(f'-- DAILY TOTAL ({daily_calories}kcal, {daily_protein}g protein)') ### MAIN def main(): command = nth_or_default(1, sys.argv, 'open') args = sys.argv[2:] handler = { 'open': handle_open, 'edit': handle_edit, 'import': handle_import, 'export': handle_export, 'test': handle_test, 'summary': handle_summary, 'backup': lambda _: backup_journal(), }.get(command, lambda _: print(f'Invalid command: {command}')) handler(args) if __name__ == '__main__': main()