updown / app.py
zainulabedin949's picture
Update app.py
91e7f66 verified
import threading
import time
import gradio as gr
from datetime import datetime
import pytz
import requests
from groq import Groq
from quotexpy import Quotex
from quotexpy.utils.candles_period import CandlesPeriod
from quotexpy.utils.account_type import AccountType
# Initialize
state = []
trade_recommendations = []
client = Quotex(email="your_email@example.com", password="your_password", headless=True)
groq_client = Groq(api_key="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")
def get_ai_recommendation(market_data):
"""Get AI-powered recommendation using Groq API"""
try:
prompt = f"""
Analyze the latest candlestick patterns and market data:
{market_data}
Provide:
1. Trading Signal (BUY/SELL/HOLD)
2. Confidence (High/Medium/Low)
3. RSI Level
4. Key Price Levels
"""
chat_completion = groq_client.chat.completions.create(
messages=[{"role": "user", "content": prompt}],
model="mixtral-8x7b-32768",
temperature=0.3,
max_tokens=150
)
return chat_completion.choices[0].message.content
except Exception as e:
return f"AI analysis failed: {e}"
def analyze_candles(candles):
"""AI + Technical Analysis"""
if not candles or len(candles) < 3:
return "Waiting for more data..."
# Technical analysis
last_close = candles[max(candles.keys())]["close"]
prev_close = candles[sorted(candles.keys())[-2]]["close"]
if last_close > prev_close * 1.005: # Strong bullish
tech_signal = "πŸ”₯ STRONG UPTREND - BUY"
elif last_close < prev_close * 0.995: # Strong bearish
tech_signal = "πŸ“‰ STRONG DOWNTREND - SELL"
else:
tech_signal = "🟑 WEAK TREND - HOLD"
# AI Input
market_data = f"""
Latest Candle:
- Open: {last_candle["open"]}
- Close: {last_candle["close"]}
- High: {last_candle["high"]}
- Low: {last_candle["low"]}
- Volume: {last_candle["vol"]}
"""
ai_signal = get_ai_recommendation(market_data)
return f"""\n
πŸ“Š **TECHNICAL ANALYSIS:**
{tech_signal}
πŸ€– **AI RECOMMENDATION:**
{ai_signal}"""
def background_worker(pair: str, timeframe: str):
try:
client.start_candles_stream(pair, 10, period_map[timeframe])
while True:
candles = client.get_realtime_candles(pair)
if candles:
analysis = analyze_candles(candles)
trade_recommendations.append(analysis)
time.sleep(5) # Refresh every 5 sec
except Exception as e:
print(f"❌ Worker error: {e}")
def start(pair, timeframe):
state.clear()
trade_recommendations.clear()
threading.Thread(
target=background_worker,
args=(pair, timeframe),
daemon=True
).start()
return "πŸ”„ Analyzing market..."
with gr.Blocks() as demo:
gr.Markdown("## πŸ€– AI Trading Assistant")
gr.Markdown("### Real-time Candle Analysis")
with gr.Row():
pair = gr.Dropdown(choices=["EURUSD", "USDJPY", "BTCUSD"], label="Pair")
timeframe = gr.Dropdown(choices=["5s", "15s", "1m", "5m"], label="Timeframe")
start_btn = gr.Button("Start AI Analysis")
gr.Markdown("---")
with gr.Column():
gr.Markdown("### πŸ“œ Last 10 Candles")
candle_output = gr.Textbox()
gr.Markdown("### πŸ—ƒοΈ Trading Signals")
ai_output = gr.Textbox(label="AI + Technical Signals")
start_btn.click(
start,
inputs=[pair, timeframe],
outputs=[candle_output, ai_output]
)
demo.load(
get_latest_recommendations,
outputs=ai_output,
every=5 # Refresh signals every 5 seconds
)
if __name__ == "__main__":
demo.launch()