import gradio as gr import pandas as pd import matplotlib.pyplot as plt import re from io import BytesIO from PIL import Image from transformers import pipeline import time from sklearn.linear_model import LinearRegression import numpy as np import os # === Vector Memory Enhancements === from sentence_transformers import SentenceTransformer embedding_model = SentenceTransformer("all-MiniLM-L6-v2") prompt_history = [] # Stores {"company": str, "question": str, "embedding": np.array} last_uploaded_company = None def get_company_name(df, file=None): try: # Try name column first possible_names = df["name"].dropna().unique() if len(possible_names) > 0: return possible_names[0] except Exception: pass # Try fallback to unique CIK try: if "cik" in df.columns: cik = str(df["cik"].unique()[0]) cik_map = { "320193": "Apple", "1018724": "Amazon", "1652044": "Google" } return cik_map.get(cik.zfill(7), f"CIK_{cik}") except Exception: pass # Fallback to filename if file: return os.path.splitext(os.path.basename(file.name))[0] return "Unknown" # Load TAPAS pipeline table_qa = pipeline("table-question-answering", model="google/tapas-large-finetuned-wtq") global_df = None keywords = ["Revenue", "Sales", "NetIncome", "NetIncomeLoss", "Cash", "Assets", "Liabilities"] def generate_chart(df): try: df["ddate"] = pd.to_datetime(df["ddate"], format="%Y%m%d", errors="coerce") mask = df["tag"].astype(str).str.contains('|'.join(keywords), case=False, na=False) | df["tlabel"].astype(str).str.contains('|'.join(keywords), case=False, na=False) core = df[mask].copy() core = core[core["value"].notnull()] core["metric"] = core["tlabel"].fillna(core["tag"]).astype(str).str.extract( r"(Revenue|Sales|Net Income|Cash.*?|Assets|Liabilities)", expand=False, flags=re.IGNORECASE ).str.strip().fillna("Other") summary = core.pivot_table(index="ddate", columns="metric", values="value", aggfunc="sum").sort_index() plt.figure(figsize=(12, 6)) summary.plot(marker='o') plt.title("Financial Metrics Over Time") plt.ylabel("USD ($)") plt.grid(True) plt.tight_layout() buf = BytesIO() plt.savefig(buf, format="png") plt.close() buf.seek(0) return Image.open(buf) except Exception as e: print("Chart generation error:", e) return None def plot_assets_by_quarter_last_two_years(): try: df = global_df.copy() df["ddate"] = pd.to_datetime(df["ddate"], format="%Y%m%d", errors="coerce") df = df[df["ddate"].notnull()] latest_date = df["ddate"].max() cutoff_date = latest_date - pd.DateOffset(years=2) recent_df = df[df["ddate"] >= cutoff_date] asset_mask = recent_df["tlabel"].astype(str).str.contains("asset", case=False, na=False) asset_data = recent_df[asset_mask].copy() asset_data = asset_data[asset_data["value"].notnull()] asset_data["quarter"] = asset_data["ddate"].dt.to_period("Q") summary = asset_data.groupby("quarter")["value"].sum().sort_index() summary.index = summary.index.astype(str).str.replace("Q", " Q") plt.figure(figsize=(10, 5)) summary.plot(kind="bar") plt.title("Total Assets by Quarter (Last 2 Years)") plt.ylabel("USD ($)") plt.xlabel("Quarter") plt.grid(True) plt.tight_layout() buf = BytesIO() plt.savefig(buf, format="png") plt.close() buf.seek(0) return Image.open(buf) except Exception as e: print("Custom assets plot error:", e) return None def upload_csv(file, history): global global_df, uploaded_filename, last_uploaded_company try: uploaded_filename = os.path.splitext(os.path.basename(file.name))[0] if "apple" in uploaded_filename: last_uploaded_company = "apple" elif "amazon" in uploaded_filename: last_uploaded_company = "amazon" else: last_uploaded_company = uploaded_filename df = pd.read_csv(file.name) # Validate necessary columns required_columns = {"ddate", "value", "tlabel", "tag"} missing = required_columns - set(df.columns) if missing: history.append((None, f"Missing required columns: {', '.join(missing)}")) return history df["value"] = ( df["value"] .astype(str) .str.replace(r"[^\d.\-]", "", regex=True) .replace("", pd.NA) ) df = df.dropna(subset=["value"]) df["value"] = df["value"].astype(float) df["ddate"] = pd.to_datetime(df["ddate"], format="%Y%m%d", errors="coerce") global_df = df history.append((None, f"File uploaded successfully for {uploaded_filename}. Ask me anything about it!")) return history except Exception as e: history.append((None, f"Failed to load CSV: {str(e)}")) return history def handle_custom_question(question): global global_df if global_df is None: return "No data loaded yet.", None df = global_df.copy() df["ddate"] = pd.to_datetime(df["ddate"], errors='coerce') df = df.dropna(subset=["ddate"]) df["quarter"] = df["ddate"].dt.to_period("Q") df["year"] = df["ddate"].dt.year q = question.lower() def to_img(fig): buf = BytesIO() plt.savefig(buf, format="png") plt.close() buf.seek(0) return Image.open(buf) if "average revenue" in q and "2025" in q: avg = df[(df["year"] == 2025) & df["tlabel"].astype(str).str.contains("revenue", case=False, na=False)]["value"].mean() return f"The average revenue for 2025 so far is ${avg:,.2f}.", None if "net income change" in q and "2020" in q and "2025" in q: mask = df["tlabel"].astype(str).str.contains("net income", case=False, na=False) net_income_by_year = df[mask].groupby("year")["value"].sum() change = net_income_by_year.get(2025, 0) - net_income_by_year.get(2020, 0) return f"Net income changed by ${change:,.2f} from 2020 to 2025.", None if "least amount of liabilities" in q: mask = df["tlabel"].astype(str).str.contains("liabilit", case=False, na=False) liabilities_by_year = df[mask].groupby("year")["value"].sum() if liabilities_by_year.empty: return "No liability data found in the dataset.", None lowest_year = liabilities_by_year.idxmin() lowest_value = liabilities_by_year.min() return f"The year with the least amount of liabilities was {lowest_year} with ${lowest_value:,.2f}.", None if "asset" in q and ("graph" in q or "plot" in q or "quarter" in q): image = plot_assets_by_quarter_last_two_years() return "Here is the total assets by quarter for the last 2 years.", image # === Free Cash Flow trend (last 8 quarters) === if "free cash flow" in question.lower() or "fcf" in question.lower(): fcf = df[df["tlabel"].astype(str).str.contains("free cash flow", case=False, na=False)] if fcf.empty: op = df[df["tlabel"].str.contains("net cash.*operating", case=False, na=False)] capex = df[df["tlabel"].str.contains("acquire.*property|capital expenditure", case=False, na=False)] op_group = op.groupby(df["ddate"].dt.to_period("Q"))["value"].sum() capex_group = capex.groupby(df["ddate"].dt.to_period("Q"))["value"].sum() fcf = (op_group - capex_group).dropna() else: fcf = fcf.groupby(df["ddate"].dt.to_period("Q"))["value"].sum() fcf = fcf.sort_index().iloc[-8:] # last 8 quarters fig, ax = plt.subplots() fcf.plot(kind="line", marker='o', ax=ax) ax.set_title("Free Cash Flow Trend (Last 8 Quarters)") ax.set_ylabel("USD ($)") ax.grid(True) buf = BytesIO() plt.savefig(buf, format="png") plt.close() buf.seek(0) return "Free cash flow trend over the past 8 quarters", Image.open(buf) if "r&d" in q or "research and development" in q: # Look for both label AND tag variants (Amazon often uses Technology and Content) rnd_mask = ( df["tlabel"].astype(str).str.contains("research and development|r&d|technology and content", case=False, na=False) | df["tag"].astype(str).str.contains("Research.*Development|TechnologyAndContent", case=False, na=False) ) r_and_d = df[rnd_mask] if r_and_d.empty: return "No R&D-like lines found in this dataset (looked for Research and Development / R&D / Technology and Content).", None summary = r_and_d.groupby("quarter")["value"].sum().sort_index() if summary.empty or summary.dropna().empty: return "R&D series is empty after grouping—nothing to plot for this file.", None fig, ax = plt.subplots() summary.plot(kind="bar", ax=ax) ax.set_title("R&D (or Technology & Content) Spending per Quarter") ax.set_ylabel("USD ($)") return "R&D (or Technology & Content) spending trend", to_img(fig) if "share buyback" in q or "repurchase" in q: mask = df["tlabel"].astype(str).str.contains("repurchase of common stock", case=False, na=False) summary = df[mask].groupby("year")["value"].sum() response = "\n".join([f"{yr}: ${val:,.2f}" for yr, val in summary.items()]) return f"Apple's share buyback per year:\n{response}", None if "gross margin" in q: rev = df[df["tlabel"].astype(str).str.contains("revenue", case=False, na=False)] cogs = df[df["tlabel"].astype(str).str.contains("cost of goods", case=False, na=False)] rev_group = rev.groupby("quarter")["value"].sum() cogs_group = cogs.groupby("quarter")["value"].sum() gross_margin = ((rev_group - cogs_group) / rev_group * 100).dropna() fig, ax = plt.subplots() gross_margin.plot(kind="bar", ax=ax) ax.set_title("Gross Margin % by Quarter") ax.set_ylabel("Percentage (%)") return "Gross margin percentage by quarter", to_img(fig) if "forecast" in q and "revenue" in q: rev = df[df["tlabel"].astype(str).str.contains("revenue", case=False, na=False)] rev_group = rev.groupby("quarter")["value"].sum().sort_index() if rev_group.shape[0] < 4: return "Not enough data to forecast revenue.", None X = np.arange(len(rev_group)).reshape(-1, 1) y = rev_group.values model = LinearRegression().fit(X, y) X_future = np.arange(len(rev_group), len(rev_group) + 2).reshape(-1, 1) y_future = model.predict(X_future) forecast_index = pd.period_range(start=rev_group.index[-1] + 1, periods=2, freq="Q") forecast_series = pd.Series(y_future, index=forecast_index) full_series = pd.concat([rev_group, forecast_series]) fig, ax = plt.subplots() full_series.plot(marker='o', ax=ax) ax.axvline(x=rev_group.index[-1], color='gray', linestyle='--') ax.set_title("Actual & Forecasted Revenue (Next 2 Quarters)") ax.set_ylabel("USD ($)") return "Here is the forecasted revenue for the next 2 quarters.", to_img(fig) if "financial health" in q: latest_year = df["year"].max() # Filter relevant rows revenue = df[df["tlabel"].astype(str).str.contains("revenue", case=False, na=False)] net_income = df[df["tag"].astype(str).isin(["NetIncomeLoss", "NetIncome"])] assets = df[df["tag"].astype(str).str.contains("Assets", case=False, na=False)] equity = df[df["tag"].astype(str).str.contains("Equity", case=False, na=False) & ~df["tag"].astype(str).str.contains("Liabilities", case=False, na=False)] # Get latest values rev_latest = revenue[revenue["year"] == latest_year]["value"].sum() net_income_latest = net_income[net_income["year"] == latest_year]["value"].sum() assets_latest = assets[assets["year"] == latest_year]["value"].sum() equity_latest = equity[equity["year"] == latest_year]["value"].sum() # Calculate metrics profit_margin = (net_income_latest / rev_latest * 100) if rev_latest > 0 else 0 debt_to_equity = ((assets_latest - equity_latest) / equity_latest) if equity_latest > 0 else 0 roa = (net_income_latest / assets_latest * 100) if assets_latest > 0 else 0 # Classification if profit_margin > 15 and debt_to_equity < 1 and roa > 5: health = "Healthy" elif profit_margin > 5 and debt_to_equity < 2: health = "Watchlist" else: health = "Risky" company_label = (last_uploaded_company or "This company").title() return ( f"For {latest_year}, {company_label} is classified as **{health}** based on:\n" f"- Profit Margin: {profit_margin:.2f}%\n" f"- Debt-to-Equity: {debt_to_equity:.2f}\n" f"- Return on Assets (ROA): {roa:.2f}%", None ) return "No custom handler available for this question.", None def ask_question(history, question): global global_df, last_uploaded_company, prompt_history if global_df is None: return history + [(question, "Please upload a CSV file first.")], gr.update() # === Check for reuse pattern (this must come BEFORE appending current question to history) === if "similar analysis" in question.lower() and (last_uploaded_company or "").lower() != "apple": similar_prompts = [item for item in prompt_history if item["company"].lower() == "apple"] if not similar_prompts: return history + [(question, "No Apple prompts stored to reference.")], gr.update(value=None) query_embedding = embedding_model.encode(question) scores = [np.dot(query_embedding, item["embedding"]) for item in similar_prompts] top_idxs = np.argsort(scores)[::-1][:3] def rewrite_company(text: str) -> str: company = (last_uploaded_company or "this company").title() return text.replace("Apple's", f"{company}'s").replace("Apple", company) df = global_df def dataset_supports(prompt: str) -> bool: q = prompt.lower() if "research and development" in q or "r&d" in q: has_rnd = ( df["tlabel"].astype(str).str.contains("research and development|r&d|technology and content", case=False, na=False).any() or df["tag"].astype(str).str.contains("Research.*Development|TechnologyAndContent", case=False, na=False).any() ) return bool(has_rnd) return True image_to_show = None # <— NEW for i in top_idxs: past_q = similar_prompts[i]["question"] if not dataset_supports(past_q): history.append((f"Skipped: {past_q}", f"This file doesn’t have the needed rows to run that analysis on {last_uploaded_company.title()}.")) continue try: resp, img = handle_custom_question(rewrite_company(past_q)) history.append((f"Reused: {rewrite_company(past_q)}", resp)) if img is not None: image_to_show = img # <— capture latest non-None image except Exception as e: history.append((f"Skipped (error): {past_q}", f"Couldn’t run this on {last_uploaded_company.title()}: {e}")) continue # Return the captured image if any; otherwise clear the image pane if image_to_show is not None: return history, image_to_show else: return history, gr.update(value=None) # === Handle current question === custom_response, custom_image = handle_custom_question(question) if custom_response is not None: if embedding_model: embedding = embedding_model.encode(question) prompt_history.append({ "company": last_uploaded_company or "Unknown", "question": question, "embedding": embedding }) time.sleep(2) print("Current prompt history:", prompt_history) return history + [(question, custom_response)], custom_image if custom_image else gr.update() # === Fallback to generic plot if question mentions plot/graph/chart === if "plot" in question.lower() or "graph" in question.lower() or "chart" in question.lower(): image = generate_chart(global_df) time.sleep(2) response = "Here is the generated chart based on your data." return history + [(question, response)], image # === Try table-qa as final fallback === try: lower_q = question.lower() matched_keywords = [k.lower() for k in keywords if k.lower() in lower_q] if matched_keywords: mask = global_df["tag"].astype(str).str.contains('|'.join(keywords), case=False, na=False) | \ global_df["tlabel"].astype(str).str.contains('|'.join(keywords), case=False, na=False) df_filtered = global_df[mask].head(50).copy() else: df_filtered = global_df.head(50).copy() for col in df_filtered.columns: if df_filtered[col].dtype == "object": df_filtered[col] = df_filtered[col].astype(str) string_table = df_filtered.to_dict(orient="records") result = table_qa(table=string_table, query=question) answer = result["answer"] if embedding_model: embedding = embedding_model.encode(question) prompt_history.append({ "company": last_uploaded_company or "Unknown", "question": question, "embedding": embedding }) time.sleep(2) print("Current prompt history:", prompt_history) return history + [(question, answer)], gr.update() except Exception as e: time.sleep(2) return history + [(question, f"Error answering: {str(e)}")], gr.update() with gr.Blocks() as demo: gr.Markdown("Financial Document Chatbot with TAPAS + Plotting") with gr.Row(): file_input = gr.File(label="Upload CSV") upload_btn = gr.Button("Upload") chatbot = gr.Chatbot(label="Chatbot Conversation", value=[(None, "Hi! How can I help you today?")]) question_input = gr.Textbox(label="Ask a question") ask_btn = gr.Button("Ask") chart_output = gr.Image(label="Generated Chart", type="pil") upload_btn.click(upload_csv, inputs=[file_input, chatbot], outputs=chatbot) ask_btn.click(ask_question, inputs=[chatbot, question_input], outputs=[chatbot, chart_output]) demo.launch(server_name="0.0.0.0", server_port=7860)