Source code for easydata.queries.pq

from typing import Any, Iterable, Optional

from easytxt.text import normalize
from pyquery import PyQuery

from easydata.data import DataBag
from easydata.queries.base import QuerySearch
from easydata.utils import pseudo

__all__ = (
    "PyQuerySearch",
    "PyQueryStrictSearch",
)

_pseudo_with_values = [
    "attr",
    "has_class",
]

_attr_shortcut_mappings = {
    "val": "value",
    "src": "src",
    "href": "href",
    "name": "name",
    "class": "class",
    "content": "content",
}


[docs]class PyQuerySearch(QuerySearch): def __init__( self, query: str, remove_query: Optional[str] = None, **kwargs, ): super().__init__(query, **kwargs) self._remove_query = remove_query self._first: bool = True self._attr: Optional[str] = None self._has_class: Optional[str] = None self._text: bool = False self._ntext: bool = False self._html: bool = False self._outer_html: bool = False self._items: bool = False self._iter: bool = False if self._query and "::" in self._query: self._initialize_custom_pseudo_keys() def parse( self, pq: PyQuery, query: Optional[str], ) -> Any: if self._iter: return self._iter_parse(pq, query=query) if self._items: return list(self._iter_parse(pq, query=query)) else: pq = self._parse_pq(pq, query=query, first=self._first) return self._extract_data_from_pq(pq) def _iter_parse( self, pq: PyQuery, query: Optional[str], ) -> Iterable[Any]: pq = self._parse_pq(pq, query=query, first=False) for spq in pq.items(): yield self._extract_data_from_pq(spq) def process_data( self, data: Any, source: Optional[str] = None, ) -> Optional[PyQuery]: if isinstance(data, DataBag): if isinstance(data[source], PyQuery): return data[source] pq_source = "{}_pq".format(source) if not hasattr(data, pq_source): data[pq_source] = PyQuery(data[source]) return data[pq_source] elif isinstance(data, PyQuery): return data return PyQuery(data) def _extract_data_from_pq(self, pq: PyQuery) -> Any: if self._text: return pq.text() or None elif self._ntext: return normalize(pq.text()) or None if pq else None elif self._html: return pq.html() or None elif self._outer_html: return pq.outer_html() or None elif self._has_class: # Returns bool return pq.has_class(self._has_class) elif self._attr: return pq.attr(self._attr) or None return pq def _parse_pq( self, pq: PyQuery, query: Optional[str], first: bool = True, ) -> PyQuery: if query: pq = pq(query) if pq and first: pq = pq.eq(0) if self._remove_query and pq: pq = pq.clone().remove(self._remove_query) return pq def _initialize_custom_pseudo_keys(self): self._query, pseudo_key = pseudo.get_key_from_query(self._query) pseudo_key = self._process_pseudo_key_extension(pseudo_key) self._process_pseudo_key(pseudo_key) def _process_pseudo_key_extension(self, pseudo_key: str) -> str: pseudo_key, extension = pseudo.get_extension_value( pseudo_key=pseudo_key, pseudo_keys_with_separator_value=["attr", "has_class"], ) if not extension: return pseudo_key if extension == "iter": self._iter = True elif extension == "items": self._items = True elif extension == "all": self._first = False else: raise ValueError( "Pseudo key extension {} is not supported. Currently supported " "are -items, -iter and -all".format(extension) ) return pseudo_key def _process_pseudo_key(self, pseudo_key: str) -> None: if pseudo_key.startswith("attr"): self._attr = pseudo.get_value(pseudo_key) elif pseudo_key.startswith("has_class"): self._has_class = pseudo.get_value(pseudo_key) elif pseudo_key in _attr_shortcut_mappings: self._attr = _attr_shortcut_mappings[pseudo_key] elif pseudo_key == "text": self._text = True elif pseudo_key == "ntext": self._ntext = True elif pseudo_key == "html": self._html = True elif pseudo_key == "ohtml": self._outer_html = True elif pseudo_key == "items": self._items = True elif pseudo_key == "iter": self._iter = True else: raise ValueError( "Pseudo key '{}' is not supported. Currently supported are: text," "ntext,html,ohtml,items,iter,has_class(<value>),attr(<value>)," "{}".format(pseudo_key, ",".join(_attr_shortcut_mappings.keys())) )
class PyQueryStrictSearch(PyQuerySearch): strict = True