热门角色不仅是灵感来源,更是你的效率助手。通过精挑细选的角色提示词,你可以快速生成高质量内容、提升创作灵感,并找到最契合你需求的解决方案。让创作更轻松,让价值更直接!
我们根据不同用户需求,持续更新角色库,让你总能找到合适的灵感入口。
本提示词专为Python开发者设计,提供专业的上下文管理器开发指导。通过系统化的分析流程,能够根据具体资源类型和应用场景,生成结构完整、符合Python最佳实践的上下文管理器代码。提示词采用分步设计方法,涵盖需求分析、异常处理、资源管理、性能优化等关键环节,确保生成的代码具备生产环境可用性,同时提供详细的技术原理说明和使用示例,帮助开发者深入理解上下文管理器的工作机制和实现细节。
以下方案面向“数据库连接池-事务上下文”的资源管理需求,强调高可靠的资源获取与归还、事务自动提交/回滚、语句级超时、可配置隔离级别以及可观测性。代码遵循Python最佳实践、具备完善异常处理,并提供重试策略的配套执行器(对事务体进行重试需要外部函数式封装,而不是仅依赖with语法)。
资源类型与关键挑战
线程安全与性能
注:示例默认面向 PostgreSQL(psycopg2/psycopg),但接口层尽量DB-API 2.0泛化;对于其他数据库可替换判定器与设置语句。
from __future__ import annotations
import logging
import time
import random
from dataclasses import dataclass
from enum import Enum
from typing import (
Any,
Callable,
ContextManager,
Iterable,
Optional,
Protocol,
Sequence,
Tuple,
Type,
TypeVar,
Union,
runtime_checkable,
Generic,
List,
Set,
)
logger = logging.getLogger(__name__)
T = TypeVar("T")
# ---------- DB-API minimal protocols ----------
@runtime_checkable
class DBCursor(Protocol):
def execute(self, operation: str, params: Optional[Union[Sequence[Any], dict[str, Any]]] = None) -> Any: ...
def close(self) -> None: ...
@property
def rowcount(self) -> int: ...
def fetchone(self) -> Any: ...
def fetchall(self) -> list[Any]: ...
@runtime_checkable
class DBConnection(Protocol):
def cursor(self) -> DBCursor: ...
def commit(self) -> None: ...
def rollback(self) -> None: ...
# autocommit 为可选属性,部分驱动支持(如 psycopg2/3)
autocommit: Any # type: ignore[attr-defined]
# ---------- Pool adapter protocol & implementations ----------
class PoolError(Exception):
"""Raised when the pool cannot provide a connection within the configured constraints."""
@runtime_checkable
class PoolAdapter(Protocol):
"""
Adapter over a connection pool, providing acquire/release with optional timeout.
Implementations must not leak connections: after acquire success, release(conn) must be callable.
"""
def acquire(self, timeout: Optional[float] = None) -> DBConnection: ...
def release(self, conn: DBConnection) -> None: ...
class QueuePoolAdapter(PoolAdapter):
"""
Wraps a queue.Queue-like pool (get(timeout=...), put(conn)) into PoolAdapter.
"""
def __init__(self, queue_obj: Any):
self._q = queue_obj
def acquire(self, timeout: Optional[float] = None) -> DBConnection:
import queue
try:
return self._q.get(timeout=timeout) # type: ignore[no-any-return]
except queue.Empty as e:
raise PoolError("Acquire connection timeout from queue pool") from e
def release(self, conn: DBConnection) -> None:
self._q.put(conn)
class Psycopg2PoolAdapter(PoolAdapter):
"""
Wraps psycopg2.pool.ThreadedConnectionPool-like object.
Behavior:
- psycopg2 pool doesn't offer native timeout; this adapter polls until deadline.
- When pool is exhausted, getconn() may raise, we retry until timeout.
"""
def __init__(self, psycopg2_pool: Any, poll_interval: float = 0.05):
self._pool = psycopg2_pool
self._poll_interval = poll_interval
def acquire(self, timeout: Optional[float] = None) -> DBConnection:
deadline = time.monotonic() + timeout if timeout and timeout > 0 else None
last_exc: Optional[BaseException] = None
while True:
try:
conn = self._pool.getconn()
if conn is None:
raise PoolError("psycopg2 pool returned None")
return conn
except Exception as e: # Pool exhausted or other issues
last_exc = e
if deadline is not None and time.monotonic() >= deadline:
raise PoolError("Acquire connection timeout from psycopg2 pool") from last_exc
time.sleep(self._poll_interval)
def release(self, conn: DBConnection) -> None:
self._pool.putconn(conn)
# ---------- Isolation levels ----------
class IsolationLevel(str, Enum):
READ_COMMITTED = "READ COMMITTED"
REPEATABLE_READ = "REPEATABLE READ"
SERIALIZABLE = "SERIALIZABLE"
# READ UNCOMMITTED 对PostgreSQL等同 READ COMMITTED;如需支持可自行映射
# READ_UNCOMMITTED = "READ UNCOMMITTED"
# ---------- Retry policy ----------
@dataclass(frozen=True)
class RetryPolicy:
max_attempts: int = 3
initial_backoff: float = 0.05 # seconds
max_backoff: float = 0.8 # seconds
jitter: float = 0.1 # +/- randomization
deadline: Optional[float] = None # absolute seconds since epoch or None
retriable_predicate: Callable[[BaseException], bool] = lambda e: False
def compute_backoff(self, attempt: int) -> float:
# Exponential backoff with jitter
base = min(self.max_backoff, self.initial_backoff * (2 ** (attempt - 1)))
if self.jitter > 0:
delta = random.uniform(-self.jitter, self.jitter) * base
return max(0.0, base + delta)
return base
# ---------- Session wrapper to track cursors ----------
class TransactionSession:
"""
A helper that wraps a DB-API connection within a transaction.
Tracks cursors created via this object and closes them on context exit.
"""
def __init__(self, conn: DBConnection, name: Optional[str] = None):
self._conn = conn
self._cursors: Set[DBCursor] = set()
self.name = name
@property
def connection(self) -> DBConnection:
return self._conn
def cursor(self) -> DBCursor:
cur = self._conn.cursor()
self._cursors.add(cur)
return cur
def execute(self, sql: str, params: Optional[Union[Sequence[Any], dict[str, Any]]] = None) -> DBCursor:
cur = self.cursor()
cur.execute(sql, params)
return cur
def _close_all_cursors(self) -> None:
for cur in list(self._cursors):
try:
cur.close()
except Exception: # log but don't raise
logger.warning("Failed to close cursor for session %s", self.name, exc_info=True)
finally:
self._cursors.discard(cur)
# ---------- Transaction context manager ----------
class TransactionContext(ContextManager[TransactionSession]):
"""
Context manager for a single database transaction from a pool.
Features:
- Acquire connection with optional timeout and release it safely.
- BEGIN transaction with configurable isolation level and read_only flag.
- SET LOCAL statement_timeout / lock_timeout if provided.
- Commit on success; rollback on failure, without masking original exception.
- Tracks cursors created via session and closes them at exit.
- Restores original autocommit if supported by the driver.
Note: This is a single-attempt transaction manager.
For retries, use run_in_transaction() which re-invokes the body per attempt.
"""
def __init__(
self,
pool: PoolAdapter,
*,
acquire_timeout: Optional[float] = None,
isolation_level: IsolationLevel = IsolationLevel.REPEATABLE_READ,
read_only: bool = False,
statement_timeout_ms: Optional[int] = None,
lock_timeout_ms: Optional[int] = None,
idle_in_txn_session_timeout_ms: Optional[int] = None,
name: Optional[str] = None,
on_event: Optional[Callable[[str, dict[str, Any]], None]] = None,
) -> None:
self._pool = pool
self._acquire_timeout = acquire_timeout
self._isolation_level = isolation_level
self._read_only = read_only
self._statement_timeout_ms = statement_timeout_ms
self._lock_timeout_ms = lock_timeout_ms
self._idle_in_txn_session_timeout_ms = idle_in_txn_session_timeout_ms
self._name = name or "txn"
self._on_event = on_event
self._conn: Optional[DBConnection] = None
self._sess: Optional[TransactionSession] = None
self._old_autocommit: Optional[bool] = None
self._start_ts: Optional[float] = None
def __enter__(self) -> TransactionSession:
self._start_ts = time.monotonic()
self._emit("acquire.start", {"name": self._name})
conn = self._pool.acquire(timeout=self._acquire_timeout)
self._conn = conn
self._emit("acquire.ok", {"name": self._name})
# autocommit handling (best-effort; may not exist in some drivers)
self._old_autocommit = getattr(conn, "autocommit", None)
try:
if self._old_autocommit is not None:
try:
# Ensure explicit transaction boundaries
conn.autocommit = False # type: ignore[assignment]
except Exception:
# If driver doesn't support, ignore but log
logger.debug("Driver does not support autocommit property change.", exc_info=True)
# Begin transaction explicitly for clarity
cur = conn.cursor()
try:
# Set transaction characteristics
# Isolation level
cur.execute(f"BEGIN")
cur.execute(f"SET TRANSACTION ISOLATION LEVEL {self._isolation_level.value}")
# Read-only / read-write
cur.execute("SET TRANSACTION READ ONLY" if self._read_only else "SET TRANSACTION READ WRITE")
# Transaction-local timeouts (PostgreSQL)
if self._statement_timeout_ms is not None:
cur.execute("SET LOCAL statement_timeout = %s", (self._statement_timeout_ms,))
if self._lock_timeout_ms is not None:
cur.execute("SET LOCAL lock_timeout = %s", (self._lock_timeout_ms,))
if self._idle_in_txn_session_timeout_ms is not None:
cur.execute("SET LOCAL idle_in_transaction_session_timeout = %s",
(self._idle_in_txn_session_timeout_ms,))
finally:
try:
cur.close()
except Exception:
logger.warning("Failed to close internal cursor when starting transaction.", exc_info=True)
self._sess = TransactionSession(conn, name=self._name)
self._emit("begin.ok", {"name": self._name, "isolation": self._isolation_level.value,
"read_only": self._read_only})
return self._sess
except Exception:
# If any failure before returning session, ensure connection is returned
self._safe_release()
self._emit("begin.error", {"name": self._name}, level="error")
raise
def __exit__(self, exc_type, exc, tb) -> bool:
assert self._conn is not None, "Invariant: conn must exist on __exit__"
# Always close user cursors created via session
if self._sess is not None:
self._sess._close_all_cursors()
commit_ok = False
try:
if exc_type is None:
# commit path
self._emit("commit.start", {"name": self._name})
try:
self._conn.commit()
commit_ok = True
self._emit("commit.ok", {"name": self._name})
except Exception as commit_exc:
# Try rollback to clean connection state, but do not mask commit exception
self._emit("commit.error", {"name": self._name, "error": repr(commit_exc)}, level="error")
try:
self._conn.rollback()
except Exception:
logger.error("Rollback after commit failure also failed.", exc_info=True)
raise
else:
# rollback path
self._emit("rollback.start", {"name": self._name, "exc": repr(exc)})
try:
self._conn.rollback()
self._emit("rollback.ok", {"name": self._name})
except Exception:
# Must not mask the original exception from the with-body
self._emit("rollback.error", {"name": self._name}, level="error")
logger.error("Rollback failed; original exception will be re-raised.", exc_info=True)
# return False -> re-raise original exception
return False
finally:
# Restore autocommit if supported
try:
if self._old_autocommit is not None:
self._conn.autocommit = self._old_autocommit # type: ignore[assignment]
except Exception:
logger.debug("Failed to restore autocommit.", exc_info=True)
# Release connection back to pool in any case
self._safe_release()
duration = (time.monotonic() - self._start_ts) if self._start_ts else None
self._emit("finish", {"name": self._name, "duration_s": duration, "commit_ok": commit_ok})
# On commit path, returning False will propagate any commit exception (already raised), else True.
return True
def _safe_release(self) -> None:
if self._conn is not None:
try:
self._pool.release(self._conn)
except Exception:
# Pool release should never raise to callers; just log.
logger.error("Failed to release connection to pool.", exc_info=True)
finally:
self._conn = None
def _emit(self, event: str, payload: dict[str, Any], level: str = "debug") -> None:
payload = dict(payload)
payload["event"] = event
if self._on_event:
try:
self._on_event(event, payload)
except Exception:
logger.debug("on_event hook failed.", exc_info=True)
# Fallback logging
if level == "error":
logger.error("%s | %s", event, payload)
elif level == "info":
logger.info("%s | %s", event, payload)
else:
logger.debug("%s | %s", event, payload)
# ---------- Retriable predicate helpers (PostgreSQL) ----------
def is_postgres_retriable(exc: BaseException) -> bool:
"""
Try to detect serialization/deadlock errors in a driver-agnostic way for PostgreSQL.
- psycopg2/3 exceptions may expose pgcode / sqlstate or specific classes.
- We inspect common attributes and class names as a last resort.
"""
# Check SQLSTATE if available
sqlstate = getattr(exc, "pgcode", None) or getattr(exc, "sqlstate", None)
if isinstance(sqlstate, str) and sqlstate in ("40001", "40P01"): # serialization_failure, deadlock_detected
return True
# psycopg3 exceptions may live in psycopg.errors with specific types
cls_name = exc.__class__.__name__
if cls_name in ("SerializationFailure", "DeadlockDetected"):
return True
# Fall back: heuristic on message (avoid relying on message ideally)
msg = str(exc).lower()
if "deadlock detected" in msg or "could not serialize access" in msg:
return True
return False
# ---------- Transaction executor with retry ----------
def run_in_transaction(
pool: PoolAdapter,
fn: Callable[[TransactionSession], T],
*,
isolation_level: IsolationLevel = IsolationLevel.REPEATABLE_READ,
read_only: bool = False,
statement_timeout_ms: Optional[int] = None,
lock_timeout_ms: Optional[int] = None,
idle_in_txn_session_timeout_ms: Optional[int] = None,
acquire_timeout: Optional[float] = None,
retry: RetryPolicy = RetryPolicy(max_attempts=3, retriable_predicate=is_postgres_retriable),
name: Optional[str] = None,
on_event: Optional[Callable[[str, dict[str, Any]], None]] = None,
) -> T:
"""
Execute fn(session) within a transaction, retrying on retriable errors as per policy.
Returns:
The return value of fn from the successful attempt.
Raises:
The last exception if all retries fail or non-retriable error occurs.
"""
attempt = 0
last_exc: Optional[BaseException] = None
start_wall = time.time()
while True:
attempt += 1
if retry.deadline is not None and time.time() > retry.deadline:
# Deadline exceeded before starting attempt
if last_exc:
raise last_exc
raise TimeoutError("Retry deadline exceeded before starting transaction attempt")
attempt_name = f"{name or 'txn'}#attempt{attempt}"
try:
with TransactionContext(
pool,
acquire_timeout=acquire_timeout,
isolation_level=isolation_level,
read_only=read_only,
statement_timeout_ms=statement_timeout_ms,
lock_timeout_ms=lock_timeout_ms,
idle_in_txn_session_timeout_ms=idle_in_txn_session_timeout_ms,
name=attempt_name,
on_event=on_event,
) as sess:
return fn(sess)
except BaseException as e:
last_exc = e
# Non-retriable or attempts exhausted?
should_retry = retry.retriable_predicate(e)
if not should_retry or attempt >= retry.max_attempts:
# Include timing info in logs
elapsed = time.time() - start_wall
logger.error("Transaction failed after %d attempt(s), elapsed=%.3fs", attempt, elapsed, exc_info=True)
raise
# Backoff and continue
backoff = retry.compute_backoff(attempt)
if retry.deadline is not None and time.time() + backoff > retry.deadline:
logger.warning("Backoff would exceed retry deadline; aborting retries.", exc_info=True)
raise
logger.info("Retriable transaction error on attempt %d; backing off %.3fs: %r", attempt, backoff, e)
time.sleep(backoff)
continue
import psycopg2
from psycopg2.pool import ThreadedConnectionPool
pool = ThreadedConnectionPool(minconn=5, maxconn=20, dsn="postgresql://user:pwd@host:5432/db")
adapter = Psycopg2PoolAdapter(pool)
def create_user(sess: TransactionSession, uid: int, name: str) -> None:
sess.execute("INSERT INTO users(id, name) VALUES (%s, %s)", (uid, name))
with TransactionContext(
adapter,
isolation_level=IsolationLevel.REPEATABLE_READ,
statement_timeout_ms=1500,
acquire_timeout=2.0,
name="create_user_txn",
) as session:
create_user(session, 1001, "Alice")
示例2:带重试策略的订单创建(多表写入 + outbox,确保一致性)
from typing import Any
def create_order_txn(sess: TransactionSession, payload: dict[str, Any]) -> int:
# 写入订单主表
cur = sess.execute(
"INSERT INTO orders(user_id, total_amount, status) VALUES (%s, %s, %s) RETURNING id",
(payload["user_id"], payload["total_amount"], "CREATED")
)
order_id = cur.fetchone()[0]
# 写入订单项
for item in payload["items"]:
sess.execute(
"INSERT INTO order_items(order_id, sku, qty, price) VALUES (%s, %s, %s, %s)",
(order_id, item["sku"], item["qty"], item["price"])
)
# 写入 outbox 消息(事务内)
sess.execute(
"INSERT INTO outbox(aggregate, aggregate_id, event_type, payload) VALUES (%s, %s, %s, %s)",
("order", order_id, "OrderCreated", payload)
)
return order_id
retry_policy = RetryPolicy(
max_attempts=5,
initial_backoff=0.05,
max_backoff=0.5,
jitter=0.2,
retriable_predicate=is_postgres_retriable,
)
order_id = run_in_transaction(
adapter,
lambda s: create_order_txn(s, {
"user_id": 42,
"total_amount": 399.0,
"items": [{"sku": "A100", "qty": 1, "price": 199.0}, {"sku": "B200", "qty": 2, "price": 100.0}],
}),
isolation_level=IsolationLevel.REPEATABLE_READ,
statement_timeout_ms=2000,
acquire_timeout=2.0,
retry=retry_policy,
name="create_order",
)
print("order_id=", order_id)
# 提交成功后,异步进程(或后台任务)扫描 outbox 表并发布到消息队列,保证最终一致性与可观测性。
示例3:只读查询 + 每语句超时
def read_report(sess: TransactionSession) -> list[tuple]:
cur = sess.execute("SELECT id, total_amount FROM orders WHERE status = %s", ("CREATED",))
return cur.fetchall()
result = run_in_transaction(
adapter,
read_report,
isolation_level=IsolationLevel.REPEATABLE_READ,
read_only=True,
statement_timeout_ms=500, # 语句级超时
acquire_timeout=1.0,
retry=RetryPolicy(max_attempts=2, retriable_predicate=is_postgres_retriable),
name="report_ro",
)
如需适配特定驱动(psycopg3、mysqlclient、aiomysql/asyncpg 等)或提供异步版本(async with),我可以在上述设计基础上给出对应的实现与最佳实践。
让AI以“Python上下文管理器设计专家”的身份,面向文件读写、数据库连接、网络请求、事务控制、并发锁等高频场景,自动产出一套从需求分析→方案设计→代码实现→测试要点→优化建议的闭环交付。通过一次清晰输入,获得可直接落地的上下文管理器代码与示例,快速统一团队资源管理规范,减少隐性缺陷与线上事故,缩短评审周期,提升交付速度与质量,帮助个人开发者与团队更高效地构建稳定、可维护、可扩展的Python工程能力。
快速为数据库连接、文件读写、缓存操作生成可复用上下文,统一异常与清理策略,减少泄漏与超时,缩短交付周期。
为数据导入导出、临时文件与外部接口封装安全上下文,失败自动清理,保障训练与ETL稳定可复现。
为运维脚本与发布流程包装临时权限、锁与回滚,一键生成模板在团队推广,显著降低人为失误。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
半价获取高级提示词-优惠即将到期