import json
from abc import ABC, abstractmethod
from functools import cached_property
from typing import Any, Dict, Iterator, List, Optional
import xmltodict
import yaml
from easytxt.text import replace_chars_by_keys
from pyquery import PyQuery
from easydata.data import DataBag
from easydata.parsers.base import BaseData
from easydata.parsers.data import Data
from easydata.processors.base import BaseProcessor
from easydata.queries.base import QuerySearchBase
from easydata.queries.re import ReSearch
from easydata.typing import QueryDataParser
from easydata.utils import parse
__all__ = (
"DataProcessor",
"DataBaseProcessor",
"DataToPqProcessor",
"DataJsonToDictProcessor",
"DataJsonFromQueryToDictProcessor",
"DataFromIterQueryProcessor",
"DataYamlToDictProcessor",
"DataXmlToDictProcessor",
"DataTextFromReProcessor",
"DataJsonFromReToDictProcessor",
"DataFromQueryProcessor",
"DataVariantsProcessor",
)
class DataBaseProcessor(BaseProcessor, ABC):
_multi: bool = False
def __init__(
self,
source: str = "main",
new_source: Optional[str] = None,
process_source_data=None,
debug: bool = False,
debug_raw: bool = False,
):
self._source = source
self._new_source = new_source
self._process_source_data = process_source_data
self._debug = debug
self._debug_raw = debug_raw
def parse(self, data: DataBag) -> Iterator[DataBag]:
source_data = data[self._source]
if self._debug_raw:
print(source_data)
if self._process_source_data:
source_data = self._process_source_data(source_data)
transformed_data = self.process_data(source_data)
if self._debug:
print(transformed_data)
if self._multi:
for iter_transformed_data in transformed_data:
data_copy = data.copy()
yield self._transformed_data_to_data(iter_transformed_data, data_copy)
else:
yield self._transformed_data_to_data(transformed_data, data)
def parse_data(self, data=None, **kwargs):
if data:
kwargs["main"] = data
data = DataBag(**kwargs)
return self.parse(data)
@abstractmethod
def process_data(self, source_data) -> Any:
pass
def _transformed_data_to_data(self, transformed_data, data):
new_source = self._new_source
if new_source is None:
original_source = "{}_raw".format(self._source)
data[original_source] = data[self._source]
new_source = self._source
data[new_source] = transformed_data
return data
[docs]class DataProcessor(DataBaseProcessor):
def process_data(self, source_data) -> Any:
return source_data
[docs]class DataToPqProcessor(DataBaseProcessor):
def process_data(self, source_data: str) -> PyQuery:
return PyQuery(source_data)
[docs]class DataJsonToDictProcessor(DataBaseProcessor):
def process_data(self, source_data: str) -> dict:
return json.loads(source_data)
[docs]class DataYamlToDictProcessor(DataBaseProcessor):
def __init__(
self,
*args,
safe_load: bool = True,
**kwargs,
):
self._safe_load = safe_load
super().__init__(*args, **kwargs)
def process_data(self, source_data: str) -> dict:
if self._safe_load:
return yaml.safe_load(source_data)
return yaml.load(source_data) # type: ignore
[docs]class DataXmlToDictProcessor(DataBaseProcessor):
def __init__(
self,
*args,
process_namespaces: bool = False,
namespace_separator: str = ":",
namespaces: Optional[dict] = None,
remove_namespaces: Optional[List[str]] = None,
encoding: Optional[str] = None,
item_depth: Optional[int] = None,
strip_whitespace: bool = True,
attr_prefix: str = "@",
cdata_key: str = "#text",
force_cdata: bool = False,
cdata_separator: str = "",
force_list: Optional[Any] = None,
**kwargs,
):
if remove_namespaces:
namespaces = {n: None for n in remove_namespaces}
self._process_namespaces = True
else:
self._process_namespaces = process_namespaces
self._namespace_separator = namespace_separator
self._encoding = encoding
self._strip_whitespace = strip_whitespace
self._attr_prefix = attr_prefix
self._cdata_key = cdata_key
self._force_cdata = force_cdata
self._cdata_separator = cdata_separator
self._namespaces = namespaces
self._force_list = force_list
self.__item_depth = item_depth
super().__init__(*args, **kwargs)
@property
def _item_depth(self):
config_key = "ED_DATA_XML_TO_DICT_ITEM_DEPTH"
return self.__item_depth or self.config[config_key]
def process_data(self, data: Any) -> Any:
return xmltodict.parse(
xml_input=data,
encoding=self._encoding,
process_namespaces=self._process_namespaces,
namespace_separator=self._namespace_separator,
item_depth=self._item_depth,
strip_whitespace=self._strip_whitespace,
attr_prefix=self._attr_prefix,
cdata_key=self._cdata_key,
force_cdata=self._force_cdata,
cdata_separator=self._cdata_separator,
namespaces=self._namespaces,
force_list=self._force_list,
)
[docs]class DataFromQueryProcessor(DataBaseProcessor):
def __init__(
self,
query: QueryDataParser,
**kwargs,
):
self._query = query
super().__init__(**kwargs)
def process_data(self, data: Any) -> Any:
return parse.query_parser(self._query, data)
[docs]class DataFromIterQueryProcessor(DataFromQueryProcessor):
_multi = True
def process_data(self, data: Any) -> Any:
yield from parse.query_parser(self._query, data)
[docs]class DataJsonFromQueryToDictProcessor(DataFromQueryProcessor):
def process_data(self, data: Any) -> Any:
jt = super().process_data(data)
return json.loads(jt)
[docs]class DataTextFromReProcessor(DataBaseProcessor):
def __init__(
self,
*args,
query,
dotall=True,
ignore_case=False,
bytes_to_string_decode: str = "utf-8",
replace_keys: Optional[list] = None,
none_if_empty=False,
process_value=None,
**kwargs,
):
self._query = query
self._dotall = dotall
self._ignore_case = ignore_case
self._bytes_to_string_decode = bytes_to_string_decode
self._replace_keys = replace_keys
self._none_if_empty = none_if_empty
self._process_value = process_value
super().__init__(*args, **kwargs)
def process_data(self, source_data: str) -> Any:
value = ReSearch(
query=self._query,
dotall=self._dotall,
ignore_case=self._ignore_case,
bytes_to_string_decode=self._bytes_to_string_decode,
).get(source_data)
if not value:
if self._none_if_empty:
return None
error_msg = 'No matches were found for a re queries "{}"'
raise ValueError(error_msg.format(self._query))
if self._process_value:
if isinstance(value, list):
value = [self._process_value(v) for v in value]
else:
value = self._process_value(value)
if value and self._replace_keys:
value = replace_chars_by_keys(value, self._replace_keys)
return value
[docs]class DataJsonFromReToDictProcessor(DataTextFromReProcessor):
def process_data(self, source_data: str) -> Any:
value = super().process_data(source_data)
if not value:
return None
if isinstance(value, list):
return [json.loads(v) for v in value]
return json.loads(value)
[docs]class DataVariantsProcessor(DataBaseProcessor):
def __init__(
self,
query: Optional[QuerySearchBase] = None,
source: str = "main",
parser: Optional[BaseData] = None,
key_parser: Optional[BaseData] = None,
key_query: Optional[QuerySearchBase] = None,
**kwargs,
):
if key_parser and key_query:
error_msg = (
"key_parser or key_query attributes cannot " "be set at the same time."
)
raise AttributeError(error_msg)
self._query = query
self._key_parser = key_parser
self._key_query = key_query
self.__parser = parser
super().__init__(
source=source,
**kwargs,
)
def parse(self, data: DataBag) -> Iterator[DataBag]:
variants_source = self._new_source or self._source
for iter_data in super(DataVariantsProcessor, self).parse(data):
for var_iter_data in parse.variants_data(iter_data, variants_source):
yield var_iter_data
@cached_property
def _parser(self) -> Optional[BaseData]:
return Data(self._query) if self._query else self.__parser
def process_data(self, data: Any) -> Dict[Optional[str], Any]:
variants_data: Dict[Optional[str], Any] = {}
parser = self._parser
if parser:
data = parser.init_config(self.config).parse(data) # type: ignore
for data_index, data_info in enumerate(data):
if not data_info:
continue
if self._key_parser:
variant_group_key = self._key_parser.parse(data_info)
elif self._key_query:
variant_group_key = self._key_query.get(data_info)
else:
variant_group_key = str(data_index)
if variant_group_key not in variants_data:
variants_data[variant_group_key] = []
variants_data[variant_group_key].append(data_info)
return variants_data