Compare commits

...

5 Commits

Author SHA1 Message Date
olari
dcafd88b86 :*) 2021-07-12 14:03:45 +03:00
olari
3ef24d2bfe profiler 2021-07-05 22:09:53 +03:00
olari
468701b220 task interface 2021-07-05 15:21:10 +03:00
olari
22d1150801 search funciton 2021-07-05 14:48:00 +03:00
olari
1e352c7b06 update 2021-07-05 14:20:55 +03:00
3 changed files with 238 additions and 112 deletions

View File

@@ -1,11 +1,10 @@
from copy import deepcopy
from datetime import datetime, timedelta
from functools import reduce, partial
from functools import cache, reduce, partial
from pathlib import Path
from shutil import copyfile, rmtree
from subprocess import run
from tempfile import mktemp, mkdtemp
from typing import Any, Callable, Optional, Tuple, TypeVar, Union
from zipfile import ZipFile
import json
import random
@@ -17,37 +16,36 @@ import traceback
### GLOBALS
JOURNAL_PATH = Path.home() / '.journal.json'
T = TypeVar('T')
AnyCallable = Callable[..., Any]
AnyDict = dict[str, Any]
Journal = AnyDict
### UTILS
def remove_chars(text, chars):
return ''.join([c for c in text if c not in chars])
def nth_or_default(n: int, l: list[T], default: T):
def get_words(text):
return remove_chars(text, '.,-:;/').lower().split()
def nth_or_default(n, l, default):
return l[n] if n < len(l) else default
def apply(f: AnyCallable, x: Any) -> Any:
def apply(f, x):
return f(x)
def flip(f: AnyCallable) -> AnyCallable:
def flip(f):
return lambda a1, a2: f(a2, a1)
def compose(*fns: AnyCallable) -> Any:
def compose(*fns):
return partial(reduce, flip(apply), fns)
def wrap_text(text: str, columns: int = 80) -> str:
def wrap_text(text, columns=80):
return textwrap.fill(text, columns,
replace_whitespace=False,
break_on_hyphens=False,
break_long_words=False)
def split_keep(delims: list[str], string: str) -> list[str]:
res: list[str] = []
buf: list[str] = []
def split_keep(delims, string):
res = []
buf = []
def flush_buf():
nonlocal res, buf
@@ -66,8 +64,8 @@ def split_keep(delims: list[str], string: str) -> list[str]:
return res
def merge_if(pred: Callable[[str, str], bool], l: list[str]) -> list[str]:
res: list[str] = []
def merge_if(pred, l):
res = []
for i, curr in enumerate(l):
prev = l[i-1] if i-1 >= 0 else None
@@ -78,10 +76,10 @@ def merge_if(pred: Callable[[str, str], bool], l: list[str]) -> list[str]:
return res
def open_editor(fpath: Path) -> None:
def open_editor(fpath):
run(['nvim', '+', str(fpath)])
def edit_text(text: str, suffix: str = '') -> str:
def edit_text(text, suffix=''):
fpath = Path(mktemp(suffix=suffix))
fpath.write_text(text)
@@ -94,56 +92,65 @@ def edit_text(text: str, suffix: str = '') -> str:
return text
def prompt(text: str) -> bool:
def prompt(text):
return input(text + ' [y/n] ') == 'y'
def find_entries(journal, pred):
matches = []
for day in journal['days']:
for idx, entry in enumerate(journal['days'][day]['entries']):
for block in entry['blocks']:
if pred(day, entry, block):
matches.append((day, idx, entry['timestamp']))
return matches
### DATE UTILS
def parse_date(date: str) -> datetime:
def parse_date(date):
return datetime.strptime(date, '%Y-%m-%d')
def format_date(date: datetime) -> str:
def format_date(date):
return date.strftime('%Y-%m-%d')
def today() -> datetime:
def today():
return datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
def evaluate_time_expression(expression: str) -> Optional[datetime]:
def evaluate_time_expression(expression):
if expression == 'today':
return datetime.now()
elif expression == 'yesterday':
return datetime.now() - timedelta(days=1)
def get_abbr_for_weekday(date: str) -> str:
def get_abbr_for_weekday(date):
return {
0: 'mo', 1: 'tu', 2: 'we', 3: 'th',
4: 'fr', 5: 'sa', 6: 'su',
}[parse_date(date).weekday()]
def parse_timestamp(timestamp: str) -> int:
def parse_timestamp(timestamp):
return int(datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S').timestamp())
def format_timestamp(timestamp: int) -> str:
def format_timestamp(timestamp):
return datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
### FILE PARSERS
def parse_foods_file(text: str) -> tuple[AnyDict, AnyDict]:
def parse_foods_file(text):
foods_str, recipes_str = text.split('---')
def parse_macro(macro: str) -> tuple[str, float]:
def parse_macro(macro):
name, value = macro.split()
return (name, float(value.removesuffix('g').removesuffix('kcal')))
foods: AnyDict = {
foods = {
macros[0]: dict(parse_macro(macro) for macro in macros[1:])
for macros in [food.split('\n') for food in foods_str.strip().split('\n\n')]
}
recipes: AnyDict = {}
recipes = {}
def evaluate_ingredients(ingredients: list[str]) -> AnyDict:
result: dict[str, float] = {}
def evaluate_ingredients(ingredients):
result = {}
total_weight = 0.0
for ingredient in ingredients:
@@ -177,7 +184,7 @@ def parse_foods_file(text: str) -> tuple[AnyDict, AnyDict]:
return foods, recipes
def evaluate_food_entry(foods: AnyDict, recipes: AnyDict, value: float, name: str) -> AnyDict:
def evaluate_food_entry(foods, recipes, value, name):
if name in recipes:
food = recipes[name]
@@ -197,8 +204,8 @@ def evaluate_food_entry(foods: AnyDict, recipes: AnyDict, value: float, name: st
return food
def parse_tasks_file(text: str) -> list[tuple[list[str], str]]:
result: list[tuple[list[str], str]] = []
def parse_tasks_file(text):
result = []
for task in text.splitlines():
days, name = task.split(':')
@@ -207,32 +214,31 @@ def parse_tasks_file(text: str) -> list[tuple[list[str], str]]:
return result
def get_godword(journal: Journal) -> list[str]:
def get_godword(journal):
return journal['files']['godword'].strip().split('\n')
def get_habits(journal: Journal) -> list[str]:
def get_habits(journal):
return journal['files']['habits'].strip().split('\n')
def get_tasks(journal: Journal) -> list[tuple[list[str], str]]:
def get_tasks(journal):
return parse_tasks_file(journal['files']['tasks'])
### HACKY HACKERY
_global_do_not_use: Journal = {}
_global_do_not_use = {}
def init_hacky_hackery(journal: Journal) -> None:
def init_hacky_hackery(journal):
global _global_do_not_use
_global_do_not_use = journal
@cache
def get_foods_file():
return parse_foods_file(_global_do_not_use['files']['foods'])
### HEADER MODULES
Notification = dict[str, Union[int, str]]
def get_notifications_for_date(journal: Journal, date: str) -> list[Notification]:
notifications: list[Notification] = []
def get_notifications_for_date(journal, date):
notifications = []
for day in journal['days'].values():
for entry in day.get('entries'):
@@ -246,20 +252,14 @@ def get_notifications_for_date(journal: Journal, date: str) -> list[Notification
return notifications
def get_yesterdays_sticky(journal: Journal, date: str) -> Optional[str]:
def get_yesterdays_sticky(journal, date):
yesterday = format_date(parse_date(date) - timedelta(days=1))
if day := journal['days'].get(yesterday):
if sticky := day['header'].get('sticky'):
return sticky
HeaderModule = tuple[
Callable[[Journal, str], Any],
Callable[[str], Any],
Callable[[Any], str],
]
header_modules: dict[str, HeaderModule] = {
header_modules = {
'godword': (
lambda j, d: [random.choice(get_godword(j)) for _ in range(20)],
lambda b: b.split(),
@@ -306,18 +306,18 @@ header_modules: dict[str, HeaderModule] = {
)
}
def create_header_module(name: str, journal: Journal, date: str) -> AnyDict:
def create_header_module(name, journal, date):
return header_modules[name][0](journal, date)
def parse_header_module(name: str, block: str) -> AnyDict:
def parse_header_module(name, block):
return header_modules[name][1](block)
def generate_header_module(name: str, value: AnyDict) -> str:
def generate_header_module(name, value):
return header_modules[name][2](value)
### ENTRY MODULES
def parse_timer(block: str) -> AnyDict:
def parse_timer(block):
rest = block.split()
name = None
@@ -327,7 +327,7 @@ def parse_timer(block: str) -> AnyDict:
if len(rest) > 1:
timestamp = parse_timestamp(' '.join(rest))
result: AnyDict = {}
result = {}
if name:
result['name'] = name
@@ -336,8 +336,8 @@ def parse_timer(block: str) -> AnyDict:
return result
def generate_timer(value: AnyDict) -> str:
parts: list[str] = []
def generate_timer(value):
parts = []
if name := value.get('name'):
parts.append(name)
@@ -347,7 +347,7 @@ def generate_timer(value: AnyDict) -> str:
return ' '.join(parts)
def parse_exercise(block: str) -> AnyDict:
def parse_exercise(block):
parts = block.split()
if parts[0] == 'walk':
@@ -371,7 +371,7 @@ def parse_exercise(block: str) -> AnyDict:
assert False
def generate_exercise(value: AnyDict) -> str:
def generate_exercise(value):
if value['kind'] == 'walk':
return f'walk {value["minutes"]}min {value["distance"]}km {value["steps"]}steps'
elif value['kind'] == 'calisthenics':
@@ -379,10 +379,10 @@ def generate_exercise(value: AnyDict) -> str:
assert False
DEFAULT_PARSER: AnyCallable = lambda b: {'value': b}
DEFAULT_GENERATOR: AnyCallable = lambda b: b['value']
DEFAULT_PARSER = lambda b: {'value': b}
DEFAULT_GENERATOR = lambda b: b['value']
entry_modules: dict[str, tuple[Callable[[str], AnyDict], Callable[[AnyDict], str]]] = {
entry_modules = {
'diet': (
lambda b: {'amount': int(b.split()[0].removesuffix('g')), 'food': b.split()[1].strip()},
lambda v: f'{v["amount"]}g {v["food"]}'),
@@ -416,15 +416,20 @@ entry_modules: dict[str, tuple[Callable[[str], AnyDict], Callable[[AnyDict], str
),
lambda v: f'{v["day"]} {v["message"]}'
),
'tag': (
lambda b: {'value': b.split(',')},
lambda v: ','.join(v['value'])
)
}
def parse_entry_module(block: str) -> AnyDict:
def parse_entry_module(block):
tag = block.split()[0].removeprefix('@')
block = block.removeprefix(f'@{tag}').strip()
return {'type': tag} | entry_modules[tag][0](block)
def generate_entry_module(block: AnyDict) -> str:
def generate_entry_module(block):
if block['type'] == 'notes':
return f'@notes\n{entry_modules[block["type"]][1](block)}'
@@ -432,7 +437,7 @@ def generate_entry_module(block: AnyDict) -> str:
### READ-ONLY STATS SECTION FUNCTIONS
def generate_stats(page: AnyDict) -> str:
def generate_stats(page):
if not page['entries']:
return ''
@@ -493,32 +498,32 @@ def generate_stats(page: AnyDict) -> str:
### PAGE FUNCTIONS
def create_header(journal: Journal, date: str) -> AnyDict:
def create_header(journal, date):
return {
module: create_header_module(module, journal, date)
for module in header_modules
}
def create_entry(journal: Journal, date: str) -> AnyDict:
def create_entry(journal, date):
return {
'timestamp': int(today().timestamp()),
'blocks': []
}
def create_day(journal: Journal, date: str) -> AnyDict:
def create_day(journal, date):
return {
'title': date,
'header': create_header(journal, date),
'entries': []
}
def parse_header(text: str) -> AnyDict:
def split_into_blocks(text: str) -> list[str]:
def parse_header(text):
def split_into_blocks(text):
return [b.strip() for b in re.split(r'\n{2,}', text) if b.strip() != '']
modules = split_into_blocks(text)
result: AnyDict = {}
result = {}
for module in modules:
name, block = module.split('\n', maxsplit=1)
@@ -527,7 +532,7 @@ def parse_header(text: str) -> AnyDict:
return result
def generate_header(header: AnyDict) -> str:
def generate_header(header):
result = ''
for name, header in header.items():
@@ -539,9 +544,9 @@ def generate_header(header: AnyDict) -> str:
return result
def parse_entry(timestamp: str, content: str) -> AnyDict:
def merge_notes_block(l: list[str]) -> list[str]:
res: list[str] = []
def parse_entry(timestamp, content):
def merge_notes_block(l):
res = []
i = 0
while i < len(l):
@@ -555,12 +560,12 @@ def parse_entry(timestamp: str, content: str) -> AnyDict:
return res
def merge_wrapped_lines(l: list[str]) -> list[str]:
def merge_wrapped_lines(l):
TIMESTAMP_LENGTH = len('2020-02-02 02:02:02 ')
POST_BLOCK_LENGTH = len('@post 2020-02-02 02:02:02 ')
COLUMN_LIMIT = 80
res: list[str] = []
res = []
i = 0
while i < len(l):
@@ -597,8 +602,8 @@ def parse_entry(timestamp: str, content: str) -> AnyDict:
return res
def split_post_block(l: list[str]) -> list[str]:
res: list[str] = []
def split_post_block(l):
res = []
POST_BLOCK_LENGTH = len('@post 2020-02-02 02:02:02')
@@ -651,12 +656,12 @@ def parse_entry(timestamp: str, content: str) -> AnyDict:
return {
'timestamp': parse_timestamp(timestamp.strip()),
'blocks': [parse_entry_module(b) if b.startswith('@') else b for b in split_into_blocks(content)],
'blocks': [(parse_entry_module(b) if b.startswith('@') else b) for b in split_into_blocks(content)],
}
def generate_entry(entry: AnyDict) -> str:
def format_block(curr: Any, prev: Any, before_prev: Any):
def format_text(text: str) -> str:
def generate_entry(entry):
def format_block(curr, prev, before_prev):
def format_text(text):
if all(c == '\n' for c in curr):
return text
@@ -664,7 +669,7 @@ def generate_entry(entry: AnyDict) -> str:
DUMMY_POST = '@post 2020-02-02 02:02:02 '
is_first = not prev
is_post: bool = (before_prev and all(c == '\n' for c in before_prev) and isinstance(prev, dict) and prev['type'] == 'post')
is_post = (before_prev and all(c == '\n' for c in before_prev) and isinstance(prev, dict) and prev['type'] == 'post')
if is_first:
text = DUMMY_TS + text
@@ -706,12 +711,12 @@ def generate_entry(entry: AnyDict) -> str:
return result
def parse_day(text: str) -> AnyDict:
ENTRY_RE = re.compile(r'^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) ?', re.MULTILINE)
def parse_day(text):
# discard read-only QS section
text = text[text.find('#'):]
ENTRY_RE = re.compile(r'^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) ?', re.MULTILINE)
header, *tmp = ENTRY_RE.split(text)
entries = list(zip(tmp[::2], tmp[1::2]))
@@ -722,10 +727,10 @@ def parse_day(text: str) -> AnyDict:
return {
'title': title,
'header': parse_header(header),
'entries': [parse_entry(timestamp, content) for timestamp, content in entries],
'entries': [parse_entry(timestamp, content) for timestamp, content in entries]
}
def generate_day(day: AnyDict) -> str:
def generate_day(day):
result = ''
result += generate_stats(day)
@@ -743,13 +748,13 @@ def generate_day(day: AnyDict) -> str:
### COMMAND UTILS
def load_journal() -> Journal:
def load_journal():
return json.loads(JOURNAL_PATH.read_text())
def save_journal(journal: Journal) -> None:
def save_journal(journal):
JOURNAL_PATH.write_text(json.dumps(journal))
def import_journal(fpath: Path) -> Journal:
def import_journal(fpath):
return {
'days': {
fname.stem: parse_day(fname.read_text())
@@ -757,18 +762,18 @@ def import_journal(fpath: Path) -> Journal:
},
'files': {
fname: (fpath / fname).read_text()
for fname in ['habits', 'godword', 'tasks', 'foods']
for fname in ['habits', 'godword', 'tasks', 'foods', 'backup']
}
}
def export_journal(journal: Journal, fpath: Path) -> None:
def export_journal(journal, fpath):
for day in journal['days'].values():
(fpath / (day['title'] + '.md')).write_text(generate_day(day))
for fname, content in journal['files'].items():
(fpath / fname).write_text(content)
def backup_journal(journal: Journal) -> Path:
def backup_journal(journal):
print('Creating backup...')
tmpdir = Path(mkdtemp())
@@ -795,7 +800,7 @@ def backup_journal(journal: Journal) -> Path:
return zipfile_path
def open_journal(journal: Journal, date: str) -> Journal:
def open_journal(journal, date):
if not journal['days'].get(date):
backup_journal(journal)
journal['days'][date] = create_day(journal, date)
@@ -816,7 +821,7 @@ def open_journal(journal: Journal, date: str) -> Journal:
### COMMAND HANDLERS
def handle_open(args: list[str]) -> None:
def handle_open(args):
subcommand = nth_or_default(0, args, 'today')
if date := evaluate_time_expression(subcommand):
@@ -824,7 +829,7 @@ def handle_open(args: list[str]) -> None:
else:
print(f'Invalid subcommand: {subcommand}')
def handle_edit(args: list[str]) -> None:
def handle_edit(args):
subcommand = nth_or_default(0, args, 'foods')
journal = load_journal()
@@ -836,7 +841,7 @@ def handle_edit(args: list[str]) -> None:
save_journal(journal)
def handle_import(args: list[str]) -> None:
def handle_import(args):
if len(args) < 1:
print('Missing directory.')
return
@@ -849,7 +854,7 @@ def handle_import(args: list[str]) -> None:
save_journal(import_journal(path))
def handle_export(args: list[str]) -> None:
def handle_export(args):
if len(args) < 1:
print('Missing directory.')
return
@@ -862,7 +867,7 @@ def handle_export(args: list[str]) -> None:
export_journal(load_journal(), path)
def handle_test(args: list[str]) -> None:
def handle_test(args):
journal = load_journal()
journal_orig = deepcopy(journal)
@@ -879,11 +884,11 @@ def handle_test(args: list[str]) -> None:
else:
print('Test passed!')
def handle_summary(args: list[str]) -> None:
def generate_food_summary(day: AnyDict) -> str:
def handle_summary(args):
def generate_food_summary(day):
result = ''
def print(str:str=''):
def print(str=''):
nonlocal result
result += '\n' + str
@@ -954,24 +959,92 @@ def handle_summary(args: list[str]) -> None:
print(generate_food_summary(journal['days'][date]))
def handle_backup(args: list[str]) -> None:
def handle_backup(args):
archive_path = backup_journal(load_journal())
if prompt('Delete backup archive?'):
archive_path.unlink()
def edit_entries(entries):
pass
def parse_search_query(query):
parts = query.split(',')
strings = []
tags = []
for part in parts:
if part.startswith('#'):
tags.append(part.removeprefix('#'))
else:
strings.append(part)
return strings, tags
def edit_entries_by_predicate(journal, predicate, reversed=False):
matches = find_entries(journal, predicate)
header = f'Number of matches: {len(matches)}'
text = header
for day, idx, ts in matches:
entry = journal['days'][day]['entries'][idx]
text += generate_entry(entry)
text = edit_text(text)
_, *tmp = ENTRY_RE.split(text)
entries = [parse_entry(ts, c) for ts, c in list(zip(tmp[::2], tmp[1::2]))]
matches_map = {ts: (day, idx) for day, idx, ts in matches}
for entry in entries:
day, idx = matches_map[entry['timestamp']]
journal['days'][day]['entries'][idx] = entry
return journal
def handle_search(args):
strings, tags = parse_search_query(args[0])
def predicate(day, entry, block):
if isinstance(block, str):
words = get_words(block)
if any(s in words for s in strings):
return True
elif block['type'] == 'tag':
if any(t in block['value'] for t in tags):
return True
save_journal(edit_entries_by_predicate(load_journal(), predicate))
def handle_tasks(args):
def predicate(day, entry, block):
if not isinstance(block, str) and block['type'] == 'task':
is_done = any(b['type'] == 'done' for b in entry['blocks'] if not isinstance(b, str))
return not is_done
save_journal(edit_entries_by_predicate(load_journal(), predicate))
def handle_profile(args):
import cProfile
cProfile.run("export_journal(load_journal(), Path(mkdtemp()))", sort='cumtime')
### MAIN
def main() -> None:
def main():
init_hacky_hackery(load_journal())
command = nth_or_default(1, sys.argv, 'open')
args = sys.argv[2:]
def handle_invalid(args: list[str]) -> None:
def handle_invalid(args):
print(f'Invalid command: {command}')
command_handlers: dict[str, Callable[[list[str]], None]] = {
command_handlers = {
'open': handle_open,
'edit': handle_edit,
'import': handle_import,
@@ -979,6 +1052,9 @@ def main() -> None:
'test': handle_test,
'summary': handle_summary,
'backup': handle_backup,
'search': handle_search,
'tasks': handle_tasks,
'profile': handle_profile,
}
handler = command_handlers.get(command, handle_invalid)

View File

@@ -0,0 +1,34 @@
from copy import deepcopy
from pathlib import Path
from shutil import copy
import json
journal_path = Path.home() / '.journal.json'
copy(str(journal_path), str(journal_path.with_suffix('.bkp')))
journal = json.loads(journal_path.read_text())
new_journal = deepcopy(journal)
for day in journal['days']:
new_entries = []
for entry in journal['days'][day]['entries']:
new_blocks = []
for block in entry['blocks']:
if not isinstance(block, str) and block['type'] == 'hide':
if len(new_blocks) and not isinstance(new_blocks[-1], str) and \
new_blocks[-1]['type'] != 'tag':
new_blocks.append({'type': 'tag', 'value': ['hide']})
elif not isinstance(block, str) and block['type'] == 'info':
new_blocks.append({'type': 'tag', 'value': ['info']})
new_blocks.append('\n')
new_blocks.append(block['value'])
else:
new_blocks.append(block)
entry['blocks'] = new_blocks
new_entries.append(entry)
new_journal['days'][day]['entries'] = new_entries
journal_path.write_text(json.dumps(new_journal))

View File

@@ -0,0 +1,16 @@
from copy import deepcopy
from pathlib import Path
from shutil import copy
import json
journal_path = Path.home() / '.journal.json'
copy(str(journal_path), str(journal_path.with_suffix('.bkp')))
journal = json.loads(journal_path.read_text())
new_journal = deepcopy(journal)
for day in journal['days']:
new_journal['days'][day]['entries'] = journal['days'][day]['entries'][0]
journal_path.write_text(json.dumps(new_journal))