#!/usr/bin/env python3
"""
🐝 SWARM Worker-Bee node — runs on YOUR machine.

What it does, and ONLY this:
  • reads the rooms YOU pick
  • sends SWARM nothing but the calls it sees: contract address + who called + when
That's it. It NEVER sends your messages, your DMs, or your account. It NEVER posts or
acts as you. It's read-only. Your Telegram session is saved locally (worker-bee.session)
and NEVER leaves this machine. This file is open source — read it top to bottom.

MODULES (new): the node can run extra OPT-IN modules you switch on in the bot or on the
website (e.g. the experimental mempool sensor that powers the live Swarm Radar). A module
is fetched + started ONLY when YOU enable it, and only ever does what its disclosure says.
No module touches your keys or your funds. Turn one off and it stops.

Setup:
  pip install telethon httpx
  python worker-bee-node.py --key wb_xxxxxxxx        # your key from @jointheswarmbot
  # the mempool sensor module also needs:  pip install websockets

Stop any time with Ctrl-C. Revoke a room by re-running and un-ticking it.
"""
import argparse
import asyncio
import os
import re
import sys
import time

try:
    from telethon import TelegramClient, events
    import httpx
except ImportError:
    print("Install deps first:\n  pip install telethon httpx")
    sys.exit(1)

# App creds + endpoints. By default the node fetches the Telegram APP id/hash from
# SWARM (authenticated by YOUR key) so this file ships credential-free. Advanced users
# can pin their own via SWARM_API_ID / SWARM_API_HASH env.
API_ID = int(os.environ.get("SWARM_API_ID", "0") or "0")
API_HASH = os.environ.get("SWARM_API_HASH", "")
CONFIG_URL = os.environ.get("SWARM_CONFIG_URL", "https://jointheswarm.co.uk/api/node/config")
INGEST_URL = os.environ.get("SWARM_INGEST_URL", "https://jointheswarm.co.uk/api/node/ingest")
MEMPOOL_SRC_URL = os.environ.get("SWARM_MEMPOOL_URL", "https://jointheswarm.co.uk/worker-bee-mempool.py")
MEMPOOL_FILE = "worker-bee-mempool.py"


async def resolve_config(key: str):
    """(api_id, api_hash, ingest_url, modules). env overrides creds if set; the module
    list always comes from SWARM so the toggles you flip in the bot/website take effect."""
    modules = []
    if API_ID and API_HASH:
        try:  # still ask SWARM which modules are on (best-effort), keep pinned creds
            async with httpx.AsyncClient(timeout=20) as h:
                j = (await h.get(CONFIG_URL, params={"key": key})).json()
            if j.get("ok"):
                modules = j.get("modules", [])
        except Exception:
            pass
        return API_ID, API_HASH, INGEST_URL, modules
    async with httpx.AsyncClient(timeout=20) as h:
        r = await h.get(CONFIG_URL, params={"key": key})
        j = r.json()
    if not j.get("ok"):
        raise SystemExit(f"node config error: {j.get('error')}. Check your key in @jointheswarmbot.")
    return int(j["apiId"]), j["apiHash"], j.get("ingestUrl", INGEST_URL), j.get("modules", [])


async def ensure_module_file(filename: str, url: str) -> bool:
    """Fetch a module's code from SWARM if it isn't already next to this file. This is how
    new opt-in modules reach already-installed nodes without a re-install."""
    if os.path.exists(filename):
        return True
    try:
        async with httpx.AsyncClient(timeout=20) as h:
            r = await h.get(url)
            r.raise_for_status()
        with open(filename, "w", encoding="utf-8") as f:
            f.write(r.text)
        return True
    except Exception as e:
        print(f"  (couldn't fetch module {filename}: {e})")
        return False


# Modules already started, so the poll loop only launches newly-switched-on ones.
_running_modules: set = set()


async def fetch_modules(key: str) -> list:
    """Re-fetch just the module list from SWARM, so a toggle in the bot/website is seen live."""
    try:
        async with httpx.AsyncClient(timeout=15) as h:
            j = (await h.get(CONFIG_URL, params={"key": key})).json()
        return j.get("modules", []) if j.get("ok") else []
    except Exception:
        return []


async def launch_modules(key: str, modules: list):
    """Start each ENABLED opt-in module that isn't already running, as a concurrent task
    alongside the core scrape. Idempotent — safe to call repeatedly from the poll loop.
    A module that errors (e.g. a missing dep) is skipped with a friendly note — it can
    never take the node down."""
    for m in modules or []:
        mid = m.get("id")
        if not m.get("enabled") or mid == "telegram_calls" or mid in _running_modules:
            continue
        if mid == "mempool_sensor":
            if not await ensure_module_file(MEMPOOL_FILE, MEMPOOL_SRC_URL):
                continue
            try:
                import importlib.util
                spec = importlib.util.spec_from_file_location("wb_mempool", MEMPOOL_FILE)
                mod = importlib.util.module_from_spec(spec)
                spec.loader.exec_module(mod)
                wss = os.environ.get("ETH_WSS", "wss://ethereum-rpc.publicnode.com")
                asyncio.create_task(mod.run(key, wss))
                _running_modules.add(mid)
                print("  🛰️ mempool sensor ON — lighting up the Swarm Radar.")
            except ModuleNotFoundError:
                print("  🛰️ mempool sensor needs one dep: pip install websockets  (then re-run)")
            except Exception as e:
                print(f"  (mempool module failed to start: {e})")


