Skip to content

GeoAgent facade

geoagent.core.agent.GeoAgent

Public facade: Strands agent + GeoAgent context, tools, and safety hooks.

Source code in geoagent/core/agent.py
class GeoAgent:
    """Public facade: Strands agent + GeoAgent context, tools, and safety hooks."""

    def __init__(
        self,
        *,
        context: Optional[GeoAgentContext] = None,
        config: Optional[GeoAgentConfig] = None,
        tools: Optional[list[Any]] = None,
        registry: Optional[GeoToolRegistry] = None,
        model: Any | None = None,
        provider: str | None = None,
        model_id: str | None = None,
        fast: bool = False,
        confirm: ConfirmCallback | None = None,
        qgis_safe_mode: bool = False,
    ) -> None:
        self._context = context or GeoAgentContext()
        cfg = config or GeoAgentConfig()
        if provider is not None:
            cfg = cfg.model_copy(update={"provider": provider})
        if model_id is not None:
            cfg = cfg.model_copy(update={"model": model_id})
        if fast and cfg.max_tokens is not None and cfg.max_tokens > 2048:
            cfg = cfg.model_copy(update={"max_tokens": 2048})
        self._config = cfg
        self._fast = fast
        self._qgis_safe_mode = qgis_safe_mode
        self._registry = registry or GeoToolRegistry()
        self._tool_list = list(tools or [])
        self._cancelled: list[str] = []
        self._tool_calls: list[dict[str, Any]] = []
        self._confirm = confirm or auto_approve_safe_only
        self._model = model or resolve_model(self._config)
        self._rebuild_strands_agent()

    def _rebuild_strands_agent(self) -> None:
        """Recreate the underlying Strands agent from current settings."""
        self._cancelled = []
        prompt = FAST_SYSTEM_PROMPT if self._fast else DEFAULT_SYSTEM_PROMPT
        extra_prompt = self._context.metadata.get("system_prompt")
        if extra_prompt:
            prompt = f"{prompt}\n\n{extra_prompt}"
        hook = ConfirmationHookProvider(
            self._registry,
            self._confirm,
            self._cancelled,
            self._tool_calls,
        )

        self._strands = Agent(
            model=self._model,
            tools=self._tool_list,
            system_prompt=prompt,
            hooks=[hook],
            callback_handler=None,
            tool_executor=SequentialToolExecutor() if self._qgis_safe_mode else None,
        )

    @property
    def context(self) -> GeoAgentContext:
        """GeoAgent runtime context."""
        return self._context

    @property
    def strands_agent(self) -> Agent:
        """The underlying Strands :class:`~strands.agent.agent.Agent`."""
        return self._strands

    @property
    def tool(self) -> Any:
        """Direct Strands tool caller (``agent.tool.some_tool(...)``)."""
        return self._strands.tool

    @property
    def tool_names(self) -> list[str]:
        """Expose Strands tool names on GeoAgent for parity."""
        return list(self._strands.tool_names)

    @property
    def tool_registry(self) -> GeoToolRegistry:
        """GeoAgent metadata registry for tool inspection."""
        return self._registry

    @property
    def config(self) -> GeoAgentConfig:
        """GeoAgent model and runtime configuration."""
        return self._config

    def __getattr__(self, name: str) -> Any:
        """Forward unknown attributes to the underlying Strands agent."""
        return getattr(self._strands, name)

    def with_map(self, m: Any) -> "GeoAgent":
        """Return a new :class:`GeoAgent` bound to a map (rebuilds tools)."""
        from geoagent.core.factory import for_leafmap

        return for_leafmap(
            m,
            config=self._config,
            model=self._model,
            fast=self._fast,
            confirm=self._confirm,
        )

    def chat(
        self,
        query: Any,
        target_map: Any = None,
    ) -> GeoAgentResponse:
        """Run a single user turn and return a :class:`GeoAgentResponse`."""
        if target_map is not None and target_map is not self._context.map_obj:
            from geoagent.core.factory import for_leafmap

            other = for_leafmap(
                target_map,
                config=self._config,
                model=self._model,
                fast=self._fast,
                confirm=self._confirm,
            )
            return other.chat(query)

        if (
            self._context.metadata.get("integration")
            in {"nasa_earthdata", "nasa_opera"}
            and self._qgis_safe_mode
            and is_qt_gui_thread()
        ):
            integration = self._context.metadata.get("integration")
            label = (
                "NASA Earthdata" if integration == "nasa_earthdata" else "NASA OPERA"
            )
            helper = (
                "the NASA Earthdata AI Assistant panel"
                if integration == "nasa_earthdata"
                else (
                    "the NASA OPERA AI Assistant panel or "
                    "geoagent.tools.nasa_opera.submit_nasa_opera_search_task(...) "
                    "for direct QGIS-console workflows"
                )
            )
            return GeoAgentResponse(
                success=False,
                error_message=(
                    f"{label} chat should be launched from a worker thread inside "
                    f"QGIS. Use {helper}."
                ),
                map=self._context.map_obj,
            )

        if self._qgis_safe_mode and is_qt_gui_thread():
            return self._chat_on_qgis_gui_thread(query)

        return self._chat_impl(query)

    def _chat_impl(self, query: Any) -> GeoAgentResponse:
        """Run a single user turn on the current thread."""
        self._cancelled.clear()
        self._tool_calls.clear()
        t0 = time.time()
        try:
            result = self._strands(query)
            elapsed = time.time() - t0
            exec_names = list(getattr(result.metrics, "tool_metrics", {}).keys())
            answer = _result_to_text(result)
            content_blocks = _result_content_blocks(result)
            images = _result_to_images(result) + _tool_calls_to_images(self._tool_calls)
            stop = str(getattr(result, "stop_reason", "end_turn"))
            success = stop not in ("cancelled", "guardrail_intervened")
            err = None if success else f"stop_reason={stop}"
            return GeoAgentResponse(
                answer_text=answer or None,
                success=success,
                error_message=err,
                execution_time=elapsed,
                content_blocks=content_blocks,
                images=images,
                executed_tools=exec_names,
                tool_calls=list(self._tool_calls),
                cancelled_tools=list(self._cancelled),
                map=self._context.map_obj,
                raw=result,
            )
        except Exception as exc:
            elapsed = time.time() - t0
            return GeoAgentResponse(
                success=False,
                error_message=_format_chat_exception(exc),
                execution_time=elapsed,
                tool_calls=list(self._tool_calls),
                cancelled_tools=list(self._cancelled),
                map=self._context.map_obj,
            )

    async def stream_chat(
        self,
        query: Any,
        target_map: Any = None,
    ) -> AsyncIterator[Any]:
        """Stream a single user turn as raw Strands events.

        Text deltas are emitted in events containing ``"data"``. The final
        Strands result is emitted in the event containing ``"result"``.
        """
        if target_map is not None and target_map is not self._context.map_obj:
            from geoagent.core.factory import for_leafmap

            other = for_leafmap(
                target_map,
                config=self._config,
                model=self._model,
                fast=self._fast,
                confirm=self._confirm,
            )
            async for event in other.stream_chat(query):
                yield event
            return

        if self._qgis_safe_mode and is_qt_gui_thread():
            integration = self._context.metadata.get("integration")
            if integration in {"nasa_earthdata", "nasa_opera"}:
                label = (
                    "NASA Earthdata"
                    if integration == "nasa_earthdata"
                    else "NASA OPERA"
                )
                helper = (
                    "the NASA Earthdata AI Assistant panel"
                    if integration == "nasa_earthdata"
                    else (
                        "the NASA OPERA AI Assistant panel or "
                        "geoagent.tools.nasa_opera.submit_nasa_opera_search_task(...) "
                        "for direct QGIS-console workflows"
                    )
                )
                raise RuntimeError(
                    f"{label} streaming chat should be launched from a worker thread "
                    f"inside QGIS. Use {helper}."
                )
            async for event in self._stream_chat_on_qgis_gui_thread(query):
                yield event
            return

        async for event in self._stream_chat_impl(query):
            yield event

    async def _stream_chat_impl(self, query: Any) -> AsyncIterator[Any]:
        """Run a streaming user turn on the current thread."""
        self._cancelled.clear()
        self._tool_calls.clear()
        try:
            async for event in self._strands.stream_async(query):
                yield event
        except Exception as exc:
            if _looks_like_json_parse_failure(exc):
                raise RuntimeError(_format_chat_exception(exc)) from exc
            raise

    async def _stream_chat_on_qgis_gui_thread(self, query: Any) -> AsyncIterator[Any]:
        """Stream QGIS chat from a worker thread while pumping Qt events."""
        events: queue.Queue[tuple[str, Any]] = queue.Queue()

        async def _run_stream() -> None:
            """Execute async streaming work off the GUI thread."""
            async for event in self._stream_chat_impl(query):
                events.put(("event", event))

        def _worker() -> None:
            """Run the async stream and forward events to the GUI thread."""
            try:
                asyncio.run(_run_stream())
            except BaseException as exc:  # pragma: no cover - defensive path
                events.put(("error", exc))
            finally:
                events.put(("done", None))

        thread = threading.Thread(
            target=_worker,
            daemon=True,
            name="GeoAgent-QGIS-stream-chat",
        )
        thread.start()

        while True:
            try:
                kind, payload = events.get_nowait()
            except queue.Empty:
                process_qt_events()
                await asyncio.sleep(0.05)
                continue

            if kind == "event":
                yield payload
            elif kind == "error":
                thread.join(timeout=0)
                raise payload
            elif kind == "done":
                thread.join(timeout=0)
                return

    def _chat_on_qgis_gui_thread(self, query: Any) -> GeoAgentResponse:
        """Run sync QGIS chat without starving the Qt event loop.

        QGIS users often call ``resp = agent.chat(...)`` from the Python
        console, which executes on the GUI thread. The model call needs to run
        away from that thread, but QGIS tools still marshal back to it via
        ``BlockingQueuedConnection``. Pumping Qt events while waiting lets those
        marshalled tool calls run and keeps the application responsive.
        """
        done = threading.Event()
        box: dict[str, Any] = {}

        def _worker() -> None:
            """Execute chat work off the GUI thread."""
            try:
                box["response"] = self._chat_impl(query)
            except BaseException as exc:  # pragma: no cover - defensive path
                box["error"] = exc
            finally:
                done.set()

        thread = threading.Thread(
            target=_worker,
            daemon=True,
            name="GeoAgent-QGIS-chat",
        )
        thread.start()

        while not done.is_set():
            process_qt_events()
            done.wait(0.05)

        thread.join(timeout=0)
        if "error" in box:
            raise box["error"]
        return box["response"]

    def chat_in_background(
        self,
        query: Any,
        *,
        target_map: Any = None,
        on_result: Any | None = None,
        on_error: Any | None = None,
    ) -> threading.Thread:
        """Run ``chat`` on a worker thread and return immediately.

        This is primarily for QGIS console usage where a synchronous ``chat()``
        call blocks the GUI event loop during network/model latency.
        """

        def _worker() -> None:
            """Execute chat work and dispatch callbacks."""
            try:
                resp = self.chat(query, target_map=target_map)
                if on_result is not None:
                    on_result(resp)
            except Exception as exc:  # pragma: no cover - defensive path
                if on_error is not None:
                    on_error(exc)

        thread = threading.Thread(target=_worker, daemon=True)
        thread.start()
        return thread

