import asyncio import torch import librosa import numpy as np import soundfile as sf from transformers import ( AutoProcessor, AutoModelForSpeechSeq2Seq, AutoModelForCausalLM, AutoTokenizer, pipeline, SpeechT5Processor, SpeechT5ForTextToSpeech, SpeechT5HifiGan ) from datasets import load_dataset import logging from typing import Optional, Dict, Any import time from pathlib import Path from kokoro import KPipeline from IPython.display import display, Audio import gradio as gr import asyncio import os # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) system_prompt_0 = """You are a highly trained U.S. Tax Assistant AI, designed to help individuals and small businesses understand, plan, and file their taxes according to federal and state tax laws. You explain complex tax concepts in simple, accurate, and actionable terms, using IRS guidelines, up-to-date tax code knowledge, and best practices for compliance and savings. You act as an explainer, educator, and assistant—not a certified tax preparer or legal advisor. Be short in your answer, less than 100 chars""" class AsyncAIConversation: def __init__(self): self.stt_processor = None self.stt_model = None self.llm_tokenizer = None self.llm_model = None self.tts_synthesizer = None self.speaker_embedding = None self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") logger.info(f"Using device: {self.device}") async def initialize_models(self): """Initialize all models asynchronously""" logger.info("Initializing models...") # Initialize STT model await self._init_stt_model() # Initialize LLM model await self._init_llm_model() # Initialize TTS model await self._init_tts_model() logger.info("All models initialized successfully!") async def _init_stt_model(self): """Initialize Speech-to-Text model""" logger.info("Loading STT model...") try: stt_model_id = "unsloth/whisper-small" #unsloth/whisper-large-v3-turbo self.stt_processor = AutoProcessor.from_pretrained(stt_model_id) self.stt_model = AutoModelForSpeechSeq2Seq.from_pretrained(stt_model_id) self.stt_model.to(self.device) logger.info("STT model loaded successfully") except Exception as e: logger.error(f"Error loading STT model: {e}") raise async def _init_llm_model(self): """Initialize Large Language Model""" logger.info("Loading LLM model...") try: model_name = "unsloth/Qwen3-0.6B" #unsloth/Qwen3-0.6B-unsloth-bnb-4bit self.llm_tokenizer = AutoTokenizer.from_pretrained(model_name) self.llm_model = AutoModelForCausalLM.from_pretrained( model_name, torch_dtype="auto", device_map="auto" ) logger.info("LLM model loaded successfully") except Exception as e: logger.error(f"Error loading LLM model: {e}") raise async def _init_tts_model(self): """Initialize Text-to-Speech model""" logger.info("Loading TTS model...") try: # Initialize Kokoro TTS pipeline self.tts_synthesizer = KPipeline(lang_code='a') logger.info("TTS model loaded successfully") except Exception as e: logger.error(f"Error loading TTS model: {e}") raise async def speech_to_text(self, audio_file_path: str) -> str: """Convert speech to text asynchronously""" logger.info(f"Processing audio file: {audio_file_path}") try: # Load audio in a separate thread to avoid blocking def load_audio(): return librosa.load(audio_file_path, sr=16000) loop = asyncio.get_event_loop() speech_array, sampling_rate = await loop.run_in_executor(None, load_audio) # Convert to tensor speech_array_pt = torch.from_numpy(speech_array).unsqueeze(0).to(self.device) # Process input features input_features = self.stt_processor( speech_array, sampling_rate=sampling_rate, return_tensors="pt" ).input_features.to(self.device) # Generate predictions with torch.no_grad(): predicted_ids = self.stt_model.generate(input_features) # Decode predictions transcription = self.stt_processor.batch_decode(predicted_ids, skip_special_tokens=True) result = transcription[0] if transcription else "" logger.info(f"STT result: {result}") return result except Exception as e: logger.error(f"Error in speech_to_text: {e}") return "" async def process_with_llm(self, text: str, system_prompt: Optional[str] = None) -> Dict[str, str]: """Process text with LLM and return both thinking and content""" logger.info(f"Processing text with LLM: {text[:50]}...") try: # Prepare messages messages = [ {"role": "user", "content": text} ] if system_prompt: messages.insert(0, {"role": "system", "content": system_prompt}) # Apply chat template formatted_text = self.llm_tokenizer.apply_chat_template( messages, tokenize=False, add_generation_prompt=True, enable_thinking=False ) # Tokenize model_inputs = self.llm_tokenizer([formatted_text], return_tensors="pt").to(self.llm_model.device) # Generate response with torch.no_grad(): generated_ids = self.llm_model.generate( **model_inputs, max_new_tokens=512, temperature=0.7, do_sample=True, pad_token_id=self.llm_tokenizer.eos_token_id ) # Extract new tokens output_ids = generated_ids[0][len(model_inputs.input_ids[0]):].tolist() # Parse thinking content try: # Find the end of thinking token () index = len(output_ids) - output_ids[::-1].index(151668) except ValueError: index = 0 thinking_content = self.llm_tokenizer.decode(output_ids[:index], skip_special_tokens=True).strip("\n") content = self.llm_tokenizer.decode(output_ids[index:], skip_special_tokens=True).strip("\n") result = { "thinking": thinking_content, "response": content } logger.info(f"LLM response generated: {content[:50]}...") return result except Exception as e: logger.error(f"Error in process_with_llm: {e}") return {"thinking": "", "response": "Sorry, I encountered an error processing your request."} async def text_to_speech(self, text: str, output_path: str = "response.wav") -> str: """Convert text to speech asynchronously""" logger.info(f"Converting text to speech: {text[:50]}...") try: # Generate speech in a separate thread to avoid blocking def generate_speech(): # Generate audio using Kokoro TTS generator = self.tts_synthesizer(text, voice='af_heart') # Get the first generated audio chunk for i, (gs, ps, audio) in enumerate(generator): if i == 0: # Use the first chunk return audio return None loop = asyncio.get_event_loop() audio_data = await loop.run_in_executor(None, generate_speech) if audio_data is None: raise ValueError("Failed to generate audio") # Save audio file with Kokoro's default sample rate (24000 Hz) sf.write(output_path, audio_data, samplerate=24000) logger.info(f"Audio saved to: {output_path}") return output_path except Exception as e: logger.error(f"Error in text_to_speech: {e}") return "" async def process_conversation(self, audio_file_path: str, system_prompt: Optional[str] = None) -> Dict[str, Any]: """Complete conversation pipeline: STT -> LLM -> TTS""" start_time = time.time() logger.info("Starting conversation processing...") try: # Step 1: Speech to Text stt_start = time.time() transcribed_text = await self.speech_to_text(audio_file_path) stt_time = time.time() - stt_start if not transcribed_text: return {"error": "Failed to transcribe audio"} # Step 2: Process with LLM llm_start = time.time() llm_result = await self.process_with_llm(transcribed_text, system_prompt) llm_time = time.time() - llm_start # Step 3: Text to Speech tts_start = time.time() audio_output_path = await self.text_to_speech(llm_result["response"]) tts_time = time.time() - tts_start total_time = time.time() - start_time result = { "input_audio": audio_file_path, "transcribed_text": transcribed_text, "thinking": llm_result["thinking"], "response_text": llm_result["response"], "output_audio": audio_output_path, "processing_times": { "stt": stt_time, "llm": llm_time, "tts": tts_time, "total": total_time } } logger.info(f"Conversation processed successfully in {total_time:.2f} seconds") return result except Exception as e: logger.error(f"Error in process_conversation: {e}") return {"error": str(e)} async def batch_process(self, audio_files: list, system_prompt: Optional[str] = None) -> list: """Process multiple audio files concurrently""" logger.info(f"Processing {len(audio_files)} audio files...") # Create tasks for concurrent processing tasks = [ self.process_conversation(audio_file, system_prompt) for audio_file in audio_files ] # Process all files concurrently results = await asyncio.gather(*tasks, return_exceptions=True) logger.info("Batch processing completed") return results # Initialize the conversation system ai_conversation = AsyncAIConversation() # Usage example and demo functions async def demo_conversation(): """Demonstration of the conversation system""" # Initialize all models await ai_conversation.initialize_models() # Create the async function wrapper for Gradio async def process_audio_gradio(audio_file, system_prompt_input): """Processes audio file and system prompt for Gradio interface.""" if audio_file is None: return "Please upload an audio file.", "", "", None # Gradio provides the file path audio_path = audio_file # Process the conversation using the initialized ai_conversation instance try: result = await ai_conversation.process_conversation( audio_file_path=audio_path, system_prompt=system_prompt_input ) if "error" in result: return f"Error: {result['error']}", "", "", None else: return ( f"Transcribed: {result['transcribed_text']}\nThinking: {result['thinking']}", result['response_text'], result['output_audio'], result['processing_times'] ) except Exception as e: return f"An unexpected error occurred: {e}", "", "", None # Define the Gradio interface with gr.Blocks() as demo: gr.Markdown("# Asynchronous AI Conversation System") gr.Markdown("Upload an audio file and provide a system prompt to get a response.") with gr.Column(): audio_input = gr.Audio(label="Upload Audio File", type="filepath") process_button = gr.Button("Process Conversation") system_prompt_input = gr.Textbox(label="System Prompt", value=system_prompt_0) with gr.Column(): status_output = gr.Textbox(label="Status/Transcription/Thinking", interactive=False) response_text_output = gr.Textbox(label="AI Response Text", interactive=False) response_audio_output = gr.Audio(label="AI Response Audio", interactive=False) processing_times_output = gr.JSON(label="Processing Times") # Link button click to the async function process_button.click( fn=process_audio_gradio, inputs=[audio_input, system_prompt_input], outputs=[status_output, response_text_output, response_audio_output, processing_times_output] ) if __name__ == "__main__": def initiate(): asyncio.run(demo_conversation()) initiate() # Gradio launch itself runs an event loop. # Ensure ai_conversation is initialized in the notebook before this cell is run. s demo.launch(debug=False, share=True)