async def module_poll_loop(key: str):
    """Every 60s, re-check which modules SWARM has switched on and start any newly-enabled
    one — so flipping a module ON in the bot/website takes effect WITHOUT a node restart.
    (Switching a module OFF is handled live by the module's own gate check.)"""
    while True:
        await asyncio.sleep(60)
        await launch_modules(key, await fetch_modules(key))


# Call extraction — EVM (0x + 40 hex) and Solana/base58 mints (32-44 chars).
EVM_RE = re.compile(r"0x[a-fA-F0-9]{40}\b")
SOL_RE = re.compile(r"\b[1-9A-HJ-NP-Za-km-z]{32,44}\b")


def extract_cas(text: str):
    out = []
    seen = set()
    for m in EVM_RE.findall(text or ""):
        k = m.lower()
        if k not in seen:
            seen.add(k); out.append((k, "ethereum"))
    for m in SOL_RE.findall(text or ""):
        if not m.startswith("0x") and m not in seen:
            seen.add(m); out.append((m, "solana"))
    return out


async def main():
    ap = argparse.ArgumentParser(description="SWARM Worker-Bee node")
    ap.add_argument("--key", required=True, help="your node key from @jointheswarmbot")
    args = ap.parse_args()

    api_id, api_hash, ingest_url, modules = await resolve_config(args.key)

    client = TelegramClient("worker-bee", api_id, api_hash)
    await client.start()  # prompts phone + login code LOCALLY; session saved to ./worker-bee.session
    me = await client.get_me()
    handle = ("@" + me.username) if getattr(me, "username", None) else (me.first_name or "you")
    print(f"\n🐝 Logged in as {handle}. Your session stays on THIS machine — it never leaves.\n")

    # List your groups/channels and let you whitelist what to contribute.
    dialogs = []
    async for d in client.iter_dialogs():
        if d.is_group or d.is_channel:
            dialogs.append(d)
    if not dialogs:
        print("No groups/channels found on this account."); return
    print("Your rooms:")
    for i, d in enumerate(dialogs):
        print(f"  [{i:>3}] {d.name}")
    sel = input("\nWhich rooms to contribute? (comma-separated numbers, or 'all'): ").strip()
    if sel.lower() == "all":
        chosen_ids = {d.id for d in dialogs}
    else:
        idx = {int(x) for x in re.findall(r"\d+", sel)}
        chosen_ids = {dialogs[i].id for i in idx if 0 <= i < len(dialogs)}
    names = {d.id: d.name for d in dialogs if d.id in chosen_ids}
    if not chosen_ids:
        print("Nothing selected — exiting. Re-run to pick rooms."); return
    print(f"\n✅ Contributing {len(chosen_ids)} room(s), read-only, calls only. Ctrl-C to stop.\n")

    buf: list[dict] = []

    async def flush():
        if not buf:
            return
        batch, buf[:] = buf[:], []
        try:
            async with httpx.AsyncClient(timeout=15) as h:
                await h.post(ingest_url, json={"token": args.key, "calls": batch})
        except Exception as e:
            print("  (send failed, will retry):", e)
            buf.extend(batch)

    @client.on(events.NewMessage(chats=list(chosen_ids)))
    async def on_msg(ev):
        cas = extract_cas(ev.raw_text or "")
        if not cas:
            return
        sender = await ev.get_sender()
        caller = (getattr(sender, "username", None) or getattr(sender, "first_name", "") or "")
        gname = names.get(ev.chat_id, "")
        now_ms = int(time.time() * 1000)
        for ca, chain in cas:
            buf.append({"ca": ca, "chain": chain, "caller": caller, "group": gname, "ts": now_ms})
            print(f"  📞 {gname}: {caller} → {ca[:10]}… [{chain}]")
        if len(buf) >= 10:
            await flush()

    async def ticker():
        while True:
            await asyncio.sleep(20)
            await flush()

    asyncio.create_task(ticker())
    # Start any opt-in modules you've switched on, then keep watching for newly-enabled
    # ones so a toggle in the bot/website takes effect without a node restart.
    await launch_modules(args.key, modules)
    asyncio.create_task(module_poll_loop(args.key))
    await client.run_until_disconnected()


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("\n🐝 stopped. Your session stays local. Re-run any time.")
