File size: 11,242 Bytes
4585e62
61cdead
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1af018c
61cdead
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1af018c
61cdead
 
 
 
 
 
 
 
 
1af018c
61cdead
 
 
 
 
 
 
 
 
 
 
 
a0941ae
ada15f5
 
61cdead
 
 
 
 
 
 
 
1af018c
61cdead
 
 
 
a0941ae
61cdead
 
a0941ae
61cdead
 
 
 
ada15f5
61cdead
 
 
 
 
 
 
 
 
1af018c
 
a0941ae
10b5b70
5e3b290
 
db79ccb
5e3b290
 
 
 
 
 
44f6895
5e3b290
1af018c
 
 
61cdead
a0941ae
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
import gradio as gr
import pdfplumber
from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine
from presidio_image_redactor import ImageRedactorEngine
import numpy as np
import re
from docx import Document
from PIL import Image
import pytesseract
import fitz  # pymupdf
import io

analyzer = AnalyzerEngine()
anonymizer = AnonymizerEngine()
image_redactor = ImageRedactorEngine()

COMPLIANCE_ENTITIES = {
    "HIPAA": ["PERSON", "EMAIL_ADDRESS", "PHONE_NUMBER", "MEDICAL_RECORD_NUMBER", "SSN"],
    "GDPR": ["PERSON", "EMAIL_ADDRESS", "LOCATION"],
    "CCPA": ["PERSON", "EMAIL_ADDRESS", "IP_ADDRESS", "SSN", "CREDIT_CARD"]
}

SUPPORTED_FILE_TYPES = [".pdf", ".docx", ".txt", ".png", ".jpg", ".jpeg"]

def extract_text(doc):
    if not hasattr(doc, "name"):
        return "ERROR: No file uploaded."
    try:
        fname = doc.name.lower()
        if fname.endswith(".pdf"):
            with pdfplumber.open(doc.name) as pdf:
                pages = [page.extract_text() or "" for page in pdf.pages]
            text = "\n".join(pages)
        elif fname.endswith(".docx"):
            document = Document(doc.name)
            text = "\n".join([p.text for p in document.paragraphs])
        elif fname.endswith(".txt"):
            with open(doc.name, "r", encoding="utf-8") as f:
                text = f.read()
        elif fname.endswith((".png", ".jpg", ".jpeg")):
            img = Image.open(doc.name)
            text = pytesseract.image_to_string(img)
        else:
            return "ERROR: Unsupported file type."
        if not text.strip():
            return "ERROR: Document contains no extractable text."
        return text
    except Exception as e:
        return f"ERROR: {e}"

def detect_pii(text):
    try:
        entities = [
            "PERSON", "EMAIL_ADDRESS", "PHONE_NUMBER", "MEDICAL_RECORD_NUMBER",
            "SSN", "CREDIT_CARD", "LOCATION", "IP_ADDRESS"
        ]
        presidio_results = analyzer.analyze(text=text, entities=entities, language="en")
        findings = [
            {
                "entity": r.entity_type,
                "score": r.score,
                "start": r.start,
                "end": r.end,
                "text": text[r.start:r.end].strip()
            }
            for r in presidio_results
        ]
        findings += find_ssns(text)
        findings += find_ip_addresses(text)
        return findings, presidio_results
    except Exception as e:
        return [{"entity": "ERROR", "text": str(e)}], []

def find_ip_addresses(text):
    pattern = r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b'
    return [
        {
            "entity": "IP_ADDRESS",
            "score": 1.0,
            "start": m.start(),
            "end": m.end(),
            "text": m.group()
        }
        for m in re.finditer(pattern, text)
    ]

def find_ssns(text):
    pattern = r'(?i)(ssn|social security number)[\s:]*([0-9]{3}-[0-9]{2}-[0-9]{4})'
    findings = []
    for m in re.finditer(pattern, text):
        findings.append({
            "entity": "SSN",
            "score": 1.0,
            "start": m.start(2),
            "end": m.end(2),
            "text": m.group(2)
        })
    for m in re.finditer(r'\b[0-9]{3}-[0-9]{2}-[0-9]{4}\b', text):
        findings.append({
            "entity": "SSN",
            "score": 0.95,
            "start": m.start(),
            "end": m.end(),
            "text": m.group()
        })
    return findings

