from pathlib import Path from typing import List, Optional def ensure_has_columns(df: "pandas.DataFrame", columns: List[str]): # We don't want to import pandas too soon import pandas as pd all_columns = columns # print("all_columns={}".format(all_columns)) # print("df={}".format(df.columns.tolist())) for c in reversed(columns): if not c in df.columns.tolist(): df.insert(0, column=c, value=pd.Series()) # print("df={}".format(df.columns.tolist())) return df def run_filters(filters, obj): for f in filters: if not f(obj): return False return True def any(filters): def f(obj): for f in filters: if f(obj): return True return False return f def all(filters): def f(obj): for f in filters: if not f(obj): return False return True return f class HttpCache(object): def __init__(self, path: Path, ext="html"): self.path = path self.ext = ext def lookup(self, key): cache_path = self._make_path(key) if cache_path.exists(): with open(str(cache_path), "r") as f: return f.read() def save(self, key, value): cache_path = self._make_path(key) cache_path.parent.mkdir(parents=True, exist_ok=True) with cache_path.open("w") as f: f.write(value) def _make_path(self, key) -> Path: return self.path / "{}.{}".format(key, self.ext) class EmptyHttpCache(object): @staticmethod def lookup(key): return None def save(self, key, value): pass def maybe_cache(path: Optional[Path], **kwargs) -> HttpCache: return HttpCache(path, **kwargs) if path is not None else EmptyHttpCache() def get_web_driver(): # selenium.webdriver.remote.webdriver.WebDriver import selenium.common.exceptions from selenium import webdriver drivers = [ ("chrome", webdriver.Chrome), # ("firefox", webdriver.Firefox), # ("webkit", webdriver.WebKitGTK), ] for key, constructor in drivers: try: return constructor() except selenium.common.exceptions.WebDriverException: pass return webdriver.Chrome() def gen_rst_table(header: List[str], data: List[List[str]]): column_widths = [] for i in range(len(header)): w = len(header[i]) for row in data: w = max(w, len(row[i])) column_widths.append(w) import io buf = io.StringIO() sep = "+-" + "-+-".join(["-" * w for i, w in enumerate(column_widths)]) + "-+" print(sep, file=buf) print("| " + " | ".join([header[i].ljust(w) for i, w in enumerate(column_widths)]) + " |", file=buf) print(sep.replace("-", "="), file=buf) for row in data: print("| " + " | ".join([row[i].ljust(w) for i, w in enumerate(column_widths)]) + " |", file=buf) print(sep, file=buf) return buf.getvalue()