summaryrefslogtreecommitdiff
path: root/src/pyssg/database.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/pyssg/database.py')
-rw-r--r--src/pyssg/database.py132
1 files changed, 0 insertions, 132 deletions
diff --git a/src/pyssg/database.py b/src/pyssg/database.py
deleted file mode 100644
index 38658eb..0000000
--- a/src/pyssg/database.py
+++ /dev/null
@@ -1,132 +0,0 @@
-import os
-import sys
-import csv
-from logging import Logger, getLogger
-
-from .utils import get_checksum
-from .database_entry import DatabaseEntry
-
-log: Logger = getLogger(__name__)
-
-
-# db class that works for both html and md files
-class Database:
- __COLUMN_NUM: int = 5
- __COLUMN_DELIMITER: str = '|'
-
- def __init__(self, db_path: str) -> None:
- log.debug('initializing the page db on path "%s"', db_path)
- self.db_path: str = db_path
- self.e: dict[str, DatabaseEntry] = dict()
-
- def update_tags(self, file_name: str,
- new_tags: set[str]) -> None:
- # technically, I should ensure this function can only run
- # if self.e is populated
- if file_name in self.e:
- log.debug('updating tags for entry "%s"', file_name)
- log.debug('entry "%s" old tags: %s',
- file_name, self.e[file_name].tags)
-
- self.e[file_name].update_tags(new_tags)
- log.debug('entry "%s" new tags: %s',
- file_name, self.e[file_name].tags)
- else:
- log.error('can\'t update tags for entry "%s",'
- ' as it is not present in db', file_name)
- sys.exit(1)
-
- def update(self, file_name: str,
- remove: str = '') -> None:
- log.debug('updating entry for file "%s"', file_name)
- f: str = file_name
- tags: set[str] = set()
- if remove != '':
- f = file_name.replace(remove, '')
- log.debug('removed "%s" from "%s": "%s"', remove, file_name, f)
-
- # get current time, needs actual file name
- time: float = os.stat(file_name).st_mtime
- log.debug('time for "%s": %s', file_name, time)
-
- # calculate current checksum, also needs actual file name
- cksm: str = get_checksum(file_name)
- log.debug('checksum for "%s": "%s"', file_name, cksm)
-
- # three cases, 1) entry didn't exist,
- # 2) entry has been mod and,
- # 3) entry hasn't been mod
- # 1)
- if f not in self.e:
- log.debug('entry "%s" didn\'t exist, adding with defaults', f)
- self.e[f] = DatabaseEntry((f, time, 0.0, cksm, tags))
- return
-
- # oe is old entity
- oe: DatabaseEntry = self.e[f]
- log.debug('entry "%s" old content: %s', f, oe)
-
- # 2)
- if cksm != oe.checksum:
- log.debug('entry "%s" has been modified, updating; '
- 'using old tags', f)
- self.e[f] = DatabaseEntry((f, oe.ctimestamp, time, cksm, oe.tags))
- log.debug('entry "%s" new content: %s', f, self.e[f])
- # 3)
- else:
- log.debug('entry "%s" hasn\'t been modified', f)
-
- def write(self) -> None:
- log.debug('writing db')
- with open(self.db_path, 'w') as file:
- csv_writer = csv.writer(file, delimiter=self.__COLUMN_DELIMITER)
- for _, v in self.e.items():
- log.debug('writing row: %s', v)
- csv_writer.writerow(v.get_raw_entry())
-
- def _db_path_exists(self) -> bool:
- log.debug('checking that "%s" exists or is a file', self.db_path)
- if not os.path.exists(self.db_path):
- log.warning('"%s" doesn\'t exist, will be'
- ' created once process finishes,'
- ' ignore if it\'s the first run', self.db_path)
- return False
- if not os.path.isfile(self.db_path):
- log.error('"%s" is not a file', self.db_path)
- sys.exit(1)
- return True
-
- def _get_raw_csv_rows(self) -> list[list[str]]:
- rows: list[list[str]]
- with open(self.db_path, 'r') as f:
- csv_reader = csv.reader(f, delimiter=self.__COLUMN_DELIMITER)
- rows = list(csv_reader)
- log.debug('db contains %d rows', len(rows))
- return rows
-
- # TODO: don't include files that are not in the db anymore
- def read(self) -> None:
- log.debug('reading db')
- if not self._db_path_exists():
- return
-
- rows: list[list[str]] = self._get_raw_csv_rows()
- # l=list of values in entry
- log.debug('parsing rows from db')
- for it, row in enumerate(rows):
- i: int = it + 1
- col_num: int = len(row)
- log.debug('row %d content: "%s"', i, row)
- if col_num != self.__COLUMN_NUM:
- log.critical('row %d doesn\'t contain %s columns, contains %d'
- ' columns: "%s"',
- i, self.__COLUMN_NUM, col_num, row)
- sys.exit(1)
- # actual value types
- r: tuple[str, float, float, str, str] = (str(row[0]),
- float(row[1]),
- float(row[2]),
- str(row[3]),
- str(row[4]))
- entry: DatabaseEntry = DatabaseEntry(r)
- self.e[entry.fname] = entry