980 lines
26 KiB
Python
980 lines
26 KiB
Python
from copy import deepcopy
|
|
from datetime import datetime, timedelta
|
|
from functools import reduce, partial
|
|
from pathlib import Path
|
|
from shutil import copyfile, rmtree
|
|
from subprocess import run
|
|
from tempfile import mktemp, mkdtemp
|
|
from typing import Any, Callable, Optional, Tuple, TypeVar, Union
|
|
from zipfile import ZipFile
|
|
import json
|
|
import random
|
|
import re
|
|
import sys
|
|
import textwrap
|
|
import traceback
|
|
|
|
### GLOBALS
|
|
|
|
JOURNAL_PATH = Path.home() / '.journal.json'
|
|
|
|
### UTILS
|
|
|
|
|
|
def nth_or_default(n, l, default):
|
|
return l[n] if n < len(l) else default
|
|
|
|
def apply(f, x):
|
|
return f(x)
|
|
|
|
def flip(f):
|
|
return lambda a1, a2: f(a2, a1)
|
|
|
|
def compose(*fns):
|
|
return partial(reduce, flip(apply), fns)
|
|
|
|
def wrap_text(text, columns=80):
|
|
return textwrap.fill(text, columns,
|
|
replace_whitespace=False,
|
|
break_on_hyphens=False,
|
|
break_long_words=False)
|
|
|
|
def split_keep(delims, string):
|
|
res = []
|
|
buf = []
|
|
|
|
def flush_buf():
|
|
nonlocal res, buf
|
|
if buf:
|
|
res.append(''.join(buf))
|
|
buf = []
|
|
|
|
for c in string:
|
|
if c in delims:
|
|
flush_buf()
|
|
res.append(c)
|
|
else:
|
|
buf.append(c)
|
|
|
|
flush_buf()
|
|
|
|
return res
|
|
|
|
def merge_if(pred, l):
|
|
res = []
|
|
|
|
for i, curr in enumerate(l):
|
|
prev = l[i-1] if i-1 >= 0 else None
|
|
if prev and pred(prev, curr):
|
|
res[-1] += curr
|
|
else:
|
|
res.append(curr)
|
|
|
|
return res
|
|
|
|
def open_editor(fpath):
|
|
run(['nvim', '+', str(fpath)])
|
|
|
|
def edit_text(text, suffix=''):
|
|
fpath = Path(mktemp(suffix=suffix))
|
|
|
|
fpath.write_text(text)
|
|
|
|
open_editor(fpath)
|
|
|
|
text = fpath.read_text()
|
|
|
|
fpath.unlink()
|
|
|
|
return text
|
|
|
|
def prompt(text):
|
|
return input(text + ' [y/n] ') == 'y'
|
|
|
|
### DATE UTILS
|
|
|
|
def parse_date(date):
|
|
return datetime.strptime(date, '%Y-%m-%d')
|
|
|
|
def format_date(date):
|
|
return date.strftime('%Y-%m-%d')
|
|
|
|
def today():
|
|
return datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
|
|
|
|
def evaluate_time_expression(expression):
|
|
if expression == 'today':
|
|
return datetime.now()
|
|
elif expression == 'yesterday':
|
|
return datetime.now() - timedelta(days=1)
|
|
|
|
def get_abbr_for_weekday(date):
|
|
return {
|
|
0: 'mo', 1: 'tu', 2: 'we', 3: 'th',
|
|
4: 'fr', 5: 'sa', 6: 'su',
|
|
}[parse_date(date).weekday()]
|
|
|
|
def parse_timestamp(timestamp):
|
|
return int(datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S').timestamp())
|
|
|
|
def format_timestamp(timestamp):
|
|
return datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
### FILE PARSERS
|
|
|
|
def parse_foods_file(text):
|
|
foods_str, recipes_str = text.split('---')
|
|
|
|
def parse_macro(macro):
|
|
name, value = macro.split()
|
|
return (name, float(value.removesuffix('g').removesuffix('kcal')))
|
|
|
|
foods = {
|
|
macros[0]: dict(parse_macro(macro) for macro in macros[1:])
|
|
for macros in [food.split('\n') for food in foods_str.strip().split('\n\n')]
|
|
}
|
|
|
|
recipes = {}
|
|
|
|
def evaluate_ingredients(ingredients):
|
|
result = {}
|
|
|
|
total_weight = 0.0
|
|
for ingredient in ingredients:
|
|
k,v = parse_macro(ingredient)
|
|
if k == 'TOTAL':
|
|
result[k] = v
|
|
continue
|
|
else:
|
|
total_weight += v
|
|
|
|
food = foods.get(k)
|
|
total = 100.0
|
|
if not food:
|
|
food = recipes[k].copy()
|
|
total = food['TOTAL']
|
|
del food['TOTAL']
|
|
|
|
for kk,vv in food.items():
|
|
if kk not in result:
|
|
result[kk] = 0.0
|
|
|
|
result[kk] += vv * (v/total)
|
|
|
|
if 'TOTAL' not in result:
|
|
result['TOTAL'] = total_weight
|
|
|
|
return result
|
|
|
|
for ingredients in [recipe.split('\n') for recipe in recipes_str.strip().split('\n\n')]:
|
|
recipes[ingredients[0]] = evaluate_ingredients(ingredients[1:])
|
|
|
|
return foods, recipes
|
|
|
|
def evaluate_food_entry(foods, recipes, value, name):
|
|
if name in recipes:
|
|
food = recipes[name]
|
|
|
|
if value == 0.0:
|
|
value = food['TOTAL']
|
|
|
|
food = {k: v*(value/food['TOTAL']) for k,v in food.items()}
|
|
elif name in foods:
|
|
if value == 0.0:
|
|
value = 100
|
|
|
|
food = {k: v*(value/100.0) for k,v in foods[name].items()}
|
|
else:
|
|
breakpoint()
|
|
print(f'ERROR: Invalid diet entry: {name}')
|
|
assert False
|
|
|
|
return food
|
|
|
|
def parse_tasks_file(text):
|
|
result = []
|
|
|
|
for task in text.splitlines():
|
|
days, name = task.split(':')
|
|
days = days.split(',')
|
|
result.append((days, name))
|
|
|
|
return result
|
|
|
|
def get_godword(journal):
|
|
return journal['files']['godword'].strip().split('\n')
|
|
|
|
def get_habits(journal):
|
|
return journal['files']['habits'].strip().split('\n')
|
|
|
|
def get_tasks(journal):
|
|
return parse_tasks_file(journal['files']['tasks'])
|
|
|
|
### HACKY HACKERY
|
|
|
|
_global_do_not_use = {}
|
|
|
|
def init_hacky_hackery(journal):
|
|
global _global_do_not_use
|
|
_global_do_not_use = journal
|
|
|
|
def get_foods_file():
|
|
return parse_foods_file(_global_do_not_use['files']['foods'])
|
|
|
|
### HEADER MODULES
|
|
|
|
def get_notifications_for_date(journal, date):
|
|
notifications = []
|
|
|
|
for day in journal['days'].values():
|
|
for entry in day.get('entries'):
|
|
for block in entry['blocks']:
|
|
if not isinstance(block, str) and block['type'] == 'notify':
|
|
if block['day'] == date:
|
|
notifications.append({
|
|
'source': entry['timestamp'],
|
|
'message': block['message']
|
|
})
|
|
|
|
return notifications
|
|
|
|
def get_yesterdays_sticky(journal, date):
|
|
yesterday = format_date(parse_date(date) - timedelta(days=1))
|
|
|
|
if day := journal['days'].get(yesterday):
|
|
if sticky := day['header'].get('sticky'):
|
|
return sticky
|
|
|
|
header_modules = {
|
|
'godword': (
|
|
lambda j, d: [random.choice(get_godword(j)) for _ in range(20)],
|
|
lambda b: b.split(),
|
|
lambda v: f"{' '.join(v[:10])}\n{' '.join(v[10:])}"
|
|
),
|
|
|
|
'habits': (
|
|
lambda j, d: {x: False for x in get_habits(j)},
|
|
lambda b: {
|
|
name.strip(): value[1] == 'x'
|
|
for (value, name) in [
|
|
line.split(maxsplit=1)
|
|
for line in b.splitlines()
|
|
]
|
|
},
|
|
lambda v: '\n'.join(f'[{"x" if v else "-"}] {k}' for k,v in v.items())
|
|
),
|
|
|
|
'notifications': (
|
|
lambda j, d: get_notifications_for_date(j, d),
|
|
lambda b: [{
|
|
'source': parse_timestamp(' '.join(parts[0:2]).strip('[]')),
|
|
'message': ' '.join(parts[2:]),
|
|
} for parts in [line.split() for line in b.splitlines()]],
|
|
lambda v: '\n'.join(f'[[{format_timestamp(n["source"])}]] {n["message"]}' for n in v)
|
|
),
|
|
|
|
'tasks': (
|
|
lambda j, d: {name: False for days, name in get_tasks(j) if get_abbr_for_weekday(d) in days},
|
|
lambda b: {
|
|
name.strip(): value[1] == 'x'
|
|
for (value, name) in [
|
|
line.split(maxsplit=1)
|
|
for line in b.splitlines()
|
|
]
|
|
},
|
|
lambda v: '\n'.join(f'[{"x" if v else "-"}] {k}' for k,v in v.items())
|
|
),
|
|
|
|
'sticky': (
|
|
lambda j, d: get_yesterdays_sticky(j, d),
|
|
lambda b: b,
|
|
lambda v: v,
|
|
)
|
|
}
|
|
|
|
def create_header_module(name, journal, date):
|
|
return header_modules[name][0](journal, date)
|
|
|
|
def parse_header_module(name, block):
|
|
return header_modules[name][1](block)
|
|
|
|
def generate_header_module(name, value):
|
|
return header_modules[name][2](value)
|
|
|
|
### ENTRY MODULES
|
|
|
|
def parse_timer(block):
|
|
rest = block.split()
|
|
|
|
name = None
|
|
timestamp = None
|
|
if len(rest) > 2:
|
|
name, *rest = rest
|
|
if len(rest) > 1:
|
|
timestamp = parse_timestamp(' '.join(rest))
|
|
|
|
result = {}
|
|
|
|
if name:
|
|
result['name'] = name
|
|
if timestamp:
|
|
result['timestamp'] = timestamp
|
|
|
|
return result
|
|
|
|
def generate_timer(value):
|
|
parts = []
|
|
|
|
if name := value.get('name'):
|
|
parts.append(name)
|
|
|
|
if ts := value.get('timestamp'):
|
|
parts.append(format_timestamp(ts))
|
|
|
|
return ' '.join(parts)
|
|
|
|
def parse_exercise(block):
|
|
parts = block.split()
|
|
|
|
if parts[0] == 'walk':
|
|
kind, minutes, distance, steps = parts
|
|
return {
|
|
'kind': kind,
|
|
'minutes': int(minutes.removesuffix('min')),
|
|
'distance': float(distance.removesuffix('km')),
|
|
'steps': int(steps.removesuffix('steps')),
|
|
}
|
|
|
|
elif parts[0] == 'calisthenics':
|
|
kind, split, exercise = parts
|
|
sets, reps = split.split('x')
|
|
return {
|
|
'kind': kind,
|
|
'reps': reps,
|
|
'sets': sets,
|
|
'exercise': exercise,
|
|
}
|
|
|
|
assert False
|
|
|
|
def generate_exercise(value):
|
|
if value['kind'] == 'walk':
|
|
return f'walk {value["minutes"]}min {value["distance"]}km {value["steps"]}steps'
|
|
elif value['kind'] == 'calisthenics':
|
|
return f'calisthenics {value["sets"]}x{value["reps"]} {value["exercise"]}'
|
|
|
|
assert False
|
|
|
|
DEFAULT_PARSER = lambda b: {'value': b}
|
|
DEFAULT_GENERATOR = lambda b: b['value']
|
|
|
|
entry_modules = {
|
|
'diet': (
|
|
lambda b: {'amount': int(b.split()[0].removesuffix('g')), 'food': b.split()[1].strip()},
|
|
lambda v: f'{v["amount"]}g {v["food"]}'),
|
|
'exercise': (parse_exercise, generate_exercise),
|
|
'behavior': (DEFAULT_PARSER, DEFAULT_GENERATOR),
|
|
|
|
'hide': (lambda _: {}, lambda _: ''),
|
|
'info': (DEFAULT_PARSER, compose(DEFAULT_GENERATOR, wrap_text)),
|
|
|
|
'post': (
|
|
lambda b: {'timestamp': parse_timestamp(b.removeprefix('@post ').strip())},
|
|
lambda v: format_timestamp(v["timestamp"])
|
|
),
|
|
'notes': (
|
|
compose(
|
|
lambda b: b.splitlines(),
|
|
lambda s: {'source': s[0], 'title': s[1]},
|
|
),
|
|
lambda v: f'{v["source"]}\n{v["title"]}'
|
|
),
|
|
|
|
'task': (DEFAULT_PARSER, DEFAULT_GENERATOR),
|
|
'start': (parse_timer, generate_timer),
|
|
'stop': (parse_timer, generate_timer),
|
|
'done': (parse_timer, generate_timer),
|
|
|
|
'notify': (
|
|
compose(
|
|
lambda b: b.split(maxsplit=1),
|
|
lambda s: {'day': s[0], 'message': s[1]}
|
|
),
|
|
lambda v: f'{v["day"]} {v["message"]}'
|
|
),
|
|
|
|
'tag': (
|
|
lambda b: {'value': b.split(',')},
|
|
lambda v: ','.join(v['value'])
|
|
)
|
|
}
|
|
|
|
def parse_entry_module(block):
|
|
tag = block.split()[0].removeprefix('@')
|
|
block = block.removeprefix(f'@{tag}').strip()
|
|
|
|
return {'type': tag} | entry_modules[tag][0](block)
|
|
|
|
def generate_entry_module(block):
|
|
if block['type'] == 'notes':
|
|
return f'@notes\n{entry_modules[block["type"]][1](block)}'
|
|
|
|
return f'@{block["type"]} {entry_modules[block["type"]][1](block)}'
|
|
|
|
### READ-ONLY STATS SECTION FUNCTIONS
|
|
|
|
def generate_stats(page):
|
|
if not page['entries']:
|
|
return ''
|
|
|
|
result = ''
|
|
|
|
num_entries = len(page['entries'])
|
|
num_blocks = sum(len(entry['blocks']) for entry in page['entries'])
|
|
text_concat = ' '.join(b for e in page['entries'] for b in e['blocks'] if isinstance(b, str))
|
|
num_words = len(text_concat.split())
|
|
result += f'Entries: {num_entries}, Blocks: {num_blocks}, Words: {num_words}'
|
|
|
|
last_entry = max(e['timestamp'] for e in page['entries'])
|
|
first_entry = min(e['timestamp'] for e in page['entries'])
|
|
entry_delta = last_entry - first_entry
|
|
entry_hours = round(entry_delta / 60 / 60, 2)
|
|
result += f'\nFirst: {format_timestamp(first_entry)}, Last: {format_timestamp(last_entry)}, Hours: {entry_hours}'
|
|
|
|
calories = 0
|
|
carbs = 0
|
|
fat = 0
|
|
protein = 0
|
|
sugar = 0
|
|
num_meals = 0
|
|
first_meal = float('inf')
|
|
last_meal = float('-inf')
|
|
|
|
foods, recipes = get_foods_file()
|
|
for entry in page['entries']:
|
|
did_count = False
|
|
for block in entry['blocks']:
|
|
if not isinstance(block, str) and block['type'] == 'diet':
|
|
food = evaluate_food_entry(foods, recipes, block['amount'], block['food'])
|
|
|
|
if not did_count:
|
|
num_meals += 1
|
|
first_meal = min(entry['timestamp'], first_meal)
|
|
last_meal = max(entry['timestamp'], last_meal)
|
|
did_count = True
|
|
|
|
calories += food['Energy']
|
|
carbs += food.get('Carbs', 0)
|
|
fat += food.get('Fat', 0)
|
|
protein += food.get('Protein', 0)
|
|
sugar += food.get('Sugar', 0)
|
|
|
|
carbs_proportion = round(carbs * 4 / calories * 100) if carbs and calories else 0
|
|
fat_proportion = round(fat * 9 / calories * 100) if fat and calories else 0
|
|
protein_proportion = round(protein * 4 / calories * 100) if protein and calories else 0
|
|
|
|
calories, carbs, fat, protein, sugar = map(partial(round, ndigits=2), [calories, carbs, fat, protein, sugar])
|
|
|
|
meal_delta = last_meal - first_meal
|
|
meal_hours = round(meal_delta / 60 / 60, 2)
|
|
|
|
result += f'\nCalories: {calories} ({carbs_proportion}/{fat_proportion}/{protein_proportion}, {protein}/{sugar}), Meals: {num_meals}, Hours: {meal_hours}'
|
|
|
|
return result
|
|
|
|
### PAGE FUNCTIONS
|
|
|
|
def create_header(journal, date):
|
|
return {
|
|
module: create_header_module(module, journal, date)
|
|
for module in header_modules
|
|
}
|
|
|
|
def create_entry(journal, date):
|
|
return {
|
|
'timestamp': int(today().timestamp()),
|
|
'blocks': []
|
|
}
|
|
|
|
def create_day(journal, date):
|
|
return {
|
|
'title': date,
|
|
'header': create_header(journal, date),
|
|
'entries': []
|
|
}
|
|
|
|
def parse_header(text):
|
|
def split_into_blocks(text):
|
|
return [b.strip() for b in re.split(r'\n{2,}', text) if b.strip() != '']
|
|
|
|
modules = split_into_blocks(text)
|
|
|
|
result = {}
|
|
|
|
for module in modules:
|
|
name, block = module.split('\n', maxsplit=1)
|
|
name = name.lower().removesuffix(':')
|
|
result[name] = parse_header_module(name, block)
|
|
|
|
return result
|
|
|
|
def generate_header(header):
|
|
result = ''
|
|
|
|
for name, header in header.items():
|
|
if not header:
|
|
continue
|
|
|
|
result += f'\n\n{name.title()}:\n'
|
|
result += generate_header_module(name, header)
|
|
|
|
return result
|
|
|
|
def parse_entry(timestamp, content):
|
|
def merge_notes_block(l):
|
|
res = []
|
|
|
|
i = 0
|
|
while i < len(l):
|
|
if l[i] == '@notes':
|
|
# notes nl source nl title
|
|
res.append('\n'.join([l[i], l[i+2], l[i+4]]))
|
|
i += 5
|
|
else:
|
|
res.append(l[i])
|
|
i += 1
|
|
|
|
return res
|
|
|
|
def merge_wrapped_lines(l):
|
|
TIMESTAMP_LENGTH = len('2020-02-02 02:02:02 ')
|
|
POST_BLOCK_LENGTH = len('@post 2020-02-02 02:02:02 ')
|
|
COLUMN_LIMIT = 80
|
|
|
|
res = []
|
|
|
|
i = 0
|
|
while i < len(l):
|
|
curr = l[i]
|
|
prev = l[i-1] if i > 0 else None
|
|
next = l[i+1] if i+1 < len(l) else None
|
|
|
|
before_prev = l[i-2] if i-2 >= 0 else None
|
|
before_before_prev = l[i-3] if i-3 >= 0 else None
|
|
|
|
# ['aoeu', '\n', 'aoeu']
|
|
if prev and curr == '\n' and next:
|
|
len_prev = len(prev)
|
|
|
|
# first block is preceded by timestamp
|
|
if i - 1 == 0:
|
|
len_prev += TIMESTAMP_LENGTH
|
|
|
|
if before_prev and before_before_prev and before_prev.startswith('@post') and all(c == '\n' for c in before_before_prev):
|
|
len_prev += POST_BLOCK_LENGTH
|
|
|
|
# do not wrap indented lines
|
|
if not next[0].isspace():
|
|
next_word = next.split()[0]
|
|
|
|
# merge only if text is actually wrapped
|
|
if len_prev + len(next_word) >= COLUMN_LIMIT:
|
|
res[-1] += ' ' + next
|
|
i += 2
|
|
continue
|
|
|
|
res.append(curr)
|
|
i += 1
|
|
|
|
return res
|
|
|
|
def split_post_block(l):
|
|
res = []
|
|
|
|
POST_BLOCK_LENGTH = len('@post 2020-02-02 02:02:02')
|
|
|
|
i = 0
|
|
while i < len(l):
|
|
curr = l[i]
|
|
|
|
if curr.startswith('@post'):
|
|
res.append(curr[:POST_BLOCK_LENGTH])
|
|
res.append(curr[POST_BLOCK_LENGTH+1:])
|
|
else:
|
|
res.append(curr)
|
|
|
|
i += 1
|
|
|
|
return res
|
|
|
|
split_into_blocks = compose(
|
|
# split the text into sections by newline and tag symbol, keeping the separators
|
|
partial(split_keep, ('\n', '@')),
|
|
|
|
# merge sequential newlines together into a single whitespace block
|
|
partial(merge_if, lambda p, c: p == c == '\n'),
|
|
|
|
## TAG PARSING
|
|
# attach escaped tags
|
|
partial(merge_if, lambda p, c: c == '@' and p[-1] == '\\'),
|
|
# attach tag
|
|
partial(merge_if, lambda p, c: p == '@'),
|
|
# attach tags which do not come after newline or another tag
|
|
partial(merge_if, lambda p, c: c[0] == '@' and not (not p[-1] != '\n' or (p[0] == '@' and p[-1] == ' '))),
|
|
|
|
## SPECIAL BLOCK PARSING
|
|
# merge notes block (because it spans 3 lines or 5 blocks)
|
|
merge_notes_block,
|
|
# split post block (because next block could be attached to it)
|
|
split_post_block,
|
|
|
|
# strip all non-whitespace blocks
|
|
partial(map, lambda s: s if s.isspace() else s.rstrip()), list,
|
|
# merge escaped tags with following text
|
|
partial(merge_if, lambda p, c: p.endswith('\\@')),
|
|
|
|
# merge wrapped lines
|
|
merge_wrapped_lines,
|
|
|
|
# remove trailing whitespace block
|
|
lambda b: b if b and not all(c == '\n' for c in b[-1]) else b[:-1],
|
|
)
|
|
|
|
return {
|
|
'timestamp': parse_timestamp(timestamp.strip()),
|
|
'blocks': [(parse_entry_module(b) if b.startswith('@') else b) for b in split_into_blocks(content)],
|
|
}
|
|
|
|
def generate_entry(entry):
|
|
def format_block(curr: Any, prev: Any, before_prev: Any):
|
|
def format_text(text):
|
|
if all(c == '\n' for c in curr):
|
|
return text
|
|
|
|
DUMMY_TS = '2020-02-02 02:02:02 '
|
|
DUMMY_POST = '@post 2020-02-02 02:02:02 '
|
|
|
|
is_first = not prev
|
|
is_post = (before_prev and all(c == '\n' for c in before_prev) and isinstance(prev, dict) and prev['type'] == 'post')
|
|
|
|
if is_first:
|
|
text = DUMMY_TS + text
|
|
|
|
if is_post:
|
|
text = DUMMY_POST + text
|
|
|
|
length = len(text)
|
|
|
|
if length > 80:
|
|
text = wrap_text(text)
|
|
|
|
if is_post:
|
|
text = text.removeprefix(DUMMY_POST)
|
|
|
|
if is_first:
|
|
text = text.removeprefix(DUMMY_TS)
|
|
|
|
return text
|
|
|
|
formatted = format_text(curr) if isinstance(curr, str) else generate_entry_module(curr)
|
|
|
|
if result[-1] != '\n' and not all(c == '\n' for c in formatted):
|
|
formatted = ' ' + formatted
|
|
|
|
return formatted
|
|
|
|
result = f'\n\n{format_timestamp(entry["timestamp"])}'
|
|
|
|
i = 0
|
|
while i < len(entry['blocks']):
|
|
curr = entry['blocks'][i]
|
|
prev = entry['blocks'][i-1] if i-1 >= 0 else None
|
|
before_prev = entry['blocks'][i-2] if i-2 >= 0 else None
|
|
|
|
result += format_block(curr, prev, before_prev)
|
|
|
|
i += 1
|
|
|
|
return result
|
|
|
|
def parse_day(text):
|
|
# discard read-only QS section
|
|
text = text[text.find('#'):]
|
|
|
|
ENTRY_RE = re.compile(r'^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) ?', re.MULTILINE)
|
|
|
|
header, *tmp = ENTRY_RE.split(text)
|
|
entries = list(zip(tmp[::2], tmp[1::2]))
|
|
|
|
title, *header = header.split('\n', maxsplit=1)
|
|
title = title.removeprefix('# ')
|
|
header = header[0] if len(header) else ''
|
|
|
|
return {
|
|
'title': title,
|
|
'header': parse_header(header),
|
|
'entries': [parse_entry(timestamp, content) for timestamp, content in entries],
|
|
}
|
|
|
|
def generate_day(day):
|
|
result = ''
|
|
|
|
result += generate_stats(day)
|
|
|
|
result += f'\n\n# {day["title"]}'
|
|
|
|
result += generate_header(day['header'])
|
|
|
|
for entry in day['entries']:
|
|
result += generate_entry(entry)
|
|
|
|
result += '\n'
|
|
|
|
return result
|
|
|
|
### COMMAND UTILS
|
|
|
|
def load_journal():
|
|
return json.loads(JOURNAL_PATH.read_text())
|
|
|
|
def save_journal(journal):
|
|
JOURNAL_PATH.write_text(json.dumps(journal))
|
|
|
|
def import_journal(fpath):
|
|
return {
|
|
'days': {
|
|
fname.stem: parse_day(fname.read_text())
|
|
for fname in list(sorted(fpath.glob('*.md')))
|
|
},
|
|
'files': {
|
|
fname: (fpath / fname).read_text()
|
|
for fname in ['habits', 'godword', 'tasks', 'foods']
|
|
}
|
|
}
|
|
|
|
def export_journal(journal, fpath):
|
|
for day in journal['days'].values():
|
|
(fpath / (day['title'] + '.md')).write_text(generate_day(day))
|
|
|
|
for fname, content in journal['files'].items():
|
|
(fpath / fname).write_text(content)
|
|
|
|
def backup_journal(journal):
|
|
print('Creating backup...')
|
|
|
|
tmpdir = Path(mkdtemp())
|
|
|
|
export_journal(journal, tmpdir)
|
|
copyfile(str(JOURNAL_PATH), str(tmpdir / 'journal.json'))
|
|
|
|
files = Path(tmpdir).glob('*')
|
|
|
|
current_date = datetime.now().strftime('%Y-%m-%d')
|
|
|
|
zipfile_path = Path(f'{current_date}.zip')
|
|
|
|
zipfile = ZipFile(zipfile_path, 'w')
|
|
|
|
for file in files:
|
|
zipfile.write(file, arcname=file.name)
|
|
|
|
rmtree(tmpdir)
|
|
|
|
if script := journal['files'].get('backup'):
|
|
print('Found script, running...')
|
|
run(['bash', '-c', script.replace('%ZIPFILE%', f"'{str(zipfile_path.absolute())}'")])
|
|
|
|
return zipfile_path
|
|
|
|
def open_journal(journal, date):
|
|
if not journal['days'].get(date):
|
|
backup_journal(journal)
|
|
journal['days'][date] = create_day(journal, date)
|
|
|
|
tmpdir = Path(mkdtemp())
|
|
export_journal(journal, tmpdir)
|
|
|
|
while True:
|
|
try:
|
|
open_editor(tmpdir / f'{date}.md')
|
|
new_journal = import_journal(tmpdir)
|
|
break
|
|
except Exception as _:
|
|
traceback.print_exc()
|
|
input('Press enter to try again...')
|
|
|
|
return new_journal
|
|
|
|
### COMMAND HANDLERS
|
|
|
|
def handle_open(args):
|
|
subcommand = nth_or_default(0, args, 'today')
|
|
|
|
if date := evaluate_time_expression(subcommand):
|
|
save_journal(open_journal(load_journal(), format_date(date)))
|
|
else:
|
|
print(f'Invalid subcommand: {subcommand}')
|
|
|
|
def handle_edit(args):
|
|
subcommand = nth_or_default(0, args, 'foods')
|
|
|
|
journal = load_journal()
|
|
|
|
if subcommand in journal['files']:
|
|
journal['files'][subcommand] = edit_text(journal['files'][subcommand])
|
|
elif prompt(f'Unknown file: {subcommand}, create new?'):
|
|
journal['files'][subcommand] = edit_text('')
|
|
|
|
save_journal(journal)
|
|
|
|
def handle_import(args):
|
|
if len(args) < 1:
|
|
print('Missing directory.')
|
|
return
|
|
|
|
path = Path(args[0])
|
|
|
|
if not path.is_dir():
|
|
print(f'Invalid directory: {path}')
|
|
return
|
|
|
|
save_journal(import_journal(path))
|
|
|
|
def handle_export(args):
|
|
if len(args) < 1:
|
|
print('Missing directory.')
|
|
return
|
|
|
|
path = Path(args[0])
|
|
|
|
if not path.is_dir():
|
|
print(f'Invalid directory: {path}')
|
|
return
|
|
|
|
export_journal(load_journal(), path)
|
|
|
|
def handle_test(args):
|
|
journal = load_journal()
|
|
|
|
journal_orig = deepcopy(journal)
|
|
|
|
for day in journal['days']:
|
|
journal['days'][day] = parse_day(generate_day(journal['days'][day]))
|
|
|
|
if journal != journal_orig:
|
|
print('Test failed!')
|
|
|
|
print('Dumping journal.fail.json and journal.fail.orig.json...')
|
|
Path('journal.fail.json').write_text(json.dumps(journal, indent=4))
|
|
Path('journal.fail.orig.json').write_text(json.dumps(journal_orig, indent=4))
|
|
else:
|
|
print('Test passed!')
|
|
|
|
def handle_summary(args):
|
|
def generate_food_summary(day):
|
|
result = ''
|
|
|
|
def print(str=''):
|
|
nonlocal result
|
|
result += '\n' + str
|
|
|
|
foods, recipes = get_foods_file()
|
|
|
|
daily_calories = 0.0
|
|
daily_protein = 0.0
|
|
|
|
for entry in day['entries']:
|
|
has_printed = False
|
|
entry_calories = 0.0
|
|
entry_protein = 0.0
|
|
for diet in (b for b in entry['blocks'] if type(b) != str and b['type'] == 'diet'):
|
|
if not has_printed:
|
|
print(f'-- {format_timestamp(entry["timestamp"])}')
|
|
has_printed = True
|
|
|
|
value = diet['amount']
|
|
name = diet['food']
|
|
|
|
if name in recipes:
|
|
food = recipes[name]
|
|
|
|
if value == 0.0:
|
|
value = food['TOTAL']
|
|
|
|
food = {k: v*(value/food['TOTAL']) for k,v in food.items()}
|
|
elif name in foods:
|
|
if value == 0.0:
|
|
value = 100
|
|
|
|
food = {k: v*(value/100.0) for k,v in foods[name].items()}
|
|
else:
|
|
print(f'ERROR: Invalid diet entry: {diet}')
|
|
continue
|
|
|
|
protein = round(food.get('Protein', 0.0), 2)
|
|
calories = round(food.get('Energy', 0.0), 2)
|
|
|
|
entry_calories += calories
|
|
entry_protein += protein
|
|
|
|
print(f'{name:<20} {value:<6}g, {calories:<6}kcal, {protein:<6}g protein')
|
|
|
|
if has_printed:
|
|
entry_calories = round(entry_calories, 2)
|
|
entry_protein = round(entry_protein, 2)
|
|
print(f'-- TOTAL: {entry_calories}kcal, {entry_protein}g protein')
|
|
print()
|
|
|
|
daily_calories += entry_calories
|
|
daily_protein += entry_protein
|
|
|
|
print(f'-- DAILY TOTAL ({daily_calories}kcal, {daily_protein}g protein)')
|
|
|
|
return result
|
|
|
|
subcommand = nth_or_default(0, args, 'today')
|
|
|
|
date = evaluate_time_expression(subcommand)
|
|
if not date:
|
|
print(f'Invalid time expression: {subcommand}')
|
|
return
|
|
|
|
date = format_date(date)
|
|
|
|
journal = load_journal()
|
|
|
|
print(generate_food_summary(journal['days'][date]))
|
|
|
|
def handle_backup(args):
|
|
archive_path = backup_journal(load_journal())
|
|
if prompt('Delete backup archive?'):
|
|
archive_path.unlink()
|
|
|
|
### MAIN
|
|
|
|
def main():
|
|
init_hacky_hackery(load_journal())
|
|
|
|
command = nth_or_default(1, sys.argv, 'open')
|
|
args = sys.argv[2:]
|
|
|
|
def handle_invalid(args):
|
|
print(f'Invalid command: {command}')
|
|
|
|
command_handlers = {
|
|
'open': handle_open,
|
|
'edit': handle_edit,
|
|
'import': handle_import,
|
|
'export': handle_export,
|
|
'test': handle_test,
|
|
'summary': handle_summary,
|
|
'backup': handle_backup,
|
|
}
|
|
|
|
handler = command_handlers.get(command, handle_invalid)
|
|
|
|
handler(args)
|
|
|
|
if __name__ == '__main__':
|
|
main()
|