Spaces:
Running
Running
File size: 8,902 Bytes
f5eebc5 b1f4556 ddfb4cd f5eebc5 77d5aa2 eec81f3 ddfb4cd 6b0ac35 ddfb4cd 6b0ac35 00a5456 77d5aa2 6b0ac35 ddfb4cd 6b0ac35 ddfb4cd 6b0ac35 00a5456 6b0ac35 f5eebc5 6b0ac35 ddfb4cd f0688d2 ddfb4cd 6b0ac35 ddfb4cd 6b0ac35 c7173b0 6b0ac35 f5eebc5 c7173b0 f5eebc5 6b0ac35 6374703 ddfb4cd 6b0ac35 f5eebc5 f485cdf c7173b0 77d5aa2 6b0ac35 77d5aa2 6b0ac35 c7173b0 6b0ac35 c7173b0 6b0ac35 cdab075 c7173b0 77d5aa2 6b0ac35 c7173b0 5a569a5 c7173b0 5a569a5 c7173b0 6b0ac35 77d5aa2 6b0ac35 c7173b0 f485cdf c7173b0 f485cdf c7173b0 f485cdf c7173b0 77d5aa2 c7173b0 f485cdf c7173b0 6b0ac35 c7173b0 f485cdf c7173b0 6b0ac35 c7173b0 6b0ac35 f485cdf f5eebc5 6b0ac35 a2831af |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 |
import gradio as gr
import requests
import json
from openfloor import (
Envelope, Conversation, Sender, DialogEvent, TextFeature,
UtteranceEvent, GetManifestsEvent, To
)
# Function to build the envelope for a manifest request using Open Floor library
def build_manifest_request_envelope(agent_url):
# Create conversation and sender
conversation = Conversation()
sender = Sender(
speakerUri="openfloor://localhost/TestClient",
serviceUrl="http://localhost"
)
# Create envelope
envelope = Envelope(conversation=conversation, sender=sender)
# Add get manifests event
manifest_event = GetManifestsEvent(
to=To(serviceUrl=agent_url, private=False)
)
envelope.events.append(manifest_event)
return envelope
# Function to build an utterance envelope using Open Floor library
def build_utterance_envelope(agent_url, message_text):
# Create conversation and sender
conversation = Conversation()
sender = Sender(
speakerUri="openfloor://localhost/TestClient",
serviceUrl="http://localhost"
)
# Create envelope
envelope = Envelope(conversation=conversation, sender=sender)
# Create dialog event with text feature
dialog_event = DialogEvent(
speakerUri="openfloor://localhost/TestClient",
features={"text": TextFeature(values=[message_text])}
)
# Create utterance event
utterance_event = UtteranceEvent(
dialogEvent=dialog_event,
to=To(serviceUrl=agent_url, private=False)
)
envelope.events.append(utterance_event)
return envelope
# Function to send the request to the agent
def send_request(agent_url, request_type, message_text="Hello, this is a test message!"):
try:
if request_type == "manifest":
envelope = build_manifest_request_envelope(agent_url)
else: # utterance
envelope = build_utterance_envelope(agent_url, message_text)
# Convert to JSON using the library's method
json_str = envelope.to_json()
envelope_dict = json.loads(json_str)
json_payload_pretty = json.dumps({"openFloor": envelope_dict}, indent=2)
# Send request
headers = {'Content-Type': 'application/json'}
payload = {"openFloor": envelope_dict}
response = requests.post(agent_url, json=payload, headers=headers, timeout=10)
# Handle response
try:
response_json = response.json()
except:
response_json = {"raw_response": response.text}
# Return: envelope, status, response, status_message
status_message = f"β
Request sent successfully (HTTP {response.status_code})"
return json_payload_pretty, response.status_code, response_json, status_message
except Exception as e:
error_response = {"error": f"Error: {str(e)}"}
return "", 500, error_response, f"β Request failed: {str(e)}"
# Function to test agent availability
def test_agent_availability(agent_url):
try:
# Send a simple manifest request to test
envelope = build_manifest_request_envelope(agent_url)
json_str = envelope.to_json()
envelope_dict = json.loads(json_str)
payload = {"openFloor": envelope_dict}
headers = {'Content-Type': 'application/json'}
response = requests.post(agent_url, json=payload, headers=headers, timeout=5)
if response.status_code == 200:
try:
response_json = response.json()
return f"β
Agent available at {agent_url}", response_json
except:
return f"β
Agent responded but with non-JSON: {response.text[:100]}...", {"raw_response": response.text}
else:
return f"β οΈ Agent responded with status {response.status_code}", {"status": response.status_code, "response": response.text}
except requests.exceptions.RequestException as e:
return f"β Agent not available: {str(e)}", {"error": str(e)}
# Enhanced test function that also shows the envelope
def test_agent_with_envelope(agent_url):
try:
# Send a simple manifest request to test
envelope = build_manifest_request_envelope(agent_url)
json_str = envelope.to_json()
envelope_dict = json.loads(json_str)
json_payload_pretty = json.dumps({"openFloor": envelope_dict}, indent=2)
payload = {"openFloor": envelope_dict}
headers = {'Content-Type': 'application/json'}
response = requests.post(agent_url, json=payload, headers=headers, timeout=5)
if response.status_code == 200:
try:
response_json = response.json()
status_msg = f"β
Agent available at {agent_url}"
return status_msg, response_json, json_payload_pretty, response.status_code
except:
status_msg = f"β
Agent responded but with non-JSON"
return status_msg, {"raw_response": response.text}, json_payload_pretty, response.status_code
else:
status_msg = f"β οΈ Agent responded with status {response.status_code}"
return status_msg, {"status": response.status_code, "response": response.text}, json_payload_pretty, response.status_code
except requests.exceptions.RequestException as e:
status_msg = f"β Agent not available: {str(e)}"
return status_msg, {"error": str(e)}, "", 0
# Gradio UI with improved layout
with gr.Blocks(title="Open Floor Protocol Tester", theme=gr.themes.Soft()) as demo:
gr.Markdown("""
# π£οΈ Open Floor Protocol Tester
**Test [Open Floor](https://github.com/open-voice-interoperability/openfloor-docs)-compliant assistants using the [official Python library](https://github.com/open-voice-interoperability/openfloor-python)**
Send **manifest** requests to get capabilities or **utterance** events for conversations.
""")
with gr.Row():
# Left Column - Controls
with gr.Column(scale=2):
gr.Markdown("### π― Assistant Configuration")
agent_url = gr.Textbox(
value="",
label="Assistant Endpoint URL",
placeholder="https://your-assistant.com/openfloor",
info="Full URL endpoint of the Open Floor assistant"
)
gr.Markdown("### π¨ Send Request")
request_type = gr.Radio(
choices=["manifest", "utterance"],
value="manifest",
label="Request Type",
info="manifest = test availability + get capabilities, utterance = send message"
)
message_text = gr.Textbox(
value="convert 20 millimeters to gallons",
label="Message Text",
placeholder="Enter your message here...",
lines=3,
info="Text to send in utterance event"
)
send_btn = gr.Button("π Send Open Floor Request", variant="primary", size="lg")
# Right Column - Results
with gr.Column(scale=3):
gr.Markdown("### π‘ Request & Response")
# Status message for all requests
request_status = gr.Textbox(
label="Status",
interactive=False,
container=False
)
# Collapsible request details
with gr.Accordion("π€ Generated Open Floor Envelope", open=False):
envelope_output = gr.Code(
language="json",
label="Request JSON",
container=False
)
# Response section
with gr.Group():
gr.Markdown("#### π₯ Response")
with gr.Row():
status_code = gr.Number(
label="HTTP Status",
interactive=False,
container=False,
scale=1
)
response_output = gr.JSON(
label="Response Data",
container=False
)
# Event handlers
def handle_send_request(*args):
envelope, status, response, status_msg = send_request(*args)
return envelope, status, response, status_msg
send_btn.click(
fn=handle_send_request,
inputs=[agent_url, request_type, message_text],
outputs=[envelope_output, status_code, response_output, request_status]
)
if __name__ == "__main__":
demo.launch() |