diff --git a/api/core/mcp/session/base_session.py b/api/core/mcp/session/base_session.py index 4b02ae9eda..7734b8fdd9 100644 --- a/api/core/mcp/session/base_session.py +++ b/api/core/mcp/session/base_session.py @@ -1,7 +1,7 @@ import logging import queue from collections.abc import Callable -from concurrent.futures import Future, ThreadPoolExecutor +from concurrent.futures import Future, ThreadPoolExecutor, TimeoutError from contextlib import ExitStack from datetime import timedelta from types import TracebackType @@ -176,11 +176,15 @@ class BaseSession( self._receiver_future: Future | None = None def __enter__(self) -> Self: - self._executor = ThreadPoolExecutor() + # The thread pool is dedicated to running `_receive_loop`. Setting `max_workers` to 1 + # ensures no unnecessary threads are created. + self._executor = ThreadPoolExecutor(max_workers=1) self._receiver_future = self._executor.submit(self._receive_loop) return self def check_receiver_status(self) -> None: + """`check_receiver_status` ensures that any exceptions raised during the + execution of `_receive_loop` are retrieved and propagated.""" if self._receiver_future and self._receiver_future.done(): self._receiver_future.result() @@ -194,7 +198,7 @@ class BaseSession( if self._receiver_future: try: self._receiver_future.result(timeout=5.0) # Wait up to 5 seconds - except concurrent.futures.TimeoutError: + except TimeoutError: # If the receiver loop is still running after timeout, we'll force shutdown pass