¥
立即购买

Python类模板设计专家

0 浏览
0 试用
0 购买
Dec 10, 2025更新

本提示词专为Python开发场景设计,能够根据用户指定的对象或概念,生成结构完整、符合Python编程规范的类模板。提示词采用分步设计流程,确保输出的类模板包含必要的初始化方法、属性定义、常用方法以及文档字符串,同时遵循面向对象编程的最佳实践。适用于各类Python项目开发、教学演示和代码重构等场景,帮助开发者快速构建高质量的类结构。

import json
import logging
import os
import time
from collections import deque
from dataclasses import dataclass
from typing import (
    Any,
    Callable,
    Deque,
    Dict,
    Iterable,
    Iterator,
    Mapping,
    MutableMapping,
    Optional,
    Tuple,
    Union,
)
from urllib.parse import urljoin

import requests
from requests import Response, Session
from requests.exceptions import ConnectionError, Timeout


@dataclass(frozen=True)
class UnifiedResponse:
    """Unified HTTP response model.

    Attributes:
        status_code: HTTP status code.
        url: Final URL of the response.
        method: HTTP method used for the request.
        headers: Response headers.
        data: Parsed JSON object if available; otherwise, None.
        text: Text body if available and not JSON; otherwise, None.
        content: Raw bytes content if streaming or non-text; otherwise, None.
        elapsed: Elapsed time in seconds for the request.
        request_id: Request ID extracted from headers if present.
    """

    status_code: int
    url: str
    method: str
    headers: Mapping[str, str]
    data: Optional[Any]
    text: Optional[str]
    content: Optional[bytes]
    elapsed: float
    request_id: Optional[str]


class RestClientError(Exception):
    """Base exception for REST client errors."""

    def __init__(
        self,
        message: str,
        *,
        status_code: Optional[int] = None,
        response: Optional[UnifiedResponse] = None,
    ) -> None:
        super().__init__(message)
        self.status_code = status_code
        self.response = response


class BadRequestError(RestClientError):
    """400 Bad Request."""


class UnauthorizedError(RestClientError):
    """401 Unauthorized."""


class ForbiddenError(RestClientError):
    """403 Forbidden."""


class NotFoundError(RestClientError):
    """404 Not Found."""


class ConflictError(RestClientError):
    """409 Conflict."""


class UnprocessableEntityError(RestClientError):
    """422 Unprocessable Entity."""


class TooManyRequestsError(RestClientError):
    """429 Too Many Requests."""


class ServerError(RestClientError):
    """5xx Server Error."""


class HTTPError(RestClientError):
    """Generic HTTP error."""


class RequestSendError(RestClientError):
    """Network or transport-level error when sending a request."""


