from copy import deepcopy from datetime import datetime, timedelta from functools import reduce, partial from pathlib import Path from shutil import copyfile, rmtree from subprocess import run from tempfile import mktemp, mkdtemp from zipfile import ZipFile import json import random import re import sys import textwrap import traceback ### GLOBALS JOURNAL_PATH = Path.home() / '.journal.json' ### UTILS def remove_chars(text, chars): return ''.join([c for c in text if c not in chars]) def get_words(text): return remove_chars(text, '.,-:;').lower().split() def nth_or_default(n, l, default): return l[n] if n < len(l) else default def apply(f, x): return f(x) def flip(f): return lambda a1, a2: f(a2, a1) def compose(*fns): return partial(reduce, flip(apply), fns) def wrap_text(text, columns=80): return textwrap.fill(text, columns, replace_whitespace=False, break_on_hyphens=False, break_long_words=False) def split_keep(delims, string): res = [] buf = [] def flush_buf(): nonlocal res, buf if buf: res.append(''.join(buf)) buf = [] for c in string: if c in delims: flush_buf() res.append(c) else: buf.append(c) flush_buf() return res def merge_if(pred, l): res = [] for i, curr in enumerate(l): prev = l[i-1] if i-1 >= 0 else None if prev and pred(prev, curr): res[-1] += curr else: res.append(curr) return res def open_editor(fpath): run(['nvim', '+', str(fpath)]) def edit_text(text, suffix=''): fpath = Path(mktemp(suffix=suffix)) fpath.write_text(text) open_editor(fpath) text = fpath.read_text() fpath.unlink() return text def prompt(text): return input(text + ' [y/n] ') == 'y' def find_entries(journal, pred): matches = [] for day in journal['days']: for idx, entry in enumerate(journal['days'][day]['entries']): for block in entry['blocks']: if pred(day, entry, block): matches.append((day, idx, entry['timestamp'])) return matches ### DATE UTILS def parse_date(date): return datetime.strptime(date, '%Y-%m-%d') def format_date(date): return date.strftime('%Y-%m-%d') def today(): return datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) def evaluate_time_expression(expression): if expression == 'today': return datetime.now() elif expression == 'yesterday': return datetime.now() - timedelta(days=1) def get_abbr_for_weekday(date): return { 0: 'mo', 1: 'tu', 2: 'we', 3: 'th', 4: 'fr', 5: 'sa', 6: 'su', }[parse_date(date).weekday()] def parse_timestamp(timestamp): return int(datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S').timestamp()) def format_timestamp(timestamp): return datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S') ### FILE PARSERS def parse_foods_file(text): foods_str, recipes_str = text.split('---') def parse_macro(macro): name, value = macro.split() return (name, float(value.removesuffix('g').removesuffix('kcal'))) foods = { macros[0]: dict(parse_macro(macro) for macro in macros[1:]) for macros in [food.split('\n') for food in foods_str.strip().split('\n\n')] } recipes = {} def evaluate_ingredients(ingredients): result = {} total_weight = 0.0 for ingredient in ingredients: k,v = parse_macro(ingredient) if k == 'TOTAL': result[k] = v continue else: total_weight += v food = foods.get(k) total = 100.0 if not food: food = recipes[k].copy() total = food['TOTAL'] del food['TOTAL'] for kk,vv in food.items(): if kk not in result: result[kk] = 0.0 result[kk] += vv * (v/total) if 'TOTAL' not in result: result['TOTAL'] = total_weight return result for ingredients in [recipe.split('\n') for recipe in recipes_str.strip().split('\n\n')]: recipes[ingredients[0]] = evaluate_ingredients(ingredients[1:]) return foods, recipes def evaluate_food_entry(foods, recipes, value, name): if name in recipes: food = recipes[name] if value == 0.0: value = food['TOTAL'] food = {k: v*(value/food['TOTAL']) for k,v in food.items()} elif name in foods: if value == 0.0: value = 100 food = {k: v*(value/100.0) for k,v in foods[name].items()} else: breakpoint() print(f'ERROR: Invalid diet entry: {name}') assert False return food def parse_tasks_file(text): result = [] for task in text.splitlines(): days, name = task.split(':') days = days.split(',') result.append((days, name)) return result def get_godword(journal): return journal['files']['godword'].strip().split('\n') def get_habits(journal): return journal['files']['habits'].strip().split('\n') def get_tasks(journal): return parse_tasks_file(journal['files']['tasks']) ### HACKY HACKERY _global_do_not_use = {} def init_hacky_hackery(journal): global _global_do_not_use _global_do_not_use = journal def get_foods_file(): return parse_foods_file(_global_do_not_use['files']['foods']) ### HEADER MODULES def get_notifications_for_date(journal, date): notifications = [] for day in journal['days'].values(): for entry in day.get('entries'): for block in entry['blocks']: if not isinstance(block, str) and block['type'] == 'notify': if block['day'] == date: notifications.append({ 'source': entry['timestamp'], 'message': block['message'] }) return notifications def get_yesterdays_sticky(journal, date): yesterday = format_date(parse_date(date) - timedelta(days=1)) if day := journal['days'].get(yesterday): if sticky := day['header'].get('sticky'): return sticky header_modules = { 'godword': ( lambda j, d: [random.choice(get_godword(j)) for _ in range(20)], lambda b: b.split(), lambda v: f"{' '.join(v[:10])}\n{' '.join(v[10:])}" ), 'habits': ( lambda j, d: {x: False for x in get_habits(j)}, lambda b: { name.strip(): value[1] == 'x' for (value, name) in [ line.split(maxsplit=1) for line in b.splitlines() ] }, lambda v: '\n'.join(f'[{"x" if v else "-"}] {k}' for k,v in v.items()) ), 'notifications': ( lambda j, d: get_notifications_for_date(j, d), lambda b: [{ 'source': parse_timestamp(' '.join(parts[0:2]).strip('[]')), 'message': ' '.join(parts[2:]), } for parts in [line.split() for line in b.splitlines()]], lambda v: '\n'.join(f'[[{format_timestamp(n["source"])}]] {n["message"]}' for n in v) ), 'tasks': ( lambda j, d: {name: False for days, name in get_tasks(j) if get_abbr_for_weekday(d) in days}, lambda b: { name.strip(): value[1] == 'x' for (value, name) in [ line.split(maxsplit=1) for line in b.splitlines() ] }, lambda v: '\n'.join(f'[{"x" if v else "-"}] {k}' for k,v in v.items()) ), 'sticky': ( lambda j, d: get_yesterdays_sticky(j, d), lambda b: b, lambda v: v, ) } def create_header_module(name, journal, date): return header_modules[name][0](journal, date) def parse_header_module(name, block): return header_modules[name][1](block) def generate_header_module(name, value): return header_modules[name][2](value) ### ENTRY MODULES def parse_timer(block): rest = block.split() name = None timestamp = None if len(rest) > 2: name, *rest = rest if len(rest) > 1: timestamp = parse_timestamp(' '.join(rest)) result = {} if name: result['name'] = name if timestamp: result['timestamp'] = timestamp return result def generate_timer(value): parts = [] if name := value.get('name'): parts.append(name) if ts := value.get('timestamp'): parts.append(format_timestamp(ts)) return ' '.join(parts) def parse_exercise(block): parts = block.split() if parts[0] == 'walk': kind, minutes, distance, steps = parts return { 'kind': kind, 'minutes': int(minutes.removesuffix('min')), 'distance': float(distance.removesuffix('km')), 'steps': int(steps.removesuffix('steps')), } elif parts[0] == 'calisthenics': kind, split, exercise = parts sets, reps = split.split('x') return { 'kind': kind, 'reps': reps, 'sets': sets, 'exercise': exercise, } assert False def generate_exercise(value): if value['kind'] == 'walk': return f'walk {value["minutes"]}min {value["distance"]}km {value["steps"]}steps' elif value['kind'] == 'calisthenics': return f'calisthenics {value["sets"]}x{value["reps"]} {value["exercise"]}' assert False DEFAULT_PARSER = lambda b: {'value': b} DEFAULT_GENERATOR = lambda b: b['value'] entry_modules = { 'diet': ( lambda b: {'amount': int(b.split()[0].removesuffix('g')), 'food': b.split()[1].strip()}, lambda v: f'{v["amount"]}g {v["food"]}'), 'exercise': (parse_exercise, generate_exercise), 'behavior': (DEFAULT_PARSER, DEFAULT_GENERATOR), 'hide': (lambda _: {}, lambda _: ''), 'info': (DEFAULT_PARSER, compose(DEFAULT_GENERATOR, wrap_text)), 'post': ( lambda b: {'timestamp': parse_timestamp(b.removeprefix('@post ').strip())}, lambda v: format_timestamp(v["timestamp"]) ), 'notes': ( compose( lambda b: b.splitlines(), lambda s: {'source': s[0], 'title': s[1]}, ), lambda v: f'{v["source"]}\n{v["title"]}' ), 'task': (DEFAULT_PARSER, DEFAULT_GENERATOR), 'start': (parse_timer, generate_timer), 'stop': (parse_timer, generate_timer), 'done': (parse_timer, generate_timer), 'notify': ( compose( lambda b: b.split(maxsplit=1), lambda s: {'day': s[0], 'message': s[1]} ), lambda v: f'{v["day"]} {v["message"]}' ), 'tag': ( lambda b: {'value': b.split(',')}, lambda v: ','.join(v['value']) ) } def parse_entry_module(block): tag = block.split()[0].removeprefix('@') block = block.removeprefix(f'@{tag}').strip() return {'type': tag} | entry_modules[tag][0](block) def generate_entry_module(block): if block['type'] == 'notes': return f'@notes\n{entry_modules[block["type"]][1](block)}' return f'@{block["type"]} {entry_modules[block["type"]][1](block)}' ### READ-ONLY STATS SECTION FUNCTIONS def generate_stats(page): if not page['entries']: return '' result = '' num_entries = len(page['entries']) num_blocks = sum(len(entry['blocks']) for entry in page['entries']) text_concat = ' '.join(b for e in page['entries'] for b in e['blocks'] if isinstance(b, str)) num_words = len(text_concat.split()) result += f'Entries: {num_entries}, Blocks: {num_blocks}, Words: {num_words}' last_entry = max(e['timestamp'] for e in page['entries']) first_entry = min(e['timestamp'] for e in page['entries']) entry_delta = last_entry - first_entry entry_hours = round(entry_delta / 60 / 60, 2) result += f'\nFirst: {format_timestamp(first_entry)}, Last: {format_timestamp(last_entry)}, Hours: {entry_hours}' calories = 0 carbs = 0 fat = 0 protein = 0 sugar = 0 num_meals = 0 first_meal = float('inf') last_meal = float('-inf') foods, recipes = get_foods_file() for entry in page['entries']: did_count = False for block in entry['blocks']: if not isinstance(block, str) and block['type'] == 'diet': food = evaluate_food_entry(foods, recipes, block['amount'], block['food']) if not did_count: num_meals += 1 first_meal = min(entry['timestamp'], first_meal) last_meal = max(entry['timestamp'], last_meal) did_count = True calories += food['Energy'] carbs += food.get('Carbs', 0) fat += food.get('Fat', 0) protein += food.get('Protein', 0) sugar += food.get('Sugar', 0) carbs_proportion = round(carbs * 4 / calories * 100) if carbs and calories else 0 fat_proportion = round(fat * 9 / calories * 100) if fat and calories else 0 protein_proportion = round(protein * 4 / calories * 100) if protein and calories else 0 calories, carbs, fat, protein, sugar = map(partial(round, ndigits=2), [calories, carbs, fat, protein, sugar]) meal_delta = last_meal - first_meal meal_hours = round(meal_delta / 60 / 60, 2) result += f'\nCalories: {calories} ({carbs_proportion}/{fat_proportion}/{protein_proportion}, {protein}/{sugar}), Meals: {num_meals}, Hours: {meal_hours}' return result ### PAGE FUNCTIONS def create_header(journal, date): return { module: create_header_module(module, journal, date) for module in header_modules } def create_entry(journal, date): return { 'timestamp': int(today().timestamp()), 'blocks': [] } def create_day(journal, date): return { 'title': date, 'header': create_header(journal, date), 'entries': [] } def parse_header(text): def split_into_blocks(text): return [b.strip() for b in re.split(r'\n{2,}', text) if b.strip() != ''] modules = split_into_blocks(text) result = {} for module in modules: name, block = module.split('\n', maxsplit=1) name = name.lower().removesuffix(':') result[name] = parse_header_module(name, block) return result def generate_header(header): result = '' for name, header in header.items(): if not header: continue result += f'\n\n{name.title()}:\n' result += generate_header_module(name, header) return result def parse_entry(timestamp, content): def merge_notes_block(l): res = [] i = 0 while i < len(l): if l[i] == '@notes': # notes nl source nl title res.append('\n'.join([l[i], l[i+2], l[i+4]])) i += 5 else: res.append(l[i]) i += 1 return res def merge_wrapped_lines(l): TIMESTAMP_LENGTH = len('2020-02-02 02:02:02 ') POST_BLOCK_LENGTH = len('@post 2020-02-02 02:02:02 ') COLUMN_LIMIT = 80 res = [] i = 0 while i < len(l): curr = l[i] prev = l[i-1] if i > 0 else None next = l[i+1] if i+1 < len(l) else None before_prev = l[i-2] if i-2 >= 0 else None before_before_prev = l[i-3] if i-3 >= 0 else None # ['aoeu', '\n', 'aoeu'] if prev and curr == '\n' and next: len_prev = len(prev) # first block is preceded by timestamp if i - 1 == 0: len_prev += TIMESTAMP_LENGTH if before_prev and before_before_prev and before_prev.startswith('@post') and all(c == '\n' for c in before_before_prev): len_prev += POST_BLOCK_LENGTH # do not wrap indented lines if not next[0].isspace(): next_word = next.split()[0] # merge only if text is actually wrapped if len_prev + len(next_word) >= COLUMN_LIMIT: res[-1] += ' ' + next i += 2 continue res.append(curr) i += 1 return res def split_post_block(l): res = [] POST_BLOCK_LENGTH = len('@post 2020-02-02 02:02:02') i = 0 while i < len(l): curr = l[i] if curr.startswith('@post'): res.append(curr[:POST_BLOCK_LENGTH]) res.append(curr[POST_BLOCK_LENGTH+1:]) else: res.append(curr) i += 1 return res split_into_blocks = compose( # split the text into sections by newline and tag symbol, keeping the separators partial(split_keep, ('\n', '@')), # merge sequential newlines together into a single whitespace block partial(merge_if, lambda p, c: p == c == '\n'), ## TAG PARSING # attach escaped tags partial(merge_if, lambda p, c: c == '@' and p[-1] == '\\'), # attach tag partial(merge_if, lambda p, c: p == '@'), # attach tags which do not come after newline or another tag partial(merge_if, lambda p, c: c[0] == '@' and not (not p[-1] != '\n' or (p[0] == '@' and p[-1] == ' '))), ## SPECIAL BLOCK PARSING # merge notes block (because it spans 3 lines or 5 blocks) merge_notes_block, # split post block (because next block could be attached to it) split_post_block, # strip all non-whitespace blocks partial(map, lambda s: s if s.isspace() else s.rstrip()), list, # merge escaped tags with following text partial(merge_if, lambda p, c: p.endswith('\\@')), # merge wrapped lines merge_wrapped_lines, # remove trailing whitespace block lambda b: b if b and not all(c == '\n' for c in b[-1]) else b[:-1], ) return { 'timestamp': parse_timestamp(timestamp.strip()), 'blocks': [(parse_entry_module(b) if b.startswith('@') else b) for b in split_into_blocks(content)], } def generate_entry(entry): def format_block(curr, prev, before_prev): def format_text(text): if all(c == '\n' for c in curr): return text DUMMY_TS = '2020-02-02 02:02:02 ' DUMMY_POST = '@post 2020-02-02 02:02:02 ' is_first = not prev is_post = (before_prev and all(c == '\n' for c in before_prev) and isinstance(prev, dict) and prev['type'] == 'post') if is_first: text = DUMMY_TS + text if is_post: text = DUMMY_POST + text length = len(text) if length > 80: text = wrap_text(text) if is_post: text = text.removeprefix(DUMMY_POST) if is_first: text = text.removeprefix(DUMMY_TS) return text formatted = format_text(curr) if isinstance(curr, str) else generate_entry_module(curr) if result[-1] != '\n' and not all(c == '\n' for c in formatted): formatted = ' ' + formatted return formatted result = f'\n\n{format_timestamp(entry["timestamp"])}' i = 0 while i < len(entry['blocks']): curr = entry['blocks'][i] prev = entry['blocks'][i-1] if i-1 >= 0 else None before_prev = entry['blocks'][i-2] if i-2 >= 0 else None result += format_block(curr, prev, before_prev) i += 1 return result ENTRY_RE = re.compile(r'^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) ?', re.MULTILINE) def parse_day(text): # discard read-only QS section text = text[text.find('#'):] header, *tmp = ENTRY_RE.split(text) entries = list(zip(tmp[::2], tmp[1::2])) title, *header = header.split('\n', maxsplit=1) title = title.removeprefix('# ') header = header[0] if len(header) else '' return { 'title': title, 'header': parse_header(header), 'entries': [parse_entry(timestamp, content) for timestamp, content in entries], } def generate_day(day): result = '' result += generate_stats(day) result += f'\n\n# {day["title"]}' result += generate_header(day['header']) for entry in day['entries']: result += generate_entry(entry) result += '\n' return result ### COMMAND UTILS def load_journal(): return json.loads(JOURNAL_PATH.read_text()) def save_journal(journal): JOURNAL_PATH.write_text(json.dumps(journal)) def import_journal(fpath): return { 'days': { fname.stem: parse_day(fname.read_text()) for fname in list(sorted(fpath.glob('*.md'))) }, 'files': { fname: (fpath / fname).read_text() for fname in ['habits', 'godword', 'tasks', 'foods'] } } def export_journal(journal, fpath): for day in journal['days'].values(): (fpath / (day['title'] + '.md')).write_text(generate_day(day)) for fname, content in journal['files'].items(): (fpath / fname).write_text(content) def backup_journal(journal): print('Creating backup...') tmpdir = Path(mkdtemp()) export_journal(journal, tmpdir) copyfile(str(JOURNAL_PATH), str(tmpdir / 'journal.json')) files = Path(tmpdir).glob('*') current_date = datetime.now().strftime('%Y-%m-%d') zipfile_path = Path(f'{current_date}.zip') zipfile = ZipFile(zipfile_path, 'w') for file in files: zipfile.write(file, arcname=file.name) rmtree(tmpdir) if script := journal['files'].get('backup'): print('Found script, running...') run(['bash', '-c', script.replace('%ZIPFILE%', f"'{str(zipfile_path.absolute())}'")]) return zipfile_path def open_journal(journal, date): if not journal['days'].get(date): backup_journal(journal) journal['days'][date] = create_day(journal, date) tmpdir = Path(mkdtemp()) export_journal(journal, tmpdir) while True: try: open_editor(tmpdir / f'{date}.md') new_journal = import_journal(tmpdir) break except Exception as _: traceback.print_exc() input('Press enter to try again...') return new_journal ### COMMAND HANDLERS def handle_open(args): subcommand = nth_or_default(0, args, 'today') if date := evaluate_time_expression(subcommand): save_journal(open_journal(load_journal(), format_date(date))) else: print(f'Invalid subcommand: {subcommand}') def handle_edit(args): subcommand = nth_or_default(0, args, 'foods') journal = load_journal() if subcommand in journal['files']: journal['files'][subcommand] = edit_text(journal['files'][subcommand]) elif prompt(f'Unknown file: {subcommand}, create new?'): journal['files'][subcommand] = edit_text('') save_journal(journal) def handle_import(args): if len(args) < 1: print('Missing directory.') return path = Path(args[0]) if not path.is_dir(): print(f'Invalid directory: {path}') return save_journal(import_journal(path)) def handle_export(args): if len(args) < 1: print('Missing directory.') return path = Path(args[0]) if not path.is_dir(): print(f'Invalid directory: {path}') return export_journal(load_journal(), path) def handle_test(args): journal = load_journal() journal_orig = deepcopy(journal) for day in journal['days']: journal['days'][day] = parse_day(generate_day(journal['days'][day])) if journal != journal_orig: print('Test failed!') print('Dumping journal.fail.json and journal.fail.orig.json...') Path('journal.fail.json').write_text(json.dumps(journal, indent=4)) Path('journal.fail.orig.json').write_text(json.dumps(journal_orig, indent=4)) else: print('Test passed!') def handle_summary(args): def generate_food_summary(day): result = '' def print(str=''): nonlocal result result += '\n' + str foods, recipes = get_foods_file() daily_calories = 0.0 daily_protein = 0.0 for entry in day['entries']: has_printed = False entry_calories = 0.0 entry_protein = 0.0 for diet in (b for b in entry['blocks'] if type(b) != str and b['type'] == 'diet'): if not has_printed: print(f'-- {format_timestamp(entry["timestamp"])}') has_printed = True value = diet['amount'] name = diet['food'] if name in recipes: food = recipes[name] if value == 0.0: value = food['TOTAL'] food = {k: v*(value/food['TOTAL']) for k,v in food.items()} elif name in foods: if value == 0.0: value = 100 food = {k: v*(value/100.0) for k,v in foods[name].items()} else: print(f'ERROR: Invalid diet entry: {diet}') continue protein = round(food.get('Protein', 0.0), 2) calories = round(food.get('Energy', 0.0), 2) entry_calories += calories entry_protein += protein print(f'{name:<20} {value:<6}g, {calories:<6}kcal, {protein:<6}g protein') if has_printed: entry_calories = round(entry_calories, 2) entry_protein = round(entry_protein, 2) print(f'-- TOTAL: {entry_calories}kcal, {entry_protein}g protein') print() daily_calories += entry_calories daily_protein += entry_protein print(f'-- DAILY TOTAL ({daily_calories}kcal, {daily_protein}g protein)') return result subcommand = nth_or_default(0, args, 'today') date = evaluate_time_expression(subcommand) if not date: print(f'Invalid time expression: {subcommand}') return date = format_date(date) journal = load_journal() print(generate_food_summary(journal['days'][date])) def handle_backup(args): archive_path = backup_journal(load_journal()) if prompt('Delete backup archive?'): archive_path.unlink() def edit_entries(entries): pass def parse_search_query(query): parts = query.split(',') strings = [] tags = [] for part in parts: if part.startswith('#'): tags.append(part.removeprefix('#')) else: strings.append(part) return strings, tags def edit_entries_by_predicate(journal, predicate, reversed=False): matches = find_entries(journal, predicate) header = f'Number of matches: {len(matches)}' text = header for day, idx, ts in matches: entry = journal['days'][day]['entries'][idx] text += generate_entry(entry) text = edit_text(text) _, *tmp = ENTRY_RE.split(text) entries = [parse_entry(ts, c) for ts, c in list(zip(tmp[::2], tmp[1::2]))] matches_map = {ts: (day, idx) for day, idx, ts in matches} for entry in entries: day, idx = matches_map[entry['timestamp']] journal['days'][day]['entries'][idx] = entry return journal def handle_search(args): strings, tags = parse_search_query(args[0]) def predicate(day, entry, block): if isinstance(block, str): words = get_words(block) if any(s in words for s in strings): return True elif block['type'] == 'tag': if any(t in block['value'] for t in tags): return True save_journal(edit_entries_by_predicate(load_journal(), predicate)) def handle_tasks(args): def predicate(day, entry, block): if not isinstance(block, str) and block['type'] == 'task': is_done = any(b['type'] == 'done' for b in entry['blocks'] if not isinstance(b, str)) return not is_done save_journal(edit_entries_by_predicate(load_journal(), predicate)) ### MAIN def main(): init_hacky_hackery(load_journal()) command = nth_or_default(1, sys.argv, 'open') args = sys.argv[2:] def handle_invalid(args): print(f'Invalid command: {command}') command_handlers = { 'open': handle_open, 'edit': handle_edit, 'import': handle_import, 'export': handle_export, 'test': handle_test, 'summary': handle_summary, 'backup': handle_backup, 'search': handle_search, 'tasks': handle_tasks } handler = command_handlers.get(command, handle_invalid) handler(args) if __name__ == '__main__': main()