import gradio as gr import pdfplumber from presidio_analyzer import AnalyzerEngine from presidio_anonymizer import AnonymizerEngine from presidio_image_redactor import ImageRedactorEngine import numpy as np import re from docx import Document from PIL import Image import pytesseract import fitz # pymupdf import io analyzer = AnalyzerEngine() anonymizer = AnonymizerEngine() image_redactor = ImageRedactorEngine() COMPLIANCE_ENTITIES = { "HIPAA": ["PERSON", "EMAIL_ADDRESS", "PHONE_NUMBER", "MEDICAL_RECORD_NUMBER", "SSN"], "GDPR": ["PERSON", "EMAIL_ADDRESS", "LOCATION"], "CCPA": ["PERSON", "EMAIL_ADDRESS", "IP_ADDRESS", "SSN", "CREDIT_CARD"] } SUPPORTED_FILE_TYPES = [".pdf", ".docx", ".txt", ".png", ".jpg", ".jpeg"] def extract_text(doc): if not hasattr(doc, "name"): return "ERROR: No file uploaded." try: fname = doc.name.lower() if fname.endswith(".pdf"): with pdfplumber.open(doc.name) as pdf: pages = [page.extract_text() or "" for page in pdf.pages] text = "\n".join(pages) elif fname.endswith(".docx"): document = Document(doc.name) text = "\n".join([p.text for p in document.paragraphs]) elif fname.endswith(".txt"): with open(doc.name, "r", encoding="utf-8") as f: text = f.read() elif fname.endswith((".png", ".jpg", ".jpeg")): img = Image.open(doc.name) text = pytesseract.image_to_string(img) else: return "ERROR: Unsupported file type." if not text.strip(): return "ERROR: Document contains no extractable text." return text except Exception as e: return f"ERROR: {e}" def detect_pii(text): try: entities = [ "PERSON", "EMAIL_ADDRESS", "PHONE_NUMBER", "MEDICAL_RECORD_NUMBER", "SSN", "CREDIT_CARD", "LOCATION", "IP_ADDRESS" ] presidio_results = analyzer.analyze(text=text, entities=entities, language="en") findings = [ { "entity": r.entity_type, "score": r.score, "start": r.start, "end": r.end, "text": text[r.start:r.end].strip() } for r in presidio_results ] findings += find_ssns(text) findings += find_ip_addresses(text) return findings, presidio_results except Exception as e: return [{"entity": "ERROR", "text": str(e)}], [] def find_ip_addresses(text): pattern = r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b' return [ { "entity": "IP_ADDRESS", "score": 1.0, "start": m.start(), "end": m.end(), "text": m.group() } for m in re.finditer(pattern, text) ] def find_ssns(text): pattern = r'(?i)(ssn|social security number)[\s:]*([0-9]{3}-[0-9]{2}-[0-9]{4})' findings = [] for m in re.finditer(pattern, text): findings.append({ "entity": "SSN", "score": 1.0, "start": m.start(2), "end": m.end(2), "text": m.group(2) }) for m in re.finditer(r'\b[0-9]{3}-[0-9]{2}-[0-9]{4}\b', text): findings.append({ "entity": "SSN", "score": 0.95, "start": m.start(), "end": m.end(), "text": m.group() }) return findings def clean_person_entities(findings): cleaned = [] for f in findings: if f["entity"] == "PERSON": name = " ".join(f["text"].split()[:2]) if name.lower() not in ["date", "department"]: f = f.copy() f["text"] = name cleaned.append(f) else: cleaned.append(f) return cleaned def dedupe_findings(findings): seen = set() deduped = [] for f in findings: key = (f["entity"], f["text"], f["start"], f["end"]) if key not in seen: seen.add(key) deduped.append(f) return deduped def risk_score(findings): weights = { "PERSON": 1, "EMAIL_ADDRESS": 2, "CREDIT_CARD": 4, "SSN": 5, "IP_ADDRESS": 2, "PHONE_NUMBER": 2, "MEDICAL_RECORD_NUMBER": 3 } return sum(weights.get(f["entity"], 1) for f in findings) def suggest_fixes(findings): fixes = [] for f in findings: ent = f["entity"] if ent == "PERSON": fixes.append("Remove or mask full names.") if ent == "EMAIL_ADDRESS": fixes.append("Anonymize email addresses.") if ent == "CREDIT_CARD": fixes.append("Remove or mask credit card numbers.") if ent == "SSN": fixes.append("Remove or mask social security numbers.") if ent == "PHONE_NUMBER": fixes.append("Mask phone numbers.") if ent == "LOCATION": fixes.append("Remove or generalize location data.") if ent == "IP_ADDRESS": fixes.append("Remove or anonymize IP addresses.") if ent == "MEDICAL_RECORD_NUMBER": fixes.append("Anonymize medical record numbers.") return list(set(fixes)) def summarize_narrative(findings, regime): if not findings: return "No sensitive or regulated information was found in this document." entity_types = [f["entity"] for f in findings] summary_lines = [f"Under **{regime}**, the document contains:"] for entity in sorted(set(entity_types)): count = entity_types.count(entity) summary_lines.append(f"- **{entity.replace('_', ' ').title()}**: {count} instance(s)") summary_lines.append("These must be anonymized or removed to ensure compliance.") return "\n".join(summary_lines) def score_legend(): return ( "**Risk Score Legend:**\n" "- 0–3: Low risk (little or no PII detected)\n" "- 4–7: Moderate risk (some PII detected, take caution)\n" "- 8+: High risk (multiple/high-value PII found—document needs urgent attention)\n" "\n" "Score is calculated based on entity sensitivity. For example, SSN and credit cards are higher risk than names." ) def redact_text(text, all_findings): all_findings = sorted(all_findings, key=lambda f: f["start"], reverse=True) redacted_text = text for f in all_findings: if not f["text"] or len(f["text"]) < 3: continue redacted_text = redacted_text[:f["start"]] + "[REDACTED]" + redacted_text[f["end"]:] return redacted_text def save_redacted_file(redacted_text): path = "/tmp/redacted_output.txt" with open(path, "w", encoding="utf-8") as f: f.write(redacted_text) return path def redact_image_with_presidio(image_path): img = Image.open(image_path) redacted_img = image_redactor.redact(img) out_path = "/tmp/redacted_image.png" redacted_img.save(out_path) return out_path def redact_pdf_with_presidio(pdf_path): doc = fitz.open(pdf_path) output_pdf = fitz.open() for page in doc: pix = page.get_pixmap() img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) redacted_img = image_redactor.redact(img) img_byte_arr = io.BytesIO() redacted_img.save(img_byte_arr, format='PNG') img_byte_arr.seek(0) rect = fitz.Rect(0, 0, pix.width, pix.height) out_page = output_pdf.new_page(width=pix.width, height=pix.height) out_page.insert_image(rect, stream=img_byte_arr.getvalue()) out_path = "/tmp/redacted_output.pdf" output_pdf.save(out_path) output_pdf.close() return out_path def executive_summary_template(findings, score, regime): if not findings: return ( f"No sensitive information detected under {regime}. Document is considered low risk." ) risk_level = ( "High Risk" if score >= 8 else "Moderate Risk" if score >= 4 else "Low Risk" ) entity_counts = {} for f in findings: entity_counts[f["entity"]] = entity_counts.get(f["entity"], 0) + 1 summary_lines = [ f"This document falls under {regime} with a risk score of {score} ({risk_level})." ] if entity_counts: summary_lines.append( "Sensitive information detected: " + ", ".join([f"{k} ({v})" for k, v in entity_counts.items()]) + "." ) summary_lines.append( "Recommendation: Anonymize or redact all sensitive entities to ensure compliance." ) return " ".join(summary_lines) def agentic_compliance(doc, regime): text = extract_text(doc) if text.startswith("ERROR"): return text, None, None, None findings, presidio_results = detect_pii(text) findings = clean_person_entities(findings) findings = dedupe_findings(findings) entities_needed = COMPLIANCE_ENTITIES.get(regime, []) relevant = [f for f in findings if f["entity"] in entities_needed] score = risk_score(relevant) fixes = suggest_fixes(relevant) summary = summarize_narrative(relevant, regime) exec_summary = executive_summary_template(relevant, score, regime) findings_md = "\n".join([ f"- **{f['entity']}** (`{f['text']}`), score: {f.get('score', 0):.2f}" for f in relevant ]) if relevant else "No relevant PII found for this regime." fixes_md = "\n".join([f"- {fix}" for fix in fixes]) if fixes else "No action needed." legend_md = score_legend() redacted = redact_text(text, findings) redacted_path = save_redacted_file(redacted) redacted_file_path = None redacted_image = None if hasattr(doc, "name"): fname = doc.name.lower() if fname.endswith((".png", ".jpg", ".jpeg")): redacted_file_path = redact_image_with_presidio(doc.name) redacted_image = redacted_file_path elif fname.endswith(".pdf"): redacted_file_path = redact_pdf_with_presidio(doc.name) redacted_image = None md = f"""### Compliance Regime: **{regime}** **Executive Summary:** {exec_summary} **Findings:** {findings_md} **Risk Score:** {score} **Actionable Recommendations:** {fixes_md} **Summary:** {summary} --- {legend_md} """ return md.strip(), redacted_path, redacted_file_path, redacted_image # ---- Gradio App UI: No previews ---- with gr.Blocks(title="Agentic Compliance MCP Server") as demo: gr.Markdown("# Agentic Compliance MCP\nUpload a document to check it for PII then select a compliance regime.") with gr.Tab("Compliance Agent"): doc = gr.File(label="Upload Document", file_types=SUPPORTED_FILE_TYPES) regime = gr.Dropdown(choices=list(COMPLIANCE_ENTITIES.keys()), label="Compliance Regime") out = gr.Markdown(label="Compliance Output") redacted_out = gr.File(label="Download Redacted Text") file_redacted_out = gr.File(label="Download Redacted PDF/Image") redacted_img = gr.Image(label="Redacted Image Preview") gr.Button("Run Compliance Agent").click( agentic_compliance, inputs=[doc, regime], outputs=[out, redacted_out, file_redacted_out, redacted_img] ) demo.launch(mcp_server=True)