Armando Medina
Update app.py
a0941ae verified
import gradio as gr
import pdfplumber
from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine
from presidio_image_redactor import ImageRedactorEngine
import numpy as np
import re
from docx import Document
from PIL import Image
import pytesseract
import fitz # pymupdf
import io
analyzer = AnalyzerEngine()
anonymizer = AnonymizerEngine()
image_redactor = ImageRedactorEngine()
COMPLIANCE_ENTITIES = {
"HIPAA": ["PERSON", "EMAIL_ADDRESS", "PHONE_NUMBER", "MEDICAL_RECORD_NUMBER", "SSN"],
"GDPR": ["PERSON", "EMAIL_ADDRESS", "LOCATION"],
"CCPA": ["PERSON", "EMAIL_ADDRESS", "IP_ADDRESS", "SSN", "CREDIT_CARD"]
}
SUPPORTED_FILE_TYPES = [".pdf", ".docx", ".txt", ".png", ".jpg", ".jpeg"]
def extract_text(doc):
if not hasattr(doc, "name"):
return "ERROR: No file uploaded."
try:
fname = doc.name.lower()
if fname.endswith(".pdf"):
with pdfplumber.open(doc.name) as pdf:
pages = [page.extract_text() or "" for page in pdf.pages]
text = "\n".join(pages)
elif fname.endswith(".docx"):
document = Document(doc.name)
text = "\n".join([p.text for p in document.paragraphs])
elif fname.endswith(".txt"):
with open(doc.name, "r", encoding="utf-8") as f:
text = f.read()
elif fname.endswith((".png", ".jpg", ".jpeg")):
img = Image.open(doc.name)
text = pytesseract.image_to_string(img)
else:
return "ERROR: Unsupported file type."
if not text.strip():
return "ERROR: Document contains no extractable text."
return text
except Exception as e:
return f"ERROR: {e}"
def detect_pii(text):
try:
entities = [
"PERSON", "EMAIL_ADDRESS", "PHONE_NUMBER", "MEDICAL_RECORD_NUMBER",
"SSN", "CREDIT_CARD", "LOCATION", "IP_ADDRESS"
]
presidio_results = analyzer.analyze(text=text, entities=entities, language="en")
findings = [
{
"entity": r.entity_type,
"score": r.score,
"start": r.start,
"end": r.end,
"text": text[r.start:r.end].strip()
}
for r in presidio_results
]
findings += find_ssns(text)
findings += find_ip_addresses(text)
return findings, presidio_results
except Exception as e:
return [{"entity": "ERROR", "text": str(e)}], []
def find_ip_addresses(text):
pattern = r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b'
return [
{
"entity": "IP_ADDRESS",
"score": 1.0,
"start": m.start(),
"end": m.end(),
"text": m.group()
}
for m in re.finditer(pattern, text)
]
def find_ssns(text):
pattern = r'(?i)(ssn|social security number)[\s:]*([0-9]{3}-[0-9]{2}-[0-9]{4})'
findings = []
for m in re.finditer(pattern, text):
findings.append({
"entity": "SSN",
"score": 1.0,
"start": m.start(2),
"end": m.end(2),
"text": m.group(2)
})
for m in re.finditer(r'\b[0-9]{3}-[0-9]{2}-[0-9]{4}\b', text):
findings.append({
"entity": "SSN",
"score": 0.95,
"start": m.start(),
"end": m.end(),
"text": m.group()
})
return findings
def clean_person_entities(findings):
cleaned = []
for f in findings:
if f["entity"] == "PERSON":
name = " ".join(f["text"].split()[:2])
if name.lower() not in ["date", "department"]:
f = f.copy()
f["text"] = name
cleaned.append(f)
else:
cleaned.append(f)
return cleaned
def dedupe_findings(findings):
seen = set()
deduped = []
for f in findings:
key = (f["entity"], f["text"], f["start"], f["end"])
if key not in seen:
seen.add(key)
deduped.append(f)
return deduped
def risk_score(findings):
weights = {
"PERSON": 1, "EMAIL_ADDRESS": 2, "CREDIT_CARD": 4, "SSN": 5,
"IP_ADDRESS": 2, "PHONE_NUMBER": 2, "MEDICAL_RECORD_NUMBER": 3
}
return sum(weights.get(f["entity"], 1) for f in findings)
def suggest_fixes(findings):
fixes = []
for f in findings:
ent = f["entity"]
if ent == "PERSON":
fixes.append("Remove or mask full names.")
if ent == "EMAIL_ADDRESS":
fixes.append("Anonymize email addresses.")
if ent == "CREDIT_CARD":
fixes.append("Remove or mask credit card numbers.")
if ent == "SSN":
fixes.append("Remove or mask social security numbers.")
if ent == "PHONE_NUMBER":
fixes.append("Mask phone numbers.")
if ent == "LOCATION":
fixes.append("Remove or generalize location data.")
if ent == "IP_ADDRESS":
fixes.append("Remove or anonymize IP addresses.")
if ent == "MEDICAL_RECORD_NUMBER":
fixes.append("Anonymize medical record numbers.")
return list(set(fixes))
def summarize_narrative(findings, regime):
if not findings:
return "No sensitive or regulated information was found in this document."
entity_types = [f["entity"] for f in findings]
summary_lines = [f"Under **{regime}**, the document contains:"]
for entity in sorted(set(entity_types)):
count = entity_types.count(entity)
summary_lines.append(f"- **{entity.replace('_', ' ').title()}**: {count} instance(s)")
summary_lines.append("These must be anonymized or removed to ensure compliance.")
return "\n".join(summary_lines)
def score_legend():
return (
"**Risk Score Legend:**\n"
"- 0–3: Low risk (little or no PII detected)\n"
"- 4–7: Moderate risk (some PII detected, take caution)\n"
"- 8+: High risk (multiple/high-value PII found—document needs urgent attention)\n"
"\n"
"Score is calculated based on entity sensitivity. For example, SSN and credit cards are higher risk than names."
)
def redact_text(text, all_findings):
all_findings = sorted(all_findings, key=lambda f: f["start"], reverse=True)
redacted_text = text
for f in all_findings:
if not f["text"] or len(f["text"]) < 3:
continue
redacted_text = redacted_text[:f["start"]] + "[REDACTED]" + redacted_text[f["end"]:]
return redacted_text
def save_redacted_file(redacted_text):
path = "/tmp/redacted_output.txt"
with open(path, "w", encoding="utf-8") as f:
f.write(redacted_text)
return path
def redact_image_with_presidio(image_path):
img = Image.open(image_path)
redacted_img = image_redactor.redact(img)
out_path = "/tmp/redacted_image.png"
redacted_img.save(out_path)
return out_path
def redact_pdf_with_presidio(pdf_path):
doc = fitz.open(pdf_path)
output_pdf = fitz.open()
for page in doc:
pix = page.get_pixmap()
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
redacted_img = image_redactor.redact(img)
img_byte_arr = io.BytesIO()
redacted_img.save(img_byte_arr, format='PNG')
img_byte_arr.seek(0)
rect = fitz.Rect(0, 0, pix.width, pix.height)
out_page = output_pdf.new_page(width=pix.width, height=pix.height)
out_page.insert_image(rect, stream=img_byte_arr.getvalue())
out_path = "/tmp/redacted_output.pdf"
output_pdf.save(out_path)
output_pdf.close()
return out_path
def executive_summary_template(findings, score, regime):
if not findings:
return (
f"No sensitive information detected under {regime}. Document is considered low risk."
)
risk_level = (
"High Risk" if score >= 8 else
"Moderate Risk" if score >= 4 else "Low Risk"
)
entity_counts = {}
for f in findings:
entity_counts[f["entity"]] = entity_counts.get(f["entity"], 0) + 1
summary_lines = [
f"This document falls under {regime} with a risk score of {score} ({risk_level})."
]
if entity_counts:
summary_lines.append(
"Sensitive information detected: " +
", ".join([f"{k} ({v})" for k, v in entity_counts.items()]) + "."
)
summary_lines.append(
"Recommendation: Anonymize or redact all sensitive entities to ensure compliance."
)
return " ".join(summary_lines)
def agentic_compliance(doc, regime):
text = extract_text(doc)
if text.startswith("ERROR"):
return text, None, None, None
findings, presidio_results = detect_pii(text)
findings = clean_person_entities(findings)
findings = dedupe_findings(findings)
entities_needed = COMPLIANCE_ENTITIES.get(regime, [])
relevant = [f for f in findings if f["entity"] in entities_needed]
score = risk_score(relevant)
fixes = suggest_fixes(relevant)
summary = summarize_narrative(relevant, regime)
exec_summary = executive_summary_template(relevant, score, regime)
findings_md = "\n".join([
f"- **{f['entity']}** (`{f['text']}`), score: {f.get('score', 0):.2f}"
for f in relevant
]) if relevant else "No relevant PII found for this regime."
fixes_md = "\n".join([f"- {fix}" for fix in fixes]) if fixes else "No action needed."
legend_md = score_legend()
redacted = redact_text(text, findings)
redacted_path = save_redacted_file(redacted)
redacted_file_path = None
redacted_image = None
if hasattr(doc, "name"):
fname = doc.name.lower()
if fname.endswith((".png", ".jpg", ".jpeg")):
redacted_file_path = redact_image_with_presidio(doc.name)
redacted_image = redacted_file_path
elif fname.endswith(".pdf"):
redacted_file_path = redact_pdf_with_presidio(doc.name)
redacted_image = None
md = f"""### Compliance Regime: **{regime}**
**Executive Summary:**
{exec_summary}
**Findings:**
{findings_md}
**Risk Score:** {score}
**Actionable Recommendations:**
{fixes_md}
**Summary:**
{summary}
---
{legend_md}
"""
return md.strip(), redacted_path, redacted_file_path, redacted_image
# ---- Gradio App UI: No previews ----
with gr.Blocks(title="Agentic Compliance MCP Server") as demo:
gr.Markdown("# Agentic Compliance MCP\nUpload a document to check it for PII then select a compliance regime.")
with gr.Tab("Compliance Agent"):
doc = gr.File(label="Upload Document", file_types=SUPPORTED_FILE_TYPES)
regime = gr.Dropdown(choices=list(COMPLIANCE_ENTITIES.keys()), label="Compliance Regime")
out = gr.Markdown(label="Compliance Output")
redacted_out = gr.File(label="Download Redacted Text")
file_redacted_out = gr.File(label="Download Redacted PDF/Image")
redacted_img = gr.Image(label="Redacted Image Preview")
gr.Button("Run Compliance Agent").click(
agentic_compliance,
inputs=[doc, regime],
outputs=[out, redacted_out, file_redacted_out, redacted_img]
)
demo.launch(mcp_server=True)