#!/usr/bin/env python3 """Enhanced web document annotation tool with modern UI.""" import hashlib import json import os import uuid from collections import defaultdict from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path from random import sample, shuffle import gradio as gr from datasets import Dataset, load_dataset from loguru import logger # FDC (Free Decimal Correspondence) constants SCIENCE_CODES = ["50", "51", "54", "57", "58", "59", "61"] FDC_KEEP = ["61"] # Medicine def prefix(dds_code: str) -> str: """Extract the first two digits from a DDS code.""" if not dds_code: return "" return dds_code[:2] def doc_hash(url: str, text: str) -> str: return hashlib.sha256(f"{url}{text}".encode()).hexdigest() def filterfunc(x: dict) -> bool: if len(x.get("text", "").split()) < 100: return False if x.get("eai_taxonomy", {}).get("free_decimal_correspondence", {}).get("primary", {}).get("code", "")[:2] != "61": return False excluded = {"Promotional/Advertisement", "Machine-Generated", "Images/Videos/Audio", "Truncated", "Spam/Ads", "Product Page", "Content Listing"} for version in ["document_type_v1", "document_type_v2"]: for level in ["primary", "secondary"]: if label := x.get("eai_taxonomy", {}).get(version, {}).get(level, {}).get("label"): if label in excluded: return False return True class DocLoader: __slots__ = ("docs", "index", "processed", "_dataset") def __init__(self, processed: set[str]): self.processed = processed self.index = 0 self.docs = [] self._dataset = {} self._load() def _load(self): ds = load_dataset("sumuks/essential-web-v1.0-sample-100M", split="train") logger.info(f"Loaded {len(ds)} documents") ds = ds.filter(filterfunc) logger.info(f"Filtered to {len(ds)} documents") # Build dataset lookup and collect unprocessed docs unprocessed = [] for idx, doc in enumerate(ds): doc_key = doc.get("id", idx) doc_with_key = dict(doc) doc_with_key["_dataset_key"] = doc_key self._dataset[doc_key] = doc_with_key # Check if already processed url = doc.get("metadata", {}).get("url", doc.get("url", "")) h = doc_hash(url, doc.get("text", "")) if h not in self.processed: unprocessed.append(doc_with_key) logger.info(f"Found {len(unprocessed)} unprocessed documents") # Randomize the order for this session shuffle(unprocessed) self.docs = unprocessed logger.info(f"Loaded {len(self.docs)} documents for this session") def next(self) -> dict | None: if self.index < len(self.docs): doc = self.docs[self.index] self.index += 1 return doc return None def get_by_id(self, doc_id: str | int) -> dict | None: result = self._dataset.get(doc_id) if result is None and isinstance(doc_id, str) and doc_id.isdigit(): result = self._dataset.get(int(doc_id)) elif result is None and isinstance(doc_id, int): result = self._dataset.get(str(doc_id)) return result @property def remaining(self) -> int: return max(0, len(self.docs) - self.index) @dataclass(slots=True) class AnnotationStore: path: Path session_id: str = field(default_factory=lambda: str(uuid.uuid4())) buffer: list[dict] = field(default_factory=list) threshold: int = 25 processed: set[str] = field(default_factory=set) annotations: list[dict] = field(default_factory=list) session_stats: dict = field(default_factory=lambda: { "total": 0, "selected": 0, "discarded": 0, "start_time": datetime.now(timezone.utc), "decisions": [] }) def __post_init__(self): self.path.parent.mkdir(parents=True, exist_ok=True) if self.path.exists(): for line in self.path.read_text().splitlines(): if rec := self._parse_line(line): self.processed.add(rec["hash"]) self.annotations.append(rec) logger.info(f"Loaded {len(self.processed)} existing annotations") def _parse_line(self, line: str) -> dict | None: try: return json.loads(line) except: return None def add(self, doc_hash: str, decision: str, doc_id: str | int): if doc_hash in self.processed: logger.warning(f"Attempted to add already processed document: {doc_hash}") return rec = { "hash": doc_hash, "decision": decision, "session": self.session_id, "id": doc_id, "timestamp": datetime.now(timezone.utc).isoformat(), } self.path.open("a").write(json.dumps(rec) + "\n") self.processed.add(doc_hash) self.buffer.append(rec) self.annotations.append(rec) self.session_stats["total"] += 1 if decision == "selected": self.session_stats["selected"] += 1 elif decision == "discarded": self.session_stats["discarded"] += 1 self.session_stats["decisions"].append((datetime.now(timezone.utc), decision)) if len(self.buffer) >= self.threshold: self.flush() def flush(self): if not self.buffer or not (token := os.getenv("HF_TOKEN")): self.buffer.clear() return try: Dataset.from_list(self.buffer).push_to_hub( "yourbench/essential-web-annotations", token=token ) logger.info(f"Pushed {len(self.buffer)} annotations") self.buffer.clear() except Exception as e: logger.error(f"Push failed: {e}") def get_rate(self) -> float: if not self.session_stats["decisions"]: return 0.0 elapsed = (datetime.now(timezone.utc) - self.session_stats["start_time"]).total_seconds() return (self.session_stats["total"] / elapsed * 3600) if elapsed > 0 else 0.0 def get_filtered(self, decision: str | None = None) -> list[dict]: if decision is None or decision == "all": return self.annotations return [a for a in self.annotations if a.get("decision") == decision] SESSION_LIMIT = 50 store = AnnotationStore(Path("data/annotations.jsonl")) loader = DocLoader(store.processed) current = loader.next() # Viewer state viewer_state = { "annotations": [], "index": 0, "filter": "all" } def format_stats() -> str: stats = store.session_stats rate = store.get_rate() return f"""
{stats['total']}
Total Annotated
{stats['selected']}
Selected
{stats['discarded']}
Discarded
{rate:.0f}/hr
Annotation Rate
{loader.remaining:,}
Remaining Docs
""" def format_progress() -> tuple[str, float]: session_completed = store.session_stats["total"] session_total = SESSION_LIMIT progress = (session_completed / session_total) if session_total > 0 else 0 percentage = progress * 100 return ( f"""
Session Progress {session_completed:,} / {session_total:,}
{percentage:.1f}% Complete
""", progress ) def format_document_info(doc: dict, annotation: dict | None = None) -> str: if not doc: return "" meta = doc.get("metadata", {}) url = meta.get("url", doc.get("url", "")) domain = url.split('/')[2] if url and '/' in url else "Unknown" cat = doc.get("eai_taxonomy", {}).get("document_type_v2", {}).get("primary", {}).get("label", "Uncategorized") word_count = len(doc.get("text", "").split()) annotation_info = "" if annotation: timestamp = datetime.fromisoformat(annotation["timestamp"].replace("Z", "+00:00")) decision_color = "#667eea" if annotation["decision"] == "selected" else "#f5576c" annotation_info = f"""
{"✅" if annotation["decision"] == "selected" else "❌"} {annotation["decision"].title()} 📅 {timestamp.strftime("%Y-%m-%d %H:%M:%S")}
""" return f"""
{annotation_info}
📌 {domain} 🏷️ {cat} 📝 {word_count:,} words
{url}
""" def choose(decision: str): global current if not current: return done_state() url = current.get("metadata", {}).get("url", current.get("url", "")) h = doc_hash(url, current.get("text", "")) doc_id = current.get("_dataset_key", current.get("id", "")) store.add(h, decision, doc_id) if store.session_stats["total"] >= SESSION_LIMIT: return done_state() current = loader.next() if not current: return done_state() progress_html, progress_num = format_progress() return ( format_document_info(current), current.get("text", ""), gr.update(interactive=True), gr.update(interactive=True), format_stats(), progress_html, progress_num ) def done_state(): progress_html, progress_num = format_progress() if store.session_stats["total"] >= SESSION_LIMIT: message = "🎉 Session Complete!" description = f"Great job! You've completed your session of {SESSION_LIMIT} documents." else: message = "🎉 All documents annotated!" description = "Great job! You've completed all available documents." return ( f"
{message}
", description, gr.update(interactive=False), gr.update(interactive=False), format_stats(), progress_html, 1.0 ) def update_viewer_filter(filter_value: str): viewer_state["filter"] = filter_value viewer_state["index"] = 0 viewer_state["annotations"] = store.get_filtered(filter_value) logger.info(f"Filter: {filter_value}, Found {len(viewer_state['annotations'])} annotations") return update_viewer_display() def navigate_viewer(direction: int): if not viewer_state["annotations"]: return update_viewer_display() viewer_state["index"] = (viewer_state["index"] + direction) % len(viewer_state["annotations"]) return update_viewer_display() def update_viewer_display(): if not viewer_state["annotations"]: return ( "
No annotations to display
", "", f"0 / 0", gr.update(interactive=False), gr.update(interactive=False) ) idx = viewer_state["index"] annotation = viewer_state["annotations"][idx] doc = loader.get_by_id(annotation["id"]) if not doc: logger.warning(f"Document not found for ID: {annotation['id']} (type: {type(annotation['id'])})") return ( "
Document not found in dataset
", f"Annotation details: {json.dumps(annotation, indent=2)}", f"{idx + 1} / {len(viewer_state['annotations'])}", gr.update(interactive=idx > 0), gr.update(interactive=idx < len(viewer_state["annotations"]) - 1) ) return ( format_document_info(doc, annotation), doc.get("text", ""), f"{idx + 1} / {len(viewer_state['annotations'])}", gr.update(interactive=idx > 0), gr.update(interactive=idx < len(viewer_state["annotations"]) - 1) ) def build() -> gr.Blocks: css = """ .stats-container { display: flex; gap: 15px; margin: 10px 0; flex-wrap: nowrap; justify-content: space-between; } .stat-item { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); border-radius: 12px; padding: 15px; flex: 1; min-width: 100px; text-align: center; box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1); transition: transform 0.2s; } .stat-item:hover { transform: translateY(-2px); } .stat-value { font-size: 24px; font-weight: bold; color: white; margin-bottom: 3px; } .stat-label { font-size: 12px; color: rgba(255, 255, 255, 0.9); } .progress-container { background: #f8f9fa; border-radius: 12px; padding: 15px; margin: 10px 0; } .progress-header { display: flex; justify-content: space-between; margin-bottom: 10px; font-weight: 600; } .progress-bar-bg { background: #e9ecef; height: 20px; border-radius: 10px; overflow: hidden; margin-bottom: 10px; } .progress-bar-fill { background: linear-gradient(90deg, #667eea 0%, #764ba2 100%); height: 100%; transition: width 0.3s ease; } .progress-percentage { text-align: center; color: #6c757d; font-size: 14px; } .doc-info { background: #f8f9fa; border-radius: 12px; padding: 15px; margin-bottom: 10px; } .doc-meta { display: flex; gap: 20px; margin-bottom: 10px; flex-wrap: wrap; } .doc-meta span { font-size: 14px; color: #495057; } .doc-url { font-size: 14px; color: #667eea; text-decoration: none; word-break: break-all; } .doc-url:hover { text-decoration: underline; } .done-message { font-size: 32px; text-align: center; padding: 40px; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; border-radius: 12px; font-weight: bold; } .annotation-info { display: flex; justify-content: space-between; margin-bottom: 10px; padding-left: 10px; } .annotation-decision { font-weight: 600; } .annotation-time { color: #6c757d; font-size: 12px; } .viewer-empty, .viewer-error { text-align: center; padding: 40px; color: #6c757d; font-size: 18px; } .viewer-nav { display: flex; justify-content: center; align-items: center; gap: 20px; margin: 10px 0; } .viewer-counter { font-weight: 600; color: #495057; } #select { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); border: none; font-size: 18px; padding: 12px 24px; } #discard { background: linear-gradient(135deg, #f093fb 0%, #f5576c 100%); border: none; font-size: 18px; padding: 12px 24px; } .dark .stat-item { background: linear-gradient(135deg, #434343 0%, #000000 100%); } .dark .progress-container, .dark .doc-info { background: #1a1a1a; } .dark .progress-bar-bg { background: #2a2a2a; } @keyframes pulse { 0% { transform: scale(1); } 50% { transform: scale(1.05); } 100% { transform: scale(1); } } """ shortcut_js = """ """ with gr.Blocks( title="Essential Web Annotation", theme=gr.themes.Default(), css=css, head=shortcut_js ) as demo: gr.Markdown("# 🚀 Essential Web Annotation Tool") with gr.Tabs(): with gr.Tab("Annotate"): gr.Markdown(""" ## 📋 Document Quality Assessment Your task is to evaluate documents for **high-quality, valuable content** that provides generalizable information. ### ✅ **Select High-Quality Documents:** Examples include: - **Technical blogs** with detailed explanations - **Scientific papers** and research articles - **Information-rich discussions** with insights - **Educational content** with actionable knowledge - **Professional documentation** and guides ### ❌ **Discard Low-Quality Documents:** - Content with minimal informational value ### 🎯 **Quick Assessment Tips:** - High-quality documents are usually immediately recognizable to a human. - Use the **Viewer** tab to browse examples of selected documents - Trust your judgment on content value and depth ### ⌨️ **Keyboard Shortcuts:** | Key | Action | |-----|--------| | **`1`** | ✅ Select document | | **`2`** | ❌ Discard document | """) progress_html, progress_num = format_progress() progress_display = gr.HTML(progress_html) stats_display = gr.HTML(format_stats()) if current: doc_info_html = format_document_info(current) text_val = current.get("text", "") else: doc_info_html = "
No documents loaded.
" text_val = "" doc_info = gr.HTML(doc_info_html) with gr.Column(variant="panel"): text_display = gr.Textbox( text_val, label="📄 Document Content", lines=20, interactive=False, show_copy_button=True ) with gr.Row(): btn_sel = gr.Button( "✅ Select (1)", elem_id="select", variant="primary", interactive=bool(current), size="lg" ) btn_dis = gr.Button( "❌ Discard (2)", elem_id="discard", variant="stop", interactive=bool(current), size="lg" ) progress_bar = gr.Number(value=progress_num, visible=False) outputs = [doc_info, text_display, btn_sel, btn_dis, stats_display, progress_display, progress_bar] btn_sel.click(lambda: choose("selected"), outputs=outputs) btn_dis.click(lambda: choose("discarded"), outputs=outputs) with gr.Tab("Viewer"): gr.Markdown("### 📚 Browse Annotated Documents") with gr.Row(): filter_dropdown = gr.Radio( choices=["all", "selected", "discarded"], value="all", label="Filter", interactive=True ) viewer_info = gr.HTML() with gr.Column(variant="panel"): viewer_text = gr.Textbox( label="📄 Document Content", lines=20, interactive=False, show_copy_button=True ) with gr.Row(): prev_btn = gr.Button("← Previous", size="lg") viewer_counter = gr.HTML("
0 / 0
") next_btn = gr.Button("Next →", size="lg") filter_dropdown.change( update_viewer_filter, inputs=[filter_dropdown], outputs=[viewer_info, viewer_text, viewer_counter, prev_btn, next_btn] ) prev_btn.click( lambda: navigate_viewer(-1), outputs=[viewer_info, viewer_text, viewer_counter, prev_btn, next_btn] ) next_btn.click( lambda: navigate_viewer(1), outputs=[viewer_info, viewer_text, viewer_counter, prev_btn, next_btn] ) demo.load( lambda: update_viewer_filter("all"), outputs=[viewer_info, viewer_text, viewer_counter, prev_btn, next_btn] ) gr.HTML(""" """) return demo if __name__ == "__main__": build().launch()