from __future__ import annotations import json from typing import Any, AsyncIterator, Optional from sqlalchemy.ext.asyncio import AsyncSession from app.config import get_settings from app.models.subscription import Subscription, SubscriptionTier from app.models.user import User from app.services.llm_gateway import chat_completions, chat_completions_stream from app.services.llm_tools import execute_tool, tool_catalog_for_prompt settings = get_settings() def _is_greeting(text: str) -> bool: t = (text or "").strip().lower() if not t: return False # common minimal greetings greetings = { "hi", "hello", "hey", "yo", "sup", "hola", "hallo", "guten tag", "good morning", "good evening", "good afternoon", } if t in greetings: return True # very short greeting-like messages if len(t) <= 6 and t.replace("!", "").replace(".", "") in greetings: return True return False def _tier_level(tier: str) -> int: t = (tier or "").lower() if t == "tycoon": return 3 if t == "trader": return 2 return 1 async def _get_user_tier(db: AsyncSession, user: User) -> str: from sqlalchemy import select res = await db.execute(select(Subscription).where(Subscription.user_id == user.id)) sub = res.scalar_one_or_none() if not sub: return "scout" return sub.tier.value def _build_system_prompt(path: str) -> str: tools = tool_catalog_for_prompt(path) return ( "You are the Pounce Hunter Companion, a domain trading expert. Always respond in English.\n\n" "CRITICAL RULES:\n" "1. NEVER invent or hallucinate data. You do NOT have access to SEMrush, Estibot, GoDaddy sales, or external databases.\n" "2. If you don't have data, say so honestly. Only use data from tools you actually called.\n" "3. Keep responses SHORT: 2-3 sentences max, then bullets if needed.\n" "4. NO markdown: no ** or *, no code blocks, no headers with #.\n" "5. Use dashes (-) for bullet points.\n\n" "WHAT YOU CAN DO:\n" "- Analyze domains using the analyze_domain tool (gives Pounce Score, risk, value estimate)\n" "- Show user's watchlist, portfolio, listings, inbox, yield data\n" "- Search auctions and drops\n" "- Generate brandable names\n\n" "WHAT YOU CANNOT DO:\n" "- Access external sales databases or SEO tools\n" "- Look up real-time WHOIS or DNS (unless via tool)\n" "- Make up sales history or traffic stats\n\n" "TOOL USAGE:\n" "- To call a tool, respond with ONLY: {\"tool_calls\":[{\"name\":\"...\",\"args\":{...}}]}\n" "- After tool results, summarize briefly without mentioning tools.\n\n" f"TOOLS:\n{json.dumps(tools, ensure_ascii=False)}\n" ) def _try_parse_tool_calls(text: str) -> Optional[list[dict[str, Any]]]: t = (text or "").strip() if not (t.startswith("{") and "tool_calls" in t): return None try: obj = json.loads(t) except Exception: return None calls = obj.get("tool_calls") if not isinstance(calls, list): return None out: list[dict[str, Any]] = [] for c in calls: if not isinstance(c, dict): continue name = c.get("name") args = c.get("args") or {} if isinstance(name, str) and isinstance(args, dict): out.append({"name": name, "args": args}) return out or None def _truncate_json(value: Any, max_chars: int = 8000) -> str: s = json.dumps(value, ensure_ascii=False) if len(s) <= max_chars: return s return s[: max_chars - 3] + "..." async def run_agent( db: AsyncSession, user: User, *, messages: list[dict[str, Any]], path: str, model: Optional[str] = None, temperature: float = 0.7, max_steps: int = 6, ) -> list[dict[str, Any]]: """ Runs a small tool loop to augment context, returning final messages to be used for the final answer generation (optionally streamed). """ tier = await _get_user_tier(db, user) if _tier_level(tier) < 2: raise PermissionError("Chat is available on Trader and Tycoon plans. Upgrade to unlock.") base = [ {"role": "system", "content": _build_system_prompt(path)}, {"role": "system", "content": f"Context: current_terminal_path={path}; tier={tier}."}, ] convo = base + (messages or []) # If the user just greets, answer naturally without tool-looping. last_user = next((m for m in reversed(messages or []) if m.get("role") == "user"), None) if last_user and _is_greeting(str(last_user.get("content") or "")): convo.append( { "role": "assistant", "content": ( "Hey! What can I help you with?\n\n" "Give me a domain to analyze, or ask about your watchlist, portfolio, or current auctions." ), } ) return convo for _ in range(max_steps): payload = { "model": model or settings.llm_default_model, "messages": convo, "temperature": temperature, "stream": False, } res = await chat_completions(payload) content = (res.get("choices") or [{}])[0].get("message", {}).get("content", "") or "" tool_calls = _try_parse_tool_calls(content) if not tool_calls: # append assistant and stop convo.append({"role": "assistant", "content": content}) return convo # append the tool request as assistant message (so model can see its own plan) convo.append({"role": "assistant", "content": content}) for call in tool_calls[:5]: # cap per step name = call["name"] args = call["args"] result = await execute_tool(db, user, name, args, path=path) convo.append( { "role": "system", "content": ( f"TOOL_RESULT_INTERNAL name={name} json={_truncate_json(result)}. " "This is internal context. Do NOT quote or display this to the user." ), } ) # Fallback: force final answer even if tool loop didn't converge convo.append( { "role": "system", "content": "Now answer the user with the best possible answer using the tool results. Do NOT request tools.", } ) return convo async def stream_final_answer(convo: list[dict[str, Any]], *, model: Optional[str], temperature: float) -> AsyncIterator[bytes]: payload = { "model": model or settings.llm_default_model, "messages": convo + [ { "role": "system", "content": ( "Respond now. Rules:\n" "- NEVER invent data. Only use data from tools you called.\n" "- Keep it SHORT: 2-3 sentences, then bullet points if needed.\n" "- NO markdown (no ** or *), just plain text with dashes for bullets.\n" "- Do NOT mention tools or JSON." ), } ], "temperature": temperature, "stream": True, } async for chunk in chat_completions_stream(payload): yield chunk