|
import gradio as gr |
|
import pdfplumber |
|
from presidio_analyzer import AnalyzerEngine |
|
from presidio_anonymizer import AnonymizerEngine |
|
from presidio_image_redactor import ImageRedactorEngine |
|
import numpy as np |
|
import re |
|
from docx import Document |
|
from PIL import Image |
|
import pytesseract |
|
import fitz |
|
import io |
|
|
|
analyzer = AnalyzerEngine() |
|
anonymizer = AnonymizerEngine() |
|
image_redactor = ImageRedactorEngine() |
|
|
|
COMPLIANCE_ENTITIES = { |
|
"HIPAA": ["PERSON", "EMAIL_ADDRESS", "PHONE_NUMBER", "MEDICAL_RECORD_NUMBER", "SSN"], |
|
"GDPR": ["PERSON", "EMAIL_ADDRESS", "LOCATION"], |
|
"CCPA": ["PERSON", "EMAIL_ADDRESS", "IP_ADDRESS", "SSN", "CREDIT_CARD"] |
|
} |
|
|
|
SUPPORTED_FILE_TYPES = [".pdf", ".docx", ".txt", ".png", ".jpg", ".jpeg"] |
|
|
|
def extract_text(doc): |
|
if not hasattr(doc, "name"): |
|
return "ERROR: No file uploaded." |
|
try: |
|
fname = doc.name.lower() |
|
if fname.endswith(".pdf"): |
|
with pdfplumber.open(doc.name) as pdf: |
|
pages = [page.extract_text() or "" for page in pdf.pages] |
|
text = "\n".join(pages) |
|
elif fname.endswith(".docx"): |
|
document = Document(doc.name) |
|
text = "\n".join([p.text for p in document.paragraphs]) |
|
elif fname.endswith(".txt"): |
|
with open(doc.name, "r", encoding="utf-8") as f: |
|
text = f.read() |
|
elif fname.endswith((".png", ".jpg", ".jpeg")): |
|
img = Image.open(doc.name) |
|
text = pytesseract.image_to_string(img) |
|
else: |
|
return "ERROR: Unsupported file type." |
|
if not text.strip(): |
|
return "ERROR: Document contains no extractable text." |
|
return text |
|
except Exception as e: |
|
return f"ERROR: {e}" |
|
|
|
def detect_pii(text): |
|
try: |
|
entities = [ |
|
"PERSON", "EMAIL_ADDRESS", "PHONE_NUMBER", "MEDICAL_RECORD_NUMBER", |
|
"SSN", "CREDIT_CARD", "LOCATION", "IP_ADDRESS" |
|
] |
|
presidio_results = analyzer.analyze(text=text, entities=entities, language="en") |
|
findings = [ |
|
{ |
|
"entity": r.entity_type, |
|
"score": r.score, |
|
"start": r.start, |
|
"end": r.end, |
|
"text": text[r.start:r.end].strip() |
|
} |
|
for r in presidio_results |
|
] |
|
findings += find_ssns(text) |
|
findings += find_ip_addresses(text) |
|
return findings, presidio_results |
|
except Exception as e: |
|
return [{"entity": "ERROR", "text": str(e)}], [] |
|
|
|
def find_ip_addresses(text): |
|
pattern = r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b' |
|
return [ |
|
{ |
|
"entity": "IP_ADDRESS", |
|
"score": 1.0, |
|
"start": m.start(), |
|
"end": m.end(), |
|
"text": m.group() |
|
} |
|
for m in re.finditer(pattern, text) |
|
] |
|
|
|
def find_ssns(text): |
|
pattern = r'(?i)(ssn|social security number)[\s:]*([0-9]{3}-[0-9]{2}-[0-9]{4})' |
|
findings = [] |
|
for m in re.finditer(pattern, text): |
|
findings.append({ |
|
"entity": "SSN", |
|
"score": 1.0, |
|
"start": m.start(2), |
|
"end": m.end(2), |
|
"text": m.group(2) |
|
}) |
|
for m in re.finditer(r'\b[0-9]{3}-[0-9]{2}-[0-9]{4}\b', text): |
|
findings.append({ |
|
"entity": "SSN", |
|
"score": 0.95, |
|
"start": m.start(), |
|
"end": m.end(), |
|
"text": m.group() |
|
}) |
|
return findings |
|
|
|
def clean_person_entities(findings): |
|
cleaned = [] |
|
for f in findings: |
|
if f["entity"] == "PERSON": |
|
name = " ".join(f["text"].split()[:2]) |
|
if name.lower() not in ["date", "department"]: |
|
f = f.copy() |
|
f["text"] = name |
|
cleaned.append(f) |
|
else: |
|
cleaned.append(f) |
|
return cleaned |
|
|
|
def dedupe_findings(findings): |
|
seen = set() |
|
deduped = [] |
|
for f in findings: |
|
key = (f["entity"], f["text"], f["start"], f["end"]) |
|
if key not in seen: |
|
seen.add(key) |
|
deduped.append(f) |
|
return deduped |
|
|
|
def risk_score(findings): |
|
weights = { |
|
"PERSON": 1, "EMAIL_ADDRESS": 2, "CREDIT_CARD": 4, "SSN": 5, |
|
"IP_ADDRESS": 2, "PHONE_NUMBER": 2, "MEDICAL_RECORD_NUMBER": 3 |
|
} |
|
return sum(weights.get(f["entity"], 1) for f in findings) |
|
|
|
def suggest_fixes(findings): |
|
fixes = [] |
|
for f in findings: |
|
ent = f["entity"] |
|
if ent == "PERSON": |
|
fixes.append("Remove or mask full names.") |
|
if ent == "EMAIL_ADDRESS": |
|
fixes.append("Anonymize email addresses.") |
|
if ent == "CREDIT_CARD": |
|
fixes.append("Remove or mask credit card numbers.") |
|
if ent == "SSN": |
|
fixes.append("Remove or mask social security numbers.") |
|
if ent == "PHONE_NUMBER": |
|
fixes.append("Mask phone numbers.") |
|
if ent == "LOCATION": |
|
fixes.append("Remove or generalize location data.") |
|
if ent == "IP_ADDRESS": |
|
fixes.append("Remove or anonymize IP addresses.") |
|
if ent == "MEDICAL_RECORD_NUMBER": |
|
fixes.append("Anonymize medical record numbers.") |
|
return list(set(fixes)) |
|
|
|
def summarize_narrative(findings, regime): |
|
if not findings: |
|
return "No sensitive or regulated information was found in this document." |
|
entity_types = [f["entity"] for f in findings] |
|
summary_lines = [f"Under **{regime}**, the document contains:"] |
|
for entity in sorted(set(entity_types)): |
|
count = entity_types.count(entity) |
|
summary_lines.append(f"- **{entity.replace('_', ' ').title()}**: {count} instance(s)") |
|
summary_lines.append("These must be anonymized or removed to ensure compliance.") |
|
return "\n".join(summary_lines) |
|
|
|
def score_legend(): |
|
return ( |
|
"**Risk Score Legend:**\n" |
|
"- 0–3: Low risk (little or no PII detected)\n" |
|
"- 4–7: Moderate risk (some PII detected, take caution)\n" |
|
"- 8+: High risk (multiple/high-value PII found—document needs urgent attention)\n" |
|
"\n" |
|
"Score is calculated based on entity sensitivity. For example, SSN and credit cards are higher risk than names." |
|
) |
|
|
|
def redact_text(text, all_findings): |
|
all_findings = sorted(all_findings, key=lambda f: f["start"], reverse=True) |
|
redacted_text = text |
|
for f in all_findings: |
|
if not f["text"] or len(f["text"]) < 3: |
|
continue |
|
redacted_text = redacted_text[:f["start"]] + "[REDACTED]" + redacted_text[f["end"]:] |
|
return redacted_text |
|
|
|
def save_redacted_file(redacted_text): |
|
path = "/tmp/redacted_output.txt" |
|
with open(path, "w", encoding="utf-8") as f: |
|
f.write(redacted_text) |
|
return path |
|
|
|
def redact_image_with_presidio(image_path): |
|
img = Image.open(image_path) |
|
redacted_img = image_redactor.redact(img) |
|
out_path = "/tmp/redacted_image.png" |
|
redacted_img.save(out_path) |
|
return out_path |
|
|
|
def redact_pdf_with_presidio(pdf_path): |
|
doc = fitz.open(pdf_path) |
|
output_pdf = fitz.open() |
|
for page in doc: |
|
pix = page.get_pixmap() |
|
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) |
|
redacted_img = image_redactor.redact(img) |
|
img_byte_arr = io.BytesIO() |
|
redacted_img.save(img_byte_arr, format='PNG') |
|
img_byte_arr.seek(0) |
|
rect = fitz.Rect(0, 0, pix.width, pix.height) |
|
out_page = output_pdf.new_page(width=pix.width, height=pix.height) |
|
out_page.insert_image(rect, stream=img_byte_arr.getvalue()) |
|
out_path = "/tmp/redacted_output.pdf" |
|
output_pdf.save(out_path) |
|
output_pdf.close() |
|
return out_path |
|
|
|
def executive_summary_template(findings, score, regime): |
|
if not findings: |
|
return ( |
|
f"No sensitive information detected under {regime}. Document is considered low risk." |
|
) |
|
risk_level = ( |
|
"High Risk" if score >= 8 else |
|
"Moderate Risk" if score >= 4 else "Low Risk" |
|
) |
|
entity_counts = {} |
|
for f in findings: |
|
entity_counts[f["entity"]] = entity_counts.get(f["entity"], 0) + 1 |
|
|
|
summary_lines = [ |
|
f"This document falls under {regime} with a risk score of {score} ({risk_level})." |
|
] |
|
if entity_counts: |
|
summary_lines.append( |
|
"Sensitive information detected: " + |
|
", ".join([f"{k} ({v})" for k, v in entity_counts.items()]) + "." |
|
) |
|
summary_lines.append( |
|
"Recommendation: Anonymize or redact all sensitive entities to ensure compliance." |
|
) |
|
return " ".join(summary_lines) |
|
|
|
def agentic_compliance(doc, regime): |
|
text = extract_text(doc) |
|
if text.startswith("ERROR"): |
|
return text, None, None, None |
|
findings, presidio_results = detect_pii(text) |
|
findings = clean_person_entities(findings) |
|
findings = dedupe_findings(findings) |
|
|
|
entities_needed = COMPLIANCE_ENTITIES.get(regime, []) |
|
relevant = [f for f in findings if f["entity"] in entities_needed] |
|
score = risk_score(relevant) |
|
fixes = suggest_fixes(relevant) |
|
summary = summarize_narrative(relevant, regime) |
|
exec_summary = executive_summary_template(relevant, score, regime) |
|
|
|
findings_md = "\n".join([ |
|
f"- **{f['entity']}** (`{f['text']}`), score: {f.get('score', 0):.2f}" |
|
for f in relevant |
|
]) if relevant else "No relevant PII found for this regime." |
|
|
|
fixes_md = "\n".join([f"- {fix}" for fix in fixes]) if fixes else "No action needed." |
|
legend_md = score_legend() |
|
|
|
redacted = redact_text(text, findings) |
|
redacted_path = save_redacted_file(redacted) |
|
|
|
redacted_file_path = None |
|
redacted_image = None |
|
if hasattr(doc, "name"): |
|
fname = doc.name.lower() |
|
if fname.endswith((".png", ".jpg", ".jpeg")): |
|
redacted_file_path = redact_image_with_presidio(doc.name) |
|
redacted_image = redacted_file_path |
|
elif fname.endswith(".pdf"): |
|
redacted_file_path = redact_pdf_with_presidio(doc.name) |
|
redacted_image = None |
|
|
|
md = f"""### Compliance Regime: **{regime}** |
|
**Executive Summary:** |
|
{exec_summary} |
|
**Findings:** |
|
{findings_md} |
|
**Risk Score:** {score} |
|
**Actionable Recommendations:** |
|
{fixes_md} |
|
**Summary:** |
|
{summary} |
|
--- |
|
{legend_md} |
|
""" |
|
return md.strip(), redacted_path, redacted_file_path, redacted_image |
|
|
|
|
|
|
|
with gr.Blocks(title="Agentic Compliance MCP Server") as demo: |
|
gr.Markdown("# Agentic Compliance MCP\nUpload a document to check it for PII then select a compliance regime.") |
|
with gr.Tab("Compliance Agent"): |
|
doc = gr.File(label="Upload Document", file_types=SUPPORTED_FILE_TYPES) |
|
regime = gr.Dropdown(choices=list(COMPLIANCE_ENTITIES.keys()), label="Compliance Regime") |
|
out = gr.Markdown(label="Compliance Output") |
|
redacted_out = gr.File(label="Download Redacted Text") |
|
file_redacted_out = gr.File(label="Download Redacted PDF/Image") |
|
redacted_img = gr.Image(label="Redacted Image Preview") |
|
|
|
gr.Button("Run Compliance Agent").click( |
|
agentic_compliance, |
|
inputs=[doc, regime], |
|
outputs=[out, redacted_out, file_redacted_out, redacted_img] |
|
) |
|
|
|
demo.launch(mcp_server=True) |
|
|