class RestClient:
    """A generic REST client with retries, rate limiting, and structured logging.

    This client provides:
      - Session management with a base URL and default timeout.
      - Bearer token authentication (static token or provider callable).
      - Exponential backoff retries for network and retryable HTTP errors.
      - Optional rate limiting (sliding window).
      - JSON serialization and unified response parsing.
      - Pagination helpers.
      - Structured logging with contextual fields.
      - File upload and streaming download helpers.

    Args:
        base_url: Base URL for all requests. Paths are joined via urljoin.
        token: Static bearer token for Authorization header. Ignored if
            token_provider is provided.
        token_provider: Callable returning a bearer token string. Called per
            request to populate Authorization header.
        timeout: Default timeout in seconds for requests.
        max_retries: Maximum number of retry attempts on retryable errors.
            Total attempts will be max_retries + 1 including the initial try.
        backoff_factor: Base factor for exponential backoff (seconds).
        backoff_max: Maximum backoff sleep duration per attempt (seconds).
        retry_on_status: Iterable of HTTP status codes to retry on. Defaults to
            {429} and 5xx.
        rate_limit: Optional rate limiting tuple (max_requests, per_seconds).
            Example: (10, 1.0) allows 10 requests per 1 second.
        session: Optional pre-configured requests.Session.
        default_headers: Default headers added to every request.
        logger: Optional logger; defaults to `logging.getLogger(__name__)`.

    Attributes:
        base_url: Base URL joined with request paths.
        timeout: Default timeout in seconds.
        max_retries: Configured maximum retry attempts.
        backoff_factor: Backoff factor for exponential backoff.
        backoff_max: Cap for backoff sleep per attempt.
        retry_on_status: Set of status codes that will be retried.
        rate_limit: Configured rate limit tuple or None.
        session: Underlying requests.Session.
        logger: Logger used for structured logs.
    """

    def __init__(
        self,
        base_url: str,
        *,
        token: Optional[str] = None,
        token_provider: Optional[Callable[[], str]] = None,
        timeout: float = 30.0,
        max_retries: int = 3,
        backoff_factor: float = 0.5,
        backoff_max: float = 60.0,
        retry_on_status: Optional[Iterable[int]] = None,
        rate_limit: Optional[Tuple[int, float]] = None,
        session: Optional[Session] = None,
        default_headers: Optional[Mapping[str, str]] = None,
        logger: Optional[logging.Logger] = None,
    ) -> None:
        self.base_url = base_url.rstrip("/") + "/"
        self._token = token
        self._token_provider = token_provider
        self.timeout = timeout
        self.max_retries = max(0, int(max_retries))
        self.backoff_factor = max(0.0, float(backoff_factor))
        self.backoff_max = max(0.0, float(backoff_max))
        default_retry = {429}
        self.retry_on_status = set(retry_on_status or default_retry)
        self.rate_limit = rate_limit
        self._rate_window: Deque[float] = deque(maxlen=rate_limit[0]) if rate_limit else deque()
        self.session = session or requests.Session()
        self.default_headers: Dict[str, str] = dict(default_headers or {})
        self.logger = logger or logging.getLogger(__name__)

        # Default headers
        if "Accept" not in self.default_headers:
            self.default_headers["Accept"] = "application/json"
        if "User-Agent" not in self.default_headers:
            self.default_headers["User-Agent"] = "RestClient/1.0"

        # Common JSON header for write methods; GET should not force Content-Type
        # but we add per-request if json payload is provided.

    # -------------- Public API --------------

    def set_token(self, token: Optional[str]) -> None:
        """Set or update the static bearer token.

        Args:
            token: Bearer token string or None to clear.
        """
        self._token = token

    def set_token_provider(
        self, provider: Optional[Callable[[], str]]
    ) -> None:
        """Set or update the token provider callable.

        Args:
            provider: Callable returning a bearer token string, or None to clear.
        """
        self._token_provider = provider

    def request(
        self,
        method: str,
        path: str,
        *,
        params: Optional[Mapping[str, Any]] = None,
        headers: Optional[Mapping[str, str]] = None,
        json_body: Optional[Any] = None,
        data: Optional[Union[Mapping[str, Any], bytes]] = None,
        files: Optional[Mapping[str, Any]] = None,
        timeout: Optional[float] = None,
        stream: bool = False,
        allow_redirects: bool = True,
        expected_status: Optional[Iterable[int]] = None,
    ) -> UnifiedResponse:
        """Perform an HTTP request.

        Args:
            method: HTTP method (GET, POST, PUT, DELETE, etc.).
            path: Relative or absolute URL path.
            params: Query string parameters.
            headers: Additional request headers.
            json_body: JSON-serializable payload set as request body.
            data: Raw or form data payload.
            files: Files for multipart upload (requests-compatible format).
            timeout: Per-request timeout override in seconds.
            stream: Whether to stream the response.
            allow_redirects: Whether to follow redirects.
            expected_status: Optional iterable of acceptable status codes.
                If provided, responses not in this set raise an error.

        Returns:
            UnifiedResponse: Parsed response object.

        Raises:
            RestClientError: On HTTP or transport errors.
        """
        return self._request(
            method=method,
            path=path,
            params=params,
            headers=headers,
            json_body=json_body,
            data=data,
            files=files,
            timeout=timeout,
            stream=stream,
            allow_redirects=allow_redirects,
            expected_status=set(expected_status) if expected_status else None,
        )

    def get(
        self,
        path: str,
        *,
        params: Optional[Mapping[str, Any]] = None,
        headers: Optional[Mapping[str, str]] = None,
        timeout: Optional[float] = None,
        stream: bool = False,
        expected_status: Optional[Iterable[int]] = None,
    ) -> UnifiedResponse:
        """HTTP GET request.

        Args:
            path: URL path.
            params: Query parameters.
            headers: Additional headers.
            timeout: Timeout override in seconds.
            stream: Stream response.
            expected_status: Acceptable status codes.

        Returns:
            UnifiedResponse: Parsed response.
        """
        return self.request(
            "GET",
            path,
            params=params,
            headers=headers,
            timeout=timeout,
            stream=stream,
            expected_status=expected_status,
        )

    def post(
        self,
        path: str,
        *,
        params: Optional[Mapping[str, Any]] = None,
        headers: Optional[Mapping[str, str]] = None,
        json_body: Optional[Any] = None,
        data: Optional[Union[Mapping[str, Any], bytes]] = None,
        files: Optional[Mapping[str, Any]] = None,
        timeout: Optional[float] = None,
        expected_status: Optional[Iterable[int]] = None,
    ) -> UnifiedResponse:
        """HTTP POST request.

        Args:
            path: URL path.
            params: Query parameters.
            headers: Additional headers.
            json_body: JSON payload.
            data: Raw or form data payload.
            files: Files for multipart upload.
            timeout: Timeout override in seconds.
            expected_status: Acceptable status codes.

        Returns:
            UnifiedResponse: Parsed response.
        """
        return self.request(
            "POST",
            path,
            params=params,
            headers=headers,
            json_body=json_body,
            data=data,
            files=files,
            timeout=timeout,
            expected_status=expected_status,
        )

    def put(
        self,
        path: str,
        *,
        params: Optional[Mapping[str, Any]] = None,
        headers: Optional[Mapping[str, str]] = None,
        json_body: Optional[Any] = None,
        data: Optional[Union[Mapping[str, Any], bytes]] = None,
        timeout: Optional[float] = None,
        expected_status: Optional[Iterable[int]] = None,
    ) -> UnifiedResponse:
        """HTTP PUT request.

        Args:
            path: URL path.
            params: Query parameters.
            headers: Additional headers.
            json_body: JSON payload.
            data: Raw or form data payload.
            timeout: Timeout override in seconds.
            expected_status: Acceptable status codes.

        Returns:
            UnifiedResponse: Parsed response.
        """
        return self.request(
            "PUT",
            path,
            params=params,
            headers=headers,
            json_body=json_body,
            data=data,
            timeout=timeout,
            expected_status=expected_status,
        )

    def delete(
        self,
        path: str,
        *,
        params: Optional[Mapping[str, Any]] = None,
        headers: Optional[Mapping[str, str]] = None,
        timeout: Optional[float] = None,
        expected_status: Optional[Iterable[int]] = None,
    ) -> UnifiedResponse:
        """HTTP DELETE request.

        Args:
            path: URL path.
            params: Query parameters.
            headers: Additional headers.
            timeout: Timeout override in seconds.
            expected_status: Acceptable status codes.

        Returns:
            UnifiedResponse: Parsed response.
        """
        return self.request(
            "DELETE",
            path,
            params=params,
            headers=headers,
            timeout=timeout,
            expected_status=expected_status,
        )

    def upload(
        self,
        path: str,
        *,
        files: Mapping[str, Any],
        data: Optional[Mapping[str, Any]] = None,
        params: Optional[Mapping[str, Any]] = None,
        headers: Optional[Mapping[str, str]] = None,
        timeout: Optional[float] = None,
        expected_status: Optional[Iterable[int]] = None,
    ) -> UnifiedResponse:
        """Upload files using multipart/form-data.

        Args:
            path: URL path.
            files: Files mapping for multipart upload, compatible with requests.
            data: Optional form fields.
            params: Query parameters.
            headers: Additional headers.
            timeout: Timeout override in seconds.
            expected_status: Acceptable status codes.

        Returns:
            UnifiedResponse: Parsed response.
        """
        return self.post(
            path,
            params=params,
            headers=headers,
            data=data,
            files=files,
            timeout=timeout,
            expected_status=expected_status,
        )

    def download(
        self,
        path: str,
        *,
        dest_path: str,
        params: Optional[Mapping[str, Any]] = None,
        headers: Optional[Mapping[str, str]] = None,
        timeout: Optional[float] = None,
        chunk_size: int = 8192,
        overwrite: bool = False,
        expected_status: Optional[Iterable[int]] = None,
    ) -> str:
        """Download a resource and save to a local file.

        Args:
            path: URL path.
            dest_path: Local file path to save.
            params: Query parameters.
            headers: Additional headers.
            timeout: Timeout override in seconds.
            chunk_size: Stream buffer size in bytes.
            overwrite: Whether to overwrite an existing file.
            expected_status: Acceptable status codes.

        Returns:
            str: Absolute path to the saved file.

        Raises:
            RestClientError: On HTTP or IO errors.
        """
        if os.path.exists(dest_path) and not overwrite:
            raise RestClientError(
                f"File exists and overwrite=False: {dest_path}"
            )

        resp = self.get(
            path,
            params=params,
            headers=headers,
            timeout=timeout,
            stream=True,
            expected_status=expected_status,
        )

        if resp.status_code >= 400:
            self._raise_http_error(resp)

        try:
            with open(dest_path, "wb") as f, self._stream_response(
                path, params, headers, timeout, chunk_size
            ) as stream_resp:
                for chunk in stream_resp.iter_content(chunk_size=chunk_size):
                    if chunk:
                        f.write(chunk)
        except OSError as exc:
            raise RestClientError(
                f"I/O error while saving to {dest_path}: {exc}"
            ) from exc

        return os.path.abspath(dest_path)

    def paginate(
        self,
        path: str,
        *,
        method: str = "GET",
        params: Optional[MutableMapping[str, Any]] = None,
        headers: Optional[Mapping[str, str]] = None,
        items_key: Optional[str] = None,
        page_param: str = "page",
        per_page_param: str = "per_page",
        start_page: int = 1,
        per_page: int = 100,
        next_key: Optional[str] = None,
        max_pages: Optional[int] = None,
        timeout: Optional[float] = None,
    ) -> Iterator[Union[UnifiedResponse, Any]]:
        """Paginate through resources.

        Supports two common patterns:
          1) Page-number based: increments `page_param` until empty or max_pages.
          2) Cursor/next-link based: uses `next_key` found in JSON response
             (e.g., {"next": "..."}). When provided, this takes precedence.

        Args:
            path: URL path for the initial request.
            method: HTTP method to use, typically GET.
            params: Initial query parameters (mutated during pagination).
            headers: Additional request headers.
            items_key: If provided, yields items from response.data[items_key];
                otherwise yields the UnifiedResponse for each page.
            page_param: Query string key for page number.
            per_page_param: Query string key for page size.
            start_page: Starting page number.
            per_page: Page size.
            next_key: JSON key holding the next cursor or URL; if present, the
                paginator will follow it until None/empty.
            max_pages: Maximum number of pages to fetch.
            timeout: Timeout override per page.

        Yields:
            Union[UnifiedResponse, Any]: Pages or items, depending on items_key.

        Raises:
            RestClientError: On HTTP errors.
        """
        current_params: MutableMapping[str, Any] = dict(params or {})
        if next_key:
            # Cursor/next mode
            next_value: Optional[str] = None
            page_count = 0
            while True:
                if next_value:
                    # Prefer cursor in params; if it's a full URL, replace path.
                    if next_value.startswith("http"):
                        current_path = next_value
                    else:
                        current_params[next_key] = next_value
                        current_path = path
                else:
                    current_path = path

                resp = self.request(
                    method,
                    current_path,
                    params=current_params,
                    headers=headers,
                    timeout=timeout,
                )

                if items_key and isinstance(resp.data, dict):
                    items = resp.data.get(items_key, []) or []
                    for item in items:
                        yield item
                else:
                    yield resp

                next_value = None
                if isinstance(resp.data, dict):
                    nval = resp.data.get(next_key)  # type: ignore[index]
                    if isinstance(nval, str) and nval:
                        next_value = nval

                page_count += 1
                if not next_value:
                    break
                if max_pages is not None and page_count >= max_pages:
                    break
        else:
            # Page-number mode
            page = start_page
            page_count = 0
            current_params[page_param] = page
            current_params[per_page_param] = per_page
            while True:
                resp = self.request(
                    method,
                    path,
                    params=current_params,
                    headers=headers,
                    timeout=timeout,
                )

                count = 0
                if items_key and isinstance(resp.data, dict):
                    items = resp.data.get(items_key, []) or []
                    for item in items:
                        count += 1
                        yield item
                else:
                    yield resp
                    # If no items_key, we assume presence of content implies
                    # there may be more pages; this is heuristic.
                    if isinstance(resp.data, list):
                        count = len(resp.data)
                    elif isinstance(resp.data, dict):
                        # Best-effort heuristic
                        count = len(resp.data)

                page_count += 1
                if max_pages is not None and page_count >= max_pages:
                    break
                if count == 0:
                    break
                page += 1
                current_params[page_param] = page

    def close(self) -> None:
        """Close the underlying session."""
        self.session.close()

    def __enter__(self) -> "RestClient":
        """Enter context manager."""
        return self

    def __exit__(self, exc_type, exc, tb) -> None:
        """Exit context manager and close session."""
        self.close()

    # -------------- Internal Helpers --------------

    def _request(
        self,
        *,
        method: str,
        path: str,
        params: Optional[Mapping[str, Any]],
        headers: Optional[Mapping[str, str]],
        json_body: Optional[Any],
        data: Optional[Union[Mapping[str, Any], bytes]],
        files: Optional[Mapping[str, Any]],
        timeout: Optional[float],
        stream: bool,
        allow_redirects: bool,
        expected_status: Optional[Iterable[int]],
    ) -> UnifiedResponse:
        url = self._build_url(path)
        req_headers = self._merge_headers(headers, json_body=json_body)
        req_timeout = timeout if timeout is not None else self.timeout
        attempts = self.max_retries + 1

        for attempt in range(1, attempts + 1):
            self._apply_rate_limit()

            start = time.perf_counter()
            error: Optional[Exception] = None

            try:
                resp = self.session.request(
                    method=method.upper(),
                    url=url,
                    params=params,
                    headers=req_headers,
                    json=json_body,
                    data=data,
                    files=files,
                    timeout=req_timeout,
                    stream=stream,
                    allow_redirects=allow_redirects,
                )
                elapsed = time.perf_counter() - start

                uresp = self._to_unified_response(resp, method)
                self._log_request(
                    method=method,
                    url=resp.url,
                    status_code=resp.status_code,
                    elapsed=elapsed,
                    attempt=attempt,
                    params=params,
                    stream=stream,
                )

                if expected_status and uresp.status_code not in expected_status:
                    if self._should_retry_status(uresp.status_code):
                        self._sleep_backoff(attempt)
                        continue
                    self._raise_http_error(uresp)

                if 200 <= uresp.status_code < 300:
                    return uresp

                if self._should_retry_status(uresp.status_code):
                    self._sleep_backoff(attempt)
                    continue

                self._raise_http_error(uresp)

            except (Timeout, ConnectionError) as exc:
                elapsed = time.perf_counter() - start
                error = exc
                self._log_request(
                    method=method,
                    url=url,
                    status_code=None,
                    elapsed=elapsed,
                    attempt=attempt,
                    params=params,
                    stream=stream,
                    error=repr(exc),
                )
                if attempt < attempts:
                    self._sleep_backoff(attempt)
                    continue
                raise RequestSendError(
                    f"Request failed after {attempts} attempts: {exc}"
                ) from exc
            except requests.RequestException as exc:
                elapsed = time.perf_counter() - start
                error = exc
                self._log_request(
                    method=method,
                    url=url,
                    status_code=None,
                    elapsed=elapsed,
                    attempt=attempt,
                    params=params,
                    stream=stream,
                    error=repr(exc),
                )
                raise RequestSendError(f"Request error: {exc}") from exc

        raise RequestSendError("Exhausted retries without a response")

    def _stream_response(
        self,
        path: str,
        params: Optional[Mapping[str, Any]],
        headers: Optional[Mapping[str, str]],
        timeout: Optional[float],
        chunk_size: int,
    ) -> Response:
        """Internal helper to open a streaming response."""
        url = self._build_url(path)
        req_headers = self._merge_headers(headers)
        req_timeout = timeout if timeout is not None else self.timeout

        attempts = self.max_retries + 1
        for attempt in range(1, attempts + 1):
            self._apply_rate_limit()
            start = time.perf_counter()
            try:
                resp = self.session.get(
                    url,
                    params=params,
                    headers=req_headers,
                    timeout=req_timeout,
                    stream=True,
                )
                elapsed = time.perf_counter() - start
                self._log_request(
                    method="GET",
                    url=resp.url,
                    status_code=resp.status_code,
                    elapsed=elapsed,
                    attempt=attempt,
                    params=params,
                    stream=True,
                )
                if self._should_retry_status(resp.status_code):
                    self._sleep_backoff(attempt)
                    continue
                resp.raise_for_status()
                return resp
            except (Timeout, ConnectionError) as exc:
                elapsed = time.perf_counter() - start
                self._log_request(
                    method="GET",
                    url=url,
                    status_code=None,
                    elapsed=elapsed,
                    attempt=attempt,
                    params=params,
                    stream=True,
                    error=repr(exc),
                )
                if attempt < attempts:
                    self._sleep_backoff(attempt)
                    continue
                raise RequestSendError(
                    f"Streaming request failed after {attempts} attempts: {exc}"
                ) from exc
        raise RequestSendError("Exhausted retries for streaming request")

    def _build_url(self, path: str) -> str:
        if path.startswith("http://") or path.startswith("https://"):
            return path
        return urljoin(self.base_url, path.lstrip("/"))

    def _merge_headers(
        self, headers: Optional[Mapping[str, str]], *, json_body: Optional[Any] = None
    ) -> Dict[str, str]:
        merged = dict(self.default_headers)
        if headers:
            merged.update(headers)

        token = self._get_bearer_token()
        if token:
            merged["Authorization"] = f"Bearer {token}"
        if json_body is not None and "Content-Type" not in merged:
            merged["Content-Type"] = "application/json"
        return merged

    def _get_bearer_token(self) -> Optional[str]:
        if self._token_provider:
            try:
                token = self._token_provider()
                if token:
                    return token
            except Exception:  # noqa: BLE001
                # Provider failures should not crash silently; log and fallback
                self.logger.exception("Token provider callable raised")
        return self._token

    def _should_retry_status(self, status_code: int) -> bool:
        if status_code in self.retry_on_status:
            return True
        if 500 <= status_code < 600:
            return True
        return False

    def _sleep_backoff(self, attempt: int) -> None:
        if attempt > self.max_retries:
            return
        # Exponential backoff with jitter (full jitter)
        base = self.backoff_factor * (2 ** (attempt - 1))
        sleep_for = min(self.backoff_max, base) * (0.5 + 0.5 * (time.time() % 1))
        time.sleep(max(0.0, sleep_for))

    def _apply_rate_limit(self) -> None:
        if not self.rate_limit:
            return
        max_req, per_seconds = self.rate_limit
        now = time.monotonic()

        # Purge timestamps outside the window
        while self._rate_window and (now - self._rate_window[0]) >= per_seconds:
            self._rate_window.popleft()

        if len(self._rate_window) >= max_req:
            # Sleep until the oldest timestamp exits the window
            wait_for = per_seconds - (now - self._rate_window[0])
            if wait_for > 0:
                time.sleep(wait_for)
            # After sleep, purge again
            now = time.monotonic()
            while self._rate_window and (now - self._rate_window[0]) >= per_seconds:
                self._rate_window.popleft()

        self._rate_window.append(time.monotonic())

    def _to_unified_response(self, resp: Response, method: str) -> UnifiedResponse:
        request_id = (
            resp.headers.get("X-Request-Id")
            or resp.headers.get("X-Request-ID")
            or resp.headers.get("Request-Id")
        )
        data: Optional[Any] = None
        text: Optional[str] = None
        content: Optional[bytes] = None

        content_type = resp.headers.get("Content-Type", "")
        if "application/json" in content_type.lower():
            try:
                data = resp.json()
            except ValueError:
                text = resp.text
        else:
            # For streamed responses, content may be large; leave None here.
            if resp.request and resp.request.headers.get("Accept") == "application/json":
                # If server ignored Accept and returned non-JSON, capture text
                text = resp.text
            else:
                # Non-JSON: capture bytes for convenience (non-streamed)
                if not resp.raw.closed and not resp.request.kwargs.get(  # type: ignore[attr-defined]
                    "stream", False
                ):
                    content = resp.content
                else:
                    text = resp.text

        return UnifiedResponse(
            status_code=resp.status_code,
            url=str(resp.url),
            method=method.upper(),
            headers=dict(resp.headers),
            data=data,
            text=text,
            content=content,
            elapsed=resp.elapsed.total_seconds() if resp.elapsed else 0.0,
            request_id=request_id,
        )

    def _raise_http_error(self, resp: UnifiedResponse) -> None:
        status = resp.status_code
        message = self._build_error_message(resp)

        if status == 400:
            raise BadRequestError(message, status_code=status, response=resp)
        if status == 401:
            raise UnauthorizedError(message, status_code=status, response=resp)
        if status == 403:
            raise ForbiddenError(message, status_code=status, response=resp)
        if status == 404:
            raise NotFoundError(message, status_code=status, response=resp)
        if status == 409:
            raise ConflictError(message, status_code=status, response=resp)
        if status == 422:
            raise UnprocessableEntityError(
                message, status_code=status, response=resp
            )
        if status == 429:
            raise TooManyRequestsError(message, status_code=status, response=resp)
        if 500 <= status < 600:
            raise ServerError(message, status_code=status, response=resp)
        raise HTTPError(message, status_code=status, response=resp)

    def _build_error_message(self, resp: UnifiedResponse) -> str:
        # Prefer server-provided message from JSON body
        if isinstance(resp.data, dict):
            msg = (
                resp.data.get("message")
                or resp.data.get("error")
                or resp.data.get("detail")
            )
            if isinstance(msg, str):
                return f"{resp.status_code} {msg}"
        # Fallback to text snippet
        if resp.text:
            snippet = resp.text.strip()
            if len(snippet) > 300:
                snippet = snippet[:300] + "..."
            return f"{resp.status_code} {snippet}"
        return f"HTTP {resp.status_code} at {resp.url}"

    def _log_request(
        self,
        *,
        method: str,
        url: str,
        status_code: Optional[int],
        elapsed: float,
        attempt: int,
        params: Optional[Mapping[str, Any]],
        stream: bool,
        error: Optional[str] = None,
    ) -> None:
        extra = {
            "event": "http_request",
            "method": method.upper(),
            "url": url,
            "status_code": status_code,
            "elapsed_ms": round(elapsed * 1000, 2),
            "attempt": attempt,
            "retry": attempt - 1,
            "stream": stream,
            "error": error,
            "params": dict(params or {}),
        }
        if error:
            self.logger.warning("HTTP request error", extra=extra)
        elif status_code and status_code >= 400:
            self.logger.error("HTTP request failed", extra=extra)
        else:
            self.logger.info("HTTP request succeeded", extra=extra)
