petrsovadina commited on
Commit
9e03d77
1 Parent(s): 7a4486e

Update presidio_helpers.py

Browse files
Files changed (1) hide show
  1. presidio_helpers.py +195 -1
presidio_helpers.py CHANGED
@@ -1,3 +1,6 @@
 
 
 
1
  from typing import List, Optional, Tuple
2
  import logging
3
  import streamlit as st
@@ -24,6 +27,7 @@ from presidio_nlp_engine_config import (
24
 
25
  logger = logging.getLogger("presidio-streamlit")
26
 
 
27
  @st.cache_resource
28
  def nlp_engine_and_registry(
29
  model_family: str,
@@ -31,6 +35,17 @@ def nlp_engine_and_registry(
31
  ta_key: Optional[str] = None,
32
  ta_endpoint: Optional[str] = None,
33
  ) -> Tuple[NlpEngine, RecognizerRegistry]:
 
 
 
 
 
 
 
 
 
 
 
34
  if "spacy" in model_family.lower():
35
  return create_nlp_engine_with_spacy(model_path)
36
  elif "transformers" in model_family.lower() or "iiiorg" in model_family.lower():
@@ -38,6 +53,7 @@ def nlp_engine_and_registry(
38
  else:
39
  raise ValueError(f"Model family {model_family} not supported")
40
 
 
41
  @st.cache_resource
42
  def analyzer_engine(
43
  model_family: str,
@@ -45,10 +61,188 @@ def analyzer_engine(
45
  ta_key: Optional[str] = None,
46
  ta_endpoint: Optional[str] = None,
47
  ) -> AnalyzerEngine:
 
 
 
 
 
 
 
 
 
48
  nlp_engine, registry = nlp_engine_and_registry(
49
  model_family, model_path, ta_key, ta_endpoint
50
  )
51
  analyzer = AnalyzerEngine(nlp_engine=nlp_engine, registry=registry)
52
  return analyzer
53
 
54
- # ... (zbytek pomocných funkcí)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Helper methods for the Presidio Streamlit app
3
+ """
4
  from typing import List, Optional, Tuple
5
  import logging
6
  import streamlit as st
 
27
 
28
  logger = logging.getLogger("presidio-streamlit")
29
 
30
+
31
  @st.cache_resource
32
  def nlp_engine_and_registry(
33
  model_family: str,
 
35
  ta_key: Optional[str] = None,
36
  ta_endpoint: Optional[str] = None,
37
  ) -> Tuple[NlpEngine, RecognizerRegistry]:
38
+ """Create the NLP Engine instance based on the requested model.
39
+ :param model_family: Which model package to use for NER.
40
+ :param model_path: Which model to use for NER. E.g.,
41
+ "StanfordAIMI/stanford-deidentifier-base",
42
+ "obi/deid_roberta_i2b2",
43
+ "en_core_web_sm"
44
+ :param ta_key: Key to the Text Analytics endpoint (only if model_path = "Azure Text Analytics")
45
+ :param ta_endpoint: Endpoint of the Text Analytics instance (only if model_path = "Azure Text Analytics")
46
+ """
47
+
48
+ # Set up NLP Engine according to the model of choice
49
  if "spacy" in model_family.lower():
50
  return create_nlp_engine_with_spacy(model_path)
51
  elif "transformers" in model_family.lower() or "iiiorg" in model_family.lower():
 
53
  else:
54
  raise ValueError(f"Model family {model_family} not supported")
55
 
56
+
57
  @st.cache_resource
58
  def analyzer_engine(
59
  model_family: str,
 
61
  ta_key: Optional[str] = None,
62
  ta_endpoint: Optional[str] = None,
63
  ) -> AnalyzerEngine:
64
+ """Create the NLP Engine instance based on the requested model.
65
+ :param model_family: Which model package to use for NER.
66
+ :param model_path: Which model to use for NER:
67
+ "StanfordAIMI/stanford-deidentifier-base",
68
+ "obi/deid_roberta_i2b2",
69
+ "en_core_web_sm"
70
+ :param ta_key: Key to the Text Analytics endpoint (only if model_path = "Azure Text Analytics")
71
+ :param ta_endpoint: Endpoint of the Text Analytics instance (only if model_path = "Azure Text Analytics")
72
+ """
73
  nlp_engine, registry = nlp_engine_and_registry(
74
  model_family, model_path, ta_key, ta_endpoint
75
  )
76
  analyzer = AnalyzerEngine(nlp_engine=nlp_engine, registry=registry)
77
  return analyzer
78
 
79
+
80
+ @st.cache_resource
81
+ def anonymizer_engine():
82
+ """Return AnonymizerEngine."""
83
+ return AnonymizerEngine()
84
+
85
+
86
+ @st.cache_data
87
+ def get_supported_entities(
88
+ model_family: str, model_path: str, ta_key: str, ta_endpoint: str
89
+ ):
90
+ """Return supported entities from the Analyzer Engine."""
91
+ return analyzer_engine(
92
+ model_family, model_path, ta_key, ta_endpoint
93
+ ).get_supported_entities() + ["GENERIC_PII"]
94
+
95
+
96
+ @st.cache_data
97
+ def analyze(
98
+ model_family: str, model_path: str, ta_key: str, ta_endpoint: str, **kwargs
99
+ ):
100
+ """Analyze input using Analyzer engine and input arguments (kwargs)."""
101
+ if "entities" not in kwargs or "All" in kwargs["entities"]:
102
+ kwargs["entities"] = None
103
+
104
+ if "deny_list" in kwargs and kwargs["deny_list"] is not None:
105
+ ad_hoc_recognizer = create_ad_hoc_deny_list_recognizer(kwargs["deny_list"])
106
+ kwargs["ad_hoc_recognizers"] = [ad_hoc_recognizer] if ad_hoc_recognizer else []
107
+ del kwargs["deny_list"]
108
+
109
+ if "regex_params" in kwargs and len(kwargs["regex_params"]) > 0:
110
+ ad_hoc_recognizer = create_ad_hoc_regex_recognizer(*kwargs["regex_params"])
111
+ kwargs["ad_hoc_recognizers"] = [ad_hoc_recognizer] if ad_hoc_recognizer else []
112
+ del kwargs["regex_params"]
113
+
114
+ return analyzer_engine(model_family, model_path, ta_key, ta_endpoint).analyze(
115
+ **kwargs
116
+ )
117
+
118
+
119
+ def anonymize(
120
+ text: str,
121
+ operator: str,
122
+ analyze_results: List[RecognizerResult],
123
+ mask_char: Optional[str] = None,
124
+ number_of_chars: Optional[str] = None,
125
+ encrypt_key: Optional[str] = None,
126
+ ):
127
+ """Anonymize identified input using Presidio Anonymizer.
128
+
129
+ :param text: Full text
130
+ :param operator: Operator name
131
+ :param mask_char: Mask char (for mask operator)
132
+ :param number_of_chars: Number of characters to mask (for mask operator)
133
+ :param encrypt_key: Encryption key (for encrypt operator)
134
+ :param analyze_results: list of results from presidio analyzer engine
135
+ """
136
+
137
+ if operator == "mask":
138
+ operator_config = {
139
+ "type": "mask",
140
+ "masking_char": mask_char,
141
+ "chars_to_mask": number_of_chars,
142
+ "from_end": False,
143
+ }
144
+
145
+ # Define operator config
146
+ elif operator == "encrypt":
147
+ operator_config = {"key": encrypt_key}
148
+ elif operator == "highlight":
149
+ operator_config = {"lambda": lambda x: x}
150
+ else:
151
+ operator_config = None
152
+
153
+ # Change operator if needed as intermediate step
154
+ if operator == "highlight":
155
+ operator = "custom"
156
+ elif operator == "synthesize":
157
+ operator = "replace"
158
+ else:
159
+ operator = operator
160
+
161
+ res = anonymizer_engine().anonymize(
162
+ text,
163
+ analyze_results,
164
+ operators={"DEFAULT": OperatorConfig(operator, operator_config)},
165
+ )
166
+ return res
167
+
168
+
169
+ def annotate(text: str, analyze_results: List[RecognizerResult]):
170
+ """Highlight the identified PII entities on the original text
171
+
172
+ :param text: Full text
173
+ :param analyze_results: list of results from presidio analyzer engine
174
+ """
175
+ tokens = []
176
+
177
+ # Use the anonymizer to resolve overlaps
178
+ results = anonymize(
179
+ text=text,
180
+ operator="highlight",
181
+ analyze_results=analyze_results,
182
+ )
183
+
184
+ # sort by start index
185
+ results = sorted(results.items, key=lambda x: x.start)
186
+ for i, res in enumerate(results):
187
+ if i == 0:
188
+ tokens.append(text[: res.start])
189
+
190
+ # append entity text and entity type
191
+ tokens.append((text[res.start : res.end], res.entity_type))
192
+
193
+ # if another entity coming i.e. we're not at the last results element, add text up to next entity
194
+ if i != len(results) - 1:
195
+ tokens.append(text[res.end : results[i + 1].start])
196
+ # if no more entities coming, add all remaining text
197
+ else:
198
+ tokens.append(text[res.end :])
199
+ return tokens
200
+
201
+
202
+ def create_fake_data(
203
+ text: str,
204
+ analyze_results: List[RecognizerResult],
205
+ openai_params: OpenAIParams,
206
+ ):
207
+ """Creates a synthetic version of the text using OpenAI APIs"""
208
+ if not openai_params.openai_key:
209
+ return "Please provide your OpenAI key"
210
+ results = anonymize(text=text, operator="replace", analyze_results=analyze_results)
211
+ prompt = create_prompt(results.text)
212
+ print(f"Prompt: {prompt}")
213
+ fake = call_completion_model(prompt=prompt, openai_params=openai_params)
214
+ return fake
215
+
216
+
217
+ @st.cache_data
218
+ def call_openai_api(
219
+ prompt: str, openai_model_name: str, openai_deployment_name: Optional[str] = None
220
+ ) -> str:
221
+ fake_data = call_completion_model(
222
+ prompt, model=openai_model_name, deployment_id=openai_deployment_name
223
+ )
224
+ return fake_data
225
+
226
+
227
+ def create_ad_hoc_deny_list_recognizer(
228
+ deny_list=Optional[List[str]],
229
+ ) -> Optional[PatternRecognizer]:
230
+ if not deny_list:
231
+ return None
232
+
233
+ deny_list_recognizer = PatternRecognizer(
234
+ supported_entity="GENERIC_PII", deny_list=deny_list
235
+ )
236
+ return deny_list_recognizer
237
+
238
+
239
+ def create_ad_hoc_regex_recognizer(
240
+ regex: str, entity_type: str, score: float, context: Optional[List[str]] = None
241
+ ) -> Optional[PatternRecognizer]:
242
+ if not regex:
243
+ return None
244
+ pattern = Pattern(name="Regex pattern", regex=regex, score=score)
245
+ regex_recognizer = PatternRecognizer(
246
+ supported_entity=entity_type, patterns=[pattern], context=context
247
+ )
248
+ return regex_recognizer