ofp_checker / app.py
azettl's picture
Update app.py
77d5aa2 verified
import gradio as gr
import requests
import json
from openfloor import (
Envelope, Conversation, Sender, DialogEvent, TextFeature,
UtteranceEvent, GetManifestsEvent, To
)
# Function to build the envelope for a manifest request using Open Floor library
def build_manifest_request_envelope(agent_url):
# Create conversation and sender
conversation = Conversation()
sender = Sender(
speakerUri="openfloor://localhost/TestClient",
serviceUrl="http://localhost"
)
# Create envelope
envelope = Envelope(conversation=conversation, sender=sender)
# Add get manifests event
manifest_event = GetManifestsEvent(
to=To(serviceUrl=agent_url, private=False)
)
envelope.events.append(manifest_event)
return envelope
# Function to build an utterance envelope using Open Floor library
def build_utterance_envelope(agent_url, message_text):
# Create conversation and sender
conversation = Conversation()
sender = Sender(
speakerUri="openfloor://localhost/TestClient",
serviceUrl="http://localhost"
)
# Create envelope
envelope = Envelope(conversation=conversation, sender=sender)
# Create dialog event with text feature
dialog_event = DialogEvent(
speakerUri="openfloor://localhost/TestClient",
features={"text": TextFeature(values=[message_text])}
)
# Create utterance event
utterance_event = UtteranceEvent(
dialogEvent=dialog_event,
to=To(serviceUrl=agent_url, private=False)
)
envelope.events.append(utterance_event)
return envelope
# Function to send the request to the agent
def send_request(agent_url, request_type, message_text="Hello, this is a test message!"):
try:
if request_type == "manifest":
envelope = build_manifest_request_envelope(agent_url)
else: # utterance
envelope = build_utterance_envelope(agent_url, message_text)
# Convert to JSON using the library's method
json_str = envelope.to_json()
envelope_dict = json.loads(json_str)
json_payload_pretty = json.dumps({"openFloor": envelope_dict}, indent=2)
# Send request
headers = {'Content-Type': 'application/json'}
payload = {"openFloor": envelope_dict}
response = requests.post(agent_url, json=payload, headers=headers, timeout=10)
# Handle response
try:
response_json = response.json()
except:
response_json = {"raw_response": response.text}
# Return: envelope, status, response, status_message
status_message = f"βœ… Request sent successfully (HTTP {response.status_code})"
return json_payload_pretty, response.status_code, response_json, status_message
except Exception as e:
error_response = {"error": f"Error: {str(e)}"}
return "", 500, error_response, f"❌ Request failed: {str(e)}"
# Function to test agent availability
def test_agent_availability(agent_url):
try:
# Send a simple manifest request to test
envelope = build_manifest_request_envelope(agent_url)
json_str = envelope.to_json()
envelope_dict = json.loads(json_str)
payload = {"openFloor": envelope_dict}
headers = {'Content-Type': 'application/json'}
response = requests.post(agent_url, json=payload, headers=headers, timeout=5)
if response.status_code == 200:
try:
response_json = response.json()
return f"βœ… Agent available at {agent_url}", response_json
except:
return f"βœ… Agent responded but with non-JSON: {response.text[:100]}...", {"raw_response": response.text}
else:
return f"⚠️ Agent responded with status {response.status_code}", {"status": response.status_code, "response": response.text}
except requests.exceptions.RequestException as e:
return f"❌ Agent not available: {str(e)}", {"error": str(e)}
# Enhanced test function that also shows the envelope
def test_agent_with_envelope(agent_url):
try:
# Send a simple manifest request to test
envelope = build_manifest_request_envelope(agent_url)
json_str = envelope.to_json()
envelope_dict = json.loads(json_str)
json_payload_pretty = json.dumps({"openFloor": envelope_dict}, indent=2)
payload = {"openFloor": envelope_dict}
headers = {'Content-Type': 'application/json'}
response = requests.post(agent_url, json=payload, headers=headers, timeout=5)
if response.status_code == 200:
try:
response_json = response.json()
status_msg = f"βœ… Agent available at {agent_url}"
return status_msg, response_json, json_payload_pretty, response.status_code
except:
status_msg = f"βœ… Agent responded but with non-JSON"
return status_msg, {"raw_response": response.text}, json_payload_pretty, response.status_code
else:
status_msg = f"⚠️ Agent responded with status {response.status_code}"
return status_msg, {"status": response.status_code, "response": response.text}, json_payload_pretty, response.status_code
except requests.exceptions.RequestException as e:
status_msg = f"❌ Agent not available: {str(e)}"
return status_msg, {"error": str(e)}, "", 0
# Gradio UI with improved layout
with gr.Blocks(title="Open Floor Protocol Tester", theme=gr.themes.Soft()) as demo:
gr.Markdown("""
# πŸ—£οΈ Open Floor Protocol Tester
**Test [Open Floor](https://github.com/open-voice-interoperability/openfloor-docs)-compliant assistants using the [official Python library](https://github.com/open-voice-interoperability/openfloor-python)**
Send **manifest** requests to get capabilities or **utterance** events for conversations.
""")
with gr.Row():
# Left Column - Controls
with gr.Column(scale=2):
gr.Markdown("### 🎯 Assistant Configuration")
agent_url = gr.Textbox(
value="",
label="Assistant Endpoint URL",
placeholder="https://your-assistant.com/openfloor",
info="Full URL endpoint of the Open Floor assistant"
)
gr.Markdown("### πŸ“¨ Send Request")
request_type = gr.Radio(
choices=["manifest", "utterance"],
value="manifest",
label="Request Type",
info="manifest = test availability + get capabilities, utterance = send message"
)
message_text = gr.Textbox(
value="convert 20 millimeters to gallons",
label="Message Text",
placeholder="Enter your message here...",
lines=3,
info="Text to send in utterance event"
)
send_btn = gr.Button("πŸš€ Send Open Floor Request", variant="primary", size="lg")
# Right Column - Results
with gr.Column(scale=3):
gr.Markdown("### πŸ“‘ Request & Response")
# Status message for all requests
request_status = gr.Textbox(
label="Status",
interactive=False,
container=False
)
# Collapsible request details
with gr.Accordion("πŸ“€ Generated Open Floor Envelope", open=False):
envelope_output = gr.Code(
language="json",
label="Request JSON",
container=False
)
# Response section
with gr.Group():
gr.Markdown("#### πŸ“₯ Response")
with gr.Row():
status_code = gr.Number(
label="HTTP Status",
interactive=False,
container=False,
scale=1
)
response_output = gr.JSON(
label="Response Data",
container=False
)
# Event handlers
def handle_send_request(*args):
envelope, status, response, status_msg = send_request(*args)
return envelope, status, response, status_msg
send_btn.click(
fn=handle_send_request,
inputs=[agent_url, request_type, message_text],
outputs=[envelope_output, status_code, response_output, request_status]
)
if __name__ == "__main__":
demo.launch()