static typing?
This commit is contained in:
370
journal.py
370
journal.py
@@ -1,10 +1,11 @@
|
||||
from copy import deepcopy
|
||||
from datetime import datetime, timedelta
|
||||
from functools import reduce, partial, cache
|
||||
from functools import reduce, partial
|
||||
from pathlib import Path
|
||||
from shutil import copyfile, rmtree
|
||||
from subprocess import run
|
||||
from tempfile import mktemp, mkdtemp
|
||||
from typing import Any, Callable, Optional, Tuple, TypeVar, Union
|
||||
from zipfile import ZipFile
|
||||
import json
|
||||
import random
|
||||
@@ -16,37 +17,37 @@ import traceback
|
||||
### GLOBALS
|
||||
|
||||
JOURNAL_PATH = Path.home() / '.journal.json'
|
||||
T = TypeVar('T')
|
||||
|
||||
AnyCallable = Callable[..., Any]
|
||||
AnyDict = dict[str, Any]
|
||||
|
||||
Journal = AnyDict
|
||||
|
||||
### UTILS
|
||||
|
||||
def compose(*fns):
|
||||
return partial(reduce, flip(apply), fns)
|
||||
|
||||
def nth_or_default(n, l, default):
|
||||
def nth_or_default(n: int, l: list[T], default: T):
|
||||
return l[n] if n < len(l) else default
|
||||
|
||||
def apply(f, x):
|
||||
def apply(f: AnyCallable, x: Any) -> Any:
|
||||
return f(x)
|
||||
|
||||
def flip(f):
|
||||
def flip(f: AnyCallable) -> AnyCallable:
|
||||
return lambda a1, a2: f(a2, a1)
|
||||
|
||||
def identity(x):
|
||||
return x
|
||||
def compose(*fns: AnyCallable) -> Any:
|
||||
return partial(reduce, flip(apply), fns)
|
||||
|
||||
def lazy_get(obj, key, default):
|
||||
result = obj.get(key)
|
||||
return result if result is not None else default()
|
||||
|
||||
def wrap_text(text, columns=80):
|
||||
def wrap_text(text: str, columns: int = 80) -> str:
|
||||
return textwrap.fill(text, columns,
|
||||
replace_whitespace=False,
|
||||
break_on_hyphens=False,
|
||||
break_long_words=False)
|
||||
|
||||
def split_keep(delims, string):
|
||||
res = []
|
||||
buf = []
|
||||
def split_keep(delims: list[str], string: str) -> list[str]:
|
||||
res: list[str] = []
|
||||
buf: list[str] = []
|
||||
|
||||
def flush_buf():
|
||||
nonlocal res, buf
|
||||
@@ -65,8 +66,8 @@ def split_keep(delims, string):
|
||||
|
||||
return res
|
||||
|
||||
def merge_if(pred, l):
|
||||
res = []
|
||||
def merge_if(pred: Callable[[str, str], bool], l: list[str]) -> list[str]:
|
||||
res: list[str] = []
|
||||
|
||||
for i, curr in enumerate(l):
|
||||
prev = l[i-1] if i-1 >= 0 else None
|
||||
@@ -77,15 +78,15 @@ def merge_if(pred, l):
|
||||
|
||||
return res
|
||||
|
||||
def editor(fpath):
|
||||
def open_editor(fpath: Path) -> None:
|
||||
run(['nvim', '+', str(fpath)])
|
||||
|
||||
def edit(text, suffix=''):
|
||||
def edit_text(text: str, suffix: str = '') -> str:
|
||||
fpath = Path(mktemp(suffix=suffix))
|
||||
|
||||
fpath.write_text(text)
|
||||
|
||||
editor(fpath)
|
||||
open_editor(fpath)
|
||||
|
||||
text = fpath.read_text()
|
||||
|
||||
@@ -93,53 +94,56 @@ def edit(text, suffix=''):
|
||||
|
||||
return text
|
||||
|
||||
def prompt(text):
|
||||
def prompt(text: str) -> bool:
|
||||
return input(text + ' [y/n] ') == 'y'
|
||||
|
||||
### DATE UTILS
|
||||
|
||||
def parse_date(date):
|
||||
def parse_date(date: str) -> datetime:
|
||||
return datetime.strptime(date, '%Y-%m-%d')
|
||||
|
||||
def format_date(date):
|
||||
def format_date(date: datetime) -> str:
|
||||
return date.strftime('%Y-%m-%d')
|
||||
|
||||
def parse_timestamp(timestamp):
|
||||
return datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S')
|
||||
def today() -> datetime:
|
||||
return datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
|
||||
|
||||
def format_timestamp(timestamp):
|
||||
return datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
|
||||
|
||||
def evaluate_time_expression(expression):
|
||||
def evaluate_time_expression(expression: str) -> Optional[datetime]:
|
||||
if expression == 'today':
|
||||
return datetime.now()
|
||||
elif expression == 'yesterday':
|
||||
return datetime.now() - timedelta(days=1)
|
||||
|
||||
def get_abbr_for_weekday(date):
|
||||
def get_abbr_for_weekday(date: str) -> str:
|
||||
return {
|
||||
0: 'mo', 1: 'tu', 2: 'we', 3: 'th',
|
||||
4: 'fr', 5: 'sa', 6: 'su',
|
||||
}[parse_date(date).weekday()]
|
||||
|
||||
def parse_timestamp(timestamp: str) -> int:
|
||||
return int(datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S').timestamp())
|
||||
|
||||
def format_timestamp(timestamp: int) -> str:
|
||||
return datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
|
||||
|
||||
### FILE PARSERS
|
||||
|
||||
def parse_foods_file(text):
|
||||
def parse_foods_file(text: str) -> tuple[AnyDict, AnyDict]:
|
||||
foods_str, recipes_str = text.split('---')
|
||||
|
||||
def parse_macro(macro):
|
||||
def parse_macro(macro: str) -> tuple[str, float]:
|
||||
name, value = macro.split()
|
||||
return (name, float(value.removesuffix('g').removesuffix('kcal')))
|
||||
|
||||
foods = {
|
||||
foods: AnyDict = {
|
||||
macros[0]: dict(parse_macro(macro) for macro in macros[1:])
|
||||
for macros in [food.split('\n') for food in foods_str.strip().split('\n\n')]
|
||||
}
|
||||
|
||||
recipes = {}
|
||||
recipes: AnyDict = {}
|
||||
|
||||
def evaluate_ingredients(ingredients):
|
||||
result = {}
|
||||
def evaluate_ingredients(ingredients: list[str]) -> AnyDict:
|
||||
result: dict[str, float] = {}
|
||||
|
||||
total_weight = 0.0
|
||||
for ingredient in ingredients:
|
||||
@@ -173,7 +177,7 @@ def parse_foods_file(text):
|
||||
|
||||
return foods, recipes
|
||||
|
||||
def evaluate_food_entry(foods, recipes, value, name):
|
||||
def evaluate_food_entry(foods: AnyDict, recipes: AnyDict, value: float, name: str) -> AnyDict:
|
||||
if name in recipes:
|
||||
food = recipes[name]
|
||||
|
||||
@@ -193,8 +197,8 @@ def evaluate_food_entry(foods, recipes, value, name):
|
||||
|
||||
return food
|
||||
|
||||
def parse_tasks_file(text):
|
||||
result = []
|
||||
def parse_tasks_file(text: str) -> list[tuple[list[str], str]]:
|
||||
result: list[tuple[list[str], str]] = []
|
||||
|
||||
for task in text.splitlines():
|
||||
days, name = task.split(':')
|
||||
@@ -203,33 +207,32 @@ def parse_tasks_file(text):
|
||||
|
||||
return result
|
||||
|
||||
@cache
|
||||
def get_godword(journal):
|
||||
def get_godword(journal: Journal) -> list[str]:
|
||||
return journal['files']['godword'].strip().split('\n')
|
||||
|
||||
@cache
|
||||
def get_habits(journal):
|
||||
def get_habits(journal: Journal) -> list[str]:
|
||||
return journal['files']['habits'].strip().split('\n')
|
||||
|
||||
@cache
|
||||
def get_tasks(journal):
|
||||
def get_tasks(journal: Journal) -> list[tuple[list[str], str]]:
|
||||
return parse_tasks_file(journal['files']['tasks'])
|
||||
|
||||
### HACKY HACKERY
|
||||
|
||||
_GLOBAL_DO_NOT_USE = {}
|
||||
_global_do_not_use: Journal = {}
|
||||
|
||||
def init_hacky_hackery(journal):
|
||||
global _GLOBAL_DO_NOT_USE
|
||||
_GLOBAL_DO_NOT_USE = journal
|
||||
def init_hacky_hackery(journal: Journal) -> None:
|
||||
global _global_do_not_use
|
||||
_global_do_not_use = journal
|
||||
|
||||
def get_foods_file():
|
||||
return parse_foods_file(_GLOBAL_DO_NOT_USE['files']['foods'])
|
||||
return parse_foods_file(_global_do_not_use['files']['foods'])
|
||||
|
||||
### HEADER MODULES
|
||||
|
||||
def get_notifications_for_date(journal, date):
|
||||
notifications = []
|
||||
Notification = dict[str, Union[int, str]]
|
||||
|
||||
def get_notifications_for_date(journal: Journal, date: str) -> list[Notification]:
|
||||
notifications: list[Notification] = []
|
||||
|
||||
for day in journal['days'].values():
|
||||
for entry in day.get('entries'):
|
||||
@@ -243,14 +246,20 @@ def get_notifications_for_date(journal, date):
|
||||
|
||||
return notifications
|
||||
|
||||
def get_yesterdays_sticky(journal, date):
|
||||
def get_yesterdays_sticky(journal: Journal, date: str) -> Optional[str]:
|
||||
yesterday = format_date(parse_date(date) - timedelta(days=1))
|
||||
|
||||
if day := journal['days'].get(yesterday):
|
||||
if sticky := day['header'].get('sticky'):
|
||||
return sticky
|
||||
|
||||
header_modules = {
|
||||
HeaderModule = tuple[
|
||||
Callable[[Journal, str], Any],
|
||||
Callable[[str], Any],
|
||||
Callable[[Any], str],
|
||||
]
|
||||
|
||||
header_modules: dict[str, HeaderModule] = {
|
||||
'godword': (
|
||||
lambda j, d: [random.choice(get_godword(j)) for _ in range(20)],
|
||||
lambda b: b.split(),
|
||||
@@ -272,7 +281,7 @@ header_modules = {
|
||||
'notifications': (
|
||||
lambda j, d: get_notifications_for_date(j, d),
|
||||
lambda b: [{
|
||||
'source': int(parse_timestamp(' '.join(parts[0:2]).strip('[]')).timestamp()),
|
||||
'source': parse_timestamp(' '.join(parts[0:2]).strip('[]')),
|
||||
'message': ' '.join(parts[2:]),
|
||||
} for parts in [line.split() for line in b.splitlines()]],
|
||||
lambda v: '\n'.join(f'[[{format_timestamp(n["source"])}]] {n["message"]}' for n in v)
|
||||
@@ -292,23 +301,23 @@ header_modules = {
|
||||
|
||||
'sticky': (
|
||||
lambda j, d: get_yesterdays_sticky(j, d),
|
||||
identity,
|
||||
identity
|
||||
lambda b: b,
|
||||
lambda v: v,
|
||||
)
|
||||
}
|
||||
|
||||
def create_header_module(name, journal, date):
|
||||
def create_header_module(name: str, journal: Journal, date: str) -> AnyDict:
|
||||
return header_modules[name][0](journal, date)
|
||||
|
||||
def parse_header_module(name, block):
|
||||
def parse_header_module(name: str, block: str) -> AnyDict:
|
||||
return header_modules[name][1](block)
|
||||
|
||||
def generate_header_module(name, value):
|
||||
def generate_header_module(name: str, value: AnyDict) -> str:
|
||||
return header_modules[name][2](value)
|
||||
|
||||
### ENTRY MODULES
|
||||
|
||||
def parse_timer(block):
|
||||
def parse_timer(block: str) -> AnyDict:
|
||||
rest = block.split()
|
||||
|
||||
name = None
|
||||
@@ -316,9 +325,10 @@ def parse_timer(block):
|
||||
if len(rest) > 2:
|
||||
name, *rest = rest
|
||||
if len(rest) > 1:
|
||||
timestamp = int(parse_timestamp(' '.join(rest)).timestamp())
|
||||
timestamp = parse_timestamp(' '.join(rest))
|
||||
|
||||
result: AnyDict = {}
|
||||
|
||||
result = {}
|
||||
if name:
|
||||
result['name'] = name
|
||||
if timestamp:
|
||||
@@ -326,18 +336,18 @@ def parse_timer(block):
|
||||
|
||||
return result
|
||||
|
||||
def generate_timer(block):
|
||||
parts = []
|
||||
def generate_timer(value: AnyDict) -> str:
|
||||
parts: list[str] = []
|
||||
|
||||
if name := block.get('name'):
|
||||
if name := value.get('name'):
|
||||
parts.append(name)
|
||||
|
||||
if ts := block.get('timestamp'):
|
||||
if ts := value.get('timestamp'):
|
||||
parts.append(format_timestamp(ts))
|
||||
|
||||
return ' '.join(parts)
|
||||
|
||||
def parse_exercise(block):
|
||||
def parse_exercise(block: str) -> AnyDict:
|
||||
parts = block.split()
|
||||
|
||||
if parts[0] == 'walk':
|
||||
@@ -361,21 +371,21 @@ def parse_exercise(block):
|
||||
|
||||
assert False
|
||||
|
||||
def generate_exercise(block):
|
||||
if block['kind'] == 'walk':
|
||||
return f'walk {block["minutes"]}min {block["distance"]}km {block["steps"]}steps'
|
||||
elif block['kind'] == 'calisthenics':
|
||||
return f'calisthenics {block["sets"]}x{block["reps"]} {block["exercise"]}'
|
||||
def generate_exercise(value: AnyDict) -> str:
|
||||
if value['kind'] == 'walk':
|
||||
return f'walk {value["minutes"]}min {value["distance"]}km {value["steps"]}steps'
|
||||
elif value['kind'] == 'calisthenics':
|
||||
return f'calisthenics {value["sets"]}x{value["reps"]} {value["exercise"]}'
|
||||
|
||||
assert False
|
||||
|
||||
DEFAULT_PARSER = lambda b: {'value': b}
|
||||
DEFAULT_GENERATOR = lambda b: b['value']
|
||||
DEFAULT_PARSER: AnyCallable = lambda b: {'value': b}
|
||||
DEFAULT_GENERATOR: AnyCallable = lambda b: b['value']
|
||||
|
||||
entry_modules = {
|
||||
entry_modules: dict[str, tuple[Callable[[str], AnyDict], Callable[[AnyDict], str]]] = {
|
||||
'diet': (
|
||||
lambda b: {'amount': int(b.split()[0].removesuffix('g')), 'food': b.split()[1].strip()},
|
||||
lambda b: f'{b["amount"]}g {b["food"]}'),
|
||||
lambda v: f'{v["amount"]}g {v["food"]}'),
|
||||
'exercise': (parse_exercise, generate_exercise),
|
||||
'behavior': (DEFAULT_PARSER, DEFAULT_GENERATOR),
|
||||
|
||||
@@ -383,15 +393,15 @@ entry_modules = {
|
||||
'info': (DEFAULT_PARSER, compose(DEFAULT_GENERATOR, wrap_text)),
|
||||
|
||||
'post': (
|
||||
lambda b: {'timestamp': int(parse_timestamp(b.removeprefix('@post ').strip()).timestamp())},
|
||||
lambda b: format_timestamp(b["timestamp"])
|
||||
lambda b: {'timestamp': parse_timestamp(b.removeprefix('@post ').strip())},
|
||||
lambda v: format_timestamp(v["timestamp"])
|
||||
),
|
||||
'notes': (
|
||||
compose(
|
||||
lambda b: b.splitlines(),
|
||||
lambda s: {'source': s[0], 'title': s[1]},
|
||||
),
|
||||
lambda b: f'{b["source"]}\n{b["title"]}'
|
||||
lambda v: f'{v["source"]}\n{v["title"]}'
|
||||
),
|
||||
|
||||
'task': (DEFAULT_PARSER, DEFAULT_GENERATOR),
|
||||
@@ -404,17 +414,17 @@ entry_modules = {
|
||||
lambda b: b.split(maxsplit=1),
|
||||
lambda s: {'day': s[0], 'message': s[1]}
|
||||
),
|
||||
lambda b: f'{b["day"]} {b["message"]}'
|
||||
lambda v: f'{v["day"]} {v["message"]}'
|
||||
),
|
||||
}
|
||||
|
||||
def parse_entry_module(block: str) -> dict:
|
||||
def parse_entry_module(block: str) -> AnyDict:
|
||||
tag = block.split()[0].removeprefix('@')
|
||||
block = block.removeprefix(f'@{tag}').strip()
|
||||
|
||||
return {'type': tag} | entry_modules[tag][0](block)
|
||||
|
||||
def generate_entry_module(block: dict) -> str:
|
||||
def generate_entry_module(block: AnyDict) -> str:
|
||||
if block['type'] == 'notes':
|
||||
return f'@notes\n{entry_modules[block["type"]][1](block)}'
|
||||
|
||||
@@ -422,7 +432,7 @@ def generate_entry_module(block: dict) -> str:
|
||||
|
||||
### READ-ONLY STATS SECTION FUNCTIONS
|
||||
|
||||
def generate_stats(page):
|
||||
def generate_stats(page: AnyDict) -> str:
|
||||
if not page['entries']:
|
||||
return ''
|
||||
|
||||
@@ -483,19 +493,32 @@ def generate_stats(page):
|
||||
|
||||
### PAGE FUNCTIONS
|
||||
|
||||
def create_header(journal, date):
|
||||
def create_header(journal: Journal, date: str) -> AnyDict:
|
||||
return {
|
||||
module: create_header_module(module, journal, date)
|
||||
for module in header_modules
|
||||
}
|
||||
|
||||
def parse_header(header):
|
||||
def split_into_blocks(text):
|
||||
def create_entry(journal: Journal, date: str) -> AnyDict:
|
||||
return {
|
||||
'timestamp': int(today().timestamp()),
|
||||
'blocks': []
|
||||
}
|
||||
|
||||
def create_day(journal: Journal, date: str) -> AnyDict:
|
||||
return {
|
||||
'title': date,
|
||||
'header': create_header(journal, date),
|
||||
'entries': []
|
||||
}
|
||||
|
||||
def parse_header(text: str) -> AnyDict:
|
||||
def split_into_blocks(text: str) -> list[str]:
|
||||
return [b.strip() for b in re.split(r'\n{2,}', text) if b.strip() != '']
|
||||
|
||||
modules = split_into_blocks(header)
|
||||
modules = split_into_blocks(text)
|
||||
|
||||
result = {}
|
||||
result: AnyDict = {}
|
||||
|
||||
for module in modules:
|
||||
name, block = module.split('\n', maxsplit=1)
|
||||
@@ -504,27 +527,21 @@ def parse_header(header):
|
||||
|
||||
return result
|
||||
|
||||
def generate_header(header):
|
||||
def generate_header(header: AnyDict) -> str:
|
||||
result = ''
|
||||
|
||||
for name, value in header.items():
|
||||
if not value:
|
||||
for name, header in header.items():
|
||||
if not header:
|
||||
continue
|
||||
|
||||
result += f'\n\n{name.title()}:\n'
|
||||
result += generate_header_module(name, value)
|
||||
result += generate_header_module(name, header)
|
||||
|
||||
return result
|
||||
|
||||
def create_entry(journal, date):
|
||||
return {
|
||||
'timestamp': int(datetime.now().timestamp()),
|
||||
'blocks': []
|
||||
}
|
||||
|
||||
def parse_entry(entry):
|
||||
def merge_notes_block(l):
|
||||
res = []
|
||||
def parse_entry(timestamp: str, content: str) -> AnyDict:
|
||||
def merge_notes_block(l: list[str]) -> list[str]:
|
||||
res: list[str] = []
|
||||
|
||||
i = 0
|
||||
while i < len(l):
|
||||
@@ -538,12 +555,12 @@ def parse_entry(entry):
|
||||
|
||||
return res
|
||||
|
||||
def merge_wrapped_lines(l):
|
||||
def merge_wrapped_lines(l: list[str]) -> list[str]:
|
||||
TIMESTAMP_LENGTH = len('2020-02-02 02:02:02 ')
|
||||
POST_BLOCK_LENGTH = len('@post 2020-02-02 02:02:02 ')
|
||||
COLUMN_LIMIT = 80
|
||||
|
||||
res = []
|
||||
res: list[str] = []
|
||||
|
||||
i = 0
|
||||
while i < len(l):
|
||||
@@ -580,8 +597,8 @@ def parse_entry(entry):
|
||||
|
||||
return res
|
||||
|
||||
def split_post_block(l):
|
||||
res = []
|
||||
def split_post_block(l: list[str]) -> list[str]:
|
||||
res: list[str] = []
|
||||
|
||||
POST_BLOCK_LENGTH = len('@post 2020-02-02 02:02:02')
|
||||
|
||||
@@ -632,16 +649,14 @@ def parse_entry(entry):
|
||||
lambda b: b if b and not all(c == '\n' for c in b[-1]) else b[:-1],
|
||||
)
|
||||
|
||||
timestamp, content = entry
|
||||
|
||||
return {
|
||||
'timestamp': int(parse_timestamp(timestamp.strip()).timestamp()),
|
||||
'timestamp': parse_timestamp(timestamp.strip()),
|
||||
'blocks': [parse_entry_module(b) if b.startswith('@') else b for b in split_into_blocks(content)],
|
||||
}
|
||||
|
||||
def generate_entry(entry):
|
||||
def format_block(curr, prev, before_prev):
|
||||
def format_text(text):
|
||||
def generate_entry(entry: AnyDict) -> str:
|
||||
def format_block(curr: Any, prev: Any, before_prev: Any):
|
||||
def format_text(text: str) -> str:
|
||||
if all(c == '\n' for c in curr):
|
||||
return text
|
||||
|
||||
@@ -649,7 +664,7 @@ def generate_entry(entry):
|
||||
DUMMY_POST = '@post 2020-02-02 02:02:02 '
|
||||
|
||||
is_first = not prev
|
||||
is_post = before_prev and all(c == '\n' for c in before_prev) and isinstance(prev, dict) and prev['type'] == 'post'
|
||||
is_post: bool = (before_prev and all(c == '\n' for c in before_prev) and isinstance(prev, dict) and prev['type'] == 'post')
|
||||
|
||||
if is_first:
|
||||
text = DUMMY_TS + text
|
||||
@@ -691,14 +706,7 @@ def generate_entry(entry):
|
||||
|
||||
return result
|
||||
|
||||
def create_day(journal, date):
|
||||
return {
|
||||
'title': date,
|
||||
'header': create_header(journal, date),
|
||||
'entries': []
|
||||
}
|
||||
|
||||
def parse_day(text):
|
||||
def parse_day(text: str) -> AnyDict:
|
||||
# discard read-only QS section
|
||||
text = text[text.find('#'):]
|
||||
|
||||
@@ -708,24 +716,25 @@ def parse_day(text):
|
||||
entries = list(zip(tmp[::2], tmp[1::2]))
|
||||
|
||||
title, *header = header.split('\n', maxsplit=1)
|
||||
title = title.removeprefix('# ')
|
||||
header = header[0] if len(header) else ''
|
||||
|
||||
return {
|
||||
'title': title.removeprefix('# '),
|
||||
'title': title,
|
||||
'header': parse_header(header),
|
||||
'entries': [parse_entry(e) for e in entries],
|
||||
'entries': [parse_entry(timestamp, content) for timestamp, content in entries],
|
||||
}
|
||||
|
||||
def generate_day(page):
|
||||
def generate_day(day: AnyDict) -> str:
|
||||
result = ''
|
||||
|
||||
result += generate_stats(page)
|
||||
result += generate_stats(day)
|
||||
|
||||
result += f'\n\n# {page["title"]}'
|
||||
result += f'\n\n# {day["title"]}'
|
||||
|
||||
result += generate_header(page['header'])
|
||||
result += generate_header(day['header'])
|
||||
|
||||
for entry in page['entries']:
|
||||
for entry in day['entries']:
|
||||
result += generate_entry(entry)
|
||||
|
||||
result += '\n'
|
||||
@@ -734,34 +743,38 @@ def generate_day(page):
|
||||
|
||||
### COMMAND UTILS
|
||||
|
||||
def import_journal(fpath):
|
||||
result = { 'days': {}, 'files': {} }
|
||||
def load_journal() -> Journal:
|
||||
return json.loads(JOURNAL_PATH.read_text())
|
||||
|
||||
for name in list(sorted(fpath.glob('*.md'))):
|
||||
day = parse_day(name.read_text())
|
||||
result['days'][name.stem] = day
|
||||
def save_journal(journal: Journal) -> None:
|
||||
JOURNAL_PATH.write_text(json.dumps(journal))
|
||||
|
||||
for fname in ['habits', 'godword', 'tasks', 'foods']:
|
||||
result['files'][fname] = (fpath / fname).read_text()
|
||||
def import_journal(fpath: Path) -> Journal:
|
||||
return {
|
||||
'days': {
|
||||
fname.stem: parse_day(fname.read_text())
|
||||
for fname in list(sorted(fpath.glob('*.md')))
|
||||
},
|
||||
'files': {
|
||||
fname: (fpath / fname).read_text()
|
||||
for fname in ['habits', 'godword', 'tasks', 'foods']
|
||||
}
|
||||
}
|
||||
|
||||
return result
|
||||
|
||||
def export_journal(journal, fpath):
|
||||
def export_journal(journal: Journal, fpath: Path) -> None:
|
||||
for day in journal['days'].values():
|
||||
(fpath / (day['title'] + '.md')).write_text(generate_day(day))
|
||||
|
||||
for fname, content in journal['files'].items():
|
||||
(fpath / fname).write_text(content)
|
||||
|
||||
def backup_journal():
|
||||
def backup_journal(journal: Journal) -> Path:
|
||||
print('Creating backup...')
|
||||
|
||||
tmpdir = mkdtemp()
|
||||
tmpdir = Path(mkdtemp())
|
||||
|
||||
journal = load_journal()
|
||||
|
||||
export_journal(journal, Path(tmpdir))
|
||||
copyfile(str(JOURNAL_PATH), tmpdir + '/journal.json')
|
||||
export_journal(journal, tmpdir)
|
||||
copyfile(str(JOURNAL_PATH), str(tmpdir / 'journal.json'))
|
||||
|
||||
files = Path(tmpdir).glob('*')
|
||||
|
||||
@@ -780,14 +793,11 @@ def backup_journal():
|
||||
print('Found script, running...')
|
||||
run(['bash', '-c', script.replace('%ZIPFILE%', f"'{str(zipfile_path.absolute())}'")])
|
||||
|
||||
if prompt('Ran script, delete archive?'):
|
||||
zipfile_path.unlink()
|
||||
|
||||
def open_journal(date):
|
||||
journal = load_journal()
|
||||
return zipfile_path
|
||||
|
||||
def open_journal(journal: Journal, date: str) -> Journal:
|
||||
if not journal['days'].get(date):
|
||||
backup_journal()
|
||||
backup_journal(journal)
|
||||
journal['days'][date] = create_day(journal, date)
|
||||
|
||||
tmpdir = Path(mkdtemp())
|
||||
@@ -795,47 +805,38 @@ def open_journal(date):
|
||||
|
||||
while True:
|
||||
try:
|
||||
editor(tmpdir / f'{date}.md')
|
||||
open_editor(tmpdir / f'{date}.md')
|
||||
new_journal = import_journal(tmpdir)
|
||||
break
|
||||
except Exception as e:
|
||||
except Exception as _:
|
||||
traceback.print_exc()
|
||||
input('Press enter to try again...')
|
||||
|
||||
save_journal(new_journal)
|
||||
|
||||
def load_journal():
|
||||
if JOURNAL_PATH.exists():
|
||||
return json.loads(JOURNAL_PATH.read_text())
|
||||
else:
|
||||
return import_journal(Path.home() / 'workspace' / 'journal')
|
||||
|
||||
def save_journal(journal):
|
||||
JOURNAL_PATH.write_text(json.dumps(journal))
|
||||
return new_journal
|
||||
|
||||
### COMMAND HANDLERS
|
||||
|
||||
def handle_open(args):
|
||||
def handle_open(args: list[str]) -> None:
|
||||
subcommand = nth_or_default(0, args, 'today')
|
||||
|
||||
if date := evaluate_time_expression(subcommand):
|
||||
open_journal(format_date(date))
|
||||
save_journal(open_journal(load_journal(), format_date(date)))
|
||||
else:
|
||||
print(f'Invalid subcommand: {subcommand}')
|
||||
|
||||
def handle_edit(args):
|
||||
def handle_edit(args: list[str]) -> None:
|
||||
subcommand = nth_or_default(0, args, 'foods')
|
||||
|
||||
journal = load_journal()
|
||||
|
||||
if subcommand in journal['files']:
|
||||
journal['files'][subcommand] = edit(journal['files'][subcommand])
|
||||
journal['files'][subcommand] = edit_text(journal['files'][subcommand])
|
||||
elif prompt(f'Unknown file: {subcommand}, create new?'):
|
||||
journal['files'][subcommand] = edit('')
|
||||
journal['files'][subcommand] = edit_text('')
|
||||
|
||||
save_journal(journal)
|
||||
|
||||
def handle_import(args):
|
||||
def handle_import(args: list[str]) -> None:
|
||||
if len(args) < 1:
|
||||
print('Missing directory.')
|
||||
return
|
||||
@@ -848,7 +849,7 @@ def handle_import(args):
|
||||
|
||||
save_journal(import_journal(path))
|
||||
|
||||
def handle_export(args):
|
||||
def handle_export(args: list[str]) -> None:
|
||||
if len(args) < 1:
|
||||
print('Missing directory.')
|
||||
return
|
||||
@@ -861,7 +862,7 @@ def handle_export(args):
|
||||
|
||||
export_journal(load_journal(), path)
|
||||
|
||||
def handle_test(args):
|
||||
def handle_test(args: list[str]) -> None:
|
||||
journal = load_journal()
|
||||
|
||||
journal_orig = deepcopy(journal)
|
||||
@@ -878,11 +879,11 @@ def handle_test(args):
|
||||
else:
|
||||
print('Test passed!')
|
||||
|
||||
def handle_summary(args):
|
||||
def generate_food_summary(page):
|
||||
def handle_summary(args: list[str]) -> None:
|
||||
def generate_food_summary(day: AnyDict) -> str:
|
||||
result = ''
|
||||
|
||||
def print(str=''):
|
||||
def print(str:str=''):
|
||||
nonlocal result
|
||||
result += '\n' + str
|
||||
|
||||
@@ -891,7 +892,7 @@ def handle_summary(args):
|
||||
daily_calories = 0.0
|
||||
daily_protein = 0.0
|
||||
|
||||
for entry in page['entries']:
|
||||
for entry in day['entries']:
|
||||
has_printed = False
|
||||
entry_calories = 0.0
|
||||
entry_protein = 0.0
|
||||
@@ -953,23 +954,34 @@ def handle_summary(args):
|
||||
|
||||
print(generate_food_summary(journal['days'][date]))
|
||||
|
||||
def handle_backup(args: list[str]) -> None:
|
||||
archive_path = backup_journal(load_journal())
|
||||
if prompt('Delete backup archive?'):
|
||||
archive_path.unlink()
|
||||
|
||||
|
||||
### MAIN
|
||||
|
||||
def main():
|
||||
def main() -> None:
|
||||
init_hacky_hackery(load_journal())
|
||||
|
||||
command = nth_or_default(1, sys.argv, 'open')
|
||||
args = sys.argv[2:]
|
||||
|
||||
handler = {
|
||||
def handle_invalid(args: list[str]) -> None:
|
||||
print(f'Invalid command: {command}')
|
||||
|
||||
command_handlers: dict[str, Callable[[list[str]], None]] = {
|
||||
'open': handle_open,
|
||||
'edit': handle_edit,
|
||||
'import': handle_import,
|
||||
'export': handle_export,
|
||||
'test': handle_test,
|
||||
'summary': handle_summary,
|
||||
'backup': lambda _: backup_journal(),
|
||||
}.get(command, lambda _: print(f'Invalid command: {command}'))
|
||||
'backup': handle_backup,
|
||||
}
|
||||
|
||||
handler = command_handlers.get(command, handle_invalid)
|
||||
|
||||
handler(args)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user