From 40d23848d816816b3db3e7296e8a28f964b53786 Mon Sep 17 00:00:00 2001 From: David Luevano Alvarado Date: Wed, 7 Dec 2022 03:31:51 -0600 Subject: refactor code, fix config checker and new files for pyssg.xyz --- src/pyssg/builder.py | 164 ++++++++++++++++++------------------ src/pyssg/configuration.py | 63 +++++++++----- src/pyssg/database.py | 6 -- src/pyssg/plt/default.yaml | 13 +-- src/pyssg/plt/mandatory_config.yaml | 9 +- src/pyssg/utils.py | 63 +++++++------- src/pyssg/yaml_parser.py | 31 +++---- 7 files changed, 179 insertions(+), 170 deletions(-) (limited to 'src') diff --git a/src/pyssg/builder.py b/src/pyssg/builder.py index eec0125..65c5837 100644 --- a/src/pyssg/builder.py +++ b/src/pyssg/builder.py @@ -27,29 +27,28 @@ class Builder: if self.dir_path not in self.config['dirs']: log.error('couldn\'t find "dirs.%s" attribute in config file', self.dir_path) sys.exit(1) - if os.path.isabs(self.dir_path) and self.dir_path.strip() != '/': log.error('dir path "%s" cannot be absolute, except for the special case "/"', self.dir_path) sys.exit(1) - log.debug('building dir_config and src/dst paths for "%s" dir path', self.dir_path) - self.dir_config: dict = deepcopy(self.config['dirs'][self.dir_path]) + log.debug('building dir_cfg for "%s" dir_path', self.dir_path) + self.dir_cfg: dict = deepcopy(self.config['dirs'][self.dir_path]['cfg']) if self.dir_path.strip() == '/': - log.debug('dir path is "/", copying src/dst directly') - self.dir_config['src'] = self.config['path']['src'] - self.dir_config['dst'] = self.config['path']['dst'] - self.dir_config['url'] = self.config['url']['main'] + log.debug('dir_path is "/", copying src/dst directly') + self.dir_cfg['src'] = self.config['path']['src'] + self.dir_cfg['dst'] = self.config['path']['dst'] + self.dir_cfg['url'] = self.config['url']['main'] else: - self.dir_config['src'] = os.path.join(self.config['path']['src'], self.dir_path) - self.dir_config['dst'] = os.path.join(self.config['path']['dst'], self.dir_path) - self.dir_config['url'] = f"{self.config['url']['main']}/{self.dir_path}" + log.debug('dir_path is "%s", generating', self.dir_path) + self.dir_cfg['src'] = os.path.join(self.config['path']['src'], self.dir_path) + self.dir_cfg['dst'] = os.path.join(self.config['path']['dst'], self.dir_path) + self.dir_cfg['url'] = f'{self.config["url"]["main"]}/{self.dir_path}' # the autoescape option could be a security risk if used in a dynamic # website, as far as i can tell log.debug('initializing the jinja environment') - self.__loader: FSLoader = FSLoader(self.config['path']['plt']) - self.env: Environment = Environment(loader=self.__loader, + self.env: Environment = Environment(loader=FSLoader(self.config['path']['plt']), autoescape=False, trim_blocks=True, lstrip_blocks=True) @@ -64,31 +63,31 @@ class Builder: self.all_tags: list[tuple[str, str]] self.common_vars: dict - def build(self) -> None: log.debug('building site for dir path "%s"', self.dir_path) - if 'exclude_dirs' not in self.dir_config: - log.debug('"exclude_dirs" attribute not found in "dirs.%s" in config file', self.dir_path) - self.dir_config['exclude_dirs'] = [] - if not isinstance(self.dir_config['exclude_dirs'], list): - log.error('"exclude_dirs" attribute is not of type "list"') + if 'exclude_dirs' not in self.dir_cfg: + log.debug('"exclude_dirs" field not found in "dirs.%s.cfg"', self.dir_path) + self.dir_cfg['exclude_dirs'] = [] + if not isinstance(self.dir_cfg['exclude_dirs'], list): + log.error('"exclude_dirs" field in "dirs.%s.cfg" isn\'t of type "list"', self.dir_path) sys.exit(1) - self.dirs = get_dir_structure(self.dir_config['src'], - self.dir_config['exclude_dirs']) - self.md_files = get_file_list(self.dir_config['src'], - ['.md'], - self.dir_config['exclude_dirs']) - self.html_files = get_file_list(self.dir_config['src'], - ['.html'], - self.dir_config['exclude_dirs']) + self.dirs = get_dir_structure(self.dir_cfg['src'], + self.dir_cfg['exclude_dirs']) + self.md_files = get_file_list(self.dir_cfg['src'], + tuple('.md'), + self.dir_cfg['exclude_dirs']) + self.html_files = get_file_list(self.dir_cfg['src'], + tuple('.html'), + self.dir_cfg['exclude_dirs']) self.__create_dir_structure() self.__copy_html_files() + # TODO: check if need to pass dirs.dir_path.files parser: MDParser = MDParser(self.md_files, self.config, - self.dir_config, + self.dir_cfg, self.db) parser.parse_files() @@ -97,88 +96,89 @@ class Builder: self.updated_files = parser.updated_files self.all_tags = parser.all_tags + # TODO: check if need to pass dirs.dir_path.files # dict for the keyword args to pass to the template renderer - log.debug('adding config, all_pages and all_tags to exposed vars for jinja') + log.debug('adding exposed vars for jinja') self.common_vars = dict(config=self.config, - dir_config=self.dir_config, + dir_config=self.dir_cfg, all_pages=self.all_files, all_tags=self.all_tags) - self.__render_pages(self.dir_config['plt']) + self.__render_pages(self.dir_cfg['plt']) - if 'tags' in self.dir_config and self.dir_config['tags']: - log.debug('rendering tags for dir "%s"', self.dir_path) - create_dir(os.path.join(self.dir_config['dst'], 'tag'), True, True) - self.__render_tags(self.dir_config['tags']) + if self.dir_cfg['tags']: + log.debug('rendering tags for dir_path "%s"', self.dir_path) + create_dir(os.path.join(self.dir_cfg['dst'], 'tag'), True, True) + if isinstance(self.dir_cfg['tags'], str): + self.__render_tags(self.dir_cfg['tags']) + else: + self.__render_tags('tag.html') - opt_renders: dict[str, str] = {'index': 'index.html', + default_plts: dict[str, str] = {'index': 'index.html', 'rss': 'rss.xml', 'sitemap': 'sitemap.xml'} - for opt in opt_renders.keys(): - if opt in self.dir_config and self.dir_config[opt]: - self.__render_template(self.dir_config[opt], - opt_renders[opt], - **self.common_vars) - + for opt in default_plts.keys(): + if opt in self.dir_cfg: + if isinstance(self.dir_cfg[opt], str): + self.__render_template(self.dir_cfg[opt], + default_plts[opt], + **self.common_vars) + else: + self.__render_template(default_plts[opt], + default_plts[opt], + **self.common_vars) def __create_dir_structure(self) -> None: - log.debug('creating dir structure') - create_dir(self.dir_config['dst'], True, True) - _dir_path: str + log.debug('creating dir structure for dir_path "%s"', self.dir_path) + create_dir(self.dir_cfg['dst'], True, True) for d in self.dirs: - _dir_path = os.path.join(self.dir_config['dst'], d) - # using silent=True to not print the info create dir msgs for this - create_dir(_dir_path, True, True) - + path: str = os.path.join(self.dir_cfg['dst'], d) + create_dir(path, True, True) def __copy_html_files(self) -> None: - if len(self.html_files) > 0: - log.debug('copying all html files') - else: + if not len(self.html_files) > 0: log.debug('no html files to copy') + return + + log.debug('copying all html files') src_file: str dst_file: str - - for f in self.html_files: - src_file = os.path.join(self.dir_config['src'], f) - dst_file = os.path.join(self.dir_config['dst'], f) - + for file in self.html_files: + src_file = os.path.join(self.dir_cfg['src'], file) + dst_file = os.path.join(self.dir_cfg['dst'], file) + # always copy on force + if self.config['info']['force']: + log.debug('copying "%s"; forced', file) + copy_file(src_file, dst_file) + continue # only copy files if they have been modified (or are new) - if self.db.update(src_file, remove=f'{self.dir_config["src"]}/'): - log.debug('file "%s" has been modified or is new, copying', f) + if self.db.update(src_file, remove=f'{self.dir_cfg["src"]}/'): + log.debug('copying "%s"; has been modified or is new', file) copy_file(src_file, dst_file) - else: - if self.config['info']['force']: - log.debug('file "%s" hasn\'t been modified, but option force is set to true, copying anyways', f) - copy_file(src_file, dst_file) - else: - log.debug('file "%s" hasn\'t been modified, ignoring', f) - + continue + log.debug('ignoring "%s"; hasn\'t been modified, not forced', file) def __render_pages(self, template_name: str) -> None: - log.debug('rendering html') + log.debug('rendering pages with template "%s"', template_name) page_vars: dict = deepcopy(self.common_vars) - temp_files: list[Page] - + temp_pages: list[Page] # check if only updated should be created if self.config['info']['force']: log.debug('all html will be rendered, force is set to true') - temp_files = self.all_files + temp_pages = self.all_files else: log.debug('only updated or new html will be rendered') - temp_files = self.updated_files + temp_pages = self.updated_files - for p in temp_files: - log.debug('adding page to exposed vars for jinja') + for p in temp_pages: + p_fname: str = p.name.replace('.md', '.html') + log.debug('adding page "%s" to exposed vars for jinja', p_fname) page_vars['page'] = p # actually render article - self.__render_template(template_name, - p.name.replace('.md','.html'), - **page_vars) - + self.__render_template(template_name, p_fname, **page_vars) def __render_tags(self, template_name: str) -> None: - log.debug('rendering tags') + log.debug('rendering tags with template "%s"', template_name) tag_vars: dict = deepcopy(self.common_vars) tag_pages: list[Page] for t in self.all_tags: @@ -192,16 +192,12 @@ class Builder: log.debug('adding page "%s" as it contains tag "%s"', p.name, t[0]) tag_pages.append(p) - log.debug('adding tag and tag_pages to exposed vars for jinja') tag_vars['tag'] = t tag_vars['tag_pages'] = tag_pages - + t_fname: str = f'tag/@{t[0]}.html' # actually render tag page - self.__render_template(template_name, - f'tag/@{t[0]}.html', - **tag_vars) - + self.__render_template(template_name, t_fname, **tag_vars) def __render_template(self, template_name: str, file_name: str, @@ -210,7 +206,7 @@ class Builder: file_name, template_name) template: Template = self.env.get_template(template_name) content: str = template.render(**template_vars) - dst_path: str = os.path.join(self.dir_config['dst'], file_name) + dst_path: str = os.path.join(self.dir_cfg['dst'], file_name) log.debug('writing html file to path "%s"', dst_path) with open(dst_path, 'w') as f: diff --git a/src/pyssg/configuration.py b/src/pyssg/configuration.py index 33a82cd..a2b48b6 100644 --- a/src/pyssg/configuration.py +++ b/src/pyssg/configuration.py @@ -12,24 +12,37 @@ DEFAULT_CONFIG_PATH: str = '$XDG_CONFIG_HOME/pyssg/config.yaml' VERSION: str = version('pyssg') -def __check_well_formed_config(config: dict) -> None: - log.debug('checking that config file is well formed (at least contains mandatory fields') - mandatory_config: dict = get_parsed_yaml('mandatory_config.yaml', 'pyssg.plt')[0] - - for section in mandatory_config.keys(): - log.debug('checking section "%s"', section) - if not config[section]: - log.error('config does not have section "%s"', section) +def __check_well_formed_config(config: dict, + config_base: list[dict], + prefix_key: str = '') -> None: + for key in config_base[0].keys(): + current_key: str = f'{prefix_key}.{key}' if prefix_key != '' else key + log.debug('checking "%s"', current_key) + if key not in config: + log.error('config doesn\'t have "%s"', current_key) + log.debug('key: %s; config.keys: %s', key, config.keys()) sys.exit(1) + + # checks for dir_paths + if key == 'dirs': + if '/' not in config[key]: + log.error('config doesn\'t have "%s./"', current_key) + log.debug('key: %s; config.keys: %s', key, config[key].keys()) + sys.exit(1) + + log.debug('checking "%s" fields for (%s) dir_paths', key, ', '.join(config[key].keys())) + for dkey in config[key].keys(): + new_current_key: str = f'{current_key}.{dkey}' + new_config_base: list[dict] = [config_base[1], config_base[1]] + __check_well_formed_config(config[key][dkey], new_config_base, new_current_key) + continue + # the case for elements that don't have nested elements - if not mandatory_config[section]: - log.debug('section "%s" doesn\'t need nested elements', section) + if not config_base[0][key]: + log.debug('"%s" doesn\'t need nested elements', current_key) continue - for option in mandatory_config[section].keys(): - log.debug('checking option "%s"', option) - if option not in config[section] or not config[section][option]: - log.error('config does not have option "%s" in section "%s"', option, section) - sys.exit(1) + new_config_base: list[dict] = [config_base[0][key], config_base[1]] + __check_well_formed_config(config[key], new_config_base, current_key) def __expand_all_paths(config: dict) -> None: @@ -41,11 +54,15 @@ def __expand_all_paths(config: dict) -> None: # not necessary to type deeper than the first dict def get_parsed_config(path: str) -> list[dict]: log.debug('reading config file "%s"', path) - config: list[dict] = get_parsed_yaml(path) # type: ignore + config: list[dict] = get_parsed_yaml(path) + mandatory_config: list[dict] = get_parsed_yaml('mandatory_config.yaml', 'pyssg.plt') log.info('found %s document(s) for configuration "%s"', len(config), path) - - __check_well_formed_config(config[0]) + log.debug('checking that config file is well formed (at least contains mandatory fields') + # TODO: make it work with n yaml docs + __check_well_formed_config(config[0], mandatory_config) + log.error('testing') + sys.exit(1) __expand_all_paths(config[0]) return config @@ -55,12 +72,12 @@ def get_parsed_config(path: str) -> list[dict]: # static config means config that shouldn't be changed by the user def get_static_config() -> dict[str, dict]: log.debug('reading and setting static config') - config: dict = get_parsed_yaml('static_config.yaml', 'pyssg.plt')[0] # type: ignore + config: dict = get_parsed_yaml('static_config.yaml', 'pyssg.plt')[0] + # do I really need a lambda function... + current_time = lambda x : datetime.now(tz=timezone.utc).strftime(x) config['info']['version'] = VERSION - config['info']['rss_run_date'] = datetime.now(tz=timezone.utc)\ - .strftime(config['fmt']['rss_date']) - config['info']['sitemap_run_date'] = datetime.now(tz=timezone.utc)\ - .strftime(config['fmt']['sitemap_date']) + config['info']['rss_run_date'] = current_time(config['fmt']['rss_date']) + config['info']['sitemap_run_date'] = current_time(config['fmt']['sitemap_date']) return config diff --git a/src/pyssg/database.py b/src/pyssg/database.py index 34bf534..d4b6a86 100644 --- a/src/pyssg/database.py +++ b/src/pyssg/database.py @@ -19,7 +19,6 @@ class Database: self.db_path: str = db_path self.e: dict[str, DatabaseEntry] = dict() - # updates the tags for a specific entry (file) # file_name only contains the entry name (not an absolute path) def update_tags(self, file_name: str, @@ -37,7 +36,6 @@ class Database: ' as it is not present in db', file_name) sys.exit(1) - # returns a bool that indicates if the entry # was (includes new entries) or wasn't updated def update(self, file_name: str, @@ -86,7 +84,6 @@ class Database: log.debug('entry "%s" hasn\'t been modified', f) return False - def write(self) -> None: log.debug('writing db') with open(self.db_path, 'w') as file: @@ -95,7 +92,6 @@ class Database: csv_writer = csv.writer(file, delimiter=self.__COLUMN_DELIMITER) csv_writer.writerow(v.get_raw_entry()) - def _db_path_exists(self) -> bool: log.debug('checking that "%s" exists or is a file', self.db_path) if not os.path.exists(self.db_path): @@ -110,7 +106,6 @@ class Database: return True - def _get_csv_rows(self) -> list[list[str]]: rows: list[list[str]] with open(self.db_path, 'r') as f: @@ -120,7 +115,6 @@ class Database: return rows - def read(self) -> None: log.debug('reading db') if not self._db_path_exists(): diff --git a/src/pyssg/plt/default.yaml b/src/pyssg/plt/default.yaml index 74ef0ee..0b722a6 100644 --- a/src/pyssg/plt/default.yaml +++ b/src/pyssg/plt/default.yaml @@ -18,10 +18,11 @@ fmt: list_sep_date: "%B %Y" dirs: /: - plt: "page.html" - tags: False - index: False - rss: False - sitemap: False - exclude_dirs: [] + cfg: + plt: "page.html" + tags: False + index: False + rss: False + sitemap: False + exclude_dirs: [] ... \ No newline at end of file diff --git a/src/pyssg/plt/mandatory_config.yaml b/src/pyssg/plt/mandatory_config.yaml index 3f12966..c1ce9f2 100644 --- a/src/pyssg/plt/mandatory_config.yaml +++ b/src/pyssg/plt/mandatory_config.yaml @@ -14,5 +14,12 @@ fmt: list_sep_date: dirs: /: - plt: +... +--- +cfg: + plt: + tags: + index: + rss: + sitemap: ... \ No newline at end of file diff --git a/src/pyssg/utils.py b/src/pyssg/utils.py index 3e05d0a..e63ee08 100644 --- a/src/pyssg/utils.py +++ b/src/pyssg/utils.py @@ -7,63 +7,65 @@ from logging import Logger, getLogger log: Logger = getLogger(__name__) +# TODO: add file exclusion option def get_file_list(path: str, - exts: list[str], - exclude: list[str]=[]) -> list[str]: + exts: tuple[str], + exclude_dirs: list[str] = []) -> list[str]: log.debug('retrieving file list in path "%s" that contain file' - ' extensions (%s) except (%s)', + ' extensions (%s) except directories (%s)', path, ', '.join(exts), - ', '.join(exclude)) - out: list[str] = [] + ', '.join(exclude_dirs)) + file_list: list[str] = [] for root, dirs, files in os.walk(path): - if exclude != []: + if exclude_dirs != []: log.debug('removing excludes from list') - dirs[:] = [d for d in dirs if d not in exclude] - - for f in files: - if f.endswith(tuple(exts)): - stripped_f: str = os.path.join(root, f).replace(path, '')[1:] - out.append(stripped_f) + dirs[:] = [d for d in dirs if d not in exclude_dirs] + for file in files: + if file.endswith(exts): + # [1:] is required to remove the '/' at the beginning after replacing + file_name: str = os.path.join(root, file).replace(path, '')[1:] + file_list.append(file_name) log.debug('added file "%s" without "%s" part: "%s"', - f, path, stripped_f) + file, path, file_name) else: log.debug('ignoring file "%s" as it doesn\'t contain' - ' any of the extensions (%s)', f, ', '.join(exts)) - - return out + ' any of the extensions (%s)', file, ', '.join(exts)) + return file_list def get_dir_structure(path: str, - exclude: list[str]=[]) -> list[str]: - log.debug('retrieving dir structure in path "%s" except (%s)', + exclude: list[str] = []) -> list[str]: + log.debug('retrieving dir structure in path "%s" except directories (%s)', path, ', '.join(exclude)) - out: list[str] = [] + dir_list: list[str] = [] for root, dirs, files in os.walk(path): if exclude != []: log.debug('removing excludes from list') dirs[:] = [d for d in dirs if d not in exclude] - for d in dirs: - if root in out: - out.remove(root) + if root in dir_list: + dir_list.remove(root) log.debug('removed dir "%s" as it already is in the list', root) + # not removing the 'path' part here, as comparisons with 'root' would fail joined_dir: str = os.path.join(root, d) - out.append(joined_dir) + dir_list.append(joined_dir) log.debug('added dir "%s" to the list', joined_dir) - log.debug('removing "%s" from all dirs in list', path) - return [o.replace(path, '')[1:] for o in out] + # [1:] is required to remove the '/' at the beginning after replacing + return [d.replace(path, '')[1:] for d in dir_list] -def create_dir(path: str, p: bool=False, silent=False) -> None: +def create_dir(path: str, p: bool = False, silent=False) -> None: try: if p: os.makedirs(path) else: os.mkdir(path) - if not silent: log.info('created directory "%s"', path) + if not silent: + log.info('created directory "%s"', path) except FileExistsError: - if not silent: log.info('directory "%s" already exists, ignoring', path) + if not silent: + log.info('directory "%s" already exists, ignoring', path) def copy_file(src: str, dst: str) -> None: @@ -74,6 +76,7 @@ def copy_file(src: str, dst: str) -> None: log.info('file "%s" already exists, ignoring', dst) +# only used for database, but keeping it here as it is an independent function # as seen in SO: https://stackoverflow.com/a/1131238 def get_checksum(path: str) -> str: log.debug('calculating md5 checksum for "%s"', path) @@ -81,7 +84,6 @@ def get_checksum(path: str) -> str: with open(path, "rb") as f: while chunk := f.read(4096): file_hash.update(chunk) - return file_hash.hexdigest() @@ -90,8 +92,7 @@ def get_expanded_path(path: str) -> str: expanded_path: str = os.path.normpath(os.path.expandvars(path)) if '$' in expanded_path: log.error('"$" character found in expanded path "%s";' - ' could be due to non-existant env var.', expanded_path) + ' could be due to non-existant env var', expanded_path) sys.exit(1) log.debug('expanded path "%s" to "%s"', path, expanded_path) - return expanded_path diff --git a/src/pyssg/yaml_parser.py b/src/pyssg/yaml_parser.py index f9303d6..3109cbc 100644 --- a/src/pyssg/yaml_parser.py +++ b/src/pyssg/yaml_parser.py @@ -12,30 +12,23 @@ log: Logger = getLogger(__name__) def __join_constructor(loader: SafeLoader, node: SequenceNode) -> str: seq = loader.construct_sequence(node) return ''.join([str(i) for i in seq]) -SafeLoader.add_constructor('!join', __join_constructor) -# "file" is either a path or the yaml content itself -def __read_raw_yaml(file: TextIOWrapper) -> list[dict]: +def __read_raw_yaml(path: str) -> list[dict]: all_docs: list[dict] = [] - all_docs_gen = yaml.safe_load_all(file) - for doc in all_docs_gen: - all_docs.append(doc) - + with open(path, 'r') as f: + for doc in yaml.safe_load_all(f): + all_docs.append(doc) return all_docs def get_parsed_yaml(resource: str, package: str='') -> list[dict]: - all_yaml_docs: list[dict] = [] if package == '': - log.debug('no package specified, reading file "%s"', resource) - with open(resource, 'r') as f: - all_yaml_docs = __read_raw_yaml(f) - else: - log.debug('package "%s" specified, reading resource "%s"', - package, resource) - with rpath(package, resource) as p: - with open(p, 'r') as f: - all_yaml_docs = __read_raw_yaml(f) - - return all_yaml_docs + log.debug('parsing yaml; reading "%s"', resource) + return __read_raw_yaml(resource) + log.debug('parsing yaml; reading "%s.%s"', package, resource) + with rpath(package, resource) as p: + return __read_raw_yaml(str(p)) + + +SafeLoader.add_constructor('!join', __join_constructor) -- cgit v1.2.3-70-g09d2