API Reference
DBConnect
DBConnect is responsible for managing the engine and the session_maker. You need to define two factories. Optionally, you can specify a host to connect to. You can also specify a handler that runs before a session is created - this handler can be used to connect to the host for the first time or to reconnect to a new one.
init
def __init__(
self: DBConnect,
engine_creator: EngineCreatorFunc,
session_maker_creator: SessionMakerCreatorFunc,
host: str | None = None,
before_create_session_handler: AsyncFunc | None = None,
) -> None:
engine_creator is a factory function for creating engines.
It’s an asynchronous callable that takes a host as input and returns
an async engine.
session_maker_creator is a factory function for creating an asynchronous
session_maker.
It’s an asynchronous callable that takes an async engine as input and returns
an async session_maker.
host is an optional parameter.
You can specify only this parameter to make your connection always work with
a single host, without dynamic switching.
However, you can still change the host in the handler if needed - it won’t
cause any issues.
before_create_session_handler is a handler that allows you to execute
custom logic before creating a session.
For example, you can switch the host to another one - this is useful for
dynamically determining the master if the previous master fails and a
replica takes its place.
The handler is an asynchronous callable that takes a DBConnect instance as input and returns nothing.
Example:
async def renew_master_connect(connect: DBConnect) -> None:
"""Updates the host if the master has changed"""
master_host = await get_master()
if master_host != connect.host:
await connect.change_host(master_host)
connect
async def connect(self: DBConnect, host: str) -> None:
Establishes a connection to the specified host. This method doesn’t need to be called explicitly. If it isn’t called, the first session request will automatically establish the connection.
change_host
async def change_host(self: DBConnect, host: str) -> None:
Establishes a connection to the specified host, but first checks under a lock that the currently connected host is different from the target host.
create_session
async def create_session(self: DBConnect) -> AsyncSession:
Creates a new session. Used internally by the library - you’ll probably never need to call it directly, but it’s good to know it exists.
session_maker
async def session_maker(self: DBConnect) -> async_sessionmaker[AsyncSession]:
Provides access to the session_maker currently used to create sessions.
close
async def close(self: DBConnect) -> None:
Completely closes and cleans up all resources, freeing the connection pool. This should be called at the end of your application’s lifecycle.
Middlewares
Most of the work - and the “magic” - happens inside the middleware.
You can check out how it works and implement your own.
Fastapi
from context_async_sqlalchemy.fastapi_utils import (
fastapi_http_db_session_middleware,
add_fastapi_http_db_session_middleware,
)
app = FastAPI(...)
add_fastapi_http_db_session_middleware(app)
# OR
app.add_middleware(
BaseHTTPMiddleware, dispatch=fastapi_http_db_session_middleware
)
Starlette
from context_async_sqlalchemy.starlette_utils import (
add_starlette_http_db_session_middleware,
starlette_http_db_session_middleware,
StarletteHTTPDBSessionMiddleware,
)
app = Starlette(...)
add_starlette_http_db_session_middleware(app)
# OR
app.add_middleware(
BaseHTTPMiddleware, dispatch=starlette_http_db_session_middleware
)
# OR
app.add_middleware(StarletteHTTPDBSessionMiddleware)
Pure ASGI
from context_async_sqlalchemy import (
ASGIHTTPDBSessionMiddleware,
)
app = SomeASGIApp(...)
app.add_middleware(ASGIHTTPDBSessionMiddleware)
Sessions
Here are the functions you’ll use most often from the library. They allow you to work with sessions directly from your asynchronous code.
db_session
async def db_session(connect: DBConnect) -> AsyncSession:
The most important function for obtaining a session in your code. When called for the first time, it returns a new session; subsequent calls return the same session.
atomic_db_session
@asynccontextmanager
async def atomic_db_session(
connect: DBConnect,
current_transaction: Literal["commit", "rollback", "append", "raise"] = "commit",
) -> AsyncGenerator[AsyncSession, None]:
A context manager that can be used to wrap another function which uses a context session, making that call isolated within its own transaction.
There are several options that define how the function will handle an already open transaction.
current_transaction:
commit- commits the open transaction and starts a new onerollback- rolls back the open transaction and starts a new oneappend- continues using the current transaction and commits itraise- raises an InvalidRequestError
commit_db_session
async def commit_db_session(connect: DBConnect) -> None:
Commits the active session, if there is one.
rollback_db_session
async def rollback_db_session(connect: DBConnect) -> None:
Rollbacks the active session, if there is one.
close_db_session
async def close_db_session(connect: DBConnect) -> None:
Closes the current context session. The connection is returned to the pool. If you close an uncommitted transaction, the connection will be rolled back.
This is useful if, for example, at the beginning of the handle a database query is needed, and then there is some other long-term work and you don't want to keep the connection opened.
new_non_ctx_session
@asynccontextmanager
async def new_non_ctx_session(
connect: DBConnect,
) -> AsyncGenerator[AsyncSession, None]:
A context manager that allows you to create a new session without placing it in a context. It's used for manual session management when you don't want to use a context.
new_non_ctx_atomic_session
@asynccontextmanager
async def new_non_ctx_atomic_session(
connect: DBConnect,
) -> AsyncGenerator[AsyncSession, None]:
A context manager that allows you to create a new session with a new transaction that isn't placed in a context. It's used for manual session management when you don't want to use a context.
Context
run_in_new_ctx
async def run_in_new_ctx(
callable_func: AsyncCallable[AsyncCallableResult],
*args: Any,
**kwargs: Any,
) -> AsyncCallableResult:
Runs a function in a new context with new sessions that have their own connection.
It will commit the transaction automatically if callable_func does not raise exceptions. Otherwise, the transaction will be rolled back.
The intended use is to run multiple database queries concurrently.
example of use:
await asyncio.gather(
your_function_with_db_session(...),
run_in_new_ctx(
your_function_with_db_session, some_arg, some_kwarg=123,
),
run_in_new_ctx(your_function_with_db_session, ...),
)
Testing
You can read more about testing here: Testing
rollback_session
@asynccontextmanager
async def rollback_session(
connection: DBConnect,
) -> AsyncGenerator[AsyncSession, None]:
A context manager that creates a session which is automatically rolled back at the end. It’s intended for use in fixtures to execute SQL queries during tests.
set_test_context
@asynccontextmanager
async def set_test_context(auto_close: bool = False) -> AsyncGenerator[None, None]:
A context manager that creates a new context in which you can place a dedicated test session. It’s intended for use in tests where the test and the application share a single transaction.
Use auto_close=False if you’re using a test session and transaction
that you close elsewhere in your code.
Use auto_close=True if you want to call a function
in a test that uses a context while the middleware is not
active, and you want all sessions to be closed automatically.
put_savepoint_session_in_ctx
async def put_savepoint_session_in_ctx(
connection: DBConnect,
session: AsyncSession,
) -> AsyncGenerator[None, None]:
Sets the context to a session that uses a save point instead of creating a transaction. You need to pass the session you're using inside your tests to attach a new session to the same connection.
It is important to use this function inside set_test_context.