from __future__ import annotations

import csv
import hashlib
import io
import json
import os
import random
from pathlib import Path
from typing import Any, Callable, Deque, Dict, Iterable, Iterator, List, Optional, Sequence, Tuple, Union
from collections import deque

try:
    import requests
except Exception:  # pragma: no cover
    requests = None  # type: ignore[assignment]

try:
    from tqdm import tqdm
except Exception:  # pragma: no cover

    def tqdm(iterable=None, **kwargs):
        return iterable if iterable is not None else []


PathLike = Union[str, Path]
Row = Dict[str, Any]
Schema = Dict[str, Union[type, str]]


class TextDatasetLoader:
    """A text dataset loader supporting local/URL streaming, splits, caching, and
    lazy loading.

    This loader can iterate over local files or URLs, supports train/val/test
    splitting via deterministic hashing, optional caching of remote resources,
    data validation against a column schema, and a preprocessing pipeline. It
    implements the iterable protocol and provides convenience methods for map,
    batch, take, and saving schema definitions.

    The loader supports these file formats:
      - .txt (one text sample per line -> {'text': str})
      - .jsonl (one JSON object per line)
      - .csv/.tsv (header row required; rows emitted as dictionaries)

    Splitting rules:
      - If files are organized by split (e.g., folder or filename containing
        'train', 'val', 'test'), a selected split will filter those sources.
      - Otherwise, on-the-fly split assignment is performed per-row using a
        deterministic hash of the raw line bytes (plus seed).

    Args:
        paths: Path(s) or URL(s) to dataset files or directories.
        split: One of {'train', 'val', 'test', None}. If None, no split filter.
        split_ratios: Ratios for (train, val, test) used when files are not
            organized by split. Must sum to 1.0.
        batch_size: Default batch size used by batch().
        shuffle: Whether to shuffle rows using a streaming shuffle buffer.
        seed: Random seed for deterministic shuffling and split assignment.
        preprocess_pipeline: A sequence of callables(row) -> row applied in
            order to each row.
        schema: Optional schema dict mapping column name to type spec (e.g.,
            str, int, float, bool) or their string names ('str', 'int', ...).
        lazy: If True, defer IO until iteration.
        cache_dir: Directory for caching remote resources (created if missing).
        cache_enabled: Whether to enable caching for URLs.
        show_progress: Whether to display a progress bar while iterating.
        shuffle_buffer_size: Buffer size for streaming shuffling.
        encoding: File encoding for local/remote text.
        delimiter: Optional delimiter override for CSV (e.g., ',' or '\\t').
            If None, inferred from file extension (.csv -> ',', .tsv -> '\\t').
        required_columns: Optional set/list of required columns to enforce.
            If provided, rows missing any will raise ValueError.

    Attributes:
        paths: Original input paths or URLs.
        split: Active split filter.
        split_ratios: Ratios for on-the-fly split.
        batch_size: Default batch size.
        shuffle: Shuffle flag.
        seed: Random seed.
        preprocess_pipeline: Preprocessing callables.
        schema: Column schema.
        lazy: Lazy loading flag.
        cache_dir: Cache directory path (if any).
        cache_enabled: Cache toggle.
        show_progress: Display progress flag.
        shuffle_buffer_size: Buffer size for shuffling.
        encoding: Text encoding.
        delimiter: CSV delimiter override.
        required_columns: Required columns for validation.
    """

    SUPPORTED_EXTS = {".txt", ".jsonl", ".csv", ".tsv"}
    SPLIT_NAMES = ("train", "val", "test")

    def __init__(
        self,
        paths: Union[PathLike, Sequence[PathLike]],
        split: Optional[str] = None,
        split_ratios: Tuple[float, float, float] = (0.8, 0.1, 0.1),
        batch_size: int = 32,
        shuffle: bool = False,
        seed: Optional[int] = None,
        preprocess_pipeline: Optional[
            Sequence[Callable[[Row], Row]]
        ] = None,
        schema: Optional[Schema] = None,
        lazy: bool = True,
        cache_dir: Optional[PathLike] = None,
        cache_enabled: bool = True,
        show_progress: bool = True,
        shuffle_buffer_size: int = 8192,
        encoding: str = "utf-8",
        delimiter: Optional[str] = None,
        required_columns: Optional[Sequence[str]] = None,
    ) -> None:
        if isinstance(paths, (str, Path)):
            self.paths: List[PathLike] = [paths]
        else:
            self.paths = list(paths)

        if split is not None and split not in self.SPLIT_NAMES:
            raise ValueError(f"split must be one of {self.SPLIT_NAMES} or None.")

        if not (isinstance(split_ratios, tuple) and len(split_ratios) == 3):
            raise ValueError("split_ratios must be a tuple of three floats.")
        if not abs(sum(split_ratios) - 1.0) < 1e-6:
            raise ValueError("split_ratios must sum to 1.0.")

        self.split = split
        self.split_ratios = split_ratios
        self.batch_size = int(batch_size)
        self.shuffle = bool(shuffle)
        self.seed = seed
        self.preprocess_pipeline = list(preprocess_pipeline or [])
        self.schema = dict(schema) if schema else None
        self.lazy = bool(lazy)
        self.cache_enabled = bool(cache_enabled)
        self.show_progress = bool(show_progress)
        self.shuffle_buffer_size = int(shuffle_buffer_size)
        self.encoding = encoding
        self.delimiter = delimiter
        self.required_columns = set(required_columns or [])

        self._rng = random.Random(seed)
        self.cache_dir = Path(cache_dir) if cache_dir else None
        if self.cache_dir:
            self.cache_dir.mkdir(parents=True, exist_ok=True)

    def __iter__(self) -> Iterator[Row]:
        """Iterate over dataset rows as dictionaries.

        Yields:
            A dictionary per row after preprocessing and validation.

        Raises:
            RuntimeError: If required dependencies for URL streaming are missing.
            ValueError: If a row fails schema or column validation.
        """
        row_iter = self._iterate_rows()
        if self.shuffle:
            row_iter = self._shuffle_stream(row_iter, self.shuffle_buffer_size)

        if self.show_progress:
            row_iter = self._with_progress(row_iter, desc="Loading")

        for row in row_iter:
            yield row

    def map(self, fn: Callable[[Row], Row]) -> "TextDatasetLoader":
        """Return a new dataset with an added map transformation.

        Args:
            fn: A function that receives a row and returns a transformed row.

        Returns:
            A new TextDatasetLoader with the function appended to the pipeline.
        """
        new_ds = self._copy()
        new_ds.preprocess_pipeline.append(fn)
        return new_ds

    def batch(
        self, batch_size: Optional[int] = None
    ) -> Iterator[List[Row]]:
        """Yield mini-batches of rows.

        Args:
            batch_size: Number of rows per batch. Defaults to self.batch_size.

        Yields:
            Lists of row dictionaries of length up to batch_size.
        """
        bsz = int(batch_size or self.batch_size)
        batch_acc: List[Row] = []
        for row in self:
            batch_acc.append(row)
            if len(batch_acc) >= bsz:
                yield batch_acc
                batch_acc = []
        if batch_acc:
            yield batch_acc

    def take(self, n: int) -> List[Row]:
        """Return the first n rows as a list.

        Args:
            n: Number of rows to take.

        Returns:
            A list of up to n row dictionaries.
        """
        out: List[Row] = []
        for i, row in enumerate(self):
            if i >= n:
                break
            out.append(row)
        return out

    def save_schema(self, path: PathLike) -> None:
        """Save the schema to a JSON file.

        If no schema was provided, a schema is inferred by peeking at the data.

        Args:
            path: Output JSON file path.

        Raises:
            RuntimeError: If schema inference fails due to empty sources.
        """
        schema = self.schema or self._infer_schema_peek()
        serializable: Dict[str, Any] = {
            "columns": [
                {"name": k, "type": self._type_to_name(v)} for k, v in schema.items()
            ]
        }
        out_path = Path(path)
        out_path.parent.mkdir(parents=True, exist_ok=True)
        with out_path.open("w", encoding="utf-8") as f:
            json.dump(serializable, f, ensure_ascii=False, indent=2)

    # Internal helpers

    def _copy(self) -> "TextDatasetLoader":
        return TextDatasetLoader(
            paths=self.paths,
            split=self.split,
            split_ratios=self.split_ratios,
            batch_size=self.batch_size,
            shuffle=self.shuffle,
            seed=self.seed,
            preprocess_pipeline=self.preprocess_pipeline[:],
            schema=self.schema.copy() if self.schema else None,
            lazy=self.lazy,
            cache_dir=str(self.cache_dir) if self.cache_dir else None,
            cache_enabled=self.cache_enabled,
            show_progress=self.show_progress,
            shuffle_buffer_size=self.shuffle_buffer_size,
            encoding=self.encoding,
            delimiter=self.delimiter,
            required_columns=list(self.required_columns),
        )

    def _iterate_rows(self) -> Iterator[Row]:
        sources = list(self._resolve_sources())
        for src in sources:
            for row in self._iter_rows_from_source(src):
                if not self._row_in_active_split(row):
                    continue
                row = self._apply_pipeline(row)
                self._validate_row(row)
                yield row

    def _resolve_sources(self) -> Iterator[Tuple[str, str]]:
        """Resolve and yield tuple(source_type, path_or_url).

        Returns:
            An iterator of tuples: ('local', path) or ('url', url).
        """
        for p in self.paths:
            p_str = str(p)
            if self._is_url(p_str):
                if self.cache_enabled:
                    local = self._cache_url(p_str)
                    if local is not None:
                        yield ("local", local)
                        continue
                yield ("url", p_str)
            else:
                path = Path(p_str)
                if path.is_dir():
                    for f in self._iter_dir_files(path):
                        if self._filter_by_split_heuristic(f):
                            yield ("local", str(f))
                elif path.is_file():
                    if self._filter_by_split_heuristic(path):
                        yield ("local", str(path))

    def _iter_dir_files(self, root: Path) -> Iterator[Path]:
        for f in root.rglob("*"):
            if f.is_file() and f.suffix.lower() in self.SUPPORTED_EXTS:
                yield f

    def _filter_by_split_heuristic(self, path: Path) -> bool:
        if self.split is None:
            return True
        name = path.name.lower()
        parts = [p.lower() for p in path.parts]
        return (self.split in name) or (self.split in parts)

    def _iter_rows_from_source(self, source: Tuple[str, str]) -> Iterator[Row]:
        stype, ref = source
        suffix = self._infer_suffix(ref)

        if stype == "local":
            path = Path(ref)
            if suffix == ".txt":
                yield from self._iter_txt_local(path)
            elif suffix == ".jsonl":
                yield from self._iter_jsonl_local(path)
            elif suffix in (".csv", ".tsv"):
                yield from self._iter_csv_local(path, suffix)
            else:
                return
        elif stype == "url":
            if requests is None:
                raise RuntimeError(
                    "requests is required for URL streaming but not installed."
                )
            if suffix == ".txt":
                yield from self._iter_txt_url(ref)
            elif suffix == ".jsonl":
                yield from self._iter_jsonl_url(ref)
            elif suffix in (".csv", ".tsv"):
                yield from self._iter_csv_url(ref, suffix)
            else:
                yield from self._iter_txt_url(ref)
        else:
            return

    def _iter_txt_local(self, path: Path) -> Iterator[Row]:
        with path.open("r", encoding=self.encoding) as f:
            for raw in f:
                line = raw.rstrip("\n\r")
                if not line:
                    continue
                yield {"text": line, "__raw__": line.encode(self.encoding)}

    def _iter_jsonl_local(self, path: Path) -> Iterator[Row]:
        with path.open("r", encoding=self.encoding) as f:
            for raw in f:
                line = raw.rstrip("\n\r")
                if not line:
                    continue
                obj = json.loads(line)
                row = dict(obj)
                row["__raw__"] = line.encode(self.encoding)
                yield row

    def _iter_csv_local(self, path: Path, suffix: str) -> Iterator[Row]:
        delim = self._resolve_delimiter(suffix)
        with path.open("r", encoding=self.encoding, newline="") as f:
            reader = csv.DictReader(f, delimiter=delim)
            for row in reader:
                raw_bytes = self._row_to_bytes_csv(row, reader.fieldnames, delim)
                row = dict(row)
                row["__raw__"] = raw_bytes
                yield row

    def _iter_txt_url(self, url: str) -> Iterator[Row]:
        with requests.get(url, stream=True, timeout=30) as r:  # type: ignore[union-attr]
            r.raise_for_status()
            for line in r.iter_lines(decode_unicode=True):
                if line is None:
                    continue
                line = line.rstrip("\n\r")
                if not line:
                    continue
                yield {"text": line, "__raw__": line.encode(self.encoding)}

    def _iter_jsonl_url(self, url: str) -> Iterator[Row]:
        with requests.get(url, stream=True, timeout=30) as r:  # type: ignore[union-attr]
            r.raise_for_status()
            for line in r.iter_lines(decode_unicode=True):
                if line is None:
                    continue
                line = line.rstrip("\n\r")
                if not line:
                    continue
                obj = json.loads(line)
                row = dict(obj)
                row["__raw__"] = line.encode(self.encoding)
                yield row

    def _iter_csv_url(self, url: str, suffix: str) -> Iterator[Row]:
        delim = self._resolve_delimiter(suffix)
        with requests.get(url, stream=True, timeout=30) as r:  # type: ignore[union-attr]
            r.raise_for_status()
            lines = (l.decode(self.encoding) if isinstance(l, bytes) else l
                     for l in r.iter_lines())
            # Create a text stream for DictReader
            stream = io.StringIO()
            fieldnames: Optional[List[str]] = None
            buffer: List[str] = []

            for chunk in lines:
                if chunk is None:
                    continue
                buffer.append(chunk + "\n")
                if fieldnames is None and len(buffer) >= 1:
                    # Initialize reader after header is available
                    stream.write("".join(buffer))
                    stream.seek(0)
                    reader = csv.DictReader(stream, delimiter=delim)
                    fieldnames = reader.fieldnames
                    for row in reader:
                        raw_bytes = self._row_to_bytes_csv(
                            row, fieldnames, delim
                        )
                        row = dict(row)
                        row["__raw__"] = raw_bytes
                        yield row
                    # Prepare for next chunks
                    remainder = stream.read()
                    buffer = [remainder] if remainder else []
                    stream = io.StringIO()
                elif fieldnames is not None:
                    stream.write("".join(buffer))
                    stream.seek(0)
                    reader = csv.DictReader(stream, delimiter=delim,
                                            fieldnames=fieldnames)
                    # Skip header repetition if present
                    first_row = next(reader, None)
                    if first_row and list(first_row.keys()) == list(fieldnames):
                        # It's a header line; ignore this first row
                        pass
                    else:
                        if first_row is not None:
                            raw_bytes = self._row_to_bytes_csv(
                                first_row, fieldnames, delim
                            )
                            first_row["__raw__"] = raw_bytes
                            yield first_row
                    for row in reader:
                        raw_bytes = self._row_to_bytes_csv(
                            row, fieldnames, delim
                        )
                        row = dict(row)
                        row["__raw__"] = raw_bytes
                        yield row
                    remainder = stream.read()
                    buffer = [remainder] if remainder else []
                    stream = io.StringIO()

            if fieldnames is None and buffer:
                # Entire small file buffered; parse once
                stream = io.StringIO("".join(buffer))
                reader = csv.DictReader(stream, delimiter=delim)
                for row in reader:
                    raw_bytes = self._row_to_bytes_csv(row, reader.fieldnames, delim)
                    row = dict(row)
                    row["__raw__"] = raw_bytes
                    yield row

    def _row_to_bytes_csv(
        self,
        row: Dict[str, Any],
        fieldnames: Optional[List[str]],
        delimiter: str,
    ) -> bytes:
        if not fieldnames:
            fieldnames = list(row.keys())
        output = io.StringIO()
        writer = csv.DictWriter(output, fieldnames=fieldnames, delimiter=delimiter)
        if output.tell() == 0:
            writer.writeheader()
        writer.writerow(row)
        return output.getvalue().encode(self.encoding)

    def _resolve_delimiter(self, suffix: str) -> str:
        if self.delimiter:
            return self.delimiter
        if suffix == ".csv":
            return ","
        if suffix == ".tsv":
            return "\t"
        return ","

    def _apply_pipeline(self, row: Row) -> Row:
        out = row
        for fn in self.preprocess_pipeline:
            out = fn(out)
        return out

    def _validate_row(self, row: Row) -> None:
        # Required columns
        if self.required_columns:
            missing = self.required_columns.difference(row.keys())
            if missing:
                raise ValueError(f"Missing required columns: {sorted(missing)}")
        # Schema typing
        if not self.schema:
            return
        for col, typ in self.schema.items():
            if col not in row:
                raise ValueError(f"Missing column '{col}' per schema.")
            target_type = self._normalize_type(typ)
            try:
                row[col] = self._coerce_value(row[col], target_type)
            except Exception as exc:
                raise ValueError(
                    f"Column '{col}' failed type '{target_type.__name__}': {exc}"
                ) from exc

    def _row_in_active_split(self, row: Row) -> bool:
        if self.split is None:
            return True
        # If source already filtered by split name in path, accept.
        # Otherwise, use deterministic hashing of raw bytes.
        raw = row.get("__raw__")
        if not isinstance(raw, (bytes, bytearray)):
            raw = json.dumps(row, sort_keys=True).encode(self.encoding)
        v = self._hash01(raw)
        t, v_ratio, te = self.split_ratios
        bounds = {
            "train": t,
            "val": t + v_ratio,
            "test": 1.0,
        }
        if self.split == "train":
            return v < bounds["train"]
        if self.split == "val":
            return bounds["train"] <= v < bounds["val"]
        if self.split == "test":
            return bounds["val"] <= v <= bounds["test"]
        return True

    def _hash01(self, data: bytes) -> float:
        h = hashlib.sha1()
        h.update(data)
        if self.seed is not None:
            h.update(str(self.seed).encode("utf-8"))
        hv = int.from_bytes(h.digest()[:8], "big")
        return (hv & ((1 << 53) - 1)) / float(1 << 53)

    def _shuffle_stream(
        self, it: Iterable[Row], buffer_size: int
    ) -> Iterator[Row]:
        buf: Deque[Row] = deque()
        for item in it:
            buf.append(item)
            if len(buf) >= buffer_size:
                idx = self._rng.randrange(len(buf))
                buf[idx], buf[-1] = buf[-1], buf[idx]
                yield buf.pop()
        while buf:
            idx = self._rng.randrange(len(buf))
            buf[idx], buf[-1] = buf[-1], buf[idx]
            yield buf.pop()

    def _with_progress(
        self, it: Iterable[Row], desc: str = "Loading"
    ) -> Iterator[Row]:
        for row in tqdm(it, desc=desc):
            yield row

    def _infer_schema_peek(self, max_rows: int = 64) -> Schema:
        rows = []
        count = 0
        for row in self._iterate_rows_no_pipeline(max_rows=max_rows):
            rows.append(row)
            count += 1
            if count >= max_rows:
                break
        if not rows:
            raise RuntimeError("Cannot infer schema from empty dataset.")
        keys = set().union(*(r.keys() for r in rows))
        keys.discard("__raw__")
        schema: Schema = {}
        for k in sorted(keys):
            schema[k] = self._infer_type_for_column([r.get(k) for r in rows])
        return schema

    def _iterate_rows_no_pipeline(self, max_rows: int) -> Iterator[Row]:
        yielded = 0
        for src in self._resolve_sources():
            for row in self._iter_rows_from_source(src):
                if not self._row_in_active_split(row):
                    continue
                if yielded >= max_rows:
                    return
                # Skip pipeline and validation for inference
                yielded += 1
                yield row

    def _infer_type_for_column(self, values: List[Any]) -> type:
        candidates = [int, float, bool, str]
        # Try to coerce; if any fails, fallback to next broader type
        for t in candidates:
            ok = True
            for v in values:
                if v is None:
                    continue
                try:
                    self._coerce_value(v, t)
                except Exception:
                    ok = False
                    break
            if ok:
                return t
        return str

    def _coerce_value(self, value: Any, target_type: type) -> Any:
        if value is None:
            return None
        if isinstance(value, target_type):
            return value
        if target_type is bool:
            if isinstance(value, str):
                lv = value.strip().lower()
                if lv in {"true", "1", "yes", "y", "t"}:
                    return True
                if lv in {"false", "0", "no", "n", "f"}:
                    return False
                raise ValueError(f"Cannot parse bool from '{value}'.")
            return bool(value)
        if target_type is int:
            if isinstance(value, bool):
                return int(value)
            return int(value)
        if target_type is float:
            if isinstance(value, bool):
                return float(int(value))
            return float(value)
        if target_type is str:
            return str(value)
        return target_type(value)

    def _type_to_name(self, t: Union[type, str]) -> str:
        if isinstance(t, str):
            return t
        mapping = {int: "int", float: "float", bool: "bool", str: "str"}
        return mapping.get(t, getattr(t, "__name__", "unknown"))

    def _normalize_type(self, t: Union[type, str]) -> type:
        if isinstance(t, type):
            return t
        tl = t.lower()
        mapping = {"int": int, "float": float, "bool": bool, "str": str}
        if tl not in mapping:
            raise ValueError(f"Unsupported type spec '{t}'.")
        return mapping[tl]

    def _infer_suffix(self, ref: str) -> str:
        try:
            suffix = Path(ref).suffix.lower()
            if suffix in self.SUPPORTED_EXTS:
                return suffix
        except Exception:
            pass
        # Fallback: try guess from URL path
        for ext in self.SUPPORTED_EXTS:
            if ref.lower().endswith(ext):
                return ext
        return ".txt"

    def _is_url(self, ref: str) -> bool:
        return ref.startswith("http://") or ref.startswith("https://")

    def _cache_url(self, url: str) -> Optional[str]:
        if self.cache_dir is None:
            return None
        if requests is None:
            return None
        # Hash URL to stable file name; try to preserve extension if any
        ext = self._infer_suffix(url)
        digest = hashlib.sha1(url.encode("utf-8")).hexdigest()
        local_path = self.cache_dir / f"{digest}{ext}"
        if local_path.exists():
            return str(local_path)

        if not self.lazy:
            self._download(url, local_path)
            return str(local_path)

        # Lazy: return local after downloading on demand
        # We still return local if download succeeds immediately; otherwise None
        try:
            self._download(url, local_path)
            return str(local_path)
        except Exception:
            return None

    def _download(self, url: str, dst: Path) -> None:
        tmp = dst.with_suffix(dst.suffix + ".part")
        tmp.parent.mkdir(parents=True, exist_ok=True)
        with requests.get(url, stream=True, timeout=60) as r:  # type: ignore[union-attr]
            r.raise_for_status()
            with tmp.open("wb") as f:
                for chunk in r.iter_content(chunk_size=1 << 20):
                    if chunk:
                        f.write(chunk)
        os.replace(tmp, dst)

