热门角色不仅是灵感来源,更是你的效率助手。通过精挑细选的角色提示词,你可以快速生成高质量内容、提升创作灵感,并找到最契合你需求的解决方案。让创作更轻松,让价值更直接!
我们根据不同用户需求,持续更新角色库,让你总能找到合适的灵感入口。
本提示词专为Python开发场景设计,能够根据用户指定的对象或概念,生成结构完整、符合Python编程规范的类模板。提示词采用分步设计流程,确保输出的类模板包含必要的初始化方法、属性定义、常用方法以及文档字符串,同时遵循面向对象编程的最佳实践。适用于各类Python项目开发、教学演示和代码重构等场景,帮助开发者快速构建高质量的类结构。
import json
import logging
import os
import time
from collections import deque
from dataclasses import dataclass
from typing import (
Any,
Callable,
Deque,
Dict,
Iterable,
Iterator,
Mapping,
MutableMapping,
Optional,
Tuple,
Union,
)
from urllib.parse import urljoin
import requests
from requests import Response, Session
from requests.exceptions import ConnectionError, Timeout
@dataclass(frozen=True)
class UnifiedResponse:
"""Unified HTTP response model.
Attributes:
status_code: HTTP status code.
url: Final URL of the response.
method: HTTP method used for the request.
headers: Response headers.
data: Parsed JSON object if available; otherwise, None.
text: Text body if available and not JSON; otherwise, None.
content: Raw bytes content if streaming or non-text; otherwise, None.
elapsed: Elapsed time in seconds for the request.
request_id: Request ID extracted from headers if present.
"""
status_code: int
url: str
method: str
headers: Mapping[str, str]
data: Optional[Any]
text: Optional[str]
content: Optional[bytes]
elapsed: float
request_id: Optional[str]
class RestClientError(Exception):
"""Base exception for REST client errors."""
def __init__(
self,
message: str,
*,
status_code: Optional[int] = None,
response: Optional[UnifiedResponse] = None,
) -> None:
super().__init__(message)
self.status_code = status_code
self.response = response
class BadRequestError(RestClientError):
"""400 Bad Request."""
class UnauthorizedError(RestClientError):
"""401 Unauthorized."""
class ForbiddenError(RestClientError):
"""403 Forbidden."""
class NotFoundError(RestClientError):
"""404 Not Found."""
class ConflictError(RestClientError):
"""409 Conflict."""
class UnprocessableEntityError(RestClientError):
"""422 Unprocessable Entity."""
class TooManyRequestsError(RestClientError):
"""429 Too Many Requests."""
class ServerError(RestClientError):
"""5xx Server Error."""
class HTTPError(RestClientError):
"""Generic HTTP error."""
class RequestSendError(RestClientError):
"""Network or transport-level error when sending a request."""
class RestClient:
"""A generic REST client with retries, rate limiting, and structured logging.
This client provides:
- Session management with a base URL and default timeout.
- Bearer token authentication (static token or provider callable).
- Exponential backoff retries for network and retryable HTTP errors.
- Optional rate limiting (sliding window).
- JSON serialization and unified response parsing.
- Pagination helpers.
- Structured logging with contextual fields.
- File upload and streaming download helpers.
Args:
base_url: Base URL for all requests. Paths are joined via urljoin.
token: Static bearer token for Authorization header. Ignored if
token_provider is provided.
token_provider: Callable returning a bearer token string. Called per
request to populate Authorization header.
timeout: Default timeout in seconds for requests.
max_retries: Maximum number of retry attempts on retryable errors.
Total attempts will be max_retries + 1 including the initial try.
backoff_factor: Base factor for exponential backoff (seconds).
backoff_max: Maximum backoff sleep duration per attempt (seconds).
retry_on_status: Iterable of HTTP status codes to retry on. Defaults to
{429} and 5xx.
rate_limit: Optional rate limiting tuple (max_requests, per_seconds).
Example: (10, 1.0) allows 10 requests per 1 second.
session: Optional pre-configured requests.Session.
default_headers: Default headers added to every request.
logger: Optional logger; defaults to `logging.getLogger(__name__)`.
Attributes:
base_url: Base URL joined with request paths.
timeout: Default timeout in seconds.
max_retries: Configured maximum retry attempts.
backoff_factor: Backoff factor for exponential backoff.
backoff_max: Cap for backoff sleep per attempt.
retry_on_status: Set of status codes that will be retried.
rate_limit: Configured rate limit tuple or None.
session: Underlying requests.Session.
logger: Logger used for structured logs.
"""
def __init__(
self,
base_url: str,
*,
token: Optional[str] = None,
token_provider: Optional[Callable[[], str]] = None,
timeout: float = 30.0,
max_retries: int = 3,
backoff_factor: float = 0.5,
backoff_max: float = 60.0,
retry_on_status: Optional[Iterable[int]] = None,
rate_limit: Optional[Tuple[int, float]] = None,
session: Optional[Session] = None,
default_headers: Optional[Mapping[str, str]] = None,
logger: Optional[logging.Logger] = None,
) -> None:
self.base_url = base_url.rstrip("/") + "/"
self._token = token
self._token_provider = token_provider
self.timeout = timeout
self.max_retries = max(0, int(max_retries))
self.backoff_factor = max(0.0, float(backoff_factor))
self.backoff_max = max(0.0, float(backoff_max))
default_retry = {429}
self.retry_on_status = set(retry_on_status or default_retry)
self.rate_limit = rate_limit
self._rate_window: Deque[float] = deque(maxlen=rate_limit[0]) if rate_limit else deque()
self.session = session or requests.Session()
self.default_headers: Dict[str, str] = dict(default_headers or {})
self.logger = logger or logging.getLogger(__name__)
# Default headers
if "Accept" not in self.default_headers:
self.default_headers["Accept"] = "application/json"
if "User-Agent" not in self.default_headers:
self.default_headers["User-Agent"] = "RestClient/1.0"
# Common JSON header for write methods; GET should not force Content-Type
# but we add per-request if json payload is provided.
# -------------- Public API --------------
def set_token(self, token: Optional[str]) -> None:
"""Set or update the static bearer token.
Args:
token: Bearer token string or None to clear.
"""
self._token = token
def set_token_provider(
self, provider: Optional[Callable[[], str]]
) -> None:
"""Set or update the token provider callable.
Args:
provider: Callable returning a bearer token string, or None to clear.
"""
self._token_provider = provider
def request(
self,
method: str,
path: str,
*,
params: Optional[Mapping[str, Any]] = None,
headers: Optional[Mapping[str, str]] = None,
json_body: Optional[Any] = None,
data: Optional[Union[Mapping[str, Any], bytes]] = None,
files: Optional[Mapping[str, Any]] = None,
timeout: Optional[float] = None,
stream: bool = False,
allow_redirects: bool = True,
expected_status: Optional[Iterable[int]] = None,
) -> UnifiedResponse:
"""Perform an HTTP request.
Args:
method: HTTP method (GET, POST, PUT, DELETE, etc.).
path: Relative or absolute URL path.
params: Query string parameters.
headers: Additional request headers.
json_body: JSON-serializable payload set as request body.
data: Raw or form data payload.
files: Files for multipart upload (requests-compatible format).
timeout: Per-request timeout override in seconds.
stream: Whether to stream the response.
allow_redirects: Whether to follow redirects.
expected_status: Optional iterable of acceptable status codes.
If provided, responses not in this set raise an error.
Returns:
UnifiedResponse: Parsed response object.
Raises:
RestClientError: On HTTP or transport errors.
"""
return self._request(
method=method,
path=path,
params=params,
headers=headers,
json_body=json_body,
data=data,
files=files,
timeout=timeout,
stream=stream,
allow_redirects=allow_redirects,
expected_status=set(expected_status) if expected_status else None,
)
def get(
self,
path: str,
*,
params: Optional[Mapping[str, Any]] = None,
headers: Optional[Mapping[str, str]] = None,
timeout: Optional[float] = None,
stream: bool = False,
expected_status: Optional[Iterable[int]] = None,
) -> UnifiedResponse:
"""HTTP GET request.
Args:
path: URL path.
params: Query parameters.
headers: Additional headers.
timeout: Timeout override in seconds.
stream: Stream response.
expected_status: Acceptable status codes.
Returns:
UnifiedResponse: Parsed response.
"""
return self.request(
"GET",
path,
params=params,
headers=headers,
timeout=timeout,
stream=stream,
expected_status=expected_status,
)
def post(
self,
path: str,
*,
params: Optional[Mapping[str, Any]] = None,
headers: Optional[Mapping[str, str]] = None,
json_body: Optional[Any] = None,
data: Optional[Union[Mapping[str, Any], bytes]] = None,
files: Optional[Mapping[str, Any]] = None,
timeout: Optional[float] = None,
expected_status: Optional[Iterable[int]] = None,
) -> UnifiedResponse:
"""HTTP POST request.
Args:
path: URL path.
params: Query parameters.
headers: Additional headers.
json_body: JSON payload.
data: Raw or form data payload.
files: Files for multipart upload.
timeout: Timeout override in seconds.
expected_status: Acceptable status codes.
Returns:
UnifiedResponse: Parsed response.
"""
return self.request(
"POST",
path,
params=params,
headers=headers,
json_body=json_body,
data=data,
files=files,
timeout=timeout,
expected_status=expected_status,
)
def put(
self,
path: str,
*,
params: Optional[Mapping[str, Any]] = None,
headers: Optional[Mapping[str, str]] = None,
json_body: Optional[Any] = None,
data: Optional[Union[Mapping[str, Any], bytes]] = None,
timeout: Optional[float] = None,
expected_status: Optional[Iterable[int]] = None,
) -> UnifiedResponse:
"""HTTP PUT request.
Args:
path: URL path.
params: Query parameters.
headers: Additional headers.
json_body: JSON payload.
data: Raw or form data payload.
timeout: Timeout override in seconds.
expected_status: Acceptable status codes.
Returns:
UnifiedResponse: Parsed response.
"""
return self.request(
"PUT",
path,
params=params,
headers=headers,
json_body=json_body,
data=data,
timeout=timeout,
expected_status=expected_status,
)
def delete(
self,
path: str,
*,
params: Optional[Mapping[str, Any]] = None,
headers: Optional[Mapping[str, str]] = None,
timeout: Optional[float] = None,
expected_status: Optional[Iterable[int]] = None,
) -> UnifiedResponse:
"""HTTP DELETE request.
Args:
path: URL path.
params: Query parameters.
headers: Additional headers.
timeout: Timeout override in seconds.
expected_status: Acceptable status codes.
Returns:
UnifiedResponse: Parsed response.
"""
return self.request(
"DELETE",
path,
params=params,
headers=headers,
timeout=timeout,
expected_status=expected_status,
)
def upload(
self,
path: str,
*,
files: Mapping[str, Any],
data: Optional[Mapping[str, Any]] = None,
params: Optional[Mapping[str, Any]] = None,
headers: Optional[Mapping[str, str]] = None,
timeout: Optional[float] = None,
expected_status: Optional[Iterable[int]] = None,
) -> UnifiedResponse:
"""Upload files using multipart/form-data.
Args:
path: URL path.
files: Files mapping for multipart upload, compatible with requests.
data: Optional form fields.
params: Query parameters.
headers: Additional headers.
timeout: Timeout override in seconds.
expected_status: Acceptable status codes.
Returns:
UnifiedResponse: Parsed response.
"""
return self.post(
path,
params=params,
headers=headers,
data=data,
files=files,
timeout=timeout,
expected_status=expected_status,
)
def download(
self,
path: str,
*,
dest_path: str,
params: Optional[Mapping[str, Any]] = None,
headers: Optional[Mapping[str, str]] = None,
timeout: Optional[float] = None,
chunk_size: int = 8192,
overwrite: bool = False,
expected_status: Optional[Iterable[int]] = None,
) -> str:
"""Download a resource and save to a local file.
Args:
path: URL path.
dest_path: Local file path to save.
params: Query parameters.
headers: Additional headers.
timeout: Timeout override in seconds.
chunk_size: Stream buffer size in bytes.
overwrite: Whether to overwrite an existing file.
expected_status: Acceptable status codes.
Returns:
str: Absolute path to the saved file.
Raises:
RestClientError: On HTTP or IO errors.
"""
if os.path.exists(dest_path) and not overwrite:
raise RestClientError(
f"File exists and overwrite=False: {dest_path}"
)
resp = self.get(
path,
params=params,
headers=headers,
timeout=timeout,
stream=True,
expected_status=expected_status,
)
if resp.status_code >= 400:
self._raise_http_error(resp)
try:
with open(dest_path, "wb") as f, self._stream_response(
path, params, headers, timeout, chunk_size
) as stream_resp:
for chunk in stream_resp.iter_content(chunk_size=chunk_size):
if chunk:
f.write(chunk)
except OSError as exc:
raise RestClientError(
f"I/O error while saving to {dest_path}: {exc}"
) from exc
return os.path.abspath(dest_path)
def paginate(
self,
path: str,
*,
method: str = "GET",
params: Optional[MutableMapping[str, Any]] = None,
headers: Optional[Mapping[str, str]] = None,
items_key: Optional[str] = None,
page_param: str = "page",
per_page_param: str = "per_page",
start_page: int = 1,
per_page: int = 100,
next_key: Optional[str] = None,
max_pages: Optional[int] = None,
timeout: Optional[float] = None,
) -> Iterator[Union[UnifiedResponse, Any]]:
"""Paginate through resources.
Supports two common patterns:
1) Page-number based: increments `page_param` until empty or max_pages.
2) Cursor/next-link based: uses `next_key` found in JSON response
(e.g., {"next": "..."}). When provided, this takes precedence.
Args:
path: URL path for the initial request.
method: HTTP method to use, typically GET.
params: Initial query parameters (mutated during pagination).
headers: Additional request headers.
items_key: If provided, yields items from response.data[items_key];
otherwise yields the UnifiedResponse for each page.
page_param: Query string key for page number.
per_page_param: Query string key for page size.
start_page: Starting page number.
per_page: Page size.
next_key: JSON key holding the next cursor or URL; if present, the
paginator will follow it until None/empty.
max_pages: Maximum number of pages to fetch.
timeout: Timeout override per page.
Yields:
Union[UnifiedResponse, Any]: Pages or items, depending on items_key.
Raises:
RestClientError: On HTTP errors.
"""
current_params: MutableMapping[str, Any] = dict(params or {})
if next_key:
# Cursor/next mode
next_value: Optional[str] = None
page_count = 0
while True:
if next_value:
# Prefer cursor in params; if it's a full URL, replace path.
if next_value.startswith("http"):
current_path = next_value
else:
current_params[next_key] = next_value
current_path = path
else:
current_path = path
resp = self.request(
method,
current_path,
params=current_params,
headers=headers,
timeout=timeout,
)
if items_key and isinstance(resp.data, dict):
items = resp.data.get(items_key, []) or []
for item in items:
yield item
else:
yield resp
next_value = None
if isinstance(resp.data, dict):
nval = resp.data.get(next_key) # type: ignore[index]
if isinstance(nval, str) and nval:
next_value = nval
page_count += 1
if not next_value:
break
if max_pages is not None and page_count >= max_pages:
break
else:
# Page-number mode
page = start_page
page_count = 0
current_params[page_param] = page
current_params[per_page_param] = per_page
while True:
resp = self.request(
method,
path,
params=current_params,
headers=headers,
timeout=timeout,
)
count = 0
if items_key and isinstance(resp.data, dict):
items = resp.data.get(items_key, []) or []
for item in items:
count += 1
yield item
else:
yield resp
# If no items_key, we assume presence of content implies
# there may be more pages; this is heuristic.
if isinstance(resp.data, list):
count = len(resp.data)
elif isinstance(resp.data, dict):
# Best-effort heuristic
count = len(resp.data)
page_count += 1
if max_pages is not None and page_count >= max_pages:
break
if count == 0:
break
page += 1
current_params[page_param] = page
def close(self) -> None:
"""Close the underlying session."""
self.session.close()
def __enter__(self) -> "RestClient":
"""Enter context manager."""
return self
def __exit__(self, exc_type, exc, tb) -> None:
"""Exit context manager and close session."""
self.close()
# -------------- Internal Helpers --------------
def _request(
self,
*,
method: str,
path: str,
params: Optional[Mapping[str, Any]],
headers: Optional[Mapping[str, str]],
json_body: Optional[Any],
data: Optional[Union[Mapping[str, Any], bytes]],
files: Optional[Mapping[str, Any]],
timeout: Optional[float],
stream: bool,
allow_redirects: bool,
expected_status: Optional[Iterable[int]],
) -> UnifiedResponse:
url = self._build_url(path)
req_headers = self._merge_headers(headers, json_body=json_body)
req_timeout = timeout if timeout is not None else self.timeout
attempts = self.max_retries + 1
for attempt in range(1, attempts + 1):
self._apply_rate_limit()
start = time.perf_counter()
error: Optional[Exception] = None
try:
resp = self.session.request(
method=method.upper(),
url=url,
params=params,
headers=req_headers,
json=json_body,
data=data,
files=files,
timeout=req_timeout,
stream=stream,
allow_redirects=allow_redirects,
)
elapsed = time.perf_counter() - start
uresp = self._to_unified_response(resp, method)
self._log_request(
method=method,
url=resp.url,
status_code=resp.status_code,
elapsed=elapsed,
attempt=attempt,
params=params,
stream=stream,
)
if expected_status and uresp.status_code not in expected_status:
if self._should_retry_status(uresp.status_code):
self._sleep_backoff(attempt)
continue
self._raise_http_error(uresp)
if 200 <= uresp.status_code < 300:
return uresp
if self._should_retry_status(uresp.status_code):
self._sleep_backoff(attempt)
continue
self._raise_http_error(uresp)
except (Timeout, ConnectionError) as exc:
elapsed = time.perf_counter() - start
error = exc
self._log_request(
method=method,
url=url,
status_code=None,
elapsed=elapsed,
attempt=attempt,
params=params,
stream=stream,
error=repr(exc),
)
if attempt < attempts:
self._sleep_backoff(attempt)
continue
raise RequestSendError(
f"Request failed after {attempts} attempts: {exc}"
) from exc
except requests.RequestException as exc:
elapsed = time.perf_counter() - start
error = exc
self._log_request(
method=method,
url=url,
status_code=None,
elapsed=elapsed,
attempt=attempt,
params=params,
stream=stream,
error=repr(exc),
)
raise RequestSendError(f"Request error: {exc}") from exc
raise RequestSendError("Exhausted retries without a response")
def _stream_response(
self,
path: str,
params: Optional[Mapping[str, Any]],
headers: Optional[Mapping[str, str]],
timeout: Optional[float],
chunk_size: int,
) -> Response:
"""Internal helper to open a streaming response."""
url = self._build_url(path)
req_headers = self._merge_headers(headers)
req_timeout = timeout if timeout is not None else self.timeout
attempts = self.max_retries + 1
for attempt in range(1, attempts + 1):
self._apply_rate_limit()
start = time.perf_counter()
try:
resp = self.session.get(
url,
params=params,
headers=req_headers,
timeout=req_timeout,
stream=True,
)
elapsed = time.perf_counter() - start
self._log_request(
method="GET",
url=resp.url,
status_code=resp.status_code,
elapsed=elapsed,
attempt=attempt,
params=params,
stream=True,
)
if self._should_retry_status(resp.status_code):
self._sleep_backoff(attempt)
continue
resp.raise_for_status()
return resp
except (Timeout, ConnectionError) as exc:
elapsed = time.perf_counter() - start
self._log_request(
method="GET",
url=url,
status_code=None,
elapsed=elapsed,
attempt=attempt,
params=params,
stream=True,
error=repr(exc),
)
if attempt < attempts:
self._sleep_backoff(attempt)
continue
raise RequestSendError(
f"Streaming request failed after {attempts} attempts: {exc}"
) from exc
raise RequestSendError("Exhausted retries for streaming request")
def _build_url(self, path: str) -> str:
if path.startswith("http://") or path.startswith("https://"):
return path
return urljoin(self.base_url, path.lstrip("/"))
def _merge_headers(
self, headers: Optional[Mapping[str, str]], *, json_body: Optional[Any] = None
) -> Dict[str, str]:
merged = dict(self.default_headers)
if headers:
merged.update(headers)
token = self._get_bearer_token()
if token:
merged["Authorization"] = f"Bearer {token}"
if json_body is not None and "Content-Type" not in merged:
merged["Content-Type"] = "application/json"
return merged
def _get_bearer_token(self) -> Optional[str]:
if self._token_provider:
try:
token = self._token_provider()
if token:
return token
except Exception: # noqa: BLE001
# Provider failures should not crash silently; log and fallback
self.logger.exception("Token provider callable raised")
return self._token
def _should_retry_status(self, status_code: int) -> bool:
if status_code in self.retry_on_status:
return True
if 500 <= status_code < 600:
return True
return False
def _sleep_backoff(self, attempt: int) -> None:
if attempt > self.max_retries:
return
# Exponential backoff with jitter (full jitter)
base = self.backoff_factor * (2 ** (attempt - 1))
sleep_for = min(self.backoff_max, base) * (0.5 + 0.5 * (time.time() % 1))
time.sleep(max(0.0, sleep_for))
def _apply_rate_limit(self) -> None:
if not self.rate_limit:
return
max_req, per_seconds = self.rate_limit
now = time.monotonic()
# Purge timestamps outside the window
while self._rate_window and (now - self._rate_window[0]) >= per_seconds:
self._rate_window.popleft()
if len(self._rate_window) >= max_req:
# Sleep until the oldest timestamp exits the window
wait_for = per_seconds - (now - self._rate_window[0])
if wait_for > 0:
time.sleep(wait_for)
# After sleep, purge again
now = time.monotonic()
while self._rate_window and (now - self._rate_window[0]) >= per_seconds:
self._rate_window.popleft()
self._rate_window.append(time.monotonic())
def _to_unified_response(self, resp: Response, method: str) -> UnifiedResponse:
request_id = (
resp.headers.get("X-Request-Id")
or resp.headers.get("X-Request-ID")
or resp.headers.get("Request-Id")
)
data: Optional[Any] = None
text: Optional[str] = None
content: Optional[bytes] = None
content_type = resp.headers.get("Content-Type", "")
if "application/json" in content_type.lower():
try:
data = resp.json()
except ValueError:
text = resp.text
else:
# For streamed responses, content may be large; leave None here.
if resp.request and resp.request.headers.get("Accept") == "application/json":
# If server ignored Accept and returned non-JSON, capture text
text = resp.text
else:
# Non-JSON: capture bytes for convenience (non-streamed)
if not resp.raw.closed and not resp.request.kwargs.get( # type: ignore[attr-defined]
"stream", False
):
content = resp.content
else:
text = resp.text
return UnifiedResponse(
status_code=resp.status_code,
url=str(resp.url),
method=method.upper(),
headers=dict(resp.headers),
data=data,
text=text,
content=content,
elapsed=resp.elapsed.total_seconds() if resp.elapsed else 0.0,
request_id=request_id,
)
def _raise_http_error(self, resp: UnifiedResponse) -> None:
status = resp.status_code
message = self._build_error_message(resp)
if status == 400:
raise BadRequestError(message, status_code=status, response=resp)
if status == 401:
raise UnauthorizedError(message, status_code=status, response=resp)
if status == 403:
raise ForbiddenError(message, status_code=status, response=resp)
if status == 404:
raise NotFoundError(message, status_code=status, response=resp)
if status == 409:
raise ConflictError(message, status_code=status, response=resp)
if status == 422:
raise UnprocessableEntityError(
message, status_code=status, response=resp
)
if status == 429:
raise TooManyRequestsError(message, status_code=status, response=resp)
if 500 <= status < 600:
raise ServerError(message, status_code=status, response=resp)
raise HTTPError(message, status_code=status, response=resp)
def _build_error_message(self, resp: UnifiedResponse) -> str:
# Prefer server-provided message from JSON body
if isinstance(resp.data, dict):
msg = (
resp.data.get("message")
or resp.data.get("error")
or resp.data.get("detail")
)
if isinstance(msg, str):
return f"{resp.status_code} {msg}"
# Fallback to text snippet
if resp.text:
snippet = resp.text.strip()
if len(snippet) > 300:
snippet = snippet[:300] + "..."
return f"{resp.status_code} {snippet}"
return f"HTTP {resp.status_code} at {resp.url}"
def _log_request(
self,
*,
method: str,
url: str,
status_code: Optional[int],
elapsed: float,
attempt: int,
params: Optional[Mapping[str, Any]],
stream: bool,
error: Optional[str] = None,
) -> None:
extra = {
"event": "http_request",
"method": method.upper(),
"url": url,
"status_code": status_code,
"elapsed_ms": round(elapsed * 1000, 2),
"attempt": attempt,
"retry": attempt - 1,
"stream": stream,
"error": error,
"params": dict(params or {}),
}
if error:
self.logger.warning("HTTP request error", extra=extra)
elif status_code and status_code >= 400:
self.logger.error("HTTP request failed", extra=extra)
else:
self.logger.info("HTTP request succeeded", extra=extra)
from __future__ import annotations
import csv
import hashlib
import io
import json
import os
import random
from pathlib import Path
from typing import Any, Callable, Deque, Dict, Iterable, Iterator, List, Optional, Sequence, Tuple, Union
from collections import deque
try:
import requests
except Exception: # pragma: no cover
requests = None # type: ignore[assignment]
try:
from tqdm import tqdm
except Exception: # pragma: no cover
def tqdm(iterable=None, **kwargs):
return iterable if iterable is not None else []
PathLike = Union[str, Path]
Row = Dict[str, Any]
Schema = Dict[str, Union[type, str]]
class TextDatasetLoader:
"""A text dataset loader supporting local/URL streaming, splits, caching, and
lazy loading.
This loader can iterate over local files or URLs, supports train/val/test
splitting via deterministic hashing, optional caching of remote resources,
data validation against a column schema, and a preprocessing pipeline. It
implements the iterable protocol and provides convenience methods for map,
batch, take, and saving schema definitions.
The loader supports these file formats:
- .txt (one text sample per line -> {'text': str})
- .jsonl (one JSON object per line)
- .csv/.tsv (header row required; rows emitted as dictionaries)
Splitting rules:
- If files are organized by split (e.g., folder or filename containing
'train', 'val', 'test'), a selected split will filter those sources.
- Otherwise, on-the-fly split assignment is performed per-row using a
deterministic hash of the raw line bytes (plus seed).
Args:
paths: Path(s) or URL(s) to dataset files or directories.
split: One of {'train', 'val', 'test', None}. If None, no split filter.
split_ratios: Ratios for (train, val, test) used when files are not
organized by split. Must sum to 1.0.
batch_size: Default batch size used by batch().
shuffle: Whether to shuffle rows using a streaming shuffle buffer.
seed: Random seed for deterministic shuffling and split assignment.
preprocess_pipeline: A sequence of callables(row) -> row applied in
order to each row.
schema: Optional schema dict mapping column name to type spec (e.g.,
str, int, float, bool) or their string names ('str', 'int', ...).
lazy: If True, defer IO until iteration.
cache_dir: Directory for caching remote resources (created if missing).
cache_enabled: Whether to enable caching for URLs.
show_progress: Whether to display a progress bar while iterating.
shuffle_buffer_size: Buffer size for streaming shuffling.
encoding: File encoding for local/remote text.
delimiter: Optional delimiter override for CSV (e.g., ',' or '\\t').
If None, inferred from file extension (.csv -> ',', .tsv -> '\\t').
required_columns: Optional set/list of required columns to enforce.
If provided, rows missing any will raise ValueError.
Attributes:
paths: Original input paths or URLs.
split: Active split filter.
split_ratios: Ratios for on-the-fly split.
batch_size: Default batch size.
shuffle: Shuffle flag.
seed: Random seed.
preprocess_pipeline: Preprocessing callables.
schema: Column schema.
lazy: Lazy loading flag.
cache_dir: Cache directory path (if any).
cache_enabled: Cache toggle.
show_progress: Display progress flag.
shuffle_buffer_size: Buffer size for shuffling.
encoding: Text encoding.
delimiter: CSV delimiter override.
required_columns: Required columns for validation.
"""
SUPPORTED_EXTS = {".txt", ".jsonl", ".csv", ".tsv"}
SPLIT_NAMES = ("train", "val", "test")
def __init__(
self,
paths: Union[PathLike, Sequence[PathLike]],
split: Optional[str] = None,
split_ratios: Tuple[float, float, float] = (0.8, 0.1, 0.1),
batch_size: int = 32,
shuffle: bool = False,
seed: Optional[int] = None,
preprocess_pipeline: Optional[
Sequence[Callable[[Row], Row]]
] = None,
schema: Optional[Schema] = None,
lazy: bool = True,
cache_dir: Optional[PathLike] = None,
cache_enabled: bool = True,
show_progress: bool = True,
shuffle_buffer_size: int = 8192,
encoding: str = "utf-8",
delimiter: Optional[str] = None,
required_columns: Optional[Sequence[str]] = None,
) -> None:
if isinstance(paths, (str, Path)):
self.paths: List[PathLike] = [paths]
else:
self.paths = list(paths)
if split is not None and split not in self.SPLIT_NAMES:
raise ValueError(f"split must be one of {self.SPLIT_NAMES} or None.")
if not (isinstance(split_ratios, tuple) and len(split_ratios) == 3):
raise ValueError("split_ratios must be a tuple of three floats.")
if not abs(sum(split_ratios) - 1.0) < 1e-6:
raise ValueError("split_ratios must sum to 1.0.")
self.split = split
self.split_ratios = split_ratios
self.batch_size = int(batch_size)
self.shuffle = bool(shuffle)
self.seed = seed
self.preprocess_pipeline = list(preprocess_pipeline or [])
self.schema = dict(schema) if schema else None
self.lazy = bool(lazy)
self.cache_enabled = bool(cache_enabled)
self.show_progress = bool(show_progress)
self.shuffle_buffer_size = int(shuffle_buffer_size)
self.encoding = encoding
self.delimiter = delimiter
self.required_columns = set(required_columns or [])
self._rng = random.Random(seed)
self.cache_dir = Path(cache_dir) if cache_dir else None
if self.cache_dir:
self.cache_dir.mkdir(parents=True, exist_ok=True)
def __iter__(self) -> Iterator[Row]:
"""Iterate over dataset rows as dictionaries.
Yields:
A dictionary per row after preprocessing and validation.
Raises:
RuntimeError: If required dependencies for URL streaming are missing.
ValueError: If a row fails schema or column validation.
"""
row_iter = self._iterate_rows()
if self.shuffle:
row_iter = self._shuffle_stream(row_iter, self.shuffle_buffer_size)
if self.show_progress:
row_iter = self._with_progress(row_iter, desc="Loading")
for row in row_iter:
yield row
def map(self, fn: Callable[[Row], Row]) -> "TextDatasetLoader":
"""Return a new dataset with an added map transformation.
Args:
fn: A function that receives a row and returns a transformed row.
Returns:
A new TextDatasetLoader with the function appended to the pipeline.
"""
new_ds = self._copy()
new_ds.preprocess_pipeline.append(fn)
return new_ds
def batch(
self, batch_size: Optional[int] = None
) -> Iterator[List[Row]]:
"""Yield mini-batches of rows.
Args:
batch_size: Number of rows per batch. Defaults to self.batch_size.
Yields:
Lists of row dictionaries of length up to batch_size.
"""
bsz = int(batch_size or self.batch_size)
batch_acc: List[Row] = []
for row in self:
batch_acc.append(row)
if len(batch_acc) >= bsz:
yield batch_acc
batch_acc = []
if batch_acc:
yield batch_acc
def take(self, n: int) -> List[Row]:
"""Return the first n rows as a list.
Args:
n: Number of rows to take.
Returns:
A list of up to n row dictionaries.
"""
out: List[Row] = []
for i, row in enumerate(self):
if i >= n:
break
out.append(row)
return out
def save_schema(self, path: PathLike) -> None:
"""Save the schema to a JSON file.
If no schema was provided, a schema is inferred by peeking at the data.
Args:
path: Output JSON file path.
Raises:
RuntimeError: If schema inference fails due to empty sources.
"""
schema = self.schema or self._infer_schema_peek()
serializable: Dict[str, Any] = {
"columns": [
{"name": k, "type": self._type_to_name(v)} for k, v in schema.items()
]
}
out_path = Path(path)
out_path.parent.mkdir(parents=True, exist_ok=True)
with out_path.open("w", encoding="utf-8") as f:
json.dump(serializable, f, ensure_ascii=False, indent=2)
# Internal helpers
def _copy(self) -> "TextDatasetLoader":
return TextDatasetLoader(
paths=self.paths,
split=self.split,
split_ratios=self.split_ratios,
batch_size=self.batch_size,
shuffle=self.shuffle,
seed=self.seed,
preprocess_pipeline=self.preprocess_pipeline[:],
schema=self.schema.copy() if self.schema else None,
lazy=self.lazy,
cache_dir=str(self.cache_dir) if self.cache_dir else None,
cache_enabled=self.cache_enabled,
show_progress=self.show_progress,
shuffle_buffer_size=self.shuffle_buffer_size,
encoding=self.encoding,
delimiter=self.delimiter,
required_columns=list(self.required_columns),
)
def _iterate_rows(self) -> Iterator[Row]:
sources = list(self._resolve_sources())
for src in sources:
for row in self._iter_rows_from_source(src):
if not self._row_in_active_split(row):
continue
row = self._apply_pipeline(row)
self._validate_row(row)
yield row
def _resolve_sources(self) -> Iterator[Tuple[str, str]]:
"""Resolve and yield tuple(source_type, path_or_url).
Returns:
An iterator of tuples: ('local', path) or ('url', url).
"""
for p in self.paths:
p_str = str(p)
if self._is_url(p_str):
if self.cache_enabled:
local = self._cache_url(p_str)
if local is not None:
yield ("local", local)
continue
yield ("url", p_str)
else:
path = Path(p_str)
if path.is_dir():
for f in self._iter_dir_files(path):
if self._filter_by_split_heuristic(f):
yield ("local", str(f))
elif path.is_file():
if self._filter_by_split_heuristic(path):
yield ("local", str(path))
def _iter_dir_files(self, root: Path) -> Iterator[Path]:
for f in root.rglob("*"):
if f.is_file() and f.suffix.lower() in self.SUPPORTED_EXTS:
yield f
def _filter_by_split_heuristic(self, path: Path) -> bool:
if self.split is None:
return True
name = path.name.lower()
parts = [p.lower() for p in path.parts]
return (self.split in name) or (self.split in parts)
def _iter_rows_from_source(self, source: Tuple[str, str]) -> Iterator[Row]:
stype, ref = source
suffix = self._infer_suffix(ref)
if stype == "local":
path = Path(ref)
if suffix == ".txt":
yield from self._iter_txt_local(path)
elif suffix == ".jsonl":
yield from self._iter_jsonl_local(path)
elif suffix in (".csv", ".tsv"):
yield from self._iter_csv_local(path, suffix)
else:
return
elif stype == "url":
if requests is None:
raise RuntimeError(
"requests is required for URL streaming but not installed."
)
if suffix == ".txt":
yield from self._iter_txt_url(ref)
elif suffix == ".jsonl":
yield from self._iter_jsonl_url(ref)
elif suffix in (".csv", ".tsv"):
yield from self._iter_csv_url(ref, suffix)
else:
yield from self._iter_txt_url(ref)
else:
return
def _iter_txt_local(self, path: Path) -> Iterator[Row]:
with path.open("r", encoding=self.encoding) as f:
for raw in f:
line = raw.rstrip("\n\r")
if not line:
continue
yield {"text": line, "__raw__": line.encode(self.encoding)}
def _iter_jsonl_local(self, path: Path) -> Iterator[Row]:
with path.open("r", encoding=self.encoding) as f:
for raw in f:
line = raw.rstrip("\n\r")
if not line:
continue
obj = json.loads(line)
row = dict(obj)
row["__raw__"] = line.encode(self.encoding)
yield row
def _iter_csv_local(self, path: Path, suffix: str) -> Iterator[Row]:
delim = self._resolve_delimiter(suffix)
with path.open("r", encoding=self.encoding, newline="") as f:
reader = csv.DictReader(f, delimiter=delim)
for row in reader:
raw_bytes = self._row_to_bytes_csv(row, reader.fieldnames, delim)
row = dict(row)
row["__raw__"] = raw_bytes
yield row
def _iter_txt_url(self, url: str) -> Iterator[Row]:
with requests.get(url, stream=True, timeout=30) as r: # type: ignore[union-attr]
r.raise_for_status()
for line in r.iter_lines(decode_unicode=True):
if line is None:
continue
line = line.rstrip("\n\r")
if not line:
continue
yield {"text": line, "__raw__": line.encode(self.encoding)}
def _iter_jsonl_url(self, url: str) -> Iterator[Row]:
with requests.get(url, stream=True, timeout=30) as r: # type: ignore[union-attr]
r.raise_for_status()
for line in r.iter_lines(decode_unicode=True):
if line is None:
continue
line = line.rstrip("\n\r")
if not line:
continue
obj = json.loads(line)
row = dict(obj)
row["__raw__"] = line.encode(self.encoding)
yield row
def _iter_csv_url(self, url: str, suffix: str) -> Iterator[Row]:
delim = self._resolve_delimiter(suffix)
with requests.get(url, stream=True, timeout=30) as r: # type: ignore[union-attr]
r.raise_for_status()
lines = (l.decode(self.encoding) if isinstance(l, bytes) else l
for l in r.iter_lines())
# Create a text stream for DictReader
stream = io.StringIO()
fieldnames: Optional[List[str]] = None
buffer: List[str] = []
for chunk in lines:
if chunk is None:
continue
buffer.append(chunk + "\n")
if fieldnames is None and len(buffer) >= 1:
# Initialize reader after header is available
stream.write("".join(buffer))
stream.seek(0)
reader = csv.DictReader(stream, delimiter=delim)
fieldnames = reader.fieldnames
for row in reader:
raw_bytes = self._row_to_bytes_csv(
row, fieldnames, delim
)
row = dict(row)
row["__raw__"] = raw_bytes
yield row
# Prepare for next chunks
remainder = stream.read()
buffer = [remainder] if remainder else []
stream = io.StringIO()
elif fieldnames is not None:
stream.write("".join(buffer))
stream.seek(0)
reader = csv.DictReader(stream, delimiter=delim,
fieldnames=fieldnames)
# Skip header repetition if present
first_row = next(reader, None)
if first_row and list(first_row.keys()) == list(fieldnames):
# It's a header line; ignore this first row
pass
else:
if first_row is not None:
raw_bytes = self._row_to_bytes_csv(
first_row, fieldnames, delim
)
first_row["__raw__"] = raw_bytes
yield first_row
for row in reader:
raw_bytes = self._row_to_bytes_csv(
row, fieldnames, delim
)
row = dict(row)
row["__raw__"] = raw_bytes
yield row
remainder = stream.read()
buffer = [remainder] if remainder else []
stream = io.StringIO()
if fieldnames is None and buffer:
# Entire small file buffered; parse once
stream = io.StringIO("".join(buffer))
reader = csv.DictReader(stream, delimiter=delim)
for row in reader:
raw_bytes = self._row_to_bytes_csv(row, reader.fieldnames, delim)
row = dict(row)
row["__raw__"] = raw_bytes
yield row
def _row_to_bytes_csv(
self,
row: Dict[str, Any],
fieldnames: Optional[List[str]],
delimiter: str,
) -> bytes:
if not fieldnames:
fieldnames = list(row.keys())
output = io.StringIO()
writer = csv.DictWriter(output, fieldnames=fieldnames, delimiter=delimiter)
if output.tell() == 0:
writer.writeheader()
writer.writerow(row)
return output.getvalue().encode(self.encoding)
def _resolve_delimiter(self, suffix: str) -> str:
if self.delimiter:
return self.delimiter
if suffix == ".csv":
return ","
if suffix == ".tsv":
return "\t"
return ","
def _apply_pipeline(self, row: Row) -> Row:
out = row
for fn in self.preprocess_pipeline:
out = fn(out)
return out
def _validate_row(self, row: Row) -> None:
# Required columns
if self.required_columns:
missing = self.required_columns.difference(row.keys())
if missing:
raise ValueError(f"Missing required columns: {sorted(missing)}")
# Schema typing
if not self.schema:
return
for col, typ in self.schema.items():
if col not in row:
raise ValueError(f"Missing column '{col}' per schema.")
target_type = self._normalize_type(typ)
try:
row[col] = self._coerce_value(row[col], target_type)
except Exception as exc:
raise ValueError(
f"Column '{col}' failed type '{target_type.__name__}': {exc}"
) from exc
def _row_in_active_split(self, row: Row) -> bool:
if self.split is None:
return True
# If source already filtered by split name in path, accept.
# Otherwise, use deterministic hashing of raw bytes.
raw = row.get("__raw__")
if not isinstance(raw, (bytes, bytearray)):
raw = json.dumps(row, sort_keys=True).encode(self.encoding)
v = self._hash01(raw)
t, v_ratio, te = self.split_ratios
bounds = {
"train": t,
"val": t + v_ratio,
"test": 1.0,
}
if self.split == "train":
return v < bounds["train"]
if self.split == "val":
return bounds["train"] <= v < bounds["val"]
if self.split == "test":
return bounds["val"] <= v <= bounds["test"]
return True
def _hash01(self, data: bytes) -> float:
h = hashlib.sha1()
h.update(data)
if self.seed is not None:
h.update(str(self.seed).encode("utf-8"))
hv = int.from_bytes(h.digest()[:8], "big")
return (hv & ((1 << 53) - 1)) / float(1 << 53)
def _shuffle_stream(
self, it: Iterable[Row], buffer_size: int
) -> Iterator[Row]:
buf: Deque[Row] = deque()
for item in it:
buf.append(item)
if len(buf) >= buffer_size:
idx = self._rng.randrange(len(buf))
buf[idx], buf[-1] = buf[-1], buf[idx]
yield buf.pop()
while buf:
idx = self._rng.randrange(len(buf))
buf[idx], buf[-1] = buf[-1], buf[idx]
yield buf.pop()
def _with_progress(
self, it: Iterable[Row], desc: str = "Loading"
) -> Iterator[Row]:
for row in tqdm(it, desc=desc):
yield row
def _infer_schema_peek(self, max_rows: int = 64) -> Schema:
rows = []
count = 0
for row in self._iterate_rows_no_pipeline(max_rows=max_rows):
rows.append(row)
count += 1
if count >= max_rows:
break
if not rows:
raise RuntimeError("Cannot infer schema from empty dataset.")
keys = set().union(*(r.keys() for r in rows))
keys.discard("__raw__")
schema: Schema = {}
for k in sorted(keys):
schema[k] = self._infer_type_for_column([r.get(k) for r in rows])
return schema
def _iterate_rows_no_pipeline(self, max_rows: int) -> Iterator[Row]:
yielded = 0
for src in self._resolve_sources():
for row in self._iter_rows_from_source(src):
if not self._row_in_active_split(row):
continue
if yielded >= max_rows:
return
# Skip pipeline and validation for inference
yielded += 1
yield row
def _infer_type_for_column(self, values: List[Any]) -> type:
candidates = [int, float, bool, str]
# Try to coerce; if any fails, fallback to next broader type
for t in candidates:
ok = True
for v in values:
if v is None:
continue
try:
self._coerce_value(v, t)
except Exception:
ok = False
break
if ok:
return t
return str
def _coerce_value(self, value: Any, target_type: type) -> Any:
if value is None:
return None
if isinstance(value, target_type):
return value
if target_type is bool:
if isinstance(value, str):
lv = value.strip().lower()
if lv in {"true", "1", "yes", "y", "t"}:
return True
if lv in {"false", "0", "no", "n", "f"}:
return False
raise ValueError(f"Cannot parse bool from '{value}'.")
return bool(value)
if target_type is int:
if isinstance(value, bool):
return int(value)
return int(value)
if target_type is float:
if isinstance(value, bool):
return float(int(value))
return float(value)
if target_type is str:
return str(value)
return target_type(value)
def _type_to_name(self, t: Union[type, str]) -> str:
if isinstance(t, str):
return t
mapping = {int: "int", float: "float", bool: "bool", str: "str"}
return mapping.get(t, getattr(t, "__name__", "unknown"))
def _normalize_type(self, t: Union[type, str]) -> type:
if isinstance(t, type):
return t
tl = t.lower()
mapping = {"int": int, "float": float, "bool": bool, "str": str}
if tl not in mapping:
raise ValueError(f"Unsupported type spec '{t}'.")
return mapping[tl]
def _infer_suffix(self, ref: str) -> str:
try:
suffix = Path(ref).suffix.lower()
if suffix in self.SUPPORTED_EXTS:
return suffix
except Exception:
pass
# Fallback: try guess from URL path
for ext in self.SUPPORTED_EXTS:
if ref.lower().endswith(ext):
return ext
return ".txt"
def _is_url(self, ref: str) -> bool:
return ref.startswith("http://") or ref.startswith("https://")
def _cache_url(self, url: str) -> Optional[str]:
if self.cache_dir is None:
return None
if requests is None:
return None
# Hash URL to stable file name; try to preserve extension if any
ext = self._infer_suffix(url)
digest = hashlib.sha1(url.encode("utf-8")).hexdigest()
local_path = self.cache_dir / f"{digest}{ext}"
if local_path.exists():
return str(local_path)
if not self.lazy:
self._download(url, local_path)
return str(local_path)
# Lazy: return local after downloading on demand
# We still return local if download succeeds immediately; otherwise None
try:
self._download(url, local_path)
return str(local_path)
except Exception:
return None
def _download(self, url: str, dst: Path) -> None:
tmp = dst.with_suffix(dst.suffix + ".part")
tmp.parent.mkdir(parents=True, exist_ok=True)
with requests.get(url, stream=True, timeout=60) as r: # type: ignore[union-attr]
r.raise_for_status()
with tmp.open("wb") as f:
for chunk in r.iter_content(chunk_size=1 << 20):
if chunk:
f.write(chunk)
os.replace(tmp, dst)
把“我需要一个XX类”的模糊想法,快速转化为可直接粘贴进项目的高质量Python类模板。面向研发、数据与自动化团队、技术讲师与学生等角色,帮助:1) 在分钟级生成结构清晰、文档完备、风格统一的类;2) 显著减少样板代码与返工,缩短评审与迭代周期;3) 统一团队编码标准,降低维护成本,提升可读性与可测试性;4) 通过可视化成果的即刻反馈,驱动试用转化为付费,解锁更多场景与持续服务。
从需求描述快速生成合格的类模板,立即上手补充业务逻辑;自动带齐注释与类型信息,减少因格式不当导致的返工;在练习中形成正确的面向对象思维。
将复杂模块抽象为清晰的类边界,统一命名与注释风格;批量产出可落地的实体与服务类骨架,压缩重复劳动;为代码评审提供一致的结构标准。
把实验脚本重构为规范化的类,隔离参数、流程与结果;让数据处理与训练任务更易复用与迭代;沉淀为可共享的内部工具。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
半价获取高级提示词-优惠即将到期