-
Notifications
You must be signed in to change notification settings - Fork 24
Fix non-Docket background tasks for MCP servers #101
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -112,13 +112,14 @@ HEALTHCHECK --interval=30s --timeout=10s --start-period=40s --retries=3 \ | |||||
| # You may override with DISABLE_AUTH=true in development. | ||||||
| ENV DISABLE_AUTH=false | ||||||
|
|
||||||
| # Default to development mode (no separate worker needed). | ||||||
| # For production, override the command to remove --no-worker and run a separate task-worker container. | ||||||
| # Default to development mode using the API's default backend (Docket). For | ||||||
| # single-process development without a worker, add `--task-backend=asyncio` to | ||||||
| # the api command. | ||||||
| # Examples: | ||||||
| # Development: docker run -p 8000:8000 redislabs/agent-memory-server | ||||||
| # Development: docker run -p 8000:8000 redislabs/agent-memory-server agent-memory api --host 0.0.0.0 --port 8000 --task-backend=asyncio | ||||||
| # Production API: docker run -p 8000:8000 redislabs/agent-memory-server agent-memory api --host 0.0.0.0 --port 8000 | ||||||
| # Production Worker: docker run redislabs/agent-memory-server agent-memory task-worker --concurrency 10 | ||||||
| CMD ["agent-memory", "api", "--host", "0.0.0.0", "--port", "8000", "--no-worker"] | ||||||
| CMD ["agent-memory", "api", "--host", "0.0.0.0", "--port", "8000"] | ||||||
|
|
||||||
| # ============================================ | ||||||
| # AWS VARIANT - Includes AWS Bedrock support | ||||||
|
|
@@ -142,10 +143,11 @@ HEALTHCHECK --interval=30s --timeout=10s --start-period=40s --retries=3 \ | |||||
| # You may override with DISABLE_AUTH=true in development. | ||||||
| ENV DISABLE_AUTH=false | ||||||
|
|
||||||
| # Default to development mode (no separate worker needed). | ||||||
| # For production, override the command to remove --no-worker and run a separate task-worker container. | ||||||
| # Default to development mode using the API's default backend (Docket). For | ||||||
| # single-process development without a worker, add `--task-backend=asyncio` to | ||||||
| # the api command. | ||||||
| # Examples: | ||||||
| # Development: docker run -p 8000:8000 redislabs/agent-memory-server:aws | ||||||
| # Development: docker run -p 8000:8000 redislabs/agent-memory-server:aws agent-memory api --host 0.0.0.0 --port 8000 --task-backend=asyncio | ||||||
| # Production API: docker run -p 8000:8000 redislabs/agent-memory-server:aws agent-memory api --host 0.0.0.0 --port 8000 | ||||||
| # Production Worker: docker run redislabs/agent-memory-server:aws agent-memory task-worker --concurrency 10 | ||||||
| CMD ["agent-memory", "api", "--host", "0.0.0.0", "--port", "8000", "--no-worker"] | ||||||
| CMD ["agent-memory", "api", "--host", "0.0.0.0", "--port", "8000"] | ||||||
|
||||||
| CMD ["agent-memory", "api", "--host", "0.0.0.0", "--port", "8000"] | |
| CMD ["agent-memory", "api", "--host", "0.0.0.0", "--port", "8000", "--task-backend=asyncio"] |
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -31,10 +31,14 @@ docker-compose up | |||||||||||||
| docker run -p 8000:8000 \ | ||||||||||||||
| -e REDIS_URL=redis://your-redis:6379 \ | ||||||||||||||
| -e OPENAI_API_KEY=your-key \ | ||||||||||||||
| redislabs/agent-memory-server:latest | ||||||||||||||
| redislabs/agent-memory-server:latest \ | ||||||||||||||
| agent-memory api --host 0.0.0.0 --port 8000 --task-backend=asyncio | ||||||||||||||
|
||||||||||||||
| agent-memory api --host 0.0.0.0 --port 8000 --task-backend=asyncio | |
| agent-memory api --host 0.0.0.0 --port 8000 --task-backend asyncio |
Copilot
AI
Dec 11, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment says "Start the server (development mode, default asyncio backend)" but the command uv run agent-memory api does not include --task-backend=asyncio. Since the CLI default for the api command is docket (see cli.py line 281), this will use the Docket backend by default, which requires running a separate worker. This contradicts the expectation set by the comment.
| uv run agent-memory api | |
| uv run agent-memory api --task-backend=asyncio |
Copilot
AI
Dec 11, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The README states "Start the server (development mode, default asyncio backend)" but the actual default for the API command is 'docket' (as shown in cli.py line 281). When running uv run agent-memory api without any flags, it will use the Docket backend and require a separate worker, not the asyncio backend. The comment should either be corrected to say "Start the server (development mode)" or the command should be changed to explicitly use --task-backend asyncio.
| # Start the server (development mode, default asyncio backend) | |
| uv run agent-memory api | |
| # Start the server (development mode, asyncio backend) | |
| uv run agent-memory api --task-backend=asyncio |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,3 @@ | ||
| """Redis Agent Memory Server - A memory system for conversational AI.""" | ||
|
|
||
| __version__ = "0.12.5" | ||
| __version__ = "0.12.6" |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -257,7 +257,7 @@ async def run_migration(): | |||||
| ) | ||||||
| else: | ||||||
| click.echo( | ||||||
| "\nMigration completed with errors. " "Run again to retry failed keys." | ||||||
| "\nMigration completed with errors. Run again to retry failed keys." | ||||||
| ) | ||||||
|
|
||||||
| asyncio.run(run_migration()) | ||||||
|
|
@@ -268,16 +268,41 @@ async def run_migration(): | |||||
| @click.option("--host", default="0.0.0.0", help="Host to run the server on") | ||||||
| @click.option("--reload", is_flag=True, help="Enable auto-reload") | ||||||
| @click.option( | ||||||
| "--no-worker", is_flag=True, help="Use FastAPI background tasks instead of Docket" | ||||||
| "--no-worker", | ||||||
| is_flag=True, | ||||||
| help=( | ||||||
| "(DEPRECATED) Use --task-backend=asyncio instead. " | ||||||
| "If present, force FastAPI/asyncio background tasks instead of Docket." | ||||||
| ), | ||||||
| deprecated=True, | ||||||
| ) | ||||||
| @click.option( | ||||||
| "--task-backend", | ||||||
| default="docket", | ||||||
| type=click.Choice(["asyncio", "docket"]), | ||||||
| help=( | ||||||
| "Background task backend (asyncio, docket). " | ||||||
| "Default is 'docket' to preserve existing behavior using Docket-based " | ||||||
| "workers (requires a running `agent-memory task-worker` for " | ||||||
| "non-blocking background tasks). Use 'asyncio' (or deprecated " | ||||||
| "--no-worker) for single-process development without a worker." | ||||||
| ), | ||||||
| ) | ||||||
| def api(port: int, host: str, reload: bool, no_worker: bool): | ||||||
| def api(port: int, host: str, reload: bool, no_worker: bool, task_backend: str): | ||||||
| """Run the REST API server.""" | ||||||
| from agent_memory_server.main import on_start_logger | ||||||
|
|
||||||
| configure_logging() | ||||||
|
|
||||||
| # Set use_docket based on the --no-worker flag | ||||||
| if no_worker: | ||||||
| # Determine effective backend. | ||||||
| # - Default is 'docket' to preserve prior behavior (Docket workers). | ||||||
| # - --task-backend=asyncio opts into single-process asyncio background tasks. | ||||||
| # - Deprecated --no-worker flag forces asyncio for backward compatibility. | ||||||
| effective_backend = "asyncio" if no_worker else task_backend | ||||||
|
|
||||||
| if effective_backend == "docket": | ||||||
| settings.use_docket = True | ||||||
| else: # "asyncio" | ||||||
| settings.use_docket = False | ||||||
|
|
||||||
| on_start_logger(port) | ||||||
|
|
@@ -298,9 +323,17 @@ def api(port: int, host: str, reload: bool, no_worker: bool): | |||||
| type=click.Choice(["stdio", "sse"]), | ||||||
| ) | ||||||
| @click.option( | ||||||
| "--no-worker", is_flag=True, help="Use FastAPI background tasks instead of Docket" | ||||||
| "--task-backend", | ||||||
| default="asyncio", | ||||||
| type=click.Choice(["asyncio", "docket"]), | ||||||
| help=( | ||||||
| "Background task backend (asyncio, docket). " | ||||||
| "Default is 'asyncio' (no separate worker needed). " | ||||||
| "Use 'docket' for production setups with a running task worker " | ||||||
| "(see `agent-memory task-worker`)." | ||||||
|
||||||
| "(see `agent-memory task-worker`)." | |
| " (see `agent-memory task-worker`)." |
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,8 +1,10 @@ | ||||||||||||||
| import asyncio | ||||||||||||||
| import concurrent.futures | ||||||||||||||
| from collections.abc import Callable | ||||||||||||||
| from typing import Any | ||||||||||||||
|
|
||||||||||||||
| from fastapi import BackgroundTasks | ||||||||||||||
| from starlette.concurrency import run_in_threadpool | ||||||||||||||
|
|
||||||||||||||
| from agent_memory_server.config import settings | ||||||||||||||
| from agent_memory_server.logging import get_logger | ||||||||||||||
|
|
@@ -12,11 +14,26 @@ | |||||||||||||
|
|
||||||||||||||
|
|
||||||||||||||
| class HybridBackgroundTasks(BackgroundTasks): | ||||||||||||||
| """A BackgroundTasks implementation that can use either Docket or FastAPI background tasks.""" | ||||||||||||||
| """A BackgroundTasks implementation that can use either Docket or asyncio tasks. | ||||||||||||||
|
|
||||||||||||||
| When use_docket=True, tasks are scheduled through Docket's Redis-based queue | ||||||||||||||
| for processing by a separate worker process. | ||||||||||||||
|
|
||||||||||||||
| When use_docket=False, tasks are scheduled using asyncio.create_task() to run | ||||||||||||||
| in the current event loop. This works in both FastAPI and MCP contexts, unlike | ||||||||||||||
| the parent class's approach which relies on Starlette's response lifecycle | ||||||||||||||
| (which doesn't exist in MCP's stdio/SSE modes). | ||||||||||||||
| """ | ||||||||||||||
|
|
||||||||||||||
| def add_task(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> None: | ||||||||||||||
| """Run tasks either directly, through Docket, or through FastAPI background tasks""" | ||||||||||||||
| logger.info("Adding task to background tasks...") | ||||||||||||||
| """Schedule a background task for execution. | ||||||||||||||
|
|
||||||||||||||
| Args: | ||||||||||||||
| func: The function to run (can be sync or async) | ||||||||||||||
| *args: Positional arguments to pass to the function | ||||||||||||||
| **kwargs: Keyword arguments to pass to the function | ||||||||||||||
| """ | ||||||||||||||
| logger.info(f"Adding background task: {func.__name__}") | ||||||||||||||
|
|
||||||||||||||
| if settings.use_docket: | ||||||||||||||
| logger.info("Scheduling task through Docket") | ||||||||||||||
|
|
@@ -28,7 +45,6 @@ def add_task(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> None: | |||||||||||||
| # This runs in a thread to avoid event loop conflicts | ||||||||||||||
| def run_in_thread(): | ||||||||||||||
| """Run the async Docket operations in a separate thread""" | ||||||||||||||
| import asyncio | ||||||||||||||
|
|
||||||||||||||
| async def schedule_task(): | ||||||||||||||
| async with Docket( | ||||||||||||||
|
|
@@ -48,9 +64,34 @@ async def schedule_task(): | |||||||||||||
|
|
||||||||||||||
| # When using Docket, we don't add anything to FastAPI background tasks | ||||||||||||||
| else: | ||||||||||||||
| logger.info("Using FastAPI background tasks") | ||||||||||||||
| # Use FastAPI's background tasks directly | ||||||||||||||
| super().add_task(func, *args, **kwargs) | ||||||||||||||
| logger.info("Scheduling task with asyncio.create_task") | ||||||||||||||
| # Use asyncio.create_task to schedule the task in the event loop. | ||||||||||||||
| # This works universally in both FastAPI and MCP contexts. | ||||||||||||||
| # | ||||||||||||||
| # Note: We don't use super().add_task() because Starlette's BackgroundTasks | ||||||||||||||
| # relies on being attached to a response object and run after the response | ||||||||||||||
| # is sent. In MCP mode (stdio/SSE), there's no Starlette response lifecycle, | ||||||||||||||
| # so tasks added via super().add_task() would never execute. | ||||||||||||||
| asyncio.create_task(self._run_task(func, *args, **kwargs)) | ||||||||||||||
|
||||||||||||||
| asyncio.create_task(self._run_task(func, *args, **kwargs)) | |
| try: | |
| asyncio.create_task(self._run_task(func, *args, **kwargs)) | |
| except RuntimeError: | |
| # No running event loop; fallback to ensure_future (may create one) | |
| asyncio.ensure_future(self._run_task(func, *args, **kwargs)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment states "Default to development mode using the API's default backend (Docket)" but this is misleading. The CMD actually runs with Docket backend (which requires a separate worker), not asyncio. This contradicts the README.md line 37 which claims "The default image runs in development mode using the asyncio task backend (no separate worker required)".
Either the CMD should be updated to include
--task-backend=asyncioto match the README's claim, or the comment and README should be updated to clarify that users need to run a separate worker when using the default Docker image.