Source code for easydata.queries.key

import json
from typing import Any, List, Optional, Tuple

import yaml

from easydata.data import DataBag
from easydata.queries.base import QuerySearch
from easydata.utils import pseudo

__all__ = (
    "KeySearch",
    "KeyStrictSearch",
    "NKeySearch",
    "NKeyStrictSearch",
)


[docs]class KeySearch(QuerySearch): def __init__( self, query: str, **kwargs, ): super().__init__(query, **kwargs) self._keys: bool = False self._values: bool = False self._json: bool = False self._yaml: bool = False self._str: bool = False self._dict_key_value: Optional[Tuple[str, str]] = None if self._query and "::" in self._query: self._initialize_custom_pseudo_keys() def parse( self, data: Any, query: Optional[str], ): if query: data = data.get(query) return self._process_data_key_values(data) def process_data( self, data: Any, source: Optional[str] = None, ) -> Any: if isinstance(data, DataBag): if isinstance(data[source], (dict, list)): return data[source] # If data is json, then it will be attempted to be converted to dict or # list. Conversion will be stored in a DataBag object under different # source name, so that we don't load json data each time. data_dict_source = "{}_dict".format(source) if hasattr(data, data_dict_source): return data[data_dict_source] try: data_dict = json.loads(data[source]) data[data_dict_source] = data_dict return data_dict except Exception: raise ValueError( "Provided data from source {} could not be converted to dict or " "list in order to use query!".format(source) ) elif isinstance(data, (dict, list)): return data return json.loads(data) def _process_data_key_values(self, data): if data: if self._dict_key_value: if not isinstance(data, list): raise TypeError( "Pseudo key dict(<key>:<value>) will make list to dict " "conversion if queried data is list type." ) dict_key, dict_value = self._dict_key_value data = {ddata[dict_key]: ddata[dict_value] for ddata in data} if self._values: data = list(data.values()) if self._keys: data = list(data.keys()) if self._json: data = json.dumps(data) if self._yaml: data = yaml.dump(data) if self._str and not isinstance(data, str): data = str(data) return data def _initialize_custom_pseudo_keys(self): self._query, pseudo_key = pseudo.get_key_from_query(self._query) pseudo_key = self._process_pseudo_key_extension(pseudo_key) self._process_pseudo_key(pseudo_key) def _process_pseudo_key_extension(self, pseudo_key: str) -> str: pseudo_key, extension = pseudo.get_extension_value(pseudo_key, ["dict"]) if not extension: return pseudo_key if extension == "json": self._json = True elif extension == "yaml": self._yaml = True elif extension == "str": self._str = True else: raise ValueError( "Pseudo key extension {} is not supported. Currently supported " "is only -json".format(extension) ) return pseudo_key def _process_pseudo_key(self, pseudo_key: str) -> None: if pseudo_key.startswith("dict"): dict_key_value_text = pseudo_key.split("(")[-1].split(")")[0] dict_key_value_split = dict_key_value_text.split(":") self._dict_key_value = dict_key_value_split[0], dict_key_value_split[1] elif pseudo_key == "values": self._values = True elif pseudo_key == "keys": self._keys = True elif pseudo_key == "json": self._json = True elif pseudo_key == "yaml": self._yaml = True elif pseudo_key == "str": self._str = True else: raise ValueError( "Pseudo key '{}' is not supported. Currently supported are: keys," "values, dict(<key>,<value>).".format(pseudo_key) )
class KeyStrictSearch(KeySearch): strict = True class NKeySearch(KeySearch): def parse( self, data: Any, query: Optional[str], ): if query: data = self._multi_query_data( data=data, queries=query.split("."), ) return self._process_data_key_values(data) def _multi_query_data(self, data: Any, queries: List[str]): query = queries.pop(0) data = data.get(query) if not queries or data is None: return data if not isinstance(data, dict): raise TypeError("key can perform nested queries only on dict objects") return self._multi_query_data(data, queries) class NKeyStrictSearch(NKeySearch): strict = True