Markata is a tool for handling directories of markdown.
Post class
Post source
class Post(frontmatter.Post): html: str
set_phase function
set_phase source
def set_phase(function: Callable) -> Any: def wrapper(self: Markata, *args: Tuple, **kwargs: Dict) -> Any: self.phase = function.__name__ self.phase_file.parent.mkdir(exist_ok=True) self.phase_file.write_text(self.phase) result = function(self, *args, **kwargs) self.phase = function.__name__ self.phase_file.parent.mkdir(exist_ok=True) self.phase_file.write_text(self.phase) return result return wrapper
Markata class
Markata source
class Markata: def __init__(self, console: Console = None) -> None: self.phase = "starting" self.MARKATA_CACHE_DIR = Path(".") / ".markata.cache" self.MARKATA_CACHE_DIR.mkdir(exist_ok=True) self.phase_file: Path = self.MARKATA_CACHE_DIR / "phase.txt" self.registered_attrs = hookspec.registered_attrs self.configure() if console is not None: self._console = console atexit.register(self.teardown) with self.cache as cache: self.init_cache_stats = cache.stats() @property def cache(self) -> FanoutCache: return FanoutCache(self.MARKATA_CACHE_DIR, statistics=True) def __getattr__(self, item: str) -> Any: if item in self._pm.hook.__dict__.keys(): # item is a hook, return a callable function return lambda: self.run(item) if item in self.__dict__.keys(): # item is an attribute, return it return self.__getitem__(item) elif item in self.registered_attrs.keys(): # item is created by a plugin, run it stage_to_run_to = max( [attr["lifecycle"] for attr in self.registered_attrs[item]] ).name self.run(stage_to_run_to) return getattr(self, item) else: # Markata does not know what this is, raise raise AttributeError(f"'Markata' object has no attribute '{item}'") def __rich__(self) -> Table: grid = Table.grid() grid.add_column("label") grid.add_column("value") for label, value in self.describe().items(): grid.add_row(label, value) return grid def bust_cache(self) -> Markata: with self.cache as cache: cache.clear() return self @set_phase def configure(self) -> Markata: sys.path.append(os.getcwd()) self.config = {**DEFUALT_CONFIG, **standard_config.load("markata")} if isinstance(self.config["glob_patterns"], str): self.config["glob_patterns"] = self.config["glob_patterns"].split(",") elif isinstance(self.config["glob_patterns"], list): self.config["glob_patterns"] = list(self.config["glob_patterns"]) else: raise TypeError("glob_patterns must be list or str") self.glob_patterns = self.config["glob_patterns"] if "hooks" not in self.config: self.hooks = [""] if isinstance(self.config["hooks"], str): self.hooks = self.config["hooks"].split(",") if isinstance(self.config["hooks"], list): self.hooks = self.config["hooks"] if "disabled_hooks" not in self.config: self.disabled_hooks = [""] if isinstance(self.config["disabled_hooks"], str): self.disabled_hooks = self.config["disabled_hooks"].split(",") if isinstance(self.config["disabled_hooks"], list): self.disabled_hooks = self.config["disabled_hooks"] if not self.config.get("output_dir", "markout").endswith( self.config.get("path_prefix", "") ): self.config["output_dir"] = ( self.config.get("output_dir", "markout") + "/" + self.config.get("path_prefix", "").rstrip("/") ) if ( len((output_split := self.config.get("output_dir", "markout").split("/"))) > 1 ): if "path_prefix" not in self.config.keys(): self.config["path_prefix"] = "/".join(output_split[1:]) + "/" if not self.config.get("path_prefix", "").endswith("/"): self.config["path_prefix"] = self.config.get("path_prefix", "") + "/" self.config["output_dir"] = self.config["output_dir"].lstrip("/") self.config["path_prefix"] = self.config["path_prefix"].lstrip("/") try: default_index = self.hooks.index("default") hooks = [ *self.hooks[:default_index], *DEFAULT_HOOKS, *self.hooks[default_index + 1 :], ] self.hooks = [hook for hook in hooks if hook not in self.disabled_hooks] except ValueError: # 'default' is not in hooks , do not replace with default_hooks pass self._pm = pluggy.PluginManager("markata") self._pm.add_hookspecs(hookspec.MarkataSpecs) self._register_hooks() self._pm.hook.configure(markata=self) return self def get_plugin_config(self, path_or_name: str) -> Dict: key = Path(path_or_name).stem config = self.config.get(key, {}) if not isinstance(config, dict): raise TypeError("must use dict") if "cache_expire" not in config.keys(): config["cache_expire"] = self.config["default_cache_expire"] if "config_key" not in config.keys(): config["config_key"] = key return config def get_config( self, key: str, default: str = "", warn: bool = True, suggested: Optional[str] = None, ) -> Any: if key in self.config.keys(): return self.config[key] else: if suggested is None: suggested = textwrap.dedent( f""" [markata] {key} = '{default}' """ ) if warn: logger.warning( textwrap.dedent( f""" Warning {key} is not set in markata config, sitemap will be missing root site_name to resolve this open your markata.toml and add {suggested} """ ), ) return default def make_hash(self, *keys: str) -> str: str_keys = [str(key) for key in keys] return hashlib.md5("".join(str_keys).encode("utf-8")).hexdigest() @property def phase(self) -> str: return self._phase @phase.setter def phase(self, value: str) -> None: self._phase = value @property def content_dir_hash(self) -> str: hashes = [ dirhash(dir) for dir in self.content_directories if dir.absolute() != Path(".").absolute() ] return self.make_hash(*hashes) @property def console(self) -> Console: try: return self._console except AttributeError: self._console = Console() return self._console def describe(self) -> dict[str, str]: return {"version": __version__, "phase": self.phase} def _to_dict(self) -> dict[str, Iterable]: return {"config": self.config, "articles": [a.to_dict() for a in self.articles]} def to_dict(self) -> dict: return self._to_dict() def to_json(self) -> str: import json return json.dumps(self.to_dict(), indent=4, sort_keys=True, default=str) def _register_hooks(self) -> None: for hook in self.hooks: try: # module style plugins plugin = importlib.import_module(hook) except ModuleNotFoundError as e: # class style plugins if "." in hook: mod = importlib.import_module(".".join(hook.split(".")[:-1])) plugin = getattr(mod, hook.split(".")[-1]) else: raise e self._pm.register(plugin) def __iter__(self, description: str = "working...") -> Iterable[frontmatter.Post]: articles: Iterable[frontmatter.Post] = track( self.articles, description=description, transient=True, console=self.console ) return articles def iter_articles(self, description: str) -> Iterable[frontmatter.Post]: articles: Iterable[frontmatter.Post] = track( self.articles, description=description, transient=True, console=self.console ) return articles def teardown(self) -> Markata: """give special access to the teardown lifecycle method""" self._pm.hook.teardown(markata=self) return self def run(self, lifecycle: LifeCycle = None) -> Markata: if lifecycle is None: lifecycle = getattr(LifeCycle, max(LifeCycle._member_map_)) if isinstance(lifecycle, str): lifecycle = LifeCycle[lifecycle] stages_to_run = [m for m in LifeCycle._member_map_ if LifeCycle[m] <= lifecycle] self.console.log(f"running {stages_to_run}") for stage in stages_to_run: self.console.log(f"{stage} running") getattr(self._pm.hook, stage)(markata=self) self.console.log(f"{stage} complete") with self.cache as cache: hits, misses = cache.stats() if hits + misses > 0: self.console.log( f"lifetime cache hit rate {round(hits/ (hits + misses)*100, 2)}%" ) if misses > 0: self.console.log(f"lifetime cache hits/misses {hits}/{misses}") hits -= self.init_cache_stats[0] misses -= self.init_cache_stats[1] if hits + misses > 0: self.console.log( f"run cache hit rate {round(hits/ (hits + misses)*100, 2)}%" ) if misses > 0: self.console.log(f"run cache hits/misses {hits}/{misses}") return self def filter(self, filter: str) -> List: def evalr(a: Post) -> Any: try: return eval( filter, {**a.to_dict(), "timedelta": timedelta, "post": a, "m": self}, {}, ) except AttributeError: return eval( filter, {**a.to_dict(), "timedelta": timedelta, "post": a, "m": self}, {}, ) return [a for a in self.articles if evalr(a)] def map( self, func: str = "title", filter: str = "True", sort: str = "True", reverse: bool = True, *args: Tuple, **kwargs: Dict, ) -> List: import copy def try_sort(a: Any) -> int: if "datetime" in sort.lower(): return a.get(sort, datetime.datetime(1970, 1, 1)) if "date" in sort.lower(): return a.get(sort, datetime.date(1970, 1, 1)) try: value = eval(sort, a.to_dict(), {}) except NameError: return -1 return value try: return int(value) except TypeError: try: return int(value.timestamp()) except Exception: try: return int( datetime.datetime.combine( value, datetime.datetime.min.time() ).timestamp() ) except Exception: try: return sum([ord(c) for c in str(value)]) except Exception: return -1 articles = copy.copy(self.articles) articles.sort(key=try_sort) if reverse: articles.reverse() try: posts = [ eval( func, {**a.to_dict(), "timedelta": timedelta, "post": a, "m": self}, {}, ) for a in articles if eval( filter, {**a.to_dict(), "timedelta": timedelta, "post": a, "m": self}, {}, ) ] except NameError as e: variable = str(e).split("'")[1] missing_in_posts = self.map( "path", filter=f'"{variable}" not in post.keys()' ) message = ( f"variable: '{variable}' is missing in {len(missing_in_posts)} posts" ) if len(missing_in_posts) > 10: message += ( f"\nfirst 10 paths to posts missing {variable}" f"[{','.join(missing_in_posts)}..." ) else: message += f"\npaths to posts missing {variable} {missing_in_posts}" raise MissingFrontMatter(message) return posts
load_ipython_extension function
load_ipython_extension source
def load_ipython_extension(ipython): ipython.user_ns["m"] = Markata() ipython.user_ns["markata"] = ipython.user_ns["m"]
wrapper function
wrapper source
def wrapper(self: Markata, *args: Tuple, **kwargs: Dict) -> Any: self.phase = function.__name__ self.phase_file.parent.mkdir(exist_ok=True) self.phase_file.write_text(self.phase) result = function(self, *args, **kwargs) self.phase = function.__name__ self.phase_file.parent.mkdir(exist_ok=True) self.phase_file.write_text(self.phase) return result
init method
init source
def __init__(self, console: Console = None) -> None: self.phase = "starting" self.MARKATA_CACHE_DIR = Path(".") / ".markata.cache" self.MARKATA_CACHE_DIR.mkdir(exist_ok=True) self.phase_file: Path = self.MARKATA_CACHE_DIR / "phase.txt" self.registered_attrs = hookspec.registered_attrs self.configure() if console is not None: self._console = console atexit.register(self.teardown) with self.cache as cache: self.init_cache_stats = cache.stats()
cache method
cache source
def cache(self) -> FanoutCache: return FanoutCache(self.MARKATA_CACHE_DIR, statistics=True)
getattr method
getattr source
def __getattr__(self, item: str) -> Any: if item in self._pm.hook.__dict__.keys(): # item is a hook, return a callable function return lambda: self.run(item) if item in self.__dict__.keys(): # item is an attribute, return it return self.__getitem__(item) elif item in self.registered_attrs.keys(): # item is created by a plugin, run it stage_to_run_to = max( [attr["lifecycle"] for attr in self.registered_attrs[item]] ).name self.run(stage_to_run_to) return getattr(self, item) else: # Markata does not know what this is, raise raise AttributeError(f"'Markata' object has no attribute '{item}'")
rich method
rich source
def __rich__(self) -> Table: grid = Table.grid() grid.add_column("label") grid.add_column("value") for label, value in self.describe().items(): grid.add_row(label, value) return grid
bust_cache method
bust_cache source
def bust_cache(self) -> Markata: with self.cache as cache: cache.clear() return self
configure method
configure source
def configure(self) -> Markata: sys.path.append(os.getcwd()) self.config = {**DEFUALT_CONFIG, **standard_config.load("markata")} if isinstance(self.config["glob_patterns"], str): self.config["glob_patterns"] = self.config["glob_patterns"].split(",") elif isinstance(self.config["glob_patterns"], list): self.config["glob_patterns"] = list(self.config["glob_patterns"]) else: raise TypeError("glob_patterns must be list or str") self.glob_patterns = self.config["glob_patterns"] if "hooks" not in self.config: self.hooks = [""] if isinstance(self.config["hooks"], str): self.hooks = self.config["hooks"].split(",") if isinstance(self.config["hooks"], list): self.hooks = self.config["hooks"] if "disabled_hooks" not in self.config: self.disabled_hooks = [""] if isinstance(self.config["disabled_hooks"], str): self.disabled_hooks = self.config["disabled_hooks"].split(",") if isinstance(self.config["disabled_hooks"], list): self.disabled_hooks = self.config["disabled_hooks"] if not self.config.get("output_dir", "markout").endswith( self.config.get("path_prefix", "") ): self.config["output_dir"] = ( self.config.get("output_dir", "markout") + "/" + self.config.get("path_prefix", "").rstrip("/") ) if ( len((output_split := self.config.get("output_dir", "markout").split("/"))) > 1 ): if "path_prefix" not in self.config.keys(): self.config["path_prefix"] = "/".join(output_split[1:]) + "/" if not self.config.get("path_prefix", "").endswith("/"): self.config["path_prefix"] = self.config.get("path_prefix", "") + "/" self.config["output_dir"] = self.config["output_dir"].lstrip("/") self.config["path_prefix"] = self.config["path_prefix"].lstrip("/") try: default_index = self.hooks.index("default") hooks = [ *self.hooks[:default_index], *DEFAULT_HOOKS, *self.hooks[default_index + 1 :], ] self.hooks = [hook for hook in hooks if hook not in self.disabled_hooks] except ValueError: # 'default' is not in hooks , do not replace with default_hooks pass self._pm = pluggy.PluginManager("markata") self._pm.add_hookspecs(hookspec.MarkataSpecs) self._register_hooks() self._pm.hook.configure(markata=self) return self
get_plugin_config method
get_plugin_config source
def get_plugin_config(self, path_or_name: str) -> Dict: key = Path(path_or_name).stem config = self.config.get(key, {}) if not isinstance(config, dict): raise TypeError("must use dict") if "cache_expire" not in config.keys(): config["cache_expire"] = self.config["default_cache_expire"] if "config_key" not in config.keys(): config["config_key"] = key return config
get_config method
get_config source
def get_config( self, key: str, default: str = "", warn: bool = True, suggested: Optional[str] = None, ) -> Any: if key in self.config.keys(): return self.config[key] else: if suggested is None: suggested = textwrap.dedent( f""" [markata] {key} = '{default}' """ ) if warn: logger.warning( textwrap.dedent( f""" Warning {key} is not set in markata config, sitemap will be missing root site_name to resolve this open your markata.toml and add {suggested} """ ), ) return default
make_hash method
make_hash source
def make_hash(self, *keys: str) -> str: str_keys = [str(key) for key in keys] return hashlib.md5("".join(str_keys).encode("utf-8")).hexdigest()
phase method
phase source
def phase(self) -> str: return self._phase
phase method
phase source
def phase(self, value: str) -> None: self._phase = value
content_dir_hash method
content_dir_hash source
def content_dir_hash(self) -> str: hashes = [ dirhash(dir) for dir in self.content_directories if dir.absolute() != Path(".").absolute() ] return self.make_hash(*hashes)
console method
console source
def console(self) -> Console: try: return self._console except AttributeError: self._console = Console() return self._console
describe method
describe source
def describe(self) -> dict[str, str]: return {"version": __version__, "phase": self.phase}
_to_dict method
_to_dict source
def _to_dict(self) -> dict[str, Iterable]: return {"config": self.config, "articles": [a.to_dict() for a in self.articles]}
to_dict method
to_dict source
def to_dict(self) -> dict: return self._to_dict()
to_json method
to_json source
def to_json(self) -> str: import json return json.dumps(self.to_dict(), indent=4, sort_keys=True, default=str)
_register_hooks method
_register_hooks source
def _register_hooks(self) -> None: for hook in self.hooks: try: # module style plugins plugin = importlib.import_module(hook) except ModuleNotFoundError as e: # class style plugins if "." in hook: mod = importlib.import_module(".".join(hook.split(".")[:-1])) plugin = getattr(mod, hook.split(".")[-1]) else: raise e self._pm.register(plugin)
iter method
iter source
def __iter__(self, description: str = "working...") -> Iterable[frontmatter.Post]: articles: Iterable[frontmatter.Post] = track( self.articles, description=description, transient=True, console=self.console ) return articles
iter_articles method
iter_articles source
def iter_articles(self, description: str) -> Iterable[frontmatter.Post]: articles: Iterable[frontmatter.Post] = track( self.articles, description=description, transient=True, console=self.console ) return articles
teardown method
give special access to the teardown lifecycle method
teardown source
def teardown(self) -> Markata: """give special access to the teardown lifecycle method""" self._pm.hook.teardown(markata=self) return self
run method
run source
def run(self, lifecycle: LifeCycle = None) -> Markata: if lifecycle is None: lifecycle = getattr(LifeCycle, max(LifeCycle._member_map_)) if isinstance(lifecycle, str): lifecycle = LifeCycle[lifecycle] stages_to_run = [m for m in LifeCycle._member_map_ if LifeCycle[m] <= lifecycle] self.console.log(f"running {stages_to_run}") for stage in stages_to_run: self.console.log(f"{stage} running") getattr(self._pm.hook, stage)(markata=self) self.console.log(f"{stage} complete") with self.cache as cache: hits, misses = cache.stats() if hits + misses > 0: self.console.log( f"lifetime cache hit rate {round(hits/ (hits + misses)*100, 2)}%" ) if misses > 0: self.console.log(f"lifetime cache hits/misses {hits}/{misses}") hits -= self.init_cache_stats[0] misses -= self.init_cache_stats[1] if hits + misses > 0: self.console.log( f"run cache hit rate {round(hits/ (hits + misses)*100, 2)}%" ) if misses > 0: self.console.log(f"run cache hits/misses {hits}/{misses}") return self
filter method
filter source
def filter(self, filter: str) -> List: def evalr(a: Post) -> Any: try: return eval( filter, {**a.to_dict(), "timedelta": timedelta, "post": a, "m": self}, {}, ) except AttributeError: return eval( filter, {**a.to_dict(), "timedelta": timedelta, "post": a, "m": self}, {}, ) return [a for a in self.articles if evalr(a)]
map method
map source
def map( self, func: str = "title", filter: str = "True", sort: str = "True", reverse: bool = True, *args: Tuple, **kwargs: Dict, ) -> List: import copy def try_sort(a: Any) -> int: if "datetime" in sort.lower(): return a.get(sort, datetime.datetime(1970, 1, 1)) if "date" in sort.lower(): return a.get(sort, datetime.date(1970, 1, 1)) try: value = eval(sort, a.to_dict(), {}) except NameError: return -1 return value try: return int(value) except TypeError: try: return int(value.timestamp()) except Exception: try: return int( datetime.datetime.combine( value, datetime.datetime.min.time() ).timestamp() ) except Exception: try: return sum([ord(c) for c in str(value)]) except Exception: return -1 articles = copy.copy(self.articles) articles.sort(key=try_sort) if reverse: articles.reverse() try: posts = [ eval( func, {**a.to_dict(), "timedelta": timedelta, "post": a, "m": self}, {}, ) for a in articles if eval( filter, {**a.to_dict(), "timedelta": timedelta, "post": a, "m": self}, {}, ) ] except NameError as e: variable = str(e).split("'")[1] missing_in_posts = self.map( "path", filter=f'"{variable}" not in post.keys()' ) message = ( f"variable: '{variable}' is missing in {len(missing_in_posts)} posts" ) if len(missing_in_posts) > 10: message += ( f"\nfirst 10 paths to posts missing {variable}" f"[{','.join(missing_in_posts)}..." ) else: message += f"\npaths to posts missing {variable} {missing_in_posts}" raise MissingFrontMatter(message) return posts
evalr function
evalr source
def evalr(a: Post) -> Any: try: return eval( filter, {**a.to_dict(), "timedelta": timedelta, "post": a, "m": self}, {}, ) except AttributeError: return eval( filter, {**a.to_dict(), "timedelta": timedelta, "post": a, "m": self}, {}, )
try_sort function
try_sort source
def try_sort(a: Any) -> int: if "datetime" in sort.lower(): return a.get(sort, datetime.datetime(1970, 1, 1)) if "date" in sort.lower(): return a.get(sort, datetime.date(1970, 1, 1)) try: value = eval(sort, a.to_dict(), {}) except NameError: return -1 return value try: return int(value) except TypeError: try: return int(value.timestamp()) except Exception: try: return int( datetime.datetime.combine( value, datetime.datetime.min.time() ).timestamp() ) except Exception: try: return sum([ord(c) for c in str(value)]) except Exception: return -1