#!/usr/bin/env python3
"""Enhanced web document annotation tool with modern UI."""
import hashlib
import json
import os
import uuid
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from random import sample, shuffle
import gradio as gr
from datasets import Dataset, load_dataset
from loguru import logger
# FDC (Free Decimal Correspondence) constants
SCIENCE_CODES = ["50", "51", "54", "57", "58", "59", "61"]
FDC_KEEP = ["61"] # Medicine
def prefix(dds_code: str) -> str:
"""Extract the first two digits from a DDS code."""
if not dds_code:
return ""
return dds_code[:2]
def doc_hash(url: str, text: str) -> str:
return hashlib.sha256(f"{url}{text}".encode()).hexdigest()
def filterfunc(x: dict) -> bool:
if len(x.get("text", "").split()) < 100:
return False
if x.get("eai_taxonomy", {}).get("free_decimal_correspondence", {}).get("primary", {}).get("code", "")[:2] != "61":
return False
excluded = {"Promotional/Advertisement", "Machine-Generated", "Images/Videos/Audio",
"Truncated", "Spam/Ads", "Product Page", "Content Listing"}
for version in ["document_type_v1", "document_type_v2"]:
for level in ["primary", "secondary"]:
if label := x.get("eai_taxonomy", {}).get(version, {}).get(level, {}).get("label"):
if label in excluded:
return False
return True
class DocLoader:
__slots__ = ("docs", "index", "processed", "_dataset")
def __init__(self, processed: set[str]):
self.processed = processed
self.index = 0
self.docs = []
self._dataset = {}
self._load()
def _load(self):
ds = load_dataset("sumuks/essential-web-v1.0-sample-100M", split="train")
logger.info(f"Loaded {len(ds)} documents")
ds = ds.filter(filterfunc)
logger.info(f"Filtered to {len(ds)} documents")
# Build dataset lookup and collect unprocessed docs
unprocessed = []
for idx, doc in enumerate(ds):
doc_key = doc.get("id", idx)
doc_with_key = dict(doc)
doc_with_key["_dataset_key"] = doc_key
self._dataset[doc_key] = doc_with_key
# Check if already processed
url = doc.get("metadata", {}).get("url", doc.get("url", ""))
h = doc_hash(url, doc.get("text", ""))
if h not in self.processed:
unprocessed.append(doc_with_key)
logger.info(f"Found {len(unprocessed)} unprocessed documents")
# Randomize the order for this session
shuffle(unprocessed)
self.docs = unprocessed
logger.info(f"Loaded {len(self.docs)} documents for this session")
def next(self) -> dict | None:
if self.index < len(self.docs):
doc = self.docs[self.index]
self.index += 1
return doc
return None
def get_by_id(self, doc_id: str | int) -> dict | None:
result = self._dataset.get(doc_id)
if result is None and isinstance(doc_id, str) and doc_id.isdigit():
result = self._dataset.get(int(doc_id))
elif result is None and isinstance(doc_id, int):
result = self._dataset.get(str(doc_id))
return result
@property
def remaining(self) -> int:
return max(0, len(self.docs) - self.index)
@dataclass(slots=True)
class AnnotationStore:
path: Path
session_id: str = field(default_factory=lambda: str(uuid.uuid4()))
buffer: list[dict] = field(default_factory=list)
threshold: int = 25
processed: set[str] = field(default_factory=set)
annotations: list[dict] = field(default_factory=list)
session_stats: dict = field(default_factory=lambda: {
"total": 0,
"selected": 0,
"discarded": 0,
"start_time": datetime.now(timezone.utc),
"decisions": []
})
def __post_init__(self):
self.path.parent.mkdir(parents=True, exist_ok=True)
if self.path.exists():
for line in self.path.read_text().splitlines():
if rec := self._parse_line(line):
self.processed.add(rec["hash"])
self.annotations.append(rec)
logger.info(f"Loaded {len(self.processed)} existing annotations")
def _parse_line(self, line: str) -> dict | None:
try:
return json.loads(line)
except:
return None
def add(self, doc_hash: str, decision: str, doc_id: str | int):
if doc_hash in self.processed:
logger.warning(f"Attempted to add already processed document: {doc_hash}")
return
rec = {
"hash": doc_hash,
"decision": decision,
"session": self.session_id,
"id": doc_id,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
self.path.open("a").write(json.dumps(rec) + "\n")
self.processed.add(doc_hash)
self.buffer.append(rec)
self.annotations.append(rec)
self.session_stats["total"] += 1
if decision == "selected":
self.session_stats["selected"] += 1
elif decision == "discarded":
self.session_stats["discarded"] += 1
self.session_stats["decisions"].append((datetime.now(timezone.utc), decision))
if len(self.buffer) >= self.threshold:
self.flush()
def flush(self):
if not self.buffer or not (token := os.getenv("HF_TOKEN")):
self.buffer.clear()
return
try:
Dataset.from_list(self.buffer).push_to_hub(
"yourbench/essential-web-annotations",
token=token
)
logger.info(f"Pushed {len(self.buffer)} annotations")
self.buffer.clear()
except Exception as e:
logger.error(f"Push failed: {e}")
def get_rate(self) -> float:
if not self.session_stats["decisions"]:
return 0.0
elapsed = (datetime.now(timezone.utc) - self.session_stats["start_time"]).total_seconds()
return (self.session_stats["total"] / elapsed * 3600) if elapsed > 0 else 0.0
def get_filtered(self, decision: str | None = None) -> list[dict]:
if decision is None or decision == "all":
return self.annotations
return [a for a in self.annotations if a.get("decision") == decision]
SESSION_LIMIT = 50
store = AnnotationStore(Path("data/annotations.jsonl"))
loader = DocLoader(store.processed)
current = loader.next()
# Viewer state
viewer_state = {
"annotations": [],
"index": 0,
"filter": "all"
}
def format_stats() -> str:
stats = store.session_stats
rate = store.get_rate()
return f"""
{stats['total']}
Total Annotated
{stats['selected']}
Selected
{stats['discarded']}
Discarded
{rate:.0f}/hr
Annotation Rate
{loader.remaining:,}
Remaining Docs
"""
def format_progress() -> tuple[str, float]:
session_completed = store.session_stats["total"]
session_total = SESSION_LIMIT
progress = (session_completed / session_total) if session_total > 0 else 0
percentage = progress * 100
return (
f"""
{percentage:.1f}% Complete
""",
progress
)
def format_document_info(doc: dict, annotation: dict | None = None) -> str:
if not doc:
return ""
meta = doc.get("metadata", {})
url = meta.get("url", doc.get("url", ""))
domain = url.split('/')[2] if url and '/' in url else "Unknown"
cat = doc.get("eai_taxonomy", {}).get("document_type_v2", {}).get("primary", {}).get("label", "Uncategorized")
word_count = len(doc.get("text", "").split())
annotation_info = ""
if annotation:
timestamp = datetime.fromisoformat(annotation["timestamp"].replace("Z", "+00:00"))
decision_color = "#667eea" if annotation["decision"] == "selected" else "#f5576c"
annotation_info = f"""
{"✅" if annotation["decision"] == "selected" else "❌"} {annotation["decision"].title()}
📅 {timestamp.strftime("%Y-%m-%d %H:%M:%S")}
"""
return f"""
{annotation_info}
📌 {domain}
🏷️ {cat}
📝 {word_count:,} words
{url}
"""
def choose(decision: str):
global current
if not current:
return done_state()
url = current.get("metadata", {}).get("url", current.get("url", ""))
h = doc_hash(url, current.get("text", ""))
doc_id = current.get("_dataset_key", current.get("id", ""))
store.add(h, decision, doc_id)
if store.session_stats["total"] >= SESSION_LIMIT:
return done_state()
current = loader.next()
if not current:
return done_state()
progress_html, progress_num = format_progress()
return (
format_document_info(current),
current.get("text", ""),
gr.update(interactive=True),
gr.update(interactive=True),
format_stats(),
progress_html,
progress_num
)
def done_state():
progress_html, progress_num = format_progress()
if store.session_stats["total"] >= SESSION_LIMIT:
message = "🎉 Session Complete!"
description = f"Great job! You've completed your session of {SESSION_LIMIT} documents."
else:
message = "🎉 All documents annotated!"
description = "Great job! You've completed all available documents."
return (
f"{message}
",
description,
gr.update(interactive=False),
gr.update(interactive=False),
format_stats(),
progress_html,
1.0
)
def update_viewer_filter(filter_value: str):
viewer_state["filter"] = filter_value
viewer_state["index"] = 0
viewer_state["annotations"] = store.get_filtered(filter_value)
logger.info(f"Filter: {filter_value}, Found {len(viewer_state['annotations'])} annotations")
return update_viewer_display()
def navigate_viewer(direction: int):
if not viewer_state["annotations"]:
return update_viewer_display()
viewer_state["index"] = (viewer_state["index"] + direction) % len(viewer_state["annotations"])
return update_viewer_display()
def update_viewer_display():
if not viewer_state["annotations"]:
return (
"No annotations to display
",
"",
f"0 / 0",
gr.update(interactive=False),
gr.update(interactive=False)
)
idx = viewer_state["index"]
annotation = viewer_state["annotations"][idx]
doc = loader.get_by_id(annotation["id"])
if not doc:
logger.warning(f"Document not found for ID: {annotation['id']} (type: {type(annotation['id'])})")
return (
"Document not found in dataset
",
f"Annotation details: {json.dumps(annotation, indent=2)}",
f"{idx + 1} / {len(viewer_state['annotations'])}",
gr.update(interactive=idx > 0),
gr.update(interactive=idx < len(viewer_state["annotations"]) - 1)
)
return (
format_document_info(doc, annotation),
doc.get("text", ""),
f"{idx + 1} / {len(viewer_state['annotations'])}",
gr.update(interactive=idx > 0),
gr.update(interactive=idx < len(viewer_state["annotations"]) - 1)
)
def build() -> gr.Blocks:
css = """
.stats-container {
display: flex;
gap: 15px;
margin: 10px 0;
flex-wrap: nowrap;
justify-content: space-between;
}
.stat-item {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
border-radius: 12px;
padding: 15px;
flex: 1;
min-width: 100px;
text-align: center;
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1);
transition: transform 0.2s;
}
.stat-item:hover {
transform: translateY(-2px);
}
.stat-value {
font-size: 24px;
font-weight: bold;
color: white;
margin-bottom: 3px;
}
.stat-label {
font-size: 12px;
color: rgba(255, 255, 255, 0.9);
}
.progress-container {
background: #f8f9fa;
border-radius: 12px;
padding: 15px;
margin: 10px 0;
}
.progress-header {
display: flex;
justify-content: space-between;
margin-bottom: 10px;
font-weight: 600;
}
.progress-bar-bg {
background: #e9ecef;
height: 20px;
border-radius: 10px;
overflow: hidden;
margin-bottom: 10px;
}
.progress-bar-fill {
background: linear-gradient(90deg, #667eea 0%, #764ba2 100%);
height: 100%;
transition: width 0.3s ease;
}
.progress-percentage {
text-align: center;
color: #6c757d;
font-size: 14px;
}
.doc-info {
background: #f8f9fa;
border-radius: 12px;
padding: 15px;
margin-bottom: 10px;
}
.doc-meta {
display: flex;
gap: 20px;
margin-bottom: 10px;
flex-wrap: wrap;
}
.doc-meta span {
font-size: 14px;
color: #495057;
}
.doc-url {
font-size: 14px;
color: #667eea;
text-decoration: none;
word-break: break-all;
}
.doc-url:hover {
text-decoration: underline;
}
.done-message {
font-size: 32px;
text-align: center;
padding: 40px;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
border-radius: 12px;
font-weight: bold;
}
.annotation-info {
display: flex;
justify-content: space-between;
margin-bottom: 10px;
padding-left: 10px;
}
.annotation-decision {
font-weight: 600;
}
.annotation-time {
color: #6c757d;
font-size: 12px;
}
.viewer-empty, .viewer-error {
text-align: center;
padding: 40px;
color: #6c757d;
font-size: 18px;
}
.viewer-nav {
display: flex;
justify-content: center;
align-items: center;
gap: 20px;
margin: 10px 0;
}
.viewer-counter {
font-weight: 600;
color: #495057;
}
#select {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
border: none;
font-size: 18px;
padding: 12px 24px;
}
#discard {
background: linear-gradient(135deg, #f093fb 0%, #f5576c 100%);
border: none;
font-size: 18px;
padding: 12px 24px;
}
.dark .stat-item {
background: linear-gradient(135deg, #434343 0%, #000000 100%);
}
.dark .progress-container, .dark .doc-info {
background: #1a1a1a;
}
.dark .progress-bar-bg {
background: #2a2a2a;
}
@keyframes pulse {
0% { transform: scale(1); }
50% { transform: scale(1.05); }
100% { transform: scale(1); }
}
"""
shortcut_js = """
"""
with gr.Blocks(
title="Essential Web Annotation",
theme=gr.themes.Default(),
css=css,
head=shortcut_js
) as demo:
gr.Markdown("# 🚀 Essential Web Annotation Tool")
with gr.Tabs():
with gr.Tab("Annotate"):
gr.Markdown("""
## 📋 Document Quality Assessment
Your task is to evaluate documents for **high-quality, valuable content** that provides generalizable information.
### ✅ **Select High-Quality Documents:**
Examples include:
- **Technical blogs** with detailed explanations
- **Scientific papers** and research articles
- **Information-rich discussions** with insights
- **Educational content** with actionable knowledge
- **Professional documentation** and guides
### ❌ **Discard Low-Quality Documents:**
- Content with minimal informational value
### 🎯 **Quick Assessment Tips:**
- High-quality documents are usually immediately recognizable to a human.
- Use the **Viewer** tab to browse examples of selected documents
- Trust your judgment on content value and depth
### ⌨️ **Keyboard Shortcuts:**
| Key | Action |
|-----|--------|
| **`1`** | ✅ Select document |
| **`2`** | ❌ Discard document |
""")
progress_html, progress_num = format_progress()
progress_display = gr.HTML(progress_html)
stats_display = gr.HTML(format_stats())
if current:
doc_info_html = format_document_info(current)
text_val = current.get("text", "")
else:
doc_info_html = "No documents loaded.
"
text_val = ""
doc_info = gr.HTML(doc_info_html)
with gr.Column(variant="panel"):
text_display = gr.Textbox(
text_val,
label="📄 Document Content",
lines=20,
interactive=False,
show_copy_button=True
)
with gr.Row():
btn_sel = gr.Button(
"✅ Select (1)",
elem_id="select",
variant="primary",
interactive=bool(current),
size="lg"
)
btn_dis = gr.Button(
"❌ Discard (2)",
elem_id="discard",
variant="stop",
interactive=bool(current),
size="lg"
)
progress_bar = gr.Number(value=progress_num, visible=False)
outputs = [doc_info, text_display, btn_sel, btn_dis, stats_display, progress_display, progress_bar]
btn_sel.click(lambda: choose("selected"), outputs=outputs)
btn_dis.click(lambda: choose("discarded"), outputs=outputs)
with gr.Tab("Viewer"):
gr.Markdown("### 📚 Browse Annotated Documents")
with gr.Row():
filter_dropdown = gr.Radio(
choices=["all", "selected", "discarded"],
value="all",
label="Filter",
interactive=True
)
viewer_info = gr.HTML()
with gr.Column(variant="panel"):
viewer_text = gr.Textbox(
label="📄 Document Content",
lines=20,
interactive=False,
show_copy_button=True
)
with gr.Row():
prev_btn = gr.Button("← Previous", size="lg")
viewer_counter = gr.HTML("0 / 0
")
next_btn = gr.Button("Next →", size="lg")
filter_dropdown.change(
update_viewer_filter,
inputs=[filter_dropdown],
outputs=[viewer_info, viewer_text, viewer_counter, prev_btn, next_btn]
)
prev_btn.click(
lambda: navigate_viewer(-1),
outputs=[viewer_info, viewer_text, viewer_counter, prev_btn, next_btn]
)
next_btn.click(
lambda: navigate_viewer(1),
outputs=[viewer_info, viewer_text, viewer_counter, prev_btn, next_btn]
)
demo.load(
lambda: update_viewer_filter("all"),
outputs=[viewer_info, viewer_text, viewer_counter, prev_btn, next_btn]
)
gr.HTML("""
""")
return demo
if __name__ == "__main__":
build().launch()