from __future__ import annotations import json from typing import Any, AsyncIterator, Optional from sqlalchemy.ext.asyncio import AsyncSession from app.config import get_settings from app.models.subscription import Subscription, SubscriptionTier from app.models.user import User from app.services.llm_gateway import chat_completions, chat_completions_stream from app.services.llm_tools import execute_tool, tool_catalog_for_prompt settings = get_settings() def _is_greeting(text: str) -> bool: t = (text or "").strip().lower() if not t: return False # common minimal greetings greetings = { "hi", "hello", "hey", "yo", "sup", "hola", "hallo", "guten tag", "good morning", "good evening", "good afternoon", } if t in greetings: return True # very short greeting-like messages if len(t) <= 6 and t.replace("!", "").replace(".", "") in greetings: return True return False def _tier_level(tier: str) -> int: t = (tier or "").lower() if t == "tycoon": return 3 if t == "trader": return 2 return 1 async def _get_user_tier(db: AsyncSession, user: User) -> str: from sqlalchemy import select res = await db.execute(select(Subscription).where(Subscription.user_id == user.id)) sub = res.scalar_one_or_none() if not sub: return "scout" return sub.tier.value def _build_system_prompt(path: str) -> str: tools = tool_catalog_for_prompt(path) return ( "You are the Pounce Hunter Companion, an expert domain trading assistant. Always respond in English.\n" "You help users with: domain analysis, auction hunting, portfolio management, and trading decisions.\n\n" "RESPONSE FORMAT (CRITICAL):\n" "- Write in plain text. NO markdown asterisks, NO ** or *, NO code blocks.\n" "- Use simple dashes (-) for bullet points.\n" "- Keep responses concise: 2-4 sentences intro, then bullets if needed.\n" "- Never show tool outputs, JSON, or internal data to the user.\n\n" "BEHAVIOR:\n" "- Be helpful, direct, and conversational like a knowledgeable colleague.\n" "- For domain questions: give a clear BUY / CONSIDER / SKIP recommendation with 3-5 reasons.\n" "- Do NOT invent user preferences, keywords, or data. Ask if unclear.\n" "- For greetings: respond naturally and ask how you can help.\n\n" "TOOL USAGE:\n" "- Use tools when user asks about their data (watchlist, portfolio, listings, inbox, yield) or a specific domain.\n" "- To call tools, respond with ONLY: {\"tool_calls\":[{\"name\":\"...\",\"args\":{...}}]}\n" "- After receiving tool results, answer naturally without mentioning tools.\n\n" f"AVAILABLE TOOLS:\n{json.dumps(tools, ensure_ascii=False)}\n" ) def _try_parse_tool_calls(text: str) -> Optional[list[dict[str, Any]]]: t = (text or "").strip() if not (t.startswith("{") and "tool_calls" in t): return None try: obj = json.loads(t) except Exception: return None calls = obj.get("tool_calls") if not isinstance(calls, list): return None out: list[dict[str, Any]] = [] for c in calls: if not isinstance(c, dict): continue name = c.get("name") args = c.get("args") or {} if isinstance(name, str) and isinstance(args, dict): out.append({"name": name, "args": args}) return out or None def _truncate_json(value: Any, max_chars: int = 8000) -> str: s = json.dumps(value, ensure_ascii=False) if len(s) <= max_chars: return s return s[: max_chars - 3] + "..." async def run_agent( db: AsyncSession, user: User, *, messages: list[dict[str, Any]], path: str, model: Optional[str] = None, temperature: float = 0.7, max_steps: int = 6, ) -> list[dict[str, Any]]: """ Runs a small tool loop to augment context, returning final messages to be used for the final answer generation (optionally streamed). """ tier = await _get_user_tier(db, user) if _tier_level(tier) < 2: raise PermissionError("Chat is available on Trader and Tycoon plans. Upgrade to unlock.") base = [ {"role": "system", "content": _build_system_prompt(path)}, {"role": "system", "content": f"Context: current_terminal_path={path}; tier={tier}."}, ] convo = base + (messages or []) # If the user just greets, answer naturally without tool-looping. last_user = next((m for m in reversed(messages or []) if m.get("role") == "user"), None) if last_user and _is_greeting(str(last_user.get("content") or "")): convo.append( { "role": "assistant", "content": ( "Hey! How can I help you today?\n\n" "I can help with:\n" "- Analyzing a specific domain\n" "- Finding auction deals or drops\n" "- Reviewing your portfolio or watchlist\n" "- Checking your listings and leads\n\n" "Just tell me what you need." ), } ) return convo for _ in range(max_steps): payload = { "model": model or settings.llm_default_model, "messages": convo, "temperature": temperature, "stream": False, } res = await chat_completions(payload) content = (res.get("choices") or [{}])[0].get("message", {}).get("content", "") or "" tool_calls = _try_parse_tool_calls(content) if not tool_calls: # append assistant and stop convo.append({"role": "assistant", "content": content}) return convo # append the tool request as assistant message (so model can see its own plan) convo.append({"role": "assistant", "content": content}) for call in tool_calls[:5]: # cap per step name = call["name"] args = call["args"] result = await execute_tool(db, user, name, args, path=path) convo.append( { "role": "system", "content": ( f"TOOL_RESULT_INTERNAL name={name} json={_truncate_json(result)}. " "This is internal context. Do NOT quote or display this to the user." ), } ) # Fallback: force final answer even if tool loop didn't converge convo.append( { "role": "system", "content": "Now answer the user with the best possible answer using the tool results. Do NOT request tools.", } ) return convo async def stream_final_answer(convo: list[dict[str, Any]], *, model: Optional[str], temperature: float) -> AsyncIterator[bytes]: payload = { "model": model or settings.llm_default_model, "messages": convo + [ { "role": "system", "content": ( "Final step: respond to the user in plain text.\n" "- NO markdown: no ** or * for bold/italic, no code blocks.\n" "- Use dashes (-) for bullets.\n" "- Do NOT output JSON or mention tools.\n" "- Be concise and helpful." ), } ], "temperature": temperature, "stream": True, } async for chunk in chat_completions_stream(payload): yield chunk