petrsovadina commited on
Commit
10ab3e3
1 Parent(s): 8acb3f5

Update presidio_helpers.py

Browse files
Files changed (1) hide show
  1. presidio_helpers.py +3 -197
presidio_helpers.py CHANGED
@@ -1,6 +1,3 @@
1
- """
2
- Helper methods for the Presidio Streamlit app
3
- """
4
  from typing import List, Optional, Tuple
5
  import logging
6
  import streamlit as st
@@ -22,12 +19,11 @@ from openai_fake_data_generator import (
22
  )
23
  from presidio_nlp_engine_config import (
24
  create_nlp_engine_with_spacy,
25
- create_nlp_engine_with_transformers, # Opraveno, odstraněna přebytečná čárka
26
  )
27
 
28
  logger = logging.getLogger("presidio-streamlit")
29
 
30
-
31
  @st.cache_resource
32
  def nlp_engine_and_registry(
33
  model_family: str,
@@ -35,25 +31,13 @@ def nlp_engine_and_registry(
35
  ta_key: Optional[str] = None,
36
  ta_endpoint: Optional[str] = None,
37
  ) -> Tuple[NlpEngine, RecognizerRegistry]:
38
- """Create the NLP Engine instance based on the requested model.
39
- :param model_family: Which model package to use for NER.
40
- :param model_path: Which model to use for NER. E.g.,
41
- "StanfordAIMI/stanford-deidentifier-base",
42
- "obi/deid_roberta_i2b2",
43
- "en_core_web_lg"
44
- :param ta_key: Key to the Text Analytics endpoint (only if model_path = "Azure Text Analytics")
45
- :param ta_endpoint: Endpoint of the Text Analytics instance (only if model_path = "Azure Text Analytics")
46
- """
47
-
48
- # Set up NLP Engine according to the model of choice
49
  if "spacy" in model_family.lower():
50
  return create_nlp_engine_with_spacy(model_path)
51
- elif "transformers" in model_family.lower():
52
  return create_nlp_engine_with_transformers(model_path)
53
  else:
54
  raise ValueError(f"Model family {model_family} not supported")
55
 
56
-
57
  @st.cache_resource
58
  def analyzer_engine(
59
  model_family: str,
@@ -61,188 +45,10 @@ def analyzer_engine(
61
  ta_key: Optional[str] = None,
62
  ta_endpoint: Optional[str] = None,
63
  ) -> AnalyzerEngine:
64
- """Create the NLP Engine instance based on the requested model.
65
- :param model_family: Which model package to use for NER.
66
- :param model_path: Which model to use for NER:
67
- "StanfordAIMI/stanford-deidentifier-base",
68
- "obi/deid_roberta_i2b2",
69
- "en_core_web_lg"
70
- :param ta_key: Key to the Text Analytics endpoint (only if model_path = "Azure Text Analytics")
71
- :param ta_endpoint: Endpoint of the Text Analytics instance (only if model_path = "Azure Text Analytics")
72
- """
73
  nlp_engine, registry = nlp_engine_and_registry(
74
  model_family, model_path, ta_key, ta_endpoint
75
  )
76
  analyzer = AnalyzerEngine(nlp_engine=nlp_engine, registry=registry)
77
  return analyzer
78
 
79
-
80
- @st.cache_resource
81
- def anonymizer_engine():
82
- """Return AnonymizerEngine."""
83
- return AnonymizerEngine()
84
-
85
-
86
- @st.cache_data
87
- def get_supported_entities(
88
- model_family: str, model_path: str, ta_key: str, ta_endpoint: str
89
- ):
90
- """Return supported entities from the Analyzer Engine."""
91
- return analyzer_engine(
92
- model_family, model_path, ta_key, ta_endpoint
93
- ).get_supported_entities() + ["GENERIC_PII"]
94
-
95
-
96
- @st.cache_data
97
- def analyze(
98
- model_family: str, model_path: str, ta_key: str, ta_endpoint: str, **kwargs
99
- ):
100
- """Analyze input using Analyzer engine and input arguments (kwargs)."""
101
- if "entities" not in kwargs or "All" in kwargs["entities"]:
102
- kwargs["entities"] = None
103
-
104
- if "deny_list" in kwargs and kwargs["deny_list"] is not None:
105
- ad_hoc_recognizer = create_ad_hoc_deny_list_recognizer(kwargs["deny_list"])
106
- kwargs["ad_hoc_recognizers"] = [ad_hoc_recognizer] if ad_hoc_recognizer else []
107
- del kwargs["deny_list"]
108
-
109
- if "regex_params" in kwargs and len(kwargs["regex_params"]) > 0:
110
- ad_hoc_recognizer = create_ad_hoc_regex_recognizer(*kwargs["regex_params"])
111
- kwargs["ad_hoc_recognizers"] = [ad_hoc_recognizer] if ad_hoc_recognizer else []
112
- del kwargs["regex_params"]
113
-
114
- return analyzer_engine(model_family, model_path, ta_key, ta_endpoint).analyze(
115
- **kwargs
116
- )
117
-
118
-
119
- def anonymize(
120
- text: str,
121
- operator: str,
122
- analyze_results: List[RecognizerResult],
123
- mask_char: Optional[str] = None,
124
- number_of_chars: Optional[str] = None,
125
- encrypt_key: Optional[str] = None,
126
- ):
127
- """Anonymize identified input using Presidio Anonymizer.
128
-
129
- :param text: Full text
130
- :param operator: Operator name
131
- :param mask_char: Mask char (for mask operator)
132
- :param number_of_chars: Number of characters to mask (for mask operator)
133
- :param encrypt_key: Encryption key (for encrypt operator)
134
- :param analyze_results: list of results from presidio analyzer engine
135
- """
136
-
137
- if operator == "mask":
138
- operator_config = {
139
- "type": "mask",
140
- "masking_char": mask_char,
141
- "chars_to_mask": number_of_chars,
142
- "from_end": False,
143
- }
144
-
145
- # Define operator config
146
- elif operator == "encrypt":
147
- operator_config = {"key": encrypt_key}
148
- elif operator == "highlight":
149
- operator_config = {"lambda": lambda x: x}
150
- else:
151
- operator_config = None
152
-
153
- # Change operator if needed as intermediate step
154
- if operator == "highlight":
155
- operator = "custom"
156
- elif operator == "synthesize":
157
- operator = "replace"
158
- else:
159
- operator = operator
160
-
161
- res = anonymizer_engine().anonymize(
162
- text,
163
- analyze_results,
164
- operators={"DEFAULT": OperatorConfig(operator, operator_config)},
165
- )
166
- return res
167
-
168
-
169
- def annotate(text: str, analyze_results: List[RecognizerResult]):
170
- """Highlight the identified PII entities on the original text
171
-
172
- :param text: Full text
173
- :param analyze_results: list of results from presidio analyzer engine
174
- """
175
- tokens = []
176
-
177
- # Use the anonymizer to resolve overlaps
178
- results = anonymize(
179
- text=text,
180
- operator="highlight",
181
- analyze_results=analyze_results,
182
- )
183
-
184
- # sort by start index
185
- results = sorted(results.items, key=lambda x: x.start)
186
- for i, res in enumerate(results):
187
- if i == 0:
188
- tokens.append(text[: res.start])
189
-
190
- # append entity text and entity type
191
- tokens.append((text[res.start : res.end], res.entity_type))
192
-
193
- # if another entity coming i.e. we're not at the last results element, add text up to next entity
194
- if i != len(results) - 1:
195
- tokens.append(text[res.end : results[i + 1].start])
196
- # if no more entities coming, add all remaining text
197
- else:
198
- tokens.append(text[res.end :])
199
- return tokens
200
-
201
-
202
- def create_fake_data(
203
- text: str,
204
- analyze_results: List[RecognizerResult],
205
- openai_params: OpenAIParams,
206
- ):
207
- """Creates a synthetic version of the text using OpenAI APIs"""
208
- if not openai_params.openai_key:
209
- return "Please provide your OpenAI key"
210
- results = anonymize(text=text, operator="replace", analyze_results=analyze_results)
211
- prompt = create_prompt(results.text)
212
- print(f"Prompt: {prompt}")
213
- fake = call_completion_model(prompt=prompt, openai_params=openai_params)
214
- return fake
215
-
216
-
217
- @st.cache_data
218
- def call_openai_api(
219
- prompt: str, openai_model_name: str, openai_deployment_name: Optional[str] = None
220
- ) -> str:
221
- fake_data = call_completion_model(
222
- prompt, model=openai_model_name, deployment_id=openai_deployment_name
223
- )
224
- return fake_data
225
-
226
-
227
- def create_ad_hoc_deny_list_recognizer(
228
- deny_list=Optional[List[str]],
229
- ) -> Optional[PatternRecognizer]:
230
- if not deny_list:
231
- return None
232
-
233
- deny_list_recognizer = PatternRecognizer(
234
- supported_entity="GENERIC_PII", deny_list=deny_list
235
- )
236
- return deny_list_recognizer
237
-
238
-
239
- def create_ad_hoc_regex_recognizer(
240
- regex: str, entity_type: str, score: float, context: Optional[List[str]] = None
241
- ) -> Optional[PatternRecognizer]:
242
- if not regex:
243
- return None
244
- pattern = Pattern(name="Regex pattern", regex=regex, score=score)
245
- regex_recognizer = PatternRecognizer(
246
- supported_entity=entity_type, patterns=[pattern], context=context
247
- )
248
- return regex_recognizer
 
 
 
 
1
  from typing import List, Optional, Tuple
2
  import logging
3
  import streamlit as st
 
19
  )
20
  from presidio_nlp_engine_config import (
21
  create_nlp_engine_with_spacy,
22
+ create_nlp_engine_with_transformers,
23
  )
24
 
25
  logger = logging.getLogger("presidio-streamlit")
26
 
 
27
  @st.cache_resource
28
  def nlp_engine_and_registry(
29
  model_family: str,
 
31
  ta_key: Optional[str] = None,
32
  ta_endpoint: Optional[str] = None,
33
  ) -> Tuple[NlpEngine, RecognizerRegistry]:
 
 
 
 
 
 
 
 
 
 
 
34
  if "spacy" in model_family.lower():
35
  return create_nlp_engine_with_spacy(model_path)
36
+ elif "transformers" in model_family.lower() or "iiiorg" in model_family.lower():
37
  return create_nlp_engine_with_transformers(model_path)
38
  else:
39
  raise ValueError(f"Model family {model_family} not supported")
40
 
 
41
  @st.cache_resource
42
  def analyzer_engine(
43
  model_family: str,
 
45
  ta_key: Optional[str] = None,
46
  ta_endpoint: Optional[str] = None,
47
  ) -> AnalyzerEngine:
 
 
 
 
 
 
 
 
 
48
  nlp_engine, registry = nlp_engine_and_registry(
49
  model_family, model_path, ta_key, ta_endpoint
50
  )
51
  analyzer = AnalyzerEngine(nlp_engine=nlp_engine, registry=registry)
52
  return analyzer
53
 
54
+ # ... (zbytek pomocných funkcí)