config: GeoAgentConfig property readonly

GeoAgent model and runtime configuration.

context: GeoAgentContext property readonly

GeoAgent runtime context.

strands_agent: Agent property readonly

The underlying Strands :class:~strands.agent.agent.Agent.

tool: Any property readonly

Direct Strands tool caller (agent.tool.some_tool(...)).

tool_names: list[str] property readonly

Expose Strands tool names on GeoAgent for parity.

tool_registry: GeoToolRegistry property readonly

GeoAgent metadata registry for tool inspection.

__getattr__(self, name) special

Forward unknown attributes to the underlying Strands agent.

Source code in geoagent/core/agent.py
def __getattr__(self, name: str) -> Any:
    """Forward unknown attributes to the underlying Strands agent."""
    return getattr(self._strands, name)

chat(self, query, target_map=None)

Run a single user turn and return a :class:GeoAgentResponse.

Source code in geoagent/core/agent.py
def chat(
    self,
    query: Any,
    target_map: Any = None,
) -> GeoAgentResponse:
    """Run a single user turn and return a :class:`GeoAgentResponse`."""
    if target_map is not None and target_map is not self._context.map_obj:
        from geoagent.core.factory import for_leafmap

        other = for_leafmap(
            target_map,
            config=self._config,
            model=self._model,
            fast=self._fast,
            confirm=self._confirm,
        )
        return other.chat(query)

    if (
        self._context.metadata.get("integration")
        in {"nasa_earthdata", "nasa_opera"}
        and self._qgis_safe_mode
        and is_qt_gui_thread()
    ):
        integration = self._context.metadata.get("integration")
        label = (
            "NASA Earthdata" if integration == "nasa_earthdata" else "NASA OPERA"
        )
        helper = (
            "the NASA Earthdata AI Assistant panel"
            if integration == "nasa_earthdata"
            else (
                "the NASA OPERA AI Assistant panel or "
                "geoagent.tools.nasa_opera.submit_nasa_opera_search_task(...) "
                "for direct QGIS-console workflows"
            )
        )
        return GeoAgentResponse(
            success=False,
            error_message=(
                f"{label} chat should be launched from a worker thread inside "
                f"QGIS. Use {helper}."
            ),
            map=self._context.map_obj,
        )

    if self._qgis_safe_mode and is_qt_gui_thread():
        return self._chat_on_qgis_gui_thread(query)

    return self._chat_impl(query)

