import gradio as gr import requests import json from openfloor import ( Envelope, Conversation, Sender, DialogEvent, TextFeature, UtteranceEvent, GetManifestsEvent, To ) # Function to build the envelope for a manifest request using Open Floor library def build_manifest_request_envelope(agent_url): # Create conversation and sender conversation = Conversation() sender = Sender( speakerUri="openfloor://localhost/TestClient", serviceUrl="http://localhost" ) # Create envelope envelope = Envelope(conversation=conversation, sender=sender) # Add get manifests event manifest_event = GetManifestsEvent( to=To(serviceUrl=agent_url, private=False) ) envelope.events.append(manifest_event) return envelope # Function to build an utterance envelope using Open Floor library def build_utterance_envelope(agent_url, message_text): # Create conversation and sender conversation = Conversation() sender = Sender( speakerUri="openfloor://localhost/TestClient", serviceUrl="http://localhost" ) # Create envelope envelope = Envelope(conversation=conversation, sender=sender) # Create dialog event with text feature dialog_event = DialogEvent( speakerUri="openfloor://localhost/TestClient", features={"text": TextFeature(values=[message_text])} ) # Create utterance event utterance_event = UtteranceEvent( dialogEvent=dialog_event, to=To(serviceUrl=agent_url, private=False) ) envelope.events.append(utterance_event) return envelope # Function to send the request to the agent def send_request(agent_url, request_type, message_text="Hello, this is a test message!"): try: if request_type == "manifest": envelope = build_manifest_request_envelope(agent_url) else: # utterance envelope = build_utterance_envelope(agent_url, message_text) # Convert to JSON using the library's method json_str = envelope.to_json() envelope_dict = json.loads(json_str) json_payload_pretty = json.dumps({"openFloor": envelope_dict}, indent=2) # Send request headers = {'Content-Type': 'application/json'} payload = {"openFloor": envelope_dict} response = requests.post(agent_url, json=payload, headers=headers, timeout=10) # Handle response try: response_json = response.json() except: response_json = {"raw_response": response.text} # Return: envelope, status, response, status_message status_message = f"✅ Request sent successfully (HTTP {response.status_code})" return json_payload_pretty, response.status_code, response_json, status_message except Exception as e: error_response = {"error": f"Error: {str(e)}"} return "", 500, error_response, f"❌ Request failed: {str(e)}" # Function to test agent availability def test_agent_availability(agent_url): try: # Send a simple manifest request to test envelope = build_manifest_request_envelope(agent_url) json_str = envelope.to_json() envelope_dict = json.loads(json_str) payload = {"openFloor": envelope_dict} headers = {'Content-Type': 'application/json'} response = requests.post(agent_url, json=payload, headers=headers, timeout=5) if response.status_code == 200: try: response_json = response.json() return f"✅ Agent available at {agent_url}", response_json except: return f"✅ Agent responded but with non-JSON: {response.text[:100]}...", {"raw_response": response.text} else: return f"⚠️ Agent responded with status {response.status_code}", {"status": response.status_code, "response": response.text} except requests.exceptions.RequestException as e: return f"❌ Agent not available: {str(e)}", {"error": str(e)} # Enhanced test function that also shows the envelope def test_agent_with_envelope(agent_url): try: # Send a simple manifest request to test envelope = build_manifest_request_envelope(agent_url) json_str = envelope.to_json() envelope_dict = json.loads(json_str) json_payload_pretty = json.dumps({"openFloor": envelope_dict}, indent=2) payload = {"openFloor": envelope_dict} headers = {'Content-Type': 'application/json'} response = requests.post(agent_url, json=payload, headers=headers, timeout=5) if response.status_code == 200: try: response_json = response.json() status_msg = f"✅ Agent available at {agent_url}" return status_msg, response_json, json_payload_pretty, response.status_code except: status_msg = f"✅ Agent responded but with non-JSON" return status_msg, {"raw_response": response.text}, json_payload_pretty, response.status_code else: status_msg = f"⚠️ Agent responded with status {response.status_code}" return status_msg, {"status": response.status_code, "response": response.text}, json_payload_pretty, response.status_code except requests.exceptions.RequestException as e: status_msg = f"❌ Agent not available: {str(e)}" return status_msg, {"error": str(e)}, "", 0 # Gradio UI with improved layout with gr.Blocks(title="Open Floor Protocol Tester", theme=gr.themes.Soft()) as demo: gr.Markdown(""" # 🗣️ Open Floor Protocol Tester **Test [Open Floor](https://github.com/open-voice-interoperability/openfloor-docs)-compliant assistants using the [official Python library](https://github.com/open-voice-interoperability/openfloor-python)** Send **manifest** requests to get capabilities or **utterance** events for conversations. """) with gr.Row(): # Left Column - Controls with gr.Column(scale=2): gr.Markdown("### 🎯 Assistant Configuration") agent_url = gr.Textbox( value="", label="Assistant Endpoint URL", placeholder="https://your-assistant.com/openfloor", info="Full URL endpoint of the Open Floor assistant" ) gr.Markdown("### 📨 Send Request") request_type = gr.Radio( choices=["manifest", "utterance"], value="manifest", label="Request Type", info="manifest = test availability + get capabilities, utterance = send message" ) message_text = gr.Textbox( value="convert 20 millimeters to gallons", label="Message Text", placeholder="Enter your message here...", lines=3, info="Text to send in utterance event" ) send_btn = gr.Button("🚀 Send Open Floor Request", variant="primary", size="lg") # Right Column - Results with gr.Column(scale=3): gr.Markdown("### 📡 Request & Response") # Status message for all requests request_status = gr.Textbox( label="Status", interactive=False, container=False ) # Collapsible request details with gr.Accordion("📤 Generated Open Floor Envelope", open=False): envelope_output = gr.Code( language="json", label="Request JSON", container=False ) # Response section with gr.Group(): gr.Markdown("#### 📥 Response") with gr.Row(): status_code = gr.Number( label="HTTP Status", interactive=False, container=False, scale=1 ) response_output = gr.JSON( label="Response Data", container=False ) # Event handlers def handle_send_request(*args): envelope, status, response, status_msg = send_request(*args) return envelope, status, response, status_msg send_btn.click( fn=handle_send_request, inputs=[agent_url, request_type, message_text], outputs=[envelope_output, status_code, response_output, request_status] ) if __name__ == "__main__": demo.launch()