""" Hunter Companion Agent - Simplified and Controlled """ from __future__ import annotations import json import re from typing import Any, AsyncIterator, Optional from sqlalchemy.ext.asyncio import AsyncSession from app.config import get_settings from app.models.subscription import Subscription from app.models.user import User from app.services.llm_gateway import chat_completions, chat_completions_stream from app.services.llm_tools import execute_tool, tool_catalog_for_prompt settings = get_settings() def _tier_level(tier: str) -> int: t = (tier or "").lower() if t == "tycoon": return 3 if t == "trader": return 2 return 1 async def _get_user_tier(db: AsyncSession, user: User) -> str: from sqlalchemy import select res = await db.execute(select(Subscription).where(Subscription.user_id == user.id)) sub = res.scalar_one_or_none() if not sub: return "scout" return sub.tier.value # Pre-defined responses for common queries (bypass LLM) CANNED_RESPONSES = { "greeting": ( "Hey! I'm your Hunter Companion.\n\n" "I can help you with:\n" "- Analyzing domains (just tell me the domain)\n" "- Showing auctions ending soon\n" "- Checking your watchlist or portfolio\n" "- Finding dropped domains\n\n" "What would you like to do?" ), "capabilities": ( "Here's what I can do:\n\n" "- Analyze any domain (Pounce Score, risk, value)\n" "- Show current auctions and deals\n" "- List recently dropped domains\n" "- Check your watchlist status\n" "- Review your portfolio performance\n" "- Show your listings and leads\n\n" "Just ask! For domain analysis, simply type the domain name." ), } def _is_greeting(text: str) -> bool: t = (text or "").strip().lower() greetings = {"hi", "hello", "hey", "yo", "sup", "hola", "hallo", "guten tag", "moin"} clean = t.replace("!", "").replace(".", "").replace("?", "") return clean in greetings or len(clean) <= 3 and clean in greetings def _is_capabilities_question(text: str) -> bool: t = (text or "").strip().lower() patterns = [ "what can you do", "what do you do", "help", "how can you help", "what are you", "who are you", "capabilities", "features", "was kannst du", "was machst du", "hilfe", ] return any(p in t for p in patterns) def _extract_domain(text: str) -> Optional[str]: """Extract a domain name from user text.""" t = (text or "").strip().lower() # Pattern: word.tld or word.word.tld pattern = r'\b([a-z0-9][-a-z0-9]*\.)+[a-z]{2,}\b' match = re.search(pattern, t) if match: domain = match.group(0) # Filter out common non-domains if domain not in {"example.com", "test.com", "domain.com"}: return domain return None def _build_system_prompt(path: str) -> str: tools = tool_catalog_for_prompt(path) return ( "You are a domain trading assistant. Be brief and helpful.\n\n" "RULES:\n" "- Give SHORT answers (2-3 sentences max)\n" "- Do NOT make up data. Only state facts from tool results.\n" "- Do NOT format with markdown (no ** or *)\n" "- If unsure, ask the user to clarify\n\n" "TOOLS (call with JSON):\n" f"{json.dumps([{'name': t['name'], 'description': t['description']} for t in tools], indent=2)}\n\n" "To call a tool: {\"tool_calls\":[{\"name\":\"...\",\"args\":{}}]}" ) def _try_parse_tool_calls(text: str) -> Optional[list[dict[str, Any]]]: t = (text or "").strip() if not (t.startswith("{") and "tool_calls" in t): return None try: obj = json.loads(t) except Exception: return None calls = obj.get("tool_calls") if not isinstance(calls, list): return None out = [] for c in calls: if isinstance(c, dict) and isinstance(c.get("name"), str): out.append({"name": c["name"], "args": c.get("args") or {}}) return out or None def _truncate_json(value: Any, max_chars: int = 6000) -> str: s = json.dumps(value, ensure_ascii=False) return s[:max_chars] if len(s) > max_chars else s def _format_analysis_result(data: dict) -> str: """Format domain analysis result into readable text.""" if "error" in data: return f"Could not analyze: {data['error']}" domain = data.get("domain", "unknown") score = data.get("pounce_score", 0) # Determine recommendation if score >= 70: rec = "BUY" rec_reason = "Strong domain with good fundamentals" elif score >= 50: rec = "CONSIDER" rec_reason = "Decent potential, evaluate based on your needs" else: rec = "SKIP" rec_reason = "Limited potential or high risk" lines = [ f"Domain: {domain}", f"Pounce Score: {score}/100", f"Recommendation: {rec}", "", ] # Add key metrics if data.get("availability"): avail = data["availability"] status = "Available" if avail.get("is_available") else "Taken" lines.append(f"Status: {status}") if data.get("value"): val = data["value"] if val.get("estimated_value"): lines.append(f"Est. Value: ${val['estimated_value']:,.0f}") if data.get("risk"): risk = data["risk"] risk_level = risk.get("risk_level", "unknown") lines.append(f"Risk: {risk_level}") lines.append("") lines.append(rec_reason) return "\n".join(lines) async def run_agent( db: AsyncSession, user: User, *, messages: list[dict[str, Any]], path: str, model: Optional[str] = None, temperature: float = 0.3, max_steps: int = 4, ) -> list[dict[str, Any]]: """Run the agent with simplified logic.""" tier = await _get_user_tier(db, user) if _tier_level(tier) < 2: raise PermissionError("Hunter Companion requires Trader or Tycoon plan.") # Get last user message last_user = next((m for m in reversed(messages or []) if m.get("role") == "user"), None) user_text = str(last_user.get("content", "")) if last_user else "" base = [ {"role": "system", "content": _build_system_prompt(path)}, ] # Handle canned responses (bypass LLM entirely) if _is_greeting(user_text): return base + messages + [{"role": "assistant", "content": CANNED_RESPONSES["greeting"]}] if _is_capabilities_question(user_text): return base + messages + [{"role": "assistant", "content": CANNED_RESPONSES["capabilities"]}] # Auto-detect domain and analyze domain = _extract_domain(user_text) if domain: # Directly call analyze_domain tool result = await execute_tool(db, user, "analyze_domain", {"domain": domain}, path=path) formatted = _format_analysis_result(result) return base + messages + [{"role": "assistant", "content": formatted}] # For other queries, use LLM with tool loop convo = base + (messages or []) for _ in range(max_steps): payload = { "model": model or settings.llm_default_model, "messages": convo, "temperature": temperature, "stream": False, } res = await chat_completions(payload) content = (res.get("choices") or [{}])[0].get("message", {}).get("content", "") or "" tool_calls = _try_parse_tool_calls(content) if not tool_calls: convo.append({"role": "assistant", "content": content}) return convo convo.append({"role": "assistant", "content": content}) for call in tool_calls[:3]: name = call["name"] args = call["args"] result = await execute_tool(db, user, name, args, path=path) # Format specific tool results if name == "analyze_domain": formatted = _format_analysis_result(result) convo.append({"role": "system", "content": f"Tool result:\n{formatted}"}) else: convo.append({"role": "system", "content": f"Tool {name} result: {_truncate_json(result)}"}) return convo async def stream_final_answer( convo: list[dict[str, Any]], *, model: Optional[str], temperature: float ) -> AsyncIterator[bytes]: """Stream the final answer.""" # Check if last message is already a complete assistant response if convo and convo[-1].get("role") == "assistant": content = convo[-1].get("content", "") if content and not content.strip().startswith("{"): # Already have a good response, stream it directly chunk = { "choices": [{"delta": {"content": content}}] } yield f"data: {json.dumps(chunk)}\n\n".encode() yield b"data: [DONE]\n\n" return # Otherwise, ask LLM to summarize payload = { "model": model or settings.llm_default_model, "messages": convo + [ { "role": "system", "content": "Give a brief, helpful response based on the data. No markdown formatting. Keep it short.", } ], "temperature": temperature, "stream": True, } async for chunk in chat_completions_stream(payload): yield chunk