chat_in_background(self, query, *, target_map=None, on_result=None, on_error=None)

Run chat on a worker thread and return immediately.

This is primarily for QGIS console usage where a synchronous chat() call blocks the GUI event loop during network/model latency.

Source code in geoagent/core/agent.py
def chat_in_background(
    self,
    query: Any,
    *,
    target_map: Any = None,
    on_result: Any | None = None,
    on_error: Any | None = None,
) -> threading.Thread:
    """Run ``chat`` on a worker thread and return immediately.

    This is primarily for QGIS console usage where a synchronous ``chat()``
    call blocks the GUI event loop during network/model latency.
    """

    def _worker() -> None:
        """Execute chat work and dispatch callbacks."""
        try:
            resp = self.chat(query, target_map=target_map)
            if on_result is not None:
                on_result(resp)
        except Exception as exc:  # pragma: no cover - defensive path
            if on_error is not None:
                on_error(exc)

    thread = threading.Thread(target=_worker, daemon=True)
    thread.start()
    return thread

stream_chat(self, query, target_map=None)

Stream a single user turn as raw Strands events.

Text deltas are emitted in events containing "data". The final Strands result is emitted in the event containing "result".

Source code in geoagent/core/agent.py
async def stream_chat(
    self,
    query: Any,
    target_map: Any = None,
) -> AsyncIterator[Any]:
    """Stream a single user turn as raw Strands events.

    Text deltas are emitted in events containing ``"data"``. The final
    Strands result is emitted in the event containing ``"result"``.
    """
    if target_map is not None and target_map is not self._context.map_obj:
        from geoagent.core.factory import for_leafmap

        other = for_leafmap(
            target_map,
            config=self._config,
            model=self._model,
            fast=self._fast,
            confirm=self._confirm,
        )
        async for event in other.stream_chat(query):
            yield event
        return

    if self._qgis_safe_mode and is_qt_gui_thread():
        integration = self._context.metadata.get("integration")
        if integration in {"nasa_earthdata", "nasa_opera"}:
            label = (
                "NASA Earthdata"
                if integration == "nasa_earthdata"
                else "NASA OPERA"
            )
            helper = (
                "the NASA Earthdata AI Assistant panel"
                if integration == "nasa_earthdata"
                else (
                    "the NASA OPERA AI Assistant panel or "
                    "geoagent.tools.nasa_opera.submit_nasa_opera_search_task(...) "
                    "for direct QGIS-console workflows"
                )
            )
            raise RuntimeError(
                f"{label} streaming chat should be launched from a worker thread "
                f"inside QGIS. Use {helper}."
            )
        async for event in self._stream_chat_on_qgis_gui_thread(query):
            yield event
        return

    async for event in self._stream_chat_impl(query):
        yield event

with_map(self, m)

Return a new :class:GeoAgent bound to a map (rebuilds tools).

Source code in geoagent/core/agent.py
def with_map(self, m: Any) -> "GeoAgent":
    """Return a new :class:`GeoAgent` bound to a map (rebuilds tools)."""
    from geoagent.core.factory import for_leafmap

    return for_leafmap(
        m,
        config=self._config,
        model=self._model,
        fast=self._fast,
        confirm=self._confirm,
    )