GeoAgent facade¶
geoagent.core.agent.GeoAgent
¶
Public facade: Strands agent + GeoAgent context, tools, and safety hooks.
Source code in geoagent/core/agent.py
class GeoAgent:
"""Public facade: Strands agent + GeoAgent context, tools, and safety hooks."""
def __init__(
self,
*,
context: Optional[GeoAgentContext] = None,
config: Optional[GeoAgentConfig] = None,
tools: Optional[list[Any]] = None,
registry: Optional[GeoToolRegistry] = None,
model: Any | None = None,
provider: str | None = None,
model_id: str | None = None,
fast: bool = False,
confirm: ConfirmCallback | None = None,
qgis_safe_mode: bool = False,
) -> None:
self._context = context or GeoAgentContext()
cfg = config or GeoAgentConfig()
if provider is not None:
cfg = cfg.model_copy(update={"provider": provider})
if model_id is not None:
cfg = cfg.model_copy(update={"model": model_id})
if fast and cfg.max_tokens is not None and cfg.max_tokens > 2048:
cfg = cfg.model_copy(update={"max_tokens": 2048})
self._config = cfg
self._fast = fast
self._qgis_safe_mode = qgis_safe_mode
self._registry = registry or GeoToolRegistry()
self._tool_list = list(tools or [])
self._cancelled: list[str] = []
self._tool_calls: list[dict[str, Any]] = []
self._confirm = confirm or auto_approve_safe_only
self._model = model or resolve_model(self._config)
self._rebuild_strands_agent()
def _rebuild_strands_agent(self) -> None:
"""Recreate the underlying Strands agent from current settings."""
self._cancelled = []
prompt = FAST_SYSTEM_PROMPT if self._fast else DEFAULT_SYSTEM_PROMPT
extra_prompt = self._context.metadata.get("system_prompt")
if extra_prompt:
prompt = f"{prompt}\n\n{extra_prompt}"
hook = ConfirmationHookProvider(
self._registry,
self._confirm,
self._cancelled,
self._tool_calls,
)
self._strands = Agent(
model=self._model,
tools=self._tool_list,
system_prompt=prompt,
hooks=[hook],
callback_handler=None,
tool_executor=SequentialToolExecutor() if self._qgis_safe_mode else None,
)
@property
def context(self) -> GeoAgentContext:
"""GeoAgent runtime context."""
return self._context
@property
def strands_agent(self) -> Agent:
"""The underlying Strands :class:`~strands.agent.agent.Agent`."""
return self._strands
@property
def tool(self) -> Any:
"""Direct Strands tool caller (``agent.tool.some_tool(...)``)."""
return self._strands.tool
@property
def tool_names(self) -> list[str]:
"""Expose Strands tool names on GeoAgent for parity."""
return list(self._strands.tool_names)
@property
def tool_registry(self) -> GeoToolRegistry:
"""GeoAgent metadata registry for tool inspection."""
return self._registry
@property
def config(self) -> GeoAgentConfig:
"""GeoAgent model and runtime configuration."""
return self._config
def __getattr__(self, name: str) -> Any:
"""Forward unknown attributes to the underlying Strands agent."""
return getattr(self._strands, name)
def with_map(self, m: Any) -> "GeoAgent":
"""Return a new :class:`GeoAgent` bound to a map (rebuilds tools)."""
from geoagent.core.factory import for_leafmap
return for_leafmap(
m,
config=self._config,
model=self._model,
fast=self._fast,
confirm=self._confirm,
)
def chat(
self,
query: Any,
target_map: Any = None,
) -> GeoAgentResponse:
"""Run a single user turn and return a :class:`GeoAgentResponse`."""
if target_map is not None and target_map is not self._context.map_obj:
from geoagent.core.factory import for_leafmap
other = for_leafmap(
target_map,
config=self._config,
model=self._model,
fast=self._fast,
confirm=self._confirm,
)
return other.chat(query)
if (
self._context.metadata.get("integration")
in {"nasa_earthdata", "nasa_opera"}
and self._qgis_safe_mode
and is_qt_gui_thread()
):
integration = self._context.metadata.get("integration")
label = (
"NASA Earthdata" if integration == "nasa_earthdata" else "NASA OPERA"
)
helper = (
"the NASA Earthdata AI Assistant panel"
if integration == "nasa_earthdata"
else (
"the NASA OPERA AI Assistant panel or "
"geoagent.tools.nasa_opera.submit_nasa_opera_search_task(...) "
"for direct QGIS-console workflows"
)
)
return GeoAgentResponse(
success=False,
error_message=(
f"{label} chat should be launched from a worker thread inside "
f"QGIS. Use {helper}."
),
map=self._context.map_obj,
)
if self._qgis_safe_mode and is_qt_gui_thread():
return self._chat_on_qgis_gui_thread(query)
return self._chat_impl(query)
def _chat_impl(self, query: Any) -> GeoAgentResponse:
"""Run a single user turn on the current thread."""
self._cancelled.clear()
self._tool_calls.clear()
t0 = time.time()
try:
result = self._strands(query)
elapsed = time.time() - t0
exec_names = list(getattr(result.metrics, "tool_metrics", {}).keys())
answer = _result_to_text(result)
content_blocks = _result_content_blocks(result)
images = _result_to_images(result) + _tool_calls_to_images(self._tool_calls)
stop = str(getattr(result, "stop_reason", "end_turn"))
success = stop not in ("cancelled", "guardrail_intervened")
err = None if success else f"stop_reason={stop}"
return GeoAgentResponse(
answer_text=answer or None,
success=success,
error_message=err,
execution_time=elapsed,
content_blocks=content_blocks,
images=images,
executed_tools=exec_names,
tool_calls=list(self._tool_calls),
cancelled_tools=list(self._cancelled),
map=self._context.map_obj,
raw=result,
)
except Exception as exc:
elapsed = time.time() - t0
return GeoAgentResponse(
success=False,
error_message=_format_chat_exception(exc),
execution_time=elapsed,
tool_calls=list(self._tool_calls),
cancelled_tools=list(self._cancelled),
map=self._context.map_obj,
)
async def stream_chat(
self,
query: Any,
target_map: Any = None,
) -> AsyncIterator[Any]:
"""Stream a single user turn as raw Strands events.
Text deltas are emitted in events containing ``"data"``. The final
Strands result is emitted in the event containing ``"result"``.
"""
if target_map is not None and target_map is not self._context.map_obj:
from geoagent.core.factory import for_leafmap
other = for_leafmap(
target_map,
config=self._config,
model=self._model,
fast=self._fast,
confirm=self._confirm,
)
async for event in other.stream_chat(query):
yield event
return
if self._qgis_safe_mode and is_qt_gui_thread():
integration = self._context.metadata.get("integration")
if integration in {"nasa_earthdata", "nasa_opera"}:
label = (
"NASA Earthdata"
if integration == "nasa_earthdata"
else "NASA OPERA"
)
helper = (
"the NASA Earthdata AI Assistant panel"
if integration == "nasa_earthdata"
else (
"the NASA OPERA AI Assistant panel or "
"geoagent.tools.nasa_opera.submit_nasa_opera_search_task(...) "
"for direct QGIS-console workflows"
)
)
raise RuntimeError(
f"{label} streaming chat should be launched from a worker thread "
f"inside QGIS. Use {helper}."
)
async for event in self._stream_chat_on_qgis_gui_thread(query):
yield event
return
async for event in self._stream_chat_impl(query):
yield event
async def _stream_chat_impl(self, query: Any) -> AsyncIterator[Any]:
"""Run a streaming user turn on the current thread."""
self._cancelled.clear()
self._tool_calls.clear()
try:
async for event in self._strands.stream_async(query):
yield event
except Exception as exc:
if _looks_like_json_parse_failure(exc):
raise RuntimeError(_format_chat_exception(exc)) from exc
raise
async def _stream_chat_on_qgis_gui_thread(self, query: Any) -> AsyncIterator[Any]:
"""Stream QGIS chat from a worker thread while pumping Qt events."""
events: queue.Queue[tuple[str, Any]] = queue.Queue()
async def _run_stream() -> None:
"""Execute async streaming work off the GUI thread."""
async for event in self._stream_chat_impl(query):
events.put(("event", event))
def _worker() -> None:
"""Run the async stream and forward events to the GUI thread."""
try:
asyncio.run(_run_stream())
except BaseException as exc: # pragma: no cover - defensive path
events.put(("error", exc))
finally:
events.put(("done", None))
thread = threading.Thread(
target=_worker,
daemon=True,
name="GeoAgent-QGIS-stream-chat",
)
thread.start()
while True:
try:
kind, payload = events.get_nowait()
except queue.Empty:
process_qt_events()
await asyncio.sleep(0.05)
continue
if kind == "event":
yield payload
elif kind == "error":
thread.join(timeout=0)
raise payload
elif kind == "done":
thread.join(timeout=0)
return
def _chat_on_qgis_gui_thread(self, query: Any) -> GeoAgentResponse:
"""Run sync QGIS chat without starving the Qt event loop.
QGIS users often call ``resp = agent.chat(...)`` from the Python
console, which executes on the GUI thread. The model call needs to run
away from that thread, but QGIS tools still marshal back to it via
``BlockingQueuedConnection``. Pumping Qt events while waiting lets those
marshalled tool calls run and keeps the application responsive.
"""
done = threading.Event()
box: dict[str, Any] = {}
def _worker() -> None:
"""Execute chat work off the GUI thread."""
try:
box["response"] = self._chat_impl(query)
except BaseException as exc: # pragma: no cover - defensive path
box["error"] = exc
finally:
done.set()
thread = threading.Thread(
target=_worker,
daemon=True,
name="GeoAgent-QGIS-chat",
)
thread.start()
while not done.is_set():
process_qt_events()
done.wait(0.05)
thread.join(timeout=0)
if "error" in box:
raise box["error"]
return box["response"]
def chat_in_background(
self,
query: Any,
*,
target_map: Any = None,
on_result: Any | None = None,
on_error: Any | None = None,
) -> threading.Thread:
"""Run ``chat`` on a worker thread and return immediately.
This is primarily for QGIS console usage where a synchronous ``chat()``
call blocks the GUI event loop during network/model latency.
"""
def _worker() -> None:
"""Execute chat work and dispatch callbacks."""
try:
resp = self.chat(query, target_map=target_map)
if on_result is not None:
on_result(resp)
except Exception as exc: # pragma: no cover - defensive path
if on_error is not None:
on_error(exc)
thread = threading.Thread(target=_worker, daemon=True)
thread.start()
return thread
config: GeoAgentConfig
property
readonly
¶
GeoAgent model and runtime configuration.
context: GeoAgentContext
property
readonly
¶
GeoAgent runtime context.
strands_agent: Agent
property
readonly
¶
The underlying Strands :class:~strands.agent.agent.Agent.
tool: Any
property
readonly
¶
Direct Strands tool caller (agent.tool.some_tool(...)).
tool_names: list[str]
property
readonly
¶
Expose Strands tool names on GeoAgent for parity.
tool_registry: GeoToolRegistry
property
readonly
¶
GeoAgent metadata registry for tool inspection.
__getattr__(self, name)
special
¶
Forward unknown attributes to the underlying Strands agent.
Source code in geoagent/core/agent.py
def __getattr__(self, name: str) -> Any:
"""Forward unknown attributes to the underlying Strands agent."""
return getattr(self._strands, name)
chat(self, query, target_map=None)
¶
Run a single user turn and return a :class:GeoAgentResponse.
Source code in geoagent/core/agent.py
def chat(
self,
query: Any,
target_map: Any = None,
) -> GeoAgentResponse:
"""Run a single user turn and return a :class:`GeoAgentResponse`."""
if target_map is not None and target_map is not self._context.map_obj:
from geoagent.core.factory import for_leafmap
other = for_leafmap(
target_map,
config=self._config,
model=self._model,
fast=self._fast,
confirm=self._confirm,
)
return other.chat(query)
if (
self._context.metadata.get("integration")
in {"nasa_earthdata", "nasa_opera"}
and self._qgis_safe_mode
and is_qt_gui_thread()
):
integration = self._context.metadata.get("integration")
label = (
"NASA Earthdata" if integration == "nasa_earthdata" else "NASA OPERA"
)
helper = (
"the NASA Earthdata AI Assistant panel"
if integration == "nasa_earthdata"
else (
"the NASA OPERA AI Assistant panel or "
"geoagent.tools.nasa_opera.submit_nasa_opera_search_task(...) "
"for direct QGIS-console workflows"
)
)
return GeoAgentResponse(
success=False,
error_message=(
f"{label} chat should be launched from a worker thread inside "
f"QGIS. Use {helper}."
),
map=self._context.map_obj,
)
if self._qgis_safe_mode and is_qt_gui_thread():
return self._chat_on_qgis_gui_thread(query)
return self._chat_impl(query)
chat_in_background(self, query, *, target_map=None, on_result=None, on_error=None)
¶
Run chat on a worker thread and return immediately.
This is primarily for QGIS console usage where a synchronous chat()
call blocks the GUI event loop during network/model latency.
Source code in geoagent/core/agent.py
def chat_in_background(
self,
query: Any,
*,
target_map: Any = None,
on_result: Any | None = None,
on_error: Any | None = None,
) -> threading.Thread:
"""Run ``chat`` on a worker thread and return immediately.
This is primarily for QGIS console usage where a synchronous ``chat()``
call blocks the GUI event loop during network/model latency.
"""
def _worker() -> None:
"""Execute chat work and dispatch callbacks."""
try:
resp = self.chat(query, target_map=target_map)
if on_result is not None:
on_result(resp)
except Exception as exc: # pragma: no cover - defensive path
if on_error is not None:
on_error(exc)
thread = threading.Thread(target=_worker, daemon=True)
thread.start()
return thread
stream_chat(self, query, target_map=None)
¶
Stream a single user turn as raw Strands events.
Text deltas are emitted in events containing "data". The final
Strands result is emitted in the event containing "result".
Source code in geoagent/core/agent.py
async def stream_chat(
self,
query: Any,
target_map: Any = None,
) -> AsyncIterator[Any]:
"""Stream a single user turn as raw Strands events.
Text deltas are emitted in events containing ``"data"``. The final
Strands result is emitted in the event containing ``"result"``.
"""
if target_map is not None and target_map is not self._context.map_obj:
from geoagent.core.factory import for_leafmap
other = for_leafmap(
target_map,
config=self._config,
model=self._model,
fast=self._fast,
confirm=self._confirm,
)
async for event in other.stream_chat(query):
yield event
return
if self._qgis_safe_mode and is_qt_gui_thread():
integration = self._context.metadata.get("integration")
if integration in {"nasa_earthdata", "nasa_opera"}:
label = (
"NASA Earthdata"
if integration == "nasa_earthdata"
else "NASA OPERA"
)
helper = (
"the NASA Earthdata AI Assistant panel"
if integration == "nasa_earthdata"
else (
"the NASA OPERA AI Assistant panel or "
"geoagent.tools.nasa_opera.submit_nasa_opera_search_task(...) "
"for direct QGIS-console workflows"
)
)
raise RuntimeError(
f"{label} streaming chat should be launched from a worker thread "
f"inside QGIS. Use {helper}."
)
async for event in self._stream_chat_on_qgis_gui_thread(query):
yield event
return
async for event in self._stream_chat_impl(query):
yield event
with_map(self, m)
¶
Return a new :class:GeoAgent bound to a map (rebuilds tools).
Source code in geoagent/core/agent.py
def with_map(self, m: Any) -> "GeoAgent":
"""Return a new :class:`GeoAgent` bound to a map (rebuilds tools)."""
from geoagent.core.factory import for_leafmap
return for_leafmap(
m,
config=self._config,
model=self._model,
fast=self._fast,
confirm=self._confirm,
)