From c09b51be40444ef59809b6148184490bd4972004 Mon Sep 17 00:00:00 2001 From: David Luevano Alvarado Date: Sat, 25 Feb 2023 21:41:47 -0600 Subject: add database tests --- src/pyssg/database.py | 21 +++-- src/pyssg/md_parser.py | 2 +- tests/conftest.py | 60 +++++++++++++ tests/io_files/md/__init__.py | 0 tests/io_files/md/a/__init__.py | 0 tests/io_files/md/a/second.md | 12 +++ tests/io_files/md/first.md | 10 +++ tests/io_files/md/new.md | 10 +++ tests/test_database.py | 188 ++++++++++++++++++++++++++++++++++++++++ 9 files changed, 291 insertions(+), 12 deletions(-) create mode 100644 tests/io_files/md/__init__.py create mode 100644 tests/io_files/md/a/__init__.py create mode 100644 tests/io_files/md/a/second.md create mode 100644 tests/io_files/md/first.md create mode 100644 tests/io_files/md/new.md create mode 100644 tests/test_database.py diff --git a/src/pyssg/database.py b/src/pyssg/database.py index ae0a8d4..38658eb 100644 --- a/src/pyssg/database.py +++ b/src/pyssg/database.py @@ -26,11 +26,11 @@ class Database: if file_name in self.e: log.debug('updating tags for entry "%s"', file_name) log.debug('entry "%s" old tags: %s', - file_name, self.e[file_name]) + file_name, self.e[file_name].tags) self.e[file_name].update_tags(new_tags) log.debug('entry "%s" new tags: %s', - file_name, self.e[file_name]) + file_name, self.e[file_name].tags) else: log.error('can\'t update tags for entry "%s",' ' as it is not present in db', file_name) @@ -62,20 +62,19 @@ class Database: self.e[f] = DatabaseEntry((f, time, 0.0, cksm, tags)) return - # old_e is old entity - old_e: DatabaseEntry = self.e[f] - log.debug('entry "%s" old content: %s', f, old_e) + # oe is old entity + oe: DatabaseEntry = self.e[f] + log.debug('entry "%s" old content: %s', f, oe) # 2) - if cksm != old_e.checksum: - log.debug('entry "%s" has been modified, updating', f) - self.e[f] = DatabaseEntry((f, old_e.ctimestamp, time, cksm, tags)) + if cksm != oe.checksum: + log.debug('entry "%s" has been modified, updating; ' + 'using old tags', f) + self.e[f] = DatabaseEntry((f, oe.ctimestamp, time, cksm, oe.tags)) log.debug('entry "%s" new content: %s', f, self.e[f]) - return # 3) else: log.debug('entry "%s" hasn\'t been modified', f) - return def write(self) -> None: log.debug('writing db') @@ -93,7 +92,7 @@ class Database: ' ignore if it\'s the first run', self.db_path) return False if not os.path.isfile(self.db_path): - log.error('"%s" is not a file"', self.db_path) + log.error('"%s" is not a file', self.db_path) sys.exit(1) return True diff --git a/src/pyssg/md_parser.py b/src/pyssg/md_parser.py index 3119db7..3ef297d 100644 --- a/src/pyssg/md_parser.py +++ b/src/pyssg/md_parser.py @@ -82,7 +82,7 @@ class MDParser: if self.dir_config['tags'] and page.tags is not None: log.debug('parsing tags for "%s"', f) - self.db.update_tags(f, list(map(itemgetter(0), page.tags))) + self.db.update_tags(f, set(map(itemgetter(0), page.tags))) log.debug('add all tags to tag list') for t in page.tags: diff --git a/tests/conftest.py b/tests/conftest.py index e2d6946..063f4a2 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,6 +1,7 @@ import os import sys import pytest +import shutil from pathlib import Path from typing import Any, Callable from pytest import MonkeyPatch @@ -11,6 +12,7 @@ from logging import Logger, getLogger, DEBUG from pyssg.arg_parser import get_parser from pyssg.custom_logger import setup_logger +from pyssg.database_entry import DatabaseEntry @pytest.fixture(scope='session') @@ -116,3 +118,61 @@ def tmp_dir_structure(tmp_path: Path) -> Path: for ext in ['txt', 'md', 'html']: (d/f'f{i}.{ext}').write_text('sample') return root + + +@pytest.fixture(scope='session') +def tmp_db_e1() -> DatabaseEntry: + return DatabaseEntry(('first.md', + 1671076311.823135, + 0.0, + '778bce781d95730cd1e872a10130e20d', + '-')) + + +@pytest.fixture(scope='session') +def tmp_db_e2() -> DatabaseEntry: + return DatabaseEntry(('a/second.md', + 1671077831.63301, + # 1671078892.892921, + 1677381461.8107588, + #'6092d6471d3a83135293e34ef6012939', + 'a61d0116844b6ebc02db62b4b1bf453d', + 'english,short,update')) + + +@pytest.fixture(scope='function') +def tmp_db(tmp_path: Path, + tmp_db_e1: DatabaseEntry, + tmp_db_e2: DatabaseEntry) -> Path: + root: Path = tmp_path/'db' + db_path: Path = tmp_path/'db/sample_db.psv' + root.mkdir() + e1: str = '|'.join(tmp_db_e1.get_raw_entry()) + e2: str = '|'.join(tmp_db_e2.get_raw_entry()) + db_path.write_text(f'{e1}\n{e2}\n') + return db_path + + +@pytest.fixture(scope='function') +def tmp_db_wrong_col_num(tmp_path: Path) -> Path: + root: Path = tmp_path/'db' + db_path: Path = tmp_path/'db/sample_db_wrong_col_num.psv' + root.mkdir() + # missing tags, could be anything though + db_path.write_text('name|0.0|0.0|cksm\n') + return db_path + + +@pytest.fixture(scope='function') +def tmp_src_dir(tmp_path: Path, + test_dir: str) -> Path: + src: Path = tmp_path/'src' + src_a: Path = src/'a' + src.mkdir() + src_a.mkdir() + src_test: str = f'{test_dir}/io_files/md' + + files: list[str] = ['first.md', 'new.md', 'a/second.md'] + for f in files: + shutil.copy2(f'{src_test}/{f}', f'{str(src)}/{f}') + return src diff --git a/tests/io_files/md/__init__.py b/tests/io_files/md/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/io_files/md/a/__init__.py b/tests/io_files/md/a/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/io_files/md/a/second.md b/tests/io_files/md/a/second.md new file mode 100644 index 0000000..cb4e333 --- /dev/null +++ b/tests/io_files/md/a/second.md @@ -0,0 +1,12 @@ +title: Second blog post for testing purposes +author: David Luévano + Someone Else +lang: en +summary: This is just a post used for testing (the second). +tags: test + english + short + update + multiple-author + +A second "blog entry" for testing purposes which uses multiple authors and is inside a subdirectory. diff --git a/tests/io_files/md/first.md b/tests/io_files/md/first.md new file mode 100644 index 0000000..567ea3e --- /dev/null +++ b/tests/io_files/md/first.md @@ -0,0 +1,10 @@ +title: First blog post for testing purposes +author: David Luévano +lang: en +summary: This is just a post used for testing. +tags: test + english + short + update + +Even though I have this "blog" subdomain and page setup, doesn't mean I'll be blogging for pyssg, this is just to serve as an example for the types of sites that pyssg can be used for. \ No newline at end of file diff --git a/tests/io_files/md/new.md b/tests/io_files/md/new.md new file mode 100644 index 0000000..ce684a7 --- /dev/null +++ b/tests/io_files/md/new.md @@ -0,0 +1,10 @@ +title: New file +author: David Luévano +lang: en +summary: New file used for testing the database. +tags: test + english + short + update + +This is a sample markdown file used for testing pyssg. \ No newline at end of file diff --git a/tests/test_database.py b/tests/test_database.py new file mode 100644 index 0000000..3dd21e0 --- /dev/null +++ b/tests/test_database.py @@ -0,0 +1,188 @@ +from pathlib import Path +import pytest +from logging import DEBUG, WARNING, ERROR, CRITICAL +from pytest import LogCaptureFixture +from pyssg.database import Database +from pyssg.database_entry import DatabaseEntry + + +def test_read_database_no_db(test_dir: str, caplog: LogCaptureFixture) -> None: + path: str = f'{test_dir}/non_existent_db.psv' + war: tuple[str, int, str] = ('pyssg.database', + WARNING, + f'"{path}" doesn\'t exist, will be created ' + 'once process finishes, ignore if it\'s the ' + 'first run') + db: Database = Database(path) + db.read() + assert caplog.record_tuples[-1] == war + + +def test_read_database_not_a_file(test_dir: str, + caplog: LogCaptureFixture) -> None: + path: str = test_dir + err: tuple[str, int, str] = ('pyssg.database', + ERROR, + f'"{path}" is not a file') + db: Database = Database(path) + with pytest.raises(SystemExit) as system_exit: + db.read() + assert system_exit.type == SystemExit + assert system_exit.value.code == 1 + assert caplog.record_tuples[-1] == err + + +def test_read_database(tmp_db: Path, + tmp_db_e1: DatabaseEntry, + tmp_db_e2: DatabaseEntry) -> None: + db: Database = Database(str(tmp_db)) + db.read() + exp_db_e: dict[str, DatabaseEntry] = {tmp_db_e1.fname: tmp_db_e1, + tmp_db_e2.fname: tmp_db_e2} + for fname in db.e.keys(): + assert str(db.e[fname]) == str(exp_db_e[fname]) + + +def test_read_database_wrong_col_num(tmp_db_wrong_col_num: Path, + caplog: LogCaptureFixture) -> None: + cri: tuple[str, int, str] = ('pyssg.database', + CRITICAL, + 'row 1 doesn\'t contain 5 columns, ' + 'contains 4 columns: ' + '"[\'name\', \'0.0\', \'0.0\', \'cksm\']"') + db: Database = Database(str(tmp_db_wrong_col_num)) + with pytest.raises(SystemExit) as system_exit: + db.read() + assert system_exit.type == SystemExit + assert system_exit.value.code == 1 + assert caplog.record_tuples[-1] == cri + + +def test_update_entry_tags(tmp_db: Path, + tmp_db_e1: DatabaseEntry, + caplog: LogCaptureFixture) -> None: + caplog.set_level(DEBUG, logger='pyssg.database') + fname: str = tmp_db_e1.fname + new_tags: set[str] = {'tag1', 'tag2', 'tag3'} + deb: tuple[str, int, str] = ('pyssg.database', + DEBUG, + f'entry "{fname}" new tags: {new_tags}') + db: Database = Database(str(tmp_db)) + db.read() + db.update_tags(fname, new_tags) + assert db.e[fname].tags == new_tags + assert caplog.record_tuples[-1] == deb + + +def test_update_entry_tags_failure(tmp_db: Path, + caplog: LogCaptureFixture) -> None: + fname: str = 'non_existent_file.md' + new_tags: set[str] = {'tag1', 'tag2', 'tag3'} + err: tuple[str, int, str] = ('pyssg.database', + ERROR, + f'can\'t update tags for entry "{fname}",' + ' as it is not present in db') + db: Database = Database(str(tmp_db)) + db.read() + with pytest.raises(SystemExit) as system_exit: + db.update_tags(fname, new_tags) + assert system_exit.type == SystemExit + assert system_exit.value.code == 1 + assert caplog.record_tuples[-1] == err + + +def test_update_entry_new_entry_full_path(tmp_db: Path, + tmp_src_dir: Path, + caplog: LogCaptureFixture) -> None: + caplog.set_level(DEBUG, logger='pyssg.database') + fname: str = f'{tmp_src_dir}/new.md' + deb: tuple[str, int, str] = ('pyssg.database', + DEBUG, + f'entry "{fname}" didn\'t exist, adding with' + ' defaults') + db: Database = Database(str(tmp_db)) + db.read() + db.update(fname) + assert caplog.record_tuples[-1] == deb + + +def test_update_entry_new_entry_fname_only(tmp_db: Path, + tmp_src_dir: Path, + caplog: LogCaptureFixture) -> None: + caplog.set_level(DEBUG, logger='pyssg.database') + fname: str = f'{tmp_src_dir}/new.md' + deb: tuple[str, int, str] = ('pyssg.database', + DEBUG, + f'entry "new.md" didn\'t exist, adding with' + ' defaults') + db: Database = Database(str(tmp_db)) + db.read() + db.update(fname, f'{tmp_src_dir}/') + assert caplog.record_tuples[-1] == deb + + +def test_update_entry_no_mod(tmp_db: Path, + tmp_src_dir: Path, + caplog: LogCaptureFixture) -> None: + caplog.set_level(DEBUG, logger='pyssg.database') + fname: str = f'{tmp_src_dir}/first.md' + deb: tuple[str, int, str] = ('pyssg.database', + DEBUG, + f'entry "first.md" hasn\'t been modified') + db: Database = Database(str(tmp_db)) + db.read() + db.update(fname, f'{tmp_src_dir}/') + assert caplog.record_tuples[-1] == deb + + +def test_update_entry_modified(tmp_db: Path, + tmp_src_dir: Path, + caplog: LogCaptureFixture) -> None: + caplog.set_level(DEBUG, logger='pyssg.database') + fname: str = f'{tmp_src_dir}/a/second.md' + with open(fname, 'a') as f: + f.write('Added modification.\n') + deb: tuple[str, int, str] = ('pyssg.database', + DEBUG, + f'entry "a/second.md" new content: ') + db: Database = Database(str(tmp_db)) + db.read() + db.update(fname, f'{tmp_src_dir}/') + # instead of checking the whole deb tuple, check that the message starts + # with the "new content", as getting the same timestamp will be difficult + assert caplog.record_tuples[-1][1] == DEBUG + assert caplog.record_tuples[-1][2].startswith(deb[2]) + + +def test_write_database_no_change(tmp_db: Path, + tmp_db_e1: DatabaseEntry, + tmp_db_e2: DatabaseEntry) -> None: + db: Database = Database(str(tmp_db)) + db.read() + db.write() + exp_db_e: dict[str, DatabaseEntry] = {tmp_db_e1.fname: tmp_db_e1, + tmp_db_e2.fname: tmp_db_e2} + db2: Database = Database(str(tmp_db)) + db2.read() + for fname in db2.e.keys(): + assert str(db2.e[fname]) == str(exp_db_e[fname]) + + +def test_write_database_new_entry(tmp_db: Path, + tmp_src_dir: Path, + tmp_db_e1: DatabaseEntry, + tmp_db_e2: DatabaseEntry) -> None: + fname: str = 'new.md' + full_path: str = f'{tmp_src_dir}/{fname}' + db: Database = Database(str(tmp_db)) + db.read() + db.update(full_path, f'{tmp_src_dir}/') + db_e2: DatabaseEntry = db.e[fname] + db.write() + exp_db_e: dict[str, DatabaseEntry] = {tmp_db_e1.fname: tmp_db_e1, + tmp_db_e2.fname: tmp_db_e2, + fname: db_e2} + db2: Database = Database(str(tmp_db)) + db2.read() + for fname in db2.e.keys(): + assert str(db2.e[fname]) == str(exp_db_e[fname]) -- cgit v1.2.3-70-g09d2