-
Notifications
You must be signed in to change notification settings - Fork 610
/
Copy pathbatch_analyzer_engine.py
159 lines (132 loc) · 5.69 KB
/
batch_analyzer_engine.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import logging
from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, Union
from presidio_analyzer import AnalyzerEngine, DictAnalyzerResult, RecognizerResult
from presidio_analyzer.nlp_engine import NlpArtifacts
logger = logging.getLogger("presidio-analyzer")
class BatchAnalyzerEngine:
"""
Batch analysis of documents (tables, lists, dicts).
Wrapper class to run Presidio Analyzer Engine on multiple values,
either lists/iterators of strings, or dictionaries.
:param analyzer_engine: AnalyzerEngine instance to use
for handling the values in those collections.
"""
def __init__(self, analyzer_engine: Optional[AnalyzerEngine] = None):
self.analyzer_engine = analyzer_engine
if not analyzer_engine:
self.analyzer_engine = AnalyzerEngine()
def analyze_iterator(
self,
texts: Iterable[Union[str, bool, float, int]],
language: str,
batch_size: int = 1,
n_process: int = 1,
**kwargs,
) -> List[List[RecognizerResult]]:
"""
Analyze an iterable of strings.
:param texts: An list containing strings to be analyzed.
:param language: Input language
:param batch_size: Batch size to process in a single iteration
:param n_process: Number of processors to use. Defaults to `1`
:param kwargs: Additional parameters for the `AnalyzerEngine.analyze` method.
(default value depends on the nlp engine implementation)
"""
# validate types
texts = self._validate_types(texts)
# Process the texts as batch for improved performance
nlp_artifacts_batch: Iterator[Tuple[str, NlpArtifacts]] = (
self.analyzer_engine.nlp_engine.process_batch(
texts=texts,
language=language,
batch_size=batch_size,
n_process=n_process,
)
)
list_results = []
for text, nlp_artifacts in nlp_artifacts_batch:
results = self.analyzer_engine.analyze(
text=str(text), nlp_artifacts=nlp_artifacts, language=language, **kwargs
)
list_results.append(results)
return list_results
def analyze_dict(
self,
input_dict: Dict[str, Union[Any, Iterable[Any]]],
language: str,
keys_to_skip: Optional[List[str]] = None,
batch_size: int = 1,
n_process: int = 1,
**kwargs,
) -> Iterator[DictAnalyzerResult]:
"""
Analyze a dictionary of keys (strings) and values/iterable of values.
Non-string values are returned as is.
:param input_dict: The input dictionary for analysis
:param language: Input language
:param keys_to_skip: Keys to ignore during analysis
:param batch_size: Batch size to process in a single iteration
:param n_process: Number of processors to use. Defaults to `1`
:param kwargs: Additional keyword arguments
for the `AnalyzerEngine.analyze` method.
Use this to pass arguments to the analyze method,
such as `ad_hoc_recognizers`, `context`, `return_decision_process`.
See `AnalyzerEngine.analyze` for the full list.
"""
context = []
if "context" in kwargs:
context = kwargs["context"]
del kwargs["context"]
if not keys_to_skip:
keys_to_skip = []
for key, value in input_dict.items():
if not value or key in keys_to_skip:
yield DictAnalyzerResult(key=key, value=value, recognizer_results=[])
continue # skip this key as requested
# Add the key as an additional context
specific_context = context[:]
specific_context.append(key)
if type(value) in (str, int, bool, float):
results: List[RecognizerResult] = self.analyzer_engine.analyze(
text=str(value), language=language, context=[key], **kwargs
)
elif isinstance(value, dict):
new_keys_to_skip = self._get_nested_keys_to_skip(key, keys_to_skip)
results = self.analyze_dict(
input_dict=value,
language=language,
context=specific_context,
keys_to_skip=new_keys_to_skip,
**kwargs,
)
elif isinstance(value, Iterable):
# Recursively iterate nested dicts
results: List[List[RecognizerResult]] = self.analyze_iterator(
texts=value,
language=language,
context=specific_context,
n_process=n_process,
batch_size=batch_size,
**kwargs,
)
else:
raise ValueError(f"type {type(value)} is unsupported.")
yield DictAnalyzerResult(key=key, value=value, recognizer_results=results)
@staticmethod
def _validate_types(value_iterator: Iterable[Any]) -> Iterator[Any]:
for val in value_iterator:
if val and type(val) not in (int, float, bool, str):
err_msg = (
"Analyzer.analyze_iterator only works "
"on primitive types (int, float, bool, str). "
"Lists of objects are not yet supported."
)
logger.error(err_msg)
raise ValueError(err_msg)
yield val
@staticmethod
def _get_nested_keys_to_skip(key, keys_to_skip):
new_keys_to_skip = [
k.replace(f"{key}.", "") for k in keys_to_skip if k.startswith(key)
]
return new_keys_to_skip