summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Luevano Alvarado <david@luevano.xyz>2023-08-28 17:22:22 -0600
committerDavid Luevano Alvarado <david@luevano.xyz>2023-08-28 17:22:22 -0600
commitacebd03111a44617a32878d7cb9cdc0eafb0ad41 (patch)
treefcb99b407a276a71bb39036230fa7e24d34ca609
parente1333971db2f2028a9c4c68e12e78a4af4fd2635 (diff)
feat: finalize sqlite integration
-rw-r--r--src/pyssg/builder.py51
-rw-r--r--src/pyssg/database.py132
-rw-r--r--src/pyssg/database_entry.py54
-rw-r--r--src/pyssg/db/database.py21
-rw-r--r--src/pyssg/md_parser.py58
-rw-r--r--src/pyssg/page.py2
-rw-r--r--src/pyssg/pyssg.py24
-rw-r--r--src/pyssg/utils.py11
8 files changed, 82 insertions, 271 deletions
diff --git a/src/pyssg/builder.py b/src/pyssg/builder.py
index eda8096..dc7bdda 100644
--- a/src/pyssg/builder.py
+++ b/src/pyssg/builder.py
@@ -6,10 +6,10 @@ from logging import Logger, getLogger
from jinja2 import Environment, Template, FileSystemLoader as FSLoader
-from .utils import get_file_list, get_dir_structure, create_dir, copy_file
-from .database import Database
-from .md_parser import MDParser
-from .page import Page
+from pyssg.utils import get_file_list, get_dir_structure, create_dir, copy_file
+from pyssg.db.database import Database
+from pyssg.md_parser import MDParser
+from pyssg.page import Page
log: Logger = getLogger(__name__)
@@ -18,32 +18,27 @@ log: Logger = getLogger(__name__)
class Builder:
def __init__(self, config: dict,
db: Database,
- dir_path: str) -> None:
+ dir_cfg: dict) -> None:
log.debug('initializing site builder')
self.config: dict = config
self.db: Database = db
- self.dir_path: str = dir_path
+ self.dir_cfg: dict = deepcopy(dir_cfg)
- if self.dir_path not in self.config['dirs']:
- log.error('couldn\'t find "dirs.%s" attribute in config file', self.dir_path)
- sys.exit(1)
- if os.path.isabs(self.dir_path) and self.dir_path.strip() != '/':
- log.error('dir path "%s" cannot be absolute, except for the special case "/"', self.dir_path)
+ if os.path.isabs(self.dir_cfg['dir']) and self.dir_cfg['dir'].strip() != '/':
+ log.error('dir path "%s" cannot be absolute', self.dir_cfg['dir'])
sys.exit(1)
- log.debug('building dir_cfg for "%s" dir_path', self.dir_path)
- self.dir_cfg: dict = deepcopy(self.config['dirs'][self.dir_path]['cfg'])
- if self.dir_path.strip() == '/':
- log.debug('dir_path is "/", copying src/dst directly')
+ if self.dir_cfg['dir'].strip() == '/':
+ log.debug('dir path is "/", copying src/dst directly')
self.dir_cfg['src'] = self.config['path']['src']
self.dir_cfg['dst'] = self.config['path']['dst']
- self.dir_cfg['url'] = self.config['url']['main']
+ self.dir_cfg['url'] = self.config['url']['base']
else:
- log.debug('dir_path is "%s", generating', self.dir_path)
- self.dir_cfg['src'] = os.path.join(self.config['path']['src'], self.dir_path)
- self.dir_cfg['dst'] = os.path.join(self.config['path']['dst'], self.dir_path)
- self.dir_cfg['url'] = f'{self.config["url"]["main"]}/{self.dir_path}'
+ log.debug('dir_path is "%s", generating', self.dir_cfg['dir'])
+ self.dir_cfg['src'] = os.path.join(self.config['path']['src'], self.dir_cfg['dir'])
+ self.dir_cfg['dst'] = os.path.join(self.config['path']['dst'], self.dir_cfg['dir'])
+ self.dir_cfg['url'] = f'{self.config["url"]["base"]}/{self.dir_cfg["dir"]}'
# the autoescape option could be a security risk if used in a dynamic
# website, as far as i can tell
@@ -63,12 +58,12 @@ class Builder:
self.common_vars: dict
def build(self) -> None:
- log.debug('building site for dir path "%s"', self.dir_path)
+ log.debug('building site for dir path "%s"', self.dir_cfg['dir'])
if 'exclude_dirs' not in self.dir_cfg:
- log.debug('"exclude_dirs" field not found in "dirs.%s.cfg"', self.dir_path)
+ log.debug('"exclude_dirs" field for dir "%s" not found', self.dir_cfg['dir'])
self.dir_cfg['exclude_dirs'] = []
if not isinstance(self.dir_cfg['exclude_dirs'], list):
- log.error('"exclude_dirs" field in "dirs.%s.cfg" isn\'t of type "list"', self.dir_path)
+ log.error('"exclude_dirs" field for dir "%s" isn\'t of type "list"', self.dir_cfg['dir'])
sys.exit(1)
self.dirs = get_dir_structure(self.dir_cfg['src'],
@@ -105,8 +100,8 @@ class Builder:
self.__render_pages(self.dir_cfg['plt'])
if self.dir_cfg['tags']:
- log.debug('rendering tags for dir_path "%s"', self.dir_path)
- create_dir(os.path.join(self.dir_cfg['dst'], 'tag'), True, True)
+ log.debug('rendering tags for dir "%s"', self.dir_cfg['dir'])
+ create_dir(os.path.join(self.dir_cfg['dst'], 'tag'), True)
if isinstance(self.dir_cfg['tags'], str):
self.__render_tags(self.dir_cfg['tags'])
else:
@@ -127,11 +122,11 @@ class Builder:
**self.common_vars)
def __create_dir_structure(self) -> None:
- log.debug('creating dir structure for dir_path "%s"', self.dir_path)
- create_dir(self.dir_cfg['dst'], True, True)
+ log.debug('creating dir structure for dir "%s"', self.dir_cfg['dir'])
+ create_dir(self.dir_cfg['dst'], True)
for d in self.dirs:
path: str = os.path.join(self.dir_cfg['dst'], d)
- create_dir(path, True, True)
+ create_dir(path, True)
def __copy_html_files(self) -> None:
if not len(self.html_files) > 0:
diff --git a/src/pyssg/database.py b/src/pyssg/database.py
deleted file mode 100644
index 38658eb..0000000
--- a/src/pyssg/database.py
+++ /dev/null
@@ -1,132 +0,0 @@
-import os
-import sys
-import csv
-from logging import Logger, getLogger
-
-from .utils import get_checksum
-from .database_entry import DatabaseEntry
-
-log: Logger = getLogger(__name__)
-
-
-# db class that works for both html and md files
-class Database:
- __COLUMN_NUM: int = 5
- __COLUMN_DELIMITER: str = '|'
-
- def __init__(self, db_path: str) -> None:
- log.debug('initializing the page db on path "%s"', db_path)
- self.db_path: str = db_path
- self.e: dict[str, DatabaseEntry] = dict()
-
- def update_tags(self, file_name: str,
- new_tags: set[str]) -> None:
- # technically, I should ensure this function can only run
- # if self.e is populated
- if file_name in self.e:
- log.debug('updating tags for entry "%s"', file_name)
- log.debug('entry "%s" old tags: %s',
- file_name, self.e[file_name].tags)
-
- self.e[file_name].update_tags(new_tags)
- log.debug('entry "%s" new tags: %s',
- file_name, self.e[file_name].tags)
- else:
- log.error('can\'t update tags for entry "%s",'
- ' as it is not present in db', file_name)
- sys.exit(1)
-
- def update(self, file_name: str,
- remove: str = '') -> None:
- log.debug('updating entry for file "%s"', file_name)
- f: str = file_name
- tags: set[str] = set()
- if remove != '':
- f = file_name.replace(remove, '')
- log.debug('removed "%s" from "%s": "%s"', remove, file_name, f)
-
- # get current time, needs actual file name
- time: float = os.stat(file_name).st_mtime
- log.debug('time for "%s": %s', file_name, time)
-
- # calculate current checksum, also needs actual file name
- cksm: str = get_checksum(file_name)
- log.debug('checksum for "%s": "%s"', file_name, cksm)
-
- # three cases, 1) entry didn't exist,
- # 2) entry has been mod and,
- # 3) entry hasn't been mod
- # 1)
- if f not in self.e:
- log.debug('entry "%s" didn\'t exist, adding with defaults', f)
- self.e[f] = DatabaseEntry((f, time, 0.0, cksm, tags))
- return
-
- # oe is old entity
- oe: DatabaseEntry = self.e[f]
- log.debug('entry "%s" old content: %s', f, oe)
-
- # 2)
- if cksm != oe.checksum:
- log.debug('entry "%s" has been modified, updating; '
- 'using old tags', f)
- self.e[f] = DatabaseEntry((f, oe.ctimestamp, time, cksm, oe.tags))
- log.debug('entry "%s" new content: %s', f, self.e[f])
- # 3)
- else:
- log.debug('entry "%s" hasn\'t been modified', f)
-
- def write(self) -> None:
- log.debug('writing db')
- with open(self.db_path, 'w') as file:
- csv_writer = csv.writer(file, delimiter=self.__COLUMN_DELIMITER)
- for _, v in self.e.items():
- log.debug('writing row: %s', v)
- csv_writer.writerow(v.get_raw_entry())
-
- def _db_path_exists(self) -> bool:
- log.debug('checking that "%s" exists or is a file', self.db_path)
- if not os.path.exists(self.db_path):
- log.warning('"%s" doesn\'t exist, will be'
- ' created once process finishes,'
- ' ignore if it\'s the first run', self.db_path)
- return False
- if not os.path.isfile(self.db_path):
- log.error('"%s" is not a file', self.db_path)
- sys.exit(1)
- return True
-
- def _get_raw_csv_rows(self) -> list[list[str]]:
- rows: list[list[str]]
- with open(self.db_path, 'r') as f:
- csv_reader = csv.reader(f, delimiter=self.__COLUMN_DELIMITER)
- rows = list(csv_reader)
- log.debug('db contains %d rows', len(rows))
- return rows
-
- # TODO: don't include files that are not in the db anymore
- def read(self) -> None:
- log.debug('reading db')
- if not self._db_path_exists():
- return
-
- rows: list[list[str]] = self._get_raw_csv_rows()
- # l=list of values in entry
- log.debug('parsing rows from db')
- for it, row in enumerate(rows):
- i: int = it + 1
- col_num: int = len(row)
- log.debug('row %d content: "%s"', i, row)
- if col_num != self.__COLUMN_NUM:
- log.critical('row %d doesn\'t contain %s columns, contains %d'
- ' columns: "%s"',
- i, self.__COLUMN_NUM, col_num, row)
- sys.exit(1)
- # actual value types
- r: tuple[str, float, float, str, str] = (str(row[0]),
- float(row[1]),
- float(row[2]),
- str(row[3]),
- str(row[4]))
- entry: DatabaseEntry = DatabaseEntry(r)
- self.e[entry.fname] = entry
diff --git a/src/pyssg/database_entry.py b/src/pyssg/database_entry.py
deleted file mode 100644
index 58e9884..0000000
--- a/src/pyssg/database_entry.py
+++ /dev/null
@@ -1,54 +0,0 @@
-import sys
-from logging import Logger, getLogger
-
-log: Logger = getLogger(__name__)
-
-
-class DatabaseEntry:
- # ignoring return type as it makes the line too long, unnecessary, too
- def __init__(self, entry: tuple[str, float, float, str, str | set[str]]):
- self.fname: str = str(entry[0])
- self.ctimestamp: float = float(entry[1])
- self.mtimestamp: float = float(entry[2])
- self.checksum: str = str(entry[3])
- self.tags: set[str] = set()
-
- if isinstance(entry[4], set):
- self.tags = entry[4]
- self.__remove_invalid()
- elif isinstance(entry[4], str):
- if entry[4] != '-':
- self.tags = set(e.strip() for e in str(entry[4]).split(','))
- self.__remove_invalid()
- # this should be unreachable as the type has to be str or set[str],
- # but I have just in case to evade bugs
- else:
- log.error('tags has to be either a set or string (comma separated)')
- sys.exit(1)
-
- log.debug('"%s" tags: %s', self.fname, self.tags)
-
- def __str__(self) -> str:
- _return_str: str = "['{}', {}, {}, '{}', {}]"\
- .format(self.fname,
- self.ctimestamp,
- self.mtimestamp,
- self.checksum,
- sorted(self.tags))
- return _return_str
-
- def __remove_invalid(self) -> None:
- if '-' in self.tags:
- self.tags.remove('-')
-
- # used for csv writing
- def get_raw_entry(self) -> list[str]:
- return [self.fname,
- str(self.ctimestamp),
- str(self.mtimestamp),
- self.checksum,
- ','.join(sorted(self.tags)) if self.tags else '-']
-
- def update_tags(self, new_tags: set[str]) -> None:
- self.tags = new_tags
- self.__remove_invalid()
diff --git a/src/pyssg/db/database.py b/src/pyssg/db/database.py
index 24b7e8b..7e53205 100644
--- a/src/pyssg/db/database.py
+++ b/src/pyssg/db/database.py
@@ -1,4 +1,3 @@
-import json
import sqlite3
from logging import Logger, getLogger
from sqlite3 import PARSE_DECLTYPES, Connection, Cursor
@@ -19,19 +18,16 @@ class Database:
# create statements are always commited
self.query(CREATE_FILES_TABLE)
-
# commits the transactions, closes connection and cursor
def write(self) -> None:
self.con.commit()
self.cur.close()
self.con.close()
-
def query(self, sql: str,
params: dict | Sequence = ()) -> list[Any]:
return self.cur.execute(sql, params).fetchall()
-
# commit query, doesn't wait until calling con.commit()
def cquery(self, sql: str,
params: dict | Sequence = ()) -> list[Any]:
@@ -40,39 +36,40 @@ class Database:
out = self.query(sql, params)
return out
-
def select(self, fname: str) -> tuple | None:
out: list[Any]
out = self.query(SELECT_FILE, (fname,))
+ log.debug("select %s", out)
return out[0] if out else None
-
def select_all(self) -> list[Any] | None:
out: list[Any] = self.query(SELECT_FILE_ALL)
+ log.debug("select_all %s", out)
return out if out else None
-
def insert(self, fname: str,
ctime: float,
checksum: str,
- tags: tuple | None = None) -> None:
+ tags: tuple | None = None) -> tuple:
params: tuple = (fname, ctime, checksum, tags)
out: tuple = self.query(INSERT_FILE, params)[0]
log.debug("insert %s", out)
-
+ return out
def update(self, fname: str,
mtime: float,
checksum: str,
- tags: tuple | None = None) -> None:
+ tags: tuple | None = None) -> tuple:
params: tuple = (mtime, checksum, tags, fname)
out: tuple = self.query(UPDATE_FILE, params)[0]
log.debug("update %s", out)
+ return out
def update_tags(self, fname: str,
- tags: tuple | None = None) -> None:
+ tags: tuple | None = None) -> tuple:
params: tuple = (tags, fname)
out: tuple = self.query(UPDATE_FILE_TAGS, params)[0]
- log.debug("update %s", out)
+ log.debug("update_tags %s", out)
+ return out
diff --git a/src/pyssg/md_parser.py b/src/pyssg/md_parser.py
index 97443f0..7056a5b 100644
--- a/src/pyssg/md_parser.py
+++ b/src/pyssg/md_parser.py
@@ -1,6 +1,7 @@
import os
from operator import itemgetter
from logging import Logger, getLogger
+import sys
from typing import Any
from markdown import Markdown
@@ -9,8 +10,9 @@ from pymdvar import VariableExtension
from markdown_checklist.extension import ChecklistExtension
from markdown.extensions.toc import TocExtension
-from .database import Database
-from .page import Page
+from pyssg.db.database import Database
+from pyssg.page import Page
+from pyssg.utils import get_file_stats
log: Logger = getLogger(__name__)
@@ -30,11 +32,11 @@ def get_md_obj(variables: dict[str, str],
# stripTitle generates an error when True,
# if there is no title attr
YafgExtension(stripTitle=False,
- figureClass="",
- figcaptionClass="",
+ figureClass='',
+ figcaptionClass='',
figureNumbering=False,
- figureNumberClass="number",
- figureNumberText="Figure"),
+ figureNumberClass='number',
+ figureNumberText='Figure'),
ChecklistExtension(),
'pymdownx.mark',
'pymdownx.caret',
@@ -76,20 +78,32 @@ class MDParser:
self.all_tags: list[tuple[str, str]] = []
def parse_files(self) -> None:
- log.debug('parsing all files')
for i, f in enumerate(self.files):
log.debug('parsing file "%s"', f)
- src_file: str = os.path.join(self.dir_config['src'], f)
- log.debug('path "%s"', src_file)
- self.db.update(src_file, remove=f'{self.dir_config["src"]}/')
-
+ path: str = os.path.join(self.dir_config['src'], f)
+ content: str = self.md.reset().convert(open(path).read())
+ fstats = get_file_stats(path)
+ chksm: str = fstats[0]
+ time: float = fstats[1]
+
+ entry: tuple
+ # old entry
+ oentry: tuple | None = self.db.select(f)
+ if not oentry:
+ entry = self.db.insert(f, time, chksm)
+ else:
+ oe_chksm: str = oentry[3]
+ if chksm != oe_chksm:
+ entry = self.db.update(f, time, chksm)
+ else:
+ entry = oentry
+
log.debug('parsing md into html')
- content: str = self.md.reset().convert(open(src_file).read())
# ignoring md.Meta type as it is not yet defined
# (because it is from an extension)
page: Page = Page(f,
- self.db.e[f].ctimestamp,
- self.db.e[f].mtimestamp,
+ entry[1],
+ entry[2],
content,
self.md.toc, # type: ignore
self.md.toc_tokens, # type: ignore
@@ -101,19 +115,19 @@ class MDParser:
log.debug('adding to file list')
self.all_files.append(page)
- if self.dir_config['tags'] and page.tags is not None:
- log.debug('parsing tags for "%s"', f)
- self.db.update_tags(f, set(map(itemgetter(0), page.tags)))
+ if self.dir_config['tags']:
+ if page.tags is None:
+ self.db.update_tags(f)
+ else:
+ tags: tuple = tuple(set(map(itemgetter(0), page.tags)))
+ if tags != entry[4]:
+ self.db.update_tags(f, tags)
log.debug('add all tags to tag list')
for t in page.tags:
if t[0] not in list(map(itemgetter(0), self.all_tags)):
- log.debug('adding tag "%s"', t[0])
self.all_tags.append(t)
- else:
- log.debug('ignoring tag "%s"; already present', t[0])
- else:
- log.debug('no tags to parse')
+ log.debug('added tag "%s"', t[0])
log.debug('sorting all lists for consistency')
self.all_files.sort(reverse=True)
diff --git a/src/pyssg/page.py b/src/pyssg/page.py
index 93eedee..26d2655 100644
--- a/src/pyssg/page.py
+++ b/src/pyssg/page.py
@@ -120,5 +120,5 @@ class Page:
# no need to specify dir_config['url'] as self.name already
# contains the relative url
name_html: str = self.name.replace(".md", ".html")
- self.url = f'{self.config["url"]["main"]}/{name_html}'
+ self.url = f'{self.config["url"]["base"]}/{name_html}'
log.debug('final url "%s"', self.url)
diff --git a/src/pyssg/pyssg.py b/src/pyssg/pyssg.py
index 96b68ff..fd136d9 100644
--- a/src/pyssg/pyssg.py
+++ b/src/pyssg/pyssg.py
@@ -84,31 +84,15 @@ def main() -> None:
log.debug('reading config file')
config: list[dict] = get_parsed_config(config_path)
- print(json.dumps(config, sort_keys=True, indent=2))
+ # print(json.dumps(config, sort_keys=True, indent=2))
if args['build']:
log.info('building the html files')
db: Database = Database(config[0]['path']['db'])
- print(db.select_all())
-
- fname: str = "t2"
- ctime: float = 1.0
- mtime: float = 2.0
- chksm: str = "xxx"
- tags: tuple | None = ("t1", "t2", "t3")
- # tags = None
-
- db.insert(fname, ctime, chksm, tags)
- # db.update(fname, mtime, chksm, tags)
- print(db.select_all())
-
- # TODO: change logic from "dir_paths" to single config
- # log.debug('building all dir_paths found in conf')
- # for dir_path in config[0]['dirs'].keys():
- # log.debug('building for "%s"', dir_path)
- # builder: Builder = Builder(config[0], db, dir_path)
- # builder.build()
+ log.debug('building all dir_paths found in conf')
+ builder: Builder = Builder(config[0], db, config[1])
+ builder.build()
db.write()
log.info('finished building the html files')
diff --git a/src/pyssg/utils.py b/src/pyssg/utils.py
index 216f535..487125f 100644
--- a/src/pyssg/utils.py
+++ b/src/pyssg/utils.py
@@ -81,12 +81,13 @@ def copy_file(src: str, dst: str) -> None:
# as seen in SO: https://stackoverflow.com/a/1131238
def get_checksum(path: str) -> str:
- log.debug('calculating md5 checksum for "%s"', path)
file_hash = md5()
with open(path, "rb") as f:
while chunk := f.read(4096):
file_hash.update(chunk)
- return file_hash.hexdigest()
+ out: str = file_hash.hexdigest()
+ log.debug('md5 checksum of "%s": %s', path, out)
+ return out
def get_expanded_path(path: str) -> str:
@@ -99,6 +100,12 @@ def get_expanded_path(path: str) -> str:
return epath
+def get_file_stats(path: str) -> tuple[str, float]:
+ time: float = os.stat(path).st_mtime
+ chksm: str = get_checksum(path)
+ return (chksm, time)
+
+
def get_time_now(fmt: str, tz: timezone=timezone.utc) -> str:
return datetime.now(tz=tz).strftime(fmt)