|
|
|
@ -1,7 +1,7 @@
|
|
|
|
import logging
|
|
|
|
import logging
|
|
|
|
import queue
|
|
|
|
import queue
|
|
|
|
from collections.abc import Callable
|
|
|
|
from collections.abc import Callable
|
|
|
|
from concurrent.futures import Future, ThreadPoolExecutor
|
|
|
|
from concurrent.futures import Future, ThreadPoolExecutor, TimeoutError
|
|
|
|
from contextlib import ExitStack
|
|
|
|
from contextlib import ExitStack
|
|
|
|
from datetime import timedelta
|
|
|
|
from datetime import timedelta
|
|
|
|
from types import TracebackType
|
|
|
|
from types import TracebackType
|
|
|
|
@ -176,11 +176,15 @@ class BaseSession(
|
|
|
|
self._receiver_future: Future | None = None
|
|
|
|
self._receiver_future: Future | None = None
|
|
|
|
|
|
|
|
|
|
|
|
def __enter__(self) -> Self:
|
|
|
|
def __enter__(self) -> Self:
|
|
|
|
self._executor = ThreadPoolExecutor()
|
|
|
|
# The thread pool is dedicated to running `_receive_loop`. Setting `max_workers` to 1
|
|
|
|
|
|
|
|
# ensures no unnecessary threads are created.
|
|
|
|
|
|
|
|
self._executor = ThreadPoolExecutor(max_workers=1)
|
|
|
|
self._receiver_future = self._executor.submit(self._receive_loop)
|
|
|
|
self._receiver_future = self._executor.submit(self._receive_loop)
|
|
|
|
return self
|
|
|
|
return self
|
|
|
|
|
|
|
|
|
|
|
|
def check_receiver_status(self) -> None:
|
|
|
|
def check_receiver_status(self) -> None:
|
|
|
|
|
|
|
|
"""`check_receiver_status` ensures that any exceptions raised during the
|
|
|
|
|
|
|
|
execution of `_receive_loop` are retrieved and propagated."""
|
|
|
|
if self._receiver_future and self._receiver_future.done():
|
|
|
|
if self._receiver_future and self._receiver_future.done():
|
|
|
|
self._receiver_future.result()
|
|
|
|
self._receiver_future.result()
|
|
|
|
|
|
|
|
|
|
|
|
@ -194,7 +198,7 @@ class BaseSession(
|
|
|
|
if self._receiver_future:
|
|
|
|
if self._receiver_future:
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
self._receiver_future.result(timeout=5.0) # Wait up to 5 seconds
|
|
|
|
self._receiver_future.result(timeout=5.0) # Wait up to 5 seconds
|
|
|
|
except concurrent.futures.TimeoutError:
|
|
|
|
except TimeoutError:
|
|
|
|
# If the receiver loop is still running after timeout, we'll force shutdown
|
|
|
|
# If the receiver loop is still running after timeout, we'll force shutdown
|
|
|
|
pass
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|