def clean_person_entities(findings):
    cleaned = []
    for f in findings:
        if f["entity"] == "PERSON":
            name = " ".join(f["text"].split()[:2])
            if name.lower() not in ["date", "department"]:
                f = f.copy()
                f["text"] = name
                cleaned.append(f)
        else:
            cleaned.append(f)
    return cleaned

def dedupe_findings(findings):
    seen = set()
    deduped = []
    for f in findings:
        key = (f["entity"], f["text"], f["start"], f["end"])
        if key not in seen:
            seen.add(key)
            deduped.append(f)
    return deduped

def risk_score(findings):
    weights = {
        "PERSON": 1, "EMAIL_ADDRESS": 2, "CREDIT_CARD": 4, "SSN": 5,
        "IP_ADDRESS": 2, "PHONE_NUMBER": 2, "MEDICAL_RECORD_NUMBER": 3
    }
    return sum(weights.get(f["entity"], 1) for f in findings)

def suggest_fixes(findings):
    fixes = []
    for f in findings:
        ent = f["entity"]
        if ent == "PERSON":
            fixes.append("Remove or mask full names.")
        if ent == "EMAIL_ADDRESS":
            fixes.append("Anonymize email addresses.")
        if ent == "CREDIT_CARD":
            fixes.append("Remove or mask credit card numbers.")
        if ent == "SSN":
            fixes.append("Remove or mask social security numbers.")
        if ent == "PHONE_NUMBER":
            fixes.append("Mask phone numbers.")
        if ent == "LOCATION":
            fixes.append("Remove or generalize location data.")
        if ent == "IP_ADDRESS":
            fixes.append("Remove or anonymize IP addresses.")
        if ent == "MEDICAL_RECORD_NUMBER":
            fixes.append("Anonymize medical record numbers.")
    return list(set(fixes))

def summarize_narrative(findings, regime):
    if not findings:
        return "No sensitive or regulated information was found in this document."
    entity_types = [f["entity"] for f in findings]
    summary_lines = [f"Under **{regime}**, the document contains:"]
    for entity in sorted(set(entity_types)):
        count = entity_types.count(entity)
        summary_lines.append(f"- **{entity.replace('_', ' ').title()}**: {count} instance(s)")
    summary_lines.append("These must be anonymized or removed to ensure compliance.")
    return "\n".join(summary_lines)

def score_legend():
    return (
        "**Risk Score Legend:**\n"
        "- 0–3: Low risk (little or no PII detected)\n"
        "- 4–7: Moderate risk (some PII detected, take caution)\n"
        "- 8+: High risk (multiple/high-value PII found—document needs urgent attention)\n"
        "\n"
        "Score is calculated based on entity sensitivity. For example, SSN and credit cards are higher risk than names."
    )

def redact_text(text, all_findings):
    all_findings = sorted(all_findings, key=lambda f: f["start"], reverse=True)
    redacted_text = text
    for f in all_findings:
        if not f["text"] or len(f["text"]) < 3:
            continue
        redacted_text = redacted_text[:f["start"]] + "[REDACTED]" + redacted_text[f["end"]:]
    return redacted_text

def save_redacted_file(redacted_text):
    path = "/tmp/redacted_output.txt"
    with open(path, "w", encoding="utf-8") as f:
        f.write(redacted_text)
    return path

def redact_image_with_presidio(image_path):
    img = Image.open(image_path)
    redacted_img = image_redactor.redact(img)
    out_path = "/tmp/redacted_image.png"
    redacted_img.save(out_path)
    return out_path

def redact_pdf_with_presidio(pdf_path):
    doc = fitz.open(pdf_path)
    output_pdf = fitz.open()
    for page in doc:
        pix = page.get_pixmap()
        img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
        redacted_img = image_redactor.redact(img)
        img_byte_arr = io.BytesIO()
        redacted_img.save(img_byte_arr, format='PNG')
        img_byte_arr.seek(0)
        rect = fitz.Rect(0, 0, pix.width, pix.height)
        out_page = output_pdf.new_page(width=pix.width, height=pix.height)
        out_page.insert_image(rect, stream=img_byte_arr.getvalue())
    out_path = "/tmp/redacted_output.pdf"
    output_pdf.save(out_path)
    output_pdf.close()
    return out_path

def executive_summary_template(findings, score, regime):
    if not findings:
        return (
            f"No sensitive information detected under {regime}. Document is considered low risk."
        )
    risk_level = (
        "High Risk" if score >= 8 else
        "Moderate Risk" if score >= 4 else "Low Risk"
    )
    entity_counts = {}
    for f in findings:
        entity_counts[f["entity"]] = entity_counts.get(f["entity"], 0) + 1

    summary_lines = [
        f"This document falls under {regime} with a risk score of {score} ({risk_level})."
    ]
    if entity_counts:
        summary_lines.append(
            "Sensitive information detected: " +
            ", ".join([f"{k} ({v})" for k, v in entity_counts.items()]) + "."
        )
    summary_lines.append(
        "Recommendation: Anonymize or redact all sensitive entities to ensure compliance."
    )
    return " ".join(summary_lines)

def agentic_compliance(doc, regime):
    text = extract_text(doc)
    if text.startswith("ERROR"):
        return text, None, None, None
    findings, presidio_results = detect_pii(text)
    findings = clean_person_entities(findings)
    findings = dedupe_findings(findings)

    entities_needed = COMPLIANCE_ENTITIES.get(regime, [])
    relevant = [f for f in findings if f["entity"] in entities_needed]
    score = risk_score(relevant)
    fixes = suggest_fixes(relevant)
    summary = summarize_narrative(relevant, regime)
    exec_summary = executive_summary_template(relevant, score, regime)

    findings_md = "\n".join([
        f"- **{f['entity']}** (`{f['text']}`), score: {f.get('score', 0):.2f}"
        for f in relevant
    ]) if relevant else "No relevant PII found for this regime."

    fixes_md = "\n".join([f"- {fix}" for fix in fixes]) if fixes else "No action needed."
    legend_md = score_legend()

    redacted = redact_text(text, findings)
    redacted_path = save_redacted_file(redacted)

    redacted_file_path = None
    redacted_image = None
    if hasattr(doc, "name"):
        fname = doc.name.lower()
        if fname.endswith((".png", ".jpg", ".jpeg")):
            redacted_file_path = redact_image_with_presidio(doc.name)
            redacted_image = redacted_file_path
        elif fname.endswith(".pdf"):
            redacted_file_path = redact_pdf_with_presidio(doc.name)
            redacted_image = None

    md = f"""### Compliance Regime: **{regime}**
**Executive Summary:**  
{exec_summary}
**Findings:**  
{findings_md}
**Risk Score:** {score}
**Actionable Recommendations:**  
{fixes_md}
**Summary:**  
{summary}
---
{legend_md}
"""
    return md.strip(), redacted_path, redacted_file_path, redacted_image

# ---- Gradio App UI: No previews ----

with gr.Blocks(title="Agentic Compliance MCP Server") as demo:
    gr.Markdown("# Agentic Compliance MCP\nUpload a document to check it for PII then select a compliance regime.")
    with gr.Tab("Compliance Agent"):
        doc = gr.File(label="Upload Document", file_types=SUPPORTED_FILE_TYPES)
        regime = gr.Dropdown(choices=list(COMPLIANCE_ENTITIES.keys()), label="Compliance Regime")
        out = gr.Markdown(label="Compliance Output")
        redacted_out = gr.File(label="Download Redacted Text")
        file_redacted_out = gr.File(label="Download Redacted PDF/Image")
        redacted_img = gr.Image(label="Redacted Image Preview")

        gr.Button("Run Compliance Agent").click(
            agentic_compliance,
            inputs=[doc, regime],
            outputs=[out, redacted_out, file_redacted_out, redacted_img]
        )

demo.launch(mcp_server=True)