from functools import cached_property
from typing import Any
from typing import Dict as TypeDict
from typing import List, Optional, Union
from easytxt import sentences
from easydata.parsers.base import Base, BaseData
from easydata.parsers.data import Data
from easydata.parsers.has import Has
from easydata.parsers.price import PriceFloat, PriceText
from easydata.parsers.text import Text
from easydata.queries.base import QuerySearchBase
from easydata.utils import mix
__all__ = (
"Dict",
"TextDict",
"HasDict",
"PriceFloatDict",
"PriceTextDict",
)
[docs]class Dict(BaseData):
def __init__(
self,
query: Optional[QuerySearchBase] = None,
key_parser: Optional[Base] = None,
val_parser: Optional[Base] = None,
key_query: Optional[QuerySearchBase] = None,
val_query: Optional[QuerySearchBase] = None,
dict_val_from_key: bool = False,
ignore_non_values: bool = False,
ignore_non_keys: bool = True,
key_normalize: bool = True,
key_title: bool = False,
key_uppercase: bool = False,
key_lowercase: bool = False,
key_replace_keys: Optional[list] = None,
key_remove_keys: Optional[list] = None,
key_split_text_key: Optional[Union[str, tuple]] = None,
key_split_text_keys: Optional[List[Union[str, tuple]]] = None,
key_take: Optional[int] = None,
key_skip: Optional[int] = None,
key_fix_spaces: bool = True,
key_allow: Optional[Union[str, List[str]]] = None,
key_callow: Optional[Union[str, List[str]]] = None,
key_deny: Optional[Union[str, List[str]]] = None,
key_cdeny: Optional[Union[str, List[str]]] = None,
key_default: Optional[str] = None,
**kwargs,
):
self._key_text_parser_properties = {
"normalize": key_normalize,
"title": key_title,
"uppercase": key_uppercase,
"lowercase": key_lowercase,
"replace_keys": key_replace_keys,
"remove_keys": key_remove_keys,
"split_key": key_split_text_key,
"split_keys": key_split_text_keys,
"take": key_take,
"skip": key_skip,
"fix_spaces": key_fix_spaces,
}
self._key_query = key_query
self._value_query = val_query
self._dict_val_from_key = dict_val_from_key
self._ignore_non_values = ignore_non_values
self._ignore_non_keys = ignore_non_keys
self._key_allow = key_allow
self._key_callow = key_callow
self._key_deny = key_deny
self._key_cdeny = key_cdeny
self._key_default = key_default
self.__key_parser = key_parser
self.__key_parser_obj = None
self.__value_parser = val_parser
self.__value_parser_obj = None
super().__init__(
query=query,
**kwargs,
)
@cached_property
def _key_parser(self):
# Initialize and validate key parser
key_parser_obj = self.__key_parser or self._default_key_parser_obj
mix.validate_parser(key_parser_obj)
key_parser_obj.init_config(self.config)
return key_parser_obj
@cached_property
def _value_parser(self):
# Initialize and validate value parser
value_parser_obj = self.__value_parser or self._default_value_parser_obj
mix.validate_parser(value_parser_obj)
value_parser_obj.init_config(self.config)
return value_parser_obj
@property
def _default_key_parser_obj(self):
return Text(query=self._key_query, **self._key_text_parser_properties)
@property
def _default_value_parser_obj(self):
return Data(self._value_query)
def parse_value(
self,
value: Any,
data: Any,
) -> dict:
if not value:
return {}
values: TypeDict[Optional[str], Any] = {}
if isinstance(value, dict):
for s_key, s_value in value.items():
parsed_key = self._key_parser.parse(data, s_key, True) # type: ignore
parsed_value = self._value_parser.parse( # type: ignore
data, s_key if self._dict_val_from_key else s_value, True
)
values[parsed_key] = parsed_value
else:
for s_value in value:
parsed_key = self._key_parser.parse(data, s_value, True) # type: ignore
parsed_value = self._value_parser.parse( # type: ignore
data, s_value, True
)
values[parsed_key] = parsed_value
if self._ignore_non_values:
values = {k: v for k, v in values.items() if v is not None}
values = self._filter_dict_items(values)
return values
def _filter_dict_items(self, values: dict) -> dict:
if self._ignore_non_keys:
values = {k: v for k, v in values.items() if k is not None}
allow_keys = self._key_callow or self._key_allow
if allow_keys and values:
item_keys = sentences.allow_contains(
sentences=list(values.keys()),
keys=allow_keys,
case_sensitive=bool(self._key_callow),
)
values = {k: v for k, v in values.items() if k in item_keys}
deny_keys = self._key_cdeny or self._key_deny
if deny_keys and values:
item_keys = sentences.deny_contains(
sentences=list(values.keys()),
keys=deny_keys,
case_sensitive=bool(self._key_cdeny),
)
values = {k: v for k, v in values.items() if k in item_keys}
if self._key_default and values:
values = {k or self._key_default: v for k, v in values.items()}
return values
[docs]class TextDict(Dict):
def __init__(
self,
*args,
val_normalize: bool = True,
val_title: bool = False,
val_uppercase: bool = False,
val_lowercase: bool = False,
val_replace_keys: Optional[list] = None,
val_remove_keys: Optional[list] = None,
val_split_text_key: Optional[Union[str, tuple]] = None,
val_split_text_keys: Optional[List[Union[str, tuple]]] = None,
val_take: Optional[int] = None,
val_skip: Optional[int] = None,
val_fix_spaces: bool = True,
val_allow: Optional[Union[str, List[str]]] = None,
val_callow: Optional[Union[str, List[str]]] = None,
val_deny: Optional[Union[str, List[str]]] = None,
val_cdeny: Optional[Union[str, List[str]]] = None,
**kwargs,
):
self._val_allow = val_allow
self._val_callow = val_callow
self._val_deny = val_deny
self._val_cdeny = val_cdeny
self._value_text_parser_properties = {
"normalize": val_normalize,
"title": val_title,
"uppercase": val_uppercase,
"lowercase": val_lowercase,
"replace_keys": val_replace_keys,
"remove_keys": val_remove_keys,
"split_key": val_split_text_key,
"split_keys": val_split_text_keys,
"take": val_take,
"skip": val_skip,
"fix_spaces": val_fix_spaces,
}
super().__init__(
*args,
**kwargs,
)
@property
def _default_value_parser_obj(self):
return Text(
query=self._value_query,
**self._value_text_parser_properties,
)
def _filter_dict_items(self, values: dict) -> dict:
values = super()._filter_dict_items(values)
allow_values = self._val_callow or self._val_allow
if allow_values and values:
item_values = sentences.allow_contains(
sentences=list(values.values()),
keys=allow_values,
case_sensitive=bool(self._val_callow),
)
values = {k: v for k, v in values.items() if v in item_values}
deny_values = self._val_cdeny or self._val_deny
if deny_values and values:
item_values = sentences.deny_contains(
sentences=list(values.values()),
keys=deny_values,
case_sensitive=bool(self._val_cdeny),
)
values = {k: v for k, v in values.items() if v in item_values}
return values
[docs]class HasDict(TextDict):
def __init__(
self,
*args,
val_contains: Optional[Union[list, str]] = None,
val_ccontains: Optional[Union[list, str]] = None,
val_contains_query: Optional[QuerySearchBase] = None,
val_empty_as_false: bool = True,
**kwargs,
):
self._value_bool_parser_properties = {
"contains": val_contains,
"ccontains": val_ccontains,
"contains_query": val_contains_query,
"empty_as_false": val_empty_as_false,
}
super().__init__(
*args,
**kwargs,
)
@property
def _default_value_parser_obj(self):
return Has(
query=self._value_query,
**self._value_bool_parser_properties,
**self._value_text_parser_properties,
)
[docs]class PriceFloatDict(TextDict):
def __init__(
self,
*args,
val_decimals: Optional[int] = None,
val_min_value: Optional[Union[float, int]] = None,
val_max_value: Optional[Union[float, int]] = None,
**kwargs,
):
self._value_price_parser_properties = {
"decimals": val_decimals,
"min_value": val_min_value,
"max_value": val_max_value,
}
super().__init__(
*args,
**kwargs,
)
@property
def _default_value_parser_obj(self):
return PriceFloat(
query=self._value_query,
**self._value_price_parser_properties,
**self._value_text_parser_properties,
)
[docs]class PriceTextDict(PriceFloatDict):
@property
def _default_value_parser_obj(self):
return PriceText(
query=self._value_query,
**self._value_price_parser_properties,
**self._value_text_parser_properties,
)