Files
journal/parse.py
2021-06-21 12:58:34 +03:00

313 lines
8.3 KiB
Python

import enum
from pathlib import Path
from datetime import datetime
import re
import json
entry_re = re.compile(r'^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) ?', re.MULTILINE)
curr_day = ''
def parse_godword(godword):
return godword.split()
def parse_habits(habits):
result = {}
for habit in habits.splitlines():
value, name = habit.split(maxsplit=1)
name = name.strip()
result[name] = value[1] == 'x'
return result
def parse_notifications(notifications):
result = []
for notification in notifications.splitlines():
parts = notification.split()
result.append({
'source': ' '.join(parts[0:2]).strip('[]'),
'message': ' '.join(parts[2:]),
})
return result
def parse_tasks(tasks):
result = {}
for task in tasks.splitlines():
value, name = task.split(maxsplit=1)
name = name.strip()
result[name] = value[1] == 'x'
return result
header_modules = {
'godword': parse_godword,
'habits': parse_habits,
'notifications': parse_notifications,
'tasks': parse_tasks,
}
def parse_header(header):
result = {}
def split_into_blocks(text):
return [b.strip() for b in re.split(r'\n{2,}', text) if b.strip() != '']
title, *modules = split_into_blocks(header)
for module in modules:
name, value = module.split('\n', maxsplit=1)
name = name.lower().removesuffix(':')
result[name] = header_modules[name](value)
return result
def parse_timestamp(timestamp):
return datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S')
def parse_post(block):
block = block.removeprefix('@post ')
try:
timestamp = int(parse_timestamp(block[:19]).timestamp())
block = block[19:]
except:
timestamp = None
content = block.strip()
result = {}
if content:
result['content'] = content
if timestamp:
result['timestamp'] = timestamp
return result
def parse_notes(block):
tag, source, title = block.splitlines()
return {'source': source, 'title': title}
def parse_diet(block):
tag, amount, food = block.split()
amount = int(amount.removesuffix('g'))
return {'amount': amount, 'food': food}
def parse_timer(block):
tag, *rest = block.split()
name = None
timestamp = None
if len(rest) > 2:
name, *rest = rest
if len(rest) > 1:
timestamp = int(parse_timestamp(' '.join(rest)).timestamp())
result = {}
if name:
result['name'] = name
if timestamp:
result['timestamp'] = timestamp
return result
def parse_exercise(block):
tag, *parts = block.split()
if parts[0] == 'walk':
kind, minutes, distance, steps = parts
return {
'kind': kind,
'minutes': int(minutes.removesuffix('min')),
'distance': float(distance.removesuffix('km')),
'steps': int(steps.removesuffix('steps')),
}
return {'kind': 'INVALID'}
def parse_notify(block):
tag, day, *rest = block.split()
return {'day': day.strip(), 'message': ' '.join(rest)}
def create_entry_module_parser(name, handler=None):
handler = handler or (lambda b: {'value': b.removeprefix(f'@{name} ')})
return lambda b: {'type': name} | handler(b)
entry_modules = {
'hide': create_entry_module_parser('hide', lambda _: {}),
'post': create_entry_module_parser('post', parse_post),
'info': create_entry_module_parser('info'),
'notes': create_entry_module_parser('notes', parse_notes),
'behavior': create_entry_module_parser('behavior'),
'diet': create_entry_module_parser('diet', parse_diet),
'task': create_entry_module_parser('task'),
'start': create_entry_module_parser('start', parse_timer),
'stop': create_entry_module_parser('stop', parse_timer),
'done': create_entry_module_parser('done', parse_timer),
'exercise': create_entry_module_parser('exercise', parse_exercise),
'notify': create_entry_module_parser('notify', parse_notify),
}
from functools import reduce, partial
def split_keep(delims, string):
res = []
buf = []
for c in string:
if c in delims:
if buf:
res.append(''.join(buf))
res.append(c)
buf = []
else:
buf.append(c)
if buf:
res.append(''.join(buf))
return res
assert split_keep(['@', '\n'], 'hello @world\n\nabout') == ['hello ', '@', 'world', '\n', '\n', 'about']
def merge_chars(chars, l):
res = []
for i in l:
if i in chars and res and all(c == i for c in res[-1]):
res[-1] += i
else:
res.append(i)
return res
assert merge_chars('\n', ['\n', '\n', 'hello', 'world', '\n', '\n']) == ['\n\n', 'hello', 'world', '\n\n']
def attach_to_next(c, l):
l = l.copy()
try:
while True:
i = l.index(c)
l[i+1] = c + l[i+1]
l.pop(i)
except:
pass
return l
assert attach_to_next('@', ['aoeu', '@', 'oeu']) == ['aoeu', '@oeu']
def attach_to_prev_if(pred, l):
res = []
for i, curr in enumerate(l):
prev = l[i-1] if i-1 >= 0 else None
if prev and pred(prev, curr):
res[-1] += curr
else:
res.append(curr)
return res
assert attach_to_prev_if(lambda p, c: p[-1] != '\n' and c[0] == '@', ['aoeu', '@oeu']) == ['aoeu@oeu']
def merge_notes_block(l):
res = []
i = 0
while i < len(l):
if l[i] == '@notes':
# notes nl source nl title
res.append('\n'.join([l[i], l[i+2], l[i+4]]))
i += 5
else:
res.append(l[i])
i += 1
return res
def merge_wrapped_lines(l):
res = []
i = 0
while i < len(l):
curr = l[i]
prev = l[i-1] if i > 0 else None
next = l[i+1] if i+1 < len(l) else None
if prev and next and curr == '\n':
len_prev = len(prev)
if i == 1:
len_prev += len('2020-02-02 02:02:02 ')
if not next[0].isspace():
next_word = next.split()[0]
if len_prev + len(next_word) >= 80:
res[-1] += ' ' + next
i += 2
continue
res.append(curr)
i += 1
return res
def apply(f, x):
return f(x)
def flip(f):
return lambda a1, a2: f(a2, a1)
def parse_entry(entry):
result = {}
def split_into_blocks(text):
r = reduce(flip(apply), [
# split the text into sections by newline and tag symbol, keeping the separators
partial(split_keep, ('\n', '@')),
# merge sequential newlines together into a single whitespace block
partial(merge_chars, '\n'),
# attach escaped tag symbols
partial(attach_to_prev_if, lambda p, c: c == '@' and p[-1] == '\\'),
# attach tag symbols
partial(attach_to_next, '@'),
# ???
partial(attach_to_prev_if, lambda p, c: p[-1] != '\n' and not (p[0] == '@' and p[-1] == ' ') and c[0] == '@'),
# yes
merge_notes_block,
# strip all non-whitespace blocks
partial(map, lambda s: s if s.isspace() else s.rstrip()), list,
# yes
merge_wrapped_lines,
# remove trailing whitespace block
lambda b: b if b and not all(c == '\n' for c in b[-1]) else b[:-1],
], text)
return r
timestamp, content = entry
result['timestamp'] = int(parse_timestamp(timestamp.strip()).timestamp())
result['blocks'] = []
for b in split_into_blocks(content):
if b.startswith('@'):
tag = b.split()[0][1:]
result['blocks'].append(entry_modules[tag](b))
else:
result['blocks'].append(b)
return result
def parse_page(text):
header, *tmp = entry_re.split(text)
entries = list(zip(tmp[::2], tmp[1::2]))
return {
'header': parse_header(header),
'entries': [parse_entry(e) for e in entries],
}
if __name__ == '__main__':
result = {}
for fpath in list(sorted((Path.home() / 'workspace' / 'journal').glob('*.md'))):
day = parse_page(fpath.read_text())
result[fpath.stem] = day
script_path = Path(__file__).parent
with open(script_path / 'journal.json', 'w') as fp:
json.dump(result, fp, indent=4, ensure_ascii=False)