示例详情

解决的问题

把“我需要一个XX类”的模糊想法,快速转化为可直接粘贴进项目的高质量Python类模板。面向研发、数据与自动化团队、技术讲师与学生等角色,帮助:1) 在分钟级生成结构清晰、文档完备、风格统一的类;2) 显著减少样板代码与返工,缩短评审与迭代周期;3) 统一团队编码标准,降低维护成本,提升可读性与可测试性;4) 通过可视化成果的即刻反馈,驱动试用转化为付费,解锁更多场景与持续服务。

适用用户

Python初级开发者

从需求描述快速生成合格的类模板,立即上手补充业务逻辑;自动带齐注释与类型信息,减少因格式不当导致的返工;在练习中形成正确的面向对象思维。

资深后端工程师

将复杂模块抽象为清晰的类边界,统一命名与注释风格;批量产出可落地的实体与服务类骨架,压缩重复劳动;为代码评审提供一致的结构标准。

数据科学与自动化工程师

把实验脚本重构为规范化的类,隔离参数、流程与结果;让数据处理与训练任务更易复用与迭代;沉淀为可共享的内部工具。

特征总结

依据业务概念,一键生成可落地的类骨架,含构造、属性、核心方法与注释
自动补全文档字符串与类型标注,交付即读即用,便于团队协作与审核
内置分步设计流程,先分析后实现,避免遗漏关键属性与业务行为细节
全面遵循编码规范与命名风格,减少返工与差异,代码更统一、易读、可维护
按场景定制属性与方法,轻松覆盖数据建模、服务封装、算法与脚本等需求
用于教学与培训演示,快速呈现面向对象思想与最佳实践,降低学习门槛
辅助遗留系统重构,梳理职责边界与依赖关系,输出可替换的标准类模板
输出即用的代码块,不含额外说明,减少反复沟通与合并成本,效率更高
可扩展继承与组合关系,清晰展示类层次,便于后续功能扩展与维护
内置常见校验与异常处理模板,预防低级错误,让交付更稳健更可预期

如何使用购买的提示词模板

1. 直接在外部 Chat 应用中使用

将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。

2. 发布为 API 接口调用

把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。

3. 在 MCP Client 中配置使用

在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。

AI 提示词价格
¥20.00元
先用后买,用好了再付款,超安全!

您购买后可以获得什么

获得完整提示词模板
- 共 645 tokens
- 1 个可调节参数
{ 对象描述 }
获得社区贡献内容的使用权
- 精选社区优质案例,助您快速上手提示词
使用提示词兑换券,低至 ¥ 9.9
了解兑换券 →
限时半价

不要错过!

半价获取高级提示词-优惠即将到期

17
:
23
小时
:
59
分钟
:
59