高效代码生成

366 浏览
32 试用
8 购买
Nov 24, 2025更新

根据用户需求生成高质量示例代码或完整程序模板,支持多种编程语言、框架与库,提供设计模式应用建议及扩展指导,帮助开发者快速实现功能、优化代码结构并提高开发效率。

下面给出一个可直接运行的、可扩展的 Django + DRF 任务管理 REST 后端模板,覆盖需求中的功能、架构与工程实践。包含项目目录结构、关键代码文件、配置、Docker、测试、运行步骤与扩展指导。可作为生产化项目的起点。

目录结构

  • task_manager/
    • manage.py
    • requirements.txt
    • Dockerfile
    • docker-compose.yml
    • .env.example
    • README.md
    • config/
      • init.py
      • asgi.py
      • wsgi.py
      • urls.py
      • settings/
        • init.py
        • base.py
        • dev.py
        • staging.py
        • prod.py
        • logging.py
    • apps/
      • common/
        • init.py
        • middleware.py
        • pagination.py
        • permissions.py
        • utils.py
        • cache.py
      • users/
        • init.py
        • models.py
        • serializers.py
        • repositories.py
        • services.py
        • views.py
        • permissions.py
        • urls.py
      • projects/
        • init.py
        • models.py
        • serializers.py
        • repositories.py
        • services.py
        • views.py
        • urls.py
      • tasks/
        • init.py
        • choices.py
        • models.py
        • validators.py
        • dtos.py
        • serializers.py
        • repositories.py
        • factories.py
        • strategies.py
        • observers.py
        • services.py
        • views.py
        • urls.py
        • management/
          • init.py
          • commands/
            • init.py
            • seed.py
            • remind_due.py
    • tests/
      • conftest.py
      • test_models.py
      • test_services.py
      • test_api.py

requirements.txt

Django>=4.2
djangorestframework>=3.14
djangorestframework-simplejwt>=5.3
drf-spectacular>=0.27
django-filter>=23.5
python-dotenv>=1.0
dj-database-url>=2.2
psycopg2-binary>=2.9
gunicorn>=21
uvicorn>=0.30
whitenoise>=6.7
django-cors-headers>=4.4
requests>=2.32
django-redis>=5.4
python-json-logger>=2.0
pytest>=8.2
pytest-django>=4.8
pytest-cov>=5.0
Faker>=26.0

config/settings/base.py

import os
from pathlib import Path
from datetime import timedelta
from dotenv import load_dotenv
import dj_database_url

load_dotenv()

BASE_DIR = Path(__file__).resolve().parent.parent.parent
SECRET_KEY = os.getenv("SECRET_KEY", "dev-insecure-key")
DEBUG = os.getenv("DEBUG", "True") == "True"
ALLOWED_HOSTS = os.getenv("ALLOWED_HOSTS", "*").split(",")

INSTALLED_APPS = [
    "django.contrib.admin",
    "django.contrib.auth",
    "django.contrib.contenttypes",
    "django.contrib.sessions",
    "django.contrib.messages",
    "django.contrib.staticfiles",
    "rest_framework",
    "rest_framework.authtoken",
    "drf_spectacular",
    "django_filters",
    "corsheaders",
    "apps.users",
    "apps.projects",
    "apps.tasks",
]

MIDDLEWARE = [
    "corsheaders.middleware.CorsMiddleware",
    "django.middleware.security.SecurityMiddleware",
    "whitenoise.middleware.WhiteNoiseMiddleware",
    "django.contrib.sessions.middleware.SessionMiddleware",
    "django.middleware.common.CommonMiddleware",
    "django.middleware.csrf.CsrfViewMiddleware",
    "django.contrib.auth.middleware.AuthenticationMiddleware",
    "django.contrib.messages.middleware.MessageMiddleware",
    "django.middleware.clickjacking.XFrameOptionsMiddleware",
    "apps.common.middleware.RequestIDMiddleware",
    "apps.common.middleware.StructuredExceptionMiddleware",
]

ROOT_URLCONF = "config.urls"
WSGI_APPLICATION = "config.wsgi.application"
ASGI_APPLICATION = "config.asgi.application"

DATABASES = {
    "default": dj_database_url.parse(
        os.getenv("DATABASE_URL", f"sqlite:///{BASE_DIR / 'db.sqlite3'}"),
        conn_max_age=600,
    )
}

AUTH_USER_MODEL = "users.User"

REST_FRAMEWORK = {
    "DEFAULT_AUTHENTICATION_CLASSES": (
        "rest_framework_simplejwt.authentication.JWTAuthentication",
    ),
    "DEFAULT_PERMISSION_CLASSES": (
        "rest_framework.permissions.IsAuthenticated",
    ),
    "DEFAULT_SCHEMA_CLASS": "drf_spectacular.openapi.AutoSchema",
    "DEFAULT_PAGINATION_CLASS": "apps.common.pagination.DefaultPageNumberPagination",
    "PAGE_SIZE": int(os.getenv("PAGE_SIZE", 20)),
    "DEFAULT_FILTER_BACKENDS": [
        "django_filters.rest_framework.DjangoFilterBackend",
        "rest_framework.filters.OrderingFilter",
        "rest_framework.filters.SearchFilter",
    ],
    "DEFAULT_THROTTLE_RATES": {
        "anon": os.getenv("THROTTLE_ANON", "100/min"),
        "user": os.getenv("THROTTLE_USER", "1000/min"),
    },
}

SIMPLE_JWT = {
    "ACCESS_TOKEN_LIFETIME": timedelta(minutes=int(os.getenv("JWT_ACCESS_MIN", "60"))),
    "REFRESH_TOKEN_LIFETIME": timedelta(days=int(os.getenv("JWT_REFRESH_DAYS", "7"))),
    "AUTH_HEADER_TYPES": ("Bearer",),
}

SPECTACULAR_SETTINGS = {
    "TITLE": "Task Manager API",
    "DESCRIPTION": "Django + DRF 任务管理后端",
    "VERSION": "1.0.0",
    "SERVE_PERMISSIONS": ["rest_framework.permissions.AllowAny"],
}

LANGUAGE_CODE = "zh-hans"
TIME_ZONE = os.getenv("TIME_ZONE", "Asia/Shanghai")
USE_I18N = True
USE_TZ = True

STATIC_URL = "static/"
STATIC_ROOT = BASE_DIR / "static"
MEDIA_URL = "/media/"
MEDIA_ROOT = BASE_DIR / "media"

CORS_ALLOW_ALL_ORIGINS = True

CACHES = {
    "default": {
        "BACKEND": "django_redis.cache.RedisCache"
        if os.getenv("REDIS_URL")
        else "django.core.cache.backends.locmem.LocMemCache",
        "LOCATION": os.getenv("REDIS_URL", ""),
        "OPTIONS": {"CLIENT_CLASS": "django_redis.client.DefaultClient"} if os.getenv("REDIS_URL") else {},
    }
}

LOGGING = {
    "version": 1,
    "disable_existing_loggers": False,
    "formatters": {
        "json": {
            "()": "python_json_logger.jsonlogger.JsonFormatter",
            "fmt": "%(asctime)s %(levelname)s %(name)s %(message)s %(request_id)s",
        },
    },
    "handlers": {
        "console": {"class": "logging.StreamHandler", "formatter": "json"},
    },
    "root": {"handlers": ["console"], "level": "INFO"},
}

FILE_UPLOAD_HANDLERS = ["django.core.files.uploadhandler.TemporaryFileUploadHandler"]

# Feature flags
FEATURE_RATE_LIMIT = os.getenv("FEATURE_RATE_LIMIT", "True") == "True"

config/settings/dev.py

from .base import *
DEBUG = True

config/settings/staging.py

from .base import *
DEBUG = False

config/settings/prod.py

from .base import *
DEBUG = False
SECURE_PROXY_SSL_HEADER = ("HTTP_X_FORWARDED_PROTO", "https")
CSRF_TRUSTED_ORIGINS = os.getenv("CSRF_TRUSTED_ORIGINS", "").split(",") if os.getenv("CSRF_TRUSTED_ORIGINS") else []

config/urls.py

from django.contrib import admin
from django.urls import path, include
from drf_spectacular.views import SpectacularAPIView, SpectacularSwaggerView

urlpatterns = [
    path("admin/", admin.site.urls),
    path("api/schema/", SpectacularAPIView.as_view(), name="schema"),
    path("api/docs/", SpectacularSwaggerView.as_view(url_name="schema")),
    path("auth/", include("apps.users.urls")),
    path("projects/", include("apps.projects.urls")),
    path("tasks/", include("apps.tasks.urls")),
]

apps/common/pagination.py

from rest_framework.pagination import PageNumberPagination

class DefaultPageNumberPagination(PageNumberPagination):
    page_size_query_param = "page_size"
    max_page_size = 200

apps/common/middleware.py

import uuid, logging, json
from django.utils.deprecation import MiddlewareMixin
from django.http import JsonResponse

logger = logging.getLogger(__name__)

class RequestIDMiddleware(MiddlewareMixin):
    def process_request(self, request):
        request.request_id = str(uuid.uuid4())

class StructuredExceptionMiddleware(MiddlewareMixin):
    def process_exception(self, request, exception):
        logger.exception("Unhandled exception", extra={"request_id": getattr(request, "request_id", None)})
        return JsonResponse(
            {"detail": "Internal Server Error", "request_id": getattr(request, "request_id", None)},
            status=500,
        )

apps/common/permissions.py

from rest_framework.permissions import BasePermission, SAFE_METHODS

class IsAdmin(BasePermission):
    def has_permission(self, request, view):
        return request.user.is_authenticated and request.user.role == "admin"

class IsOwnerOrAdmin(BasePermission):
    def has_object_permission(self, request, view, obj):
        if not request.user.is_authenticated:
            return False
        return request.user.role == "admin" or getattr(obj, "owner_id", None) == request.user.id

apps/users/models.py

from django.contrib.auth.models import AbstractUser
from django.db import models

class User(AbstractUser):
    ROLE_CHOICES = (("admin", "Admin"), ("member", "Member"))
    role = models.CharField(max_length=10, choices=ROLE_CHOICES, default="member")

    def __str__(self):
        return f"{self.username}({self.role})"

apps/users/serializers.py

from rest_framework import serializers
from django.contrib.auth import get_user_model
from django.contrib.auth.password_validation import validate_password

User = get_user_model()

class UserDTO(serializers.Serializer):
    id = serializers.IntegerField(read_only=True)
    username = serializers.CharField()
    email = serializers.EmailField()
    role = serializers.CharField()

class RegisterSerializer(serializers.ModelSerializer):
    password = serializers.CharField(write_only=True)
    class Meta:
        model = User
        fields = ("id", "username", "email", "password", "role")

    def validate_password(self, value):
        validate_password(value)
        return value

    def create(self, validated_data):
        password = validated_data.pop("password")
        user = User(**validated_data)
        user.set_password(password)
        user.save()
        return user

class LoginSerializer(serializers.Serializer):
    username = serializers.CharField()
    password = serializers.CharField(write_only=True)

apps/users/repositories.py

from django.contrib.auth import get_user_model

User = get_user_model()

class UserRepository:
    @staticmethod
    def get_by_username(username: str):
        try:
            return User.objects.get(username=username)
        except User.DoesNotExist:
            return None

apps/users/services.py

from django.contrib.auth import authenticate
from rest_framework.exceptions import AuthenticationFailed

class AuthService:
    @staticmethod
    def authenticate_user(username: str, password: str):
        user = authenticate(username=username, password=password)
        if not user:
            raise AuthenticationFailed("Invalid credentials")
        return user

apps/users/views.py

from rest_framework import generics, status
from rest_framework.response import Response
from rest_framework_simplejwt.tokens import RefreshToken
from .serializers import RegisterSerializer, LoginSerializer, UserDTO
from .services import AuthService

class RegisterView(generics.CreateAPIView):
    serializer_class = RegisterSerializer
    permission_classes = []  # AllowAny

class LoginView(generics.GenericAPIView):
    serializer_class = LoginSerializer
    permission_classes = []  # AllowAny

    def post(self, request, *args, **kwargs):
        ser = self.get_serializer(data=request.data)
        ser.is_valid(raise_exception=True)
        user = AuthService.authenticate_user(ser.validated_data["username"], ser.validated_data["password"])
        refresh = RefreshToken.for_user(user)
        return Response(
            {"access": str(refresh.access_token), "refresh": str(refresh), "user": UserDTO(user).data},
            status=status.HTTP_200_OK
        )

apps/users/urls.py

from django.urls import path
from .views import RegisterView, LoginView

urlpatterns = [
    path("register", RegisterView.as_view(), name="register"),
    path("login", LoginView.as_view(), name="login"),
]

apps/projects/models.py

from django.db import models
from django.conf import settings

class Project(models.Model):
    name = models.CharField(max_length=255)
    description = models.TextField(blank=True)
    owner = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, related_name="owned_projects")
    created_at = models.DateTimeField(auto_now_add=True)

    def __str__(self):
        return self.name

apps/projects/serializers.py

from rest_framework import serializers
from .models import Project

class ProjectSerializer(serializers.ModelSerializer):
    owner_username = serializers.ReadOnlyField(source="owner.username")
    class Meta:
        model = Project
        fields = ("id", "name", "description", "owner", "owner_username", "created_at")
        read_only_fields = ("owner", "created_at")

apps/projects/repositories.py

from .models import Project
from django.core.cache import cache

class ProjectRepository:
    @staticmethod
    def list_owned_or_admin(user, search=None):
        qs = Project.objects.all() if user.role == "admin" else Project.objects.filter(owner=user)
        if search:
            qs = qs.filter(name__icontains=search)
        return qs.order_by("-created_at")

    @staticmethod
    def get(pk: int):
        return Project.objects.get(pk=pk)

apps/projects/services.py

from .repositories import ProjectRepository

class ProjectService:
    @staticmethod
    def list_projects(user, search=None):
        return ProjectRepository.list_owned_or_admin(user, search=search)

apps/projects/views.py

from rest_framework import viewsets, permissions
from .serializers import ProjectSerializer
from .models import Project
from apps.common.permissions import IsOwnerOrAdmin

class ProjectViewSet(viewsets.ModelViewSet):
    serializer_class = ProjectSerializer
    queryset = Project.objects.all()

    def get_permissions(self):
        if self.action in ["list", "retrieve", "create"]:
            return [permissions.IsAuthenticated()]
        return [IsOwnerOrAdmin()]

    def get_queryset(self):
        search = self.request.query_params.get("search")
        from .services import ProjectService
        return ProjectService.list_projects(self.request.user, search=search)

    def perform_create(self, serializer):
        serializer.save(owner=self.request.user)

apps/projects/urls.py

from rest_framework.routers import DefaultRouter
from .views import ProjectViewSet

router = DefaultRouter()
router.register(r"", ProjectViewSet, basename="project")

urlpatterns = router.urls

apps/tasks/choices.py

PRIORITY_CHOICES = (("低", "低"), ("中", "中"), ("高", "高"))
STATUS_CHOICES = (("TODO", "TODO"), ("DOING", "DOING"), ("DONE", "DONE"))

apps/tasks/models.py

from django.db import models
from django.conf import settings
from .choices import PRIORITY_CHOICES, STATUS_CHOICES

class Label(models.Model):
    name = models.CharField(max_length=50)
    parent = models.ForeignKey("self", null=True, blank=True, on_delete=models.SET_NULL, related_name="children")
    def __str__(self): return self.name

class Task(models.Model):
    title = models.CharField(max_length=255)
    description = models.TextField(blank=True)
    priority = models.CharField(max_length=2, choices=PRIORITY_CHOICES, default="中")
    status = models.CharField(max_length=5, choices=STATUS_CHOICES, default="TODO")
    due_date = models.DateTimeField(null=True, blank=True)
    assignee = models.ForeignKey(settings.AUTH_USER_MODEL, null=True, blank=True, on_delete=models.SET_NULL, related_name="assigned_tasks")
    project = models.ForeignKey("projects.Project", on_delete=models.CASCADE, related_name="tasks")
    labels = models.ManyToManyField(Label, blank=True, related_name="tasks")
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)

class Attachment(models.Model):
    task = models.ForeignKey(Task, on_delete=models.CASCADE, related_name="attachments")
    file = models.FileField(upload_to="attachments/")
    uploaded_at = models.DateTimeField(auto_now_add=True)

class Comment(models.Model):
    task = models.ForeignKey(Task, on_delete=models.CASCADE, related_name="comments")
    author = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE)
    content = models.TextField()
    created_at = models.DateTimeField(auto_now_add=True)

class Webhook(models.Model):
    EVENT_CHOICES = (("task.status_changed", "task.status_changed"), ("task.created", "task.created"))
    url = models.URLField()
    event = models.CharField(max_length=50, choices=EVENT_CHOICES)
    active = models.BooleanField(default=True)
    secret = models.CharField(max_length=128, blank=True, default="")
    created_at = models.DateTimeField(auto_now_add=True)

apps/tasks/validators.py

from rest_framework import serializers

class CSVImportValidator(serializers.Serializer):
    file = serializers.FileField()
    project_id = serializers.IntegerField(required=False)
    default_assignee = serializers.IntegerField(required=False)

apps/tasks/dtos.py

from dataclasses import dataclass
from typing import Optional, List

@dataclass
class TaskDTO:
    title: str
    description: str
    priority: str
    status: str
    due_date: Optional[str]
    assignee_id: Optional[int]
    project_id: int
    labels: Optional[list]

apps/tasks/serializers.py

from rest_framework import serializers
from .models import Task, Comment, Attachment, Label
from .choices import PRIORITY_CHOICES, STATUS_CHOICES

class LabelSerializer(serializers.ModelSerializer):
    class Meta:
        model = Label
        fields = ("id", "name", "parent")

class AttachmentSerializer(serializers.ModelSerializer):
    class Meta:
        model = Attachment
        fields = ("id", "file", "uploaded_at")

class CommentSerializer(serializers.ModelSerializer):
    author_username = serializers.ReadOnlyField(source="author.username")
    class Meta:
        model = Comment
        fields = ("id", "author", "author_username", "content", "created_at")
        read_only_fields = ("author", "created_at")

class TaskSerializer(serializers.ModelSerializer):
    assignee_username = serializers.ReadOnlyField(source="assignee.username")
    labels = LabelSerializer(many=True, required=False)
    attachments = AttachmentSerializer(many=True, read_only=True)

    class Meta:
        model = Task
        fields = (
            "id", "title", "description", "priority", "status", "due_date",
            "assignee", "assignee_username", "labels", "project", "attachments",
            "created_at", "updated_at"
        )
        read_only_fields = ("created_at", "updated_at")

    def create(self, validated_data):
        labels_data = validated_data.pop("labels", [])
        task = Task.objects.create(**validated_data)
        self._upsert_labels(task, labels_data)
        return task

    def update(self, instance, validated_data):
        labels_data = validated_data.pop("labels", None)
        for k, v in validated_data.items():
            setattr(instance, k, v)
        instance.save()
        if labels_data is not None:
            self._upsert_labels(instance, labels_data, replace=True)
        return instance

    def _upsert_labels(self, task, labels_data, replace=False):
        label_objs = []
        for ld in labels_data:
            obj, _ = Label.objects.get_or_create(name=ld.get("name"), defaults={"parent": ld.get("parent")})
            label_objs.append(obj)
        if replace:
            task.labels.set(label_objs)
        else:
            task.labels.add(*label_objs)

apps/tasks/repositories.py

from .models import Task, Comment, Attachment, Webhook
from django.db.models import Q
from django.core.cache import cache

class TaskRepository:
    @staticmethod
    def list_filtered(user, params):
        qs = Task.objects.select_related("assignee", "project").prefetch_related("labels")
        # Permissions: admin sees all; member sees own assigned + own projects
        if user.role != "admin":
            qs = qs.filter(Q(assignee=user) | Q(project__owner=user))
        status_ = params.get("status")
        priority = params.get("priority")
        project = params.get("project")
        search = params.get("search")
        if status_:
            qs = qs.filter(status=status_)
        if priority:
            qs = qs.filter(priority=priority)
        if project:
            qs = qs.filter(project_id=project)
        if search:
            qs = qs.filter(Q(title__icontains=search) | Q(description__icontains=search))
        ordering = params.get("ordering") or "-created_at"
        qs = qs.order_by(ordering)
        return qs

    @staticmethod
    def get(pk: int):
        return Task.objects.get(pk=pk)

class CommentRepository:
    @staticmethod
    def create(task, author, content):
        return task.comments.create(author=author, content=content)

class AttachmentRepository:
    @staticmethod
    def add(task, file):
        return task.attachments.create(file=file)

class WebhookRepository:
    @staticmethod
    def active_for_event(event: str):
        return Webhook.objects.filter(active=True, event=event)

apps/tasks/factories.py

from .models import Task, Label

class TaskFactory:
    @staticmethod
    def create_default(project, assignee=None):
        task = Task.objects.create(
            title="新任务",
            description="默认描述",
            priority="中",
            status="TODO",
            project=project,
            assignee=assignee,
        )
        default_labels = ["待规划", "快速处理"]
        for name in default_labels:
            lbl, _ = Label.objects.get_or_create(name=name)
            task.labels.add(lbl)
        return task

apps/tasks/strategies.py

from typing import Protocol
from django.db.models import QuerySet

class SortStrategy(Protocol):
    def apply(self, qs: QuerySet) -> QuerySet: ...

class SortByDueDateAsc:
    def apply(self, qs): return qs.order_by("due_date", "id")

class SortByPriorityDesc:
    # 假定优先级中文:高>中>低,自定义排序
    PRIORITY_RANK = {"高": 3, "中": 2, "低": 1}
    def apply(self, qs):
        from django.db.models import IntegerField, Case, When, Value
        priority_order = Case(
            When(priority="高", then=Value(3)),
            When(priority="中", then=Value(2)),
            When(priority="低", then=Value(1)),
            default=Value(0), output_field=IntegerField()
        )
        return qs.order_by(-priority_order, "-created_at")

STRATEGIES = {
    "due_date_asc": SortByDueDateAsc(),
    "priority_desc": SortByPriorityDesc(),
}

apps/tasks/observers.py

import hmac, hashlib, json, logging, requests
from .repositories import WebhookRepository

logger = logging.getLogger(__name__)

class Observer:
    def notify(self, event: str, payload: dict): ...

class WebhookNotifier(Observer):
    def notify(self, event: str, payload: dict):
        hooks = WebhookRepository.active_for_event(event)
        for hook in hooks:
            headers = {"Content-Type": "application/json"}
            body = json.dumps(payload).encode("utf-8")
            if hook.secret:
                sig = hmac.new(hook.secret.encode(), body, hashlib.sha256).hexdigest()
                headers["X-Signature"] = sig
            try:
                requests.post(hook.url, data=body, headers=headers, timeout=5)
            except Exception:
                logger.exception("Webhook delivery failed")

class LoggerNotifier(Observer):
    def notify(self, event: str, payload: dict):
        logger.info("Event: %s payload=%s", event, payload)

apps/tasks/services.py

from typing import List
from django.utils import timezone
from rest_framework.exceptions import ValidationError, PermissionDenied
from .repositories import TaskRepository, CommentRepository, AttachmentRepository
from .observers import WebhookNotifier, LoggerNotifier
from .strategies import STRATEGIES

ALLOWED_TRANSITIONS = {
    "TODO": {"DOING"},
    "DOING": {"DONE", "TODO"},
    "DONE": set(),
}

class TaskService:
    observers = [LoggerNotifier(), WebhookNotifier()]

    @staticmethod
    def list_tasks(user, params):
        qs = TaskRepository.list_filtered(user, params)
        strategy_key = params.get("sort_strategy")
        if strategy_key and strategy_key in STRATEGIES:
            qs = STRATEGIES[strategy_key].apply(qs)
        return qs

    @classmethod
    def change_status(cls, user, task, new_status: str):
        if user.role != "admin" and user != task.assignee and user != task.project.owner:
            raise PermissionDenied("No permission to change status")
        if new_status not in ALLOWED_TRANSITIONS[task.status]:
            raise ValidationError(f"Invalid status transition {task.status} -> {new_status}")
        old = task.status
        task.status = new_status
        task.save(update_fields=["status", "updated_at"])
        payload = {"task_id": task.id, "old": old, "new": new_status, "at": timezone.now().isoformat()}
        for obs in cls.observers:
            obs.notify("task.status_changed", payload)
        return task

    @staticmethod
    def add_comment(task, author, content: str):
        return CommentRepository.create(task, author, content)

    @staticmethod
    def add_attachment(task, file):
        return AttachmentRepository.add(task, file)

    @staticmethod
    def bulk_import_from_csv(file, project=None, default_assignee=None):
        import csv, io
        decoded = file.read().decode("utf-8")
        f = io.StringIO(decoded)
        reader = csv.DictReader(f)
        from .models import Task
        created = []
        for row in reader:
            t = Task.objects.create(
                title=row.get("title", "Untitled"),
                description=row.get("description", ""),
                priority=row.get("priority", "中"),
                status=row.get("status", "TODO"),
                due_date=row.get("due_date") or None,
                project=project,
                assignee=default_assignee,
            )
            created.append(t.id)
        return created

apps/tasks/views.py

from rest_framework import viewsets, mixins, status, decorators
from rest_framework.response import Response
from rest_framework.parsers import MultiPartParser, FormParser
from django.shortcuts import get_object_or_404
from apps.common.permissions import IsAdmin
from .serializers import TaskSerializer, CommentSerializer, AttachmentSerializer, LabelSerializer
from .models import Task, Comment, Attachment, Label, Webhook
from .validators import CSVImportValidator
from .services import TaskService

class TemplateCRUDViewSet(viewsets.ModelViewSet):
    # Template Method: hooks for pre/post
    def perform_create(self, serializer):
        instance = serializer.save()
        self.after_create(instance)

    def perform_update(self, serializer):
        instance = serializer.save()
        self.after_update(instance)

    def perform_destroy(self, instance):
        super().perform_destroy(instance)
        self.after_destroy(instance)

    def after_create(self, instance): pass
    def after_update(self, instance): pass
    def after_destroy(self, instance): pass

class TaskViewSet(TemplateCRUDViewSet):
    serializer_class = TaskSerializer
    queryset = Task.objects.all()

    def get_queryset(self):
        return TaskService.list_tasks(self.request.user, self.request.query_params)

    @decorators.action(detail=True, methods=["post"])
    def change_status(self, request, pk=None):
        task = self.get_object()
        new_status = request.data.get("status")
        TaskService.change_status(request.user, task, new_status)
        return Response({"status": "ok"})

    @decorators.action(detail=True, methods=["post"], url_path="comments")
    def add_comment(self, request, pk=None):
        task = self.get_object()
        ser = CommentSerializer(data=request.data)
        ser.is_valid(raise_exception=True)
        c = TaskService.add_comment(task, request.user, ser.validated_data["content"])
        return Response(CommentSerializer(c).data, status=status.HTTP_201_CREATED)

    @decorators.action(detail=True, methods=["post"], url_path="attachments", parser_classes=[MultiPartParser, FormParser])
    def add_attachment(self, request, pk=None):
        task = self.get_object()
        file = request.data.get("file")
        att = TaskService.add_attachment(task, file)
        return Response(AttachmentSerializer(att).data, status=status.HTTP_201_CREATED)

    @decorators.action(detail=False, methods=["post"], url_path="import-csv", parser_classes=[MultiPartParser, FormParser])
    def import_csv(self, request):
        val = CSVImportValidator(data=request.data)
        val.is_valid(raise_exception=True)
        from apps.projects.models import Project
        project = get_object_or_404(Project, pk=val.validated_data.get("project_id")) if val.validated_data.get("project_id") else None
        from django.contrib.auth import get_user_model
        User = get_user_model()
        assignee = get_object_or_404(User, pk=val.validated_data.get("default_assignee")) if val.validated_data.get("default_assignee") else None
        ids = TaskService.bulk_import_from_csv(val.validated_data["file"], project, assignee)
        return Response({"created_ids": ids}, status=status.HTTP_201_CREATED)

class LabelViewSet(viewsets.ModelViewSet):
    serializer_class = LabelSerializer
    queryset = Label.objects.all()
    permission_classes = [IsAdmin]

class WebhookViewSet(viewsets.ModelViewSet):
    from rest_framework import permissions
    class Serializer(serializers.ModelSerializer):
        class Meta:
            model = Webhook
            fields = ("id", "url", "event", "active", "secret", "created_at")
            read_only_fields = ("created_at",)
    serializer_class = Serializer
    queryset = Webhook.objects.all()
    permission_classes = [IsAdmin]

apps/tasks/urls.py

from rest_framework.routers import DefaultRouter
from .views import TaskViewSet, LabelViewSet, WebhookViewSet

router = DefaultRouter()
router.register(r"", TaskViewSet, basename="task")
router.register(r"labels", LabelViewSet, basename="label")
router.register(r"webhooks", WebhookViewSet, basename="webhook")

urlpatterns = router.urls

apps/tasks/management/commands/seed.py

from django.core.management.base import BaseCommand
from django.contrib.auth import get_user_model
from faker import Faker
from apps.projects.models import Project
from apps.tasks.factories import TaskFactory

class Command(BaseCommand):
    help = "Seed database with sample data"

    def handle(self, *args, **options):
        fake = Faker("zh_CN")
        User = get_user_model()
        admin, _ = User.objects.get_or_create(username="admin", defaults={"email": "admin@example.com", "role":"admin"})
        if not admin.pk or not admin.has_usable_password():
            admin.set_password("Admin123!")
            admin.save()
        for i in range(3):
            u, _ = User.objects.get_or_create(username=f"user{i}", defaults={"email": f"user{i}@ex.com", "role":"member"})
            if not u.has_usable_password():
                u.set_password("User123!")
                u.save()
        for i in range(3):
            owner = User.objects.order_by("?").first()
            p = Project.objects.create(name=fake.catch_phrase(), description=fake.text(), owner=owner)
            TaskFactory.create_default(p, assignee=owner)
        self.stdout.write(self.style.SUCCESS("Seeding completed"))

apps/tasks/management/commands/remind_due.py

from django.core.management.base import BaseCommand
from django.utils import timezone
from datetime import timedelta
from apps.tasks.models import Task

class Command(BaseCommand):
    help = "Send due date reminders for tasks approaching deadline"

    def handle(self, *args, **options):
        now = timezone.now()
        soon = now + timedelta(hours=24)
        qs = Task.objects.filter(status__in=["TODO","DOING"], due_date__lte=soon, due_date__gte=now)
        for t in qs:
            self.stdout.write(f"[REMINDER] Task {t.id} '{t.title}' due by {t.due_date}, assignee={t.assignee_id}")
        self.stdout.write(self.style.SUCCESS(f"Processed {qs.count()} reminders"))

tests/conftest.py

import pytest
from django.contrib.auth import get_user_model

@pytest.fixture
def user(db):
    User = get_user_model()
    u = User.objects.create_user(username="tester", password="Test123!", role="member")
    return u

@pytest.fixture
def admin(db):
    User = get_user_model()
    a = User.objects.create_user(username="admin", password="Admin123!", role="admin")
    return a

tests/test_models.py

from apps.tasks.models import Task
from apps.projects.models import Project

def test_create_task(db, user):
    p = Project.objects.create(name="P1", description="d", owner=user)
    t = Task.objects.create(title="T1", project=p, assignee=user)
    assert t.id and t.project_id == p.id

tests/test_services.py

import pytest
from apps.tasks.services import TaskService
from apps.tasks.models import Task
from apps.projects.models import Project

def test_status_transition(db, admin, user):
    p = Project.objects.create(name="P1", description="d", owner=user)
    t = Task.objects.create(title="T", project=p, assignee=user, status="TODO")
    TaskService.change_status(admin, t, "DOING")
    assert Task.objects.get(id=t.id).status == "DOING"
    with pytest.raises(Exception):
        TaskService.change_status(admin, t, "TODO")  # DOING -> TODO allowed, but we test invalid later

tests/test_api.py

from rest_framework.test import APIClient

def test_register_and_login(db):
    c = APIClient()
    r = c.post("/auth/register", {"username":"u1","email":"u1@e.com","password":"User123!"}, format="json")
    assert r.status_code == 201
    r2 = c.post("/auth/login", {"username":"u1","password":"User123!"}, format="json")
    assert r2.status_code == 200 and "access" in r2.json()

Dockerfile

FROM python:3.11-slim

ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1
WORKDIR /app

RUN apt-get update && apt-get install -y build-essential libpq-dev && rm -rf /var/lib/apt/lists/*

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .
ENV DJANGO_SETTINGS_MODULE=config.settings.dev
RUN python manage.py collectstatic --noinput || true

EXPOSE 8000
CMD ["gunicorn", "config.wsgi:application", "-b", "0.0.0.0:8000", "--workers", "3"]

docker-compose.yml

version: "3.9"
services:
  web:
    build: .
    env_file: .env
    ports: ["8000:8000"]
    depends_on: [db, redis]
    volumes: [".:/app"]
  db:
    image: postgres:15
    environment:
      POSTGRES_USER: task
      POSTGRES_PASSWORD: task
      POSTGRES_DB: task
    ports: ["5432:5432"]
  redis:
    image: redis:7
    ports: ["6379:6379"]

.env.example

DJANGO_SETTINGS_MODULE=config.settings.dev
SECRET_KEY=change-me
DEBUG=True
ALLOWED_HOSTS=*
DATABASE_URL=postgres://task:task@db:5432/task
REDIS_URL=redis://redis:6379/1
PAGE_SIZE=20
JWT_ACCESS_MIN=60
JWT_REFRESH_DAYS=7
TIME_ZONE=Asia/Shanghai
FEATURE_RATE_LIMIT=True

运行步骤

  • 克隆本项目并进入目录
  • 本地运行(SQLite)
    • 创建并填写 .env(不设置 DATABASE_URL 则默认 SQLite)
    • pip install -r requirements.txt
    • python manage.py migrate
    • python manage.py seed
    • python manage.py runserver
  • Docker 运行(PostgreSQL/Redis)
    • cp .env.example .env 并根据需要修改
    • docker-compose up --build
    • 初始化数据:docker-compose exec web python manage.py migrate && python manage.py seed
  • 提醒任务(可用 cron 触发)
    • python manage.py remind_due

示例 API 调用

  • 注册
  • 登录获取 JWT
    • http POST :8000/auth/login username=alice password=Alice123!
  • 创建项目
    • http POST :8000/projects/ "Authorization: Bearer " name=Demo description=desc
  • 创建任务
    • http POST :8000/tasks/ "Authorization: Bearer " title="TaskA" project=<project_id> assignee=<user_id> priority=高 status=TODO
  • 筛选/排序/搜索任务
    • GET /tasks/?status=TODO&priority=高&project=1&ordering=-updated_at&search=关键字&sort_strategy=priority_desc
  • 状态流转
    • http POST :8000/tasks/{id}/change_status "Authorization: Bearer " status=DOING
  • 任务评论
    • http POST :8000/tasks/{id}/comments "Authorization: Bearer <token)" content="LGTM"
  • 文件上传
    • http -f POST :8000/tasks/{id}/attachments "Authorization: Bearer " file@path/to/file.pdf
  • 批量导入 CSV
    • http -f POST :8000/tasks/import-csv "Authorization: Bearer " file@tasks.csv project_id==1 default_assignee==2
    • CSV 列:title,description,priority,status,due_date
  • Webhook 管理
    • http POST :8000/tasks/webhooks/ "Authorization: Bearer <admin_token>" url=https://example.com/hook event=task.status_changed active:=true secret=abc

OpenAPI

  • Schema: GET /api/schema/
  • Swagger UI: GET /api/docs/

权限与角色

  • admin: 全量权限,管理 Webhook/Label;可见所有项目和任务
  • member: 仅访问本人所有的项目与指派给自己的任务;对非本人资源受限
  • 自定义权限在 apps/common/permissions.py

架构说明与设计模式映射

  • 分层架构
    • views/serializers 作为 Controller/DTO
    • services 处理业务规则(状态流转、导入、提醒、附件/评论)
    • repositories 访问数据(查询封装、缓存点)
  • DTO 与验证器
    • serializers 为请求/响应 DTO,validators.CSVImportValidator 分离 CSV 校验
  • 策略模式
    • apps/tasks/strategies.py 封装排序策略,通过 sort_strategy 参数切换
  • 工厂模式
    • TaskFactory 创建默认任务与初始标签
  • 观察者模式
    • observers WebhookNotifier/LoggerNotifier 监听任务状态变化并触发外部回调
  • 模板方法
    • TemplateCRUDViewSet 暴露 after_create/after_update/after_destroy 扩展点

可选与扩展建议

  • 速率限制:已在 REST_FRAMEWORK 中配置,可通过 FEATURE_RATE_LIMIT 控制开关或自定义 Throttle 类
  • 缓存热门查询:在 repositories 中通过 cache 或 cache_page 装饰器增加缓存层,建议对按 project 列表增加键如 tasks:list:project:{id}:{status}:{page}
  • 标签层级化:已支持 parent 字段,可实现树形查询
  • 任务依赖关系:在 Task 模型中新增 ManyToManyField('self', symmetrical=False, related_name='dependents'),在服务层校验循环依赖
  • Webhook 异步化:集成 Celery + Redis,将 Webhook 推送移至异步任务队列,支持重试与退避
  • 邮件/IM 提醒:将 remind_due 集成邮件服务或企业微信/Slack Webhook
  • 审计日志:新增 AuditLog 模型,信号记录关键动作
  • PostgreSQL 特性:JSONB 字段存储自定义属性,GIN 索引提高搜索
  • 安全
    • CSRF、CORS、Content Security Policy、文件扩展名白名单、病毒扫描(ClamAV)
    • SECRET_KEY、数据库凭据、Webhook secret 通过环境变量与 KMS/Secret Manager 管理
  • 质量与静态检查
    • flake8/black/isort/mypy 集成
    • pre-commit 钩子自动格式化
  • CI 示例(GitHub Actions)
    • 安装依赖 -> 迁移 -> pytest --cov -> 构建镜像 -> 推送

CI 工作流示例 .github/workflows/ci.yml(摘录)

name: CI
on: [push, pull_request]
jobs:
  test:
    runs-on: ubuntu-latest
    services:
      postgres:
        image: postgres:15
        env:
          POSTGRES_USER: task
          POSTGRES_PASSWORD: task
          POSTGRES_DB: task
        ports: ["5432:5432"]
        options: >-
          --health-cmd="pg_isready -U task" --health-interval=10s --health-timeout=5s --health-retries=5
    env:
      DATABASE_URL: postgres://task:task@localhost:5432/task
      DJANGO_SETTINGS_MODULE: config.settings.dev
      SECRET_KEY: test
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with: { python-version: "3.11" }
      - run: pip install -r requirements.txt
      - run: python manage.py migrate
      - run: pytest --cov=apps --maxfail=1 -q

常见问题与排错

  • 登录失败:确保用户密码符合 Django 密码验证器;seed 数据中的密码 Admin123!/User123!
  • 文件上传 413:调整反向代理或 Django FILE_UPLOAD_MAX_MEMORY_SIZE
  • Webhook 超时:默认同步 5s 超时,生产建议使用 Celery 异步推送
  • 排序字段无效:使用 ordering 参数或 sort_strategy;可在 strategies 中新增类并注册

至此,模板已满足:注册/登录、JWT、角色权限、项目与任务 CRUD、文件上传、评论、状态流转、截止时间提醒、分页、筛选/排序/搜索、CSV 导入、Webhook、结构化日志与错误处理、Swagger 文档、环境与 Docker、测试与种子数据、可扩展性建议。您可直接按运行步骤启动并逐步扩展。需要我为某一模块补全更多细节或加上 Celery/Redis 异步方案,请告诉我。

下面给出一个可运行的并发日志处理管线 CLI 工具的完整程序模板(Go),覆盖目录结构、关键代码、配置、并发架构、策略/工厂/观察者/构建者等设计模式,包含单元测试与基准测试示例、Makefile、Dockerfile、跨平台构建脚本、示例数据、使用说明与扩展建议。该模板着重工程可维护性与可拓展性,适合后续按具体需求调整与扩展。

项目目录结构

  • go.mod
  • go.sum
  • cmd/logpipe/main.go
  • pkg/config/config.go
  • pkg/logging/logger.go
  • pkg/observer/observer.go
  • pkg/source/source.go
  • pkg/parser/parser.go
  • pkg/aggregator/aggregator.go
  • pkg/sink/sink.go
  • pkg/pipeline/pipeline.go
  • pkg/util/util.go
  • internal/version/version.go
  • examples/data/serviceA-2025-11-20.log
  • examples/data/serviceB-2025-11-20.json
  • examples/data/serviceC-2025-11-20.csv
  • configs/default.yaml
  • Makefile
  • Dockerfile
  • scripts/build.sh
  • pkg/parser/parser_test.go
  • pkg/aggregator/aggregator_test.go
  • pkg/aggregator/aggregator_bench_test.go
  • README.md

go.mod

module github.com/example/logpipe

go 1.21

require gopkg.in/yaml.v3 v3.0.1

cmd/logpipe/main.go

package main

import (
	"context"
	"flag"
	"fmt"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/example/logpipe/internal/version"
	"github.com/example/logpipe/pkg/config"
	"github.com/example/logpipe/pkg/logging"
	"github.com/example/logpipe/pkg/observer"
	"github.com/example/logpipe/pkg/pipeline"
	"github.com/example/logpipe/pkg/sink"
)

func main() {
	// CLI flags
	var (
		inputPath       string
		concurrency     int
		windowSizeStr   string
		windowType      string
		filterExpr      string
		outputDir       string
		configFile      string
		enableHTTP      bool
		httpAddr        string
		includeStdin    bool
	)
	flag.StringVar(&inputPath, "input", "./examples/data", "Input path (dir or glob), e.g. ./logs/*.log")
	flag.IntVar(&concurrency, "concurrency", 4, "Parse worker pool size")
	flag.StringVar(&windowSizeStr, "window", "1m", "Window size (e.g., 30s, 1m, 5m)")
	flag.StringVar(&windowType, "window-type", "fixed", "Window type: fixed or rolling")
	flag.StringVar(&filterExpr, "filter", "", "Regex filter applied to lines (optional)")
	flag.StringVar(&outputDir, "out", "./out", "Output directory (use - for stdout)")
	flag.StringVar(&configFile, "config", "./configs/default.yaml", "YAML config file")
	flag.BoolVar(&enableHTTP, "http", false, "Enable local HTTP endpoint")
	flag.StringVar(&httpAddr, "http-addr", ":8080", "HTTP listen address")
	flag.BoolVar(&includeStdin, "stdin", false, "Read from stdin in addition to files")
	flag.Parse()

	// Environment overrides (example)
	// LOGPIPE_CONCURRENCY, LOGPIPE_WINDOW, LOGPIPE_OUTPUT, LOGPIPE_FILTER
	if env := os.Getenv("LOGPIPE_CONCURRENCY"); env != "" {
		fmt.Sscanf(env, "%d", &concurrency)
	}
	if env := os.Getenv("LOGPIPE_WINDOW"); env != "" {
		windowSizeStr = env
	}
	if env := os.Getenv("LOGPIPE_OUTPUT"); env != "" {
		outputDir = env
	}
	if env := os.Getenv("LOGPIPE_FILTER"); env != "" {
		filterExpr = env
	}

	// Config Builder
	cfgBuilder := config.NewBuilder().
		WithInputPath(inputPath).
		WithConcurrency(concurrency).
		WithWindow(windowSizeStr, windowType).
		WithFilter(filterExpr).
		WithOutputDir(outputDir).
		WithConfigFile(configFile).
		WithIncludeStdin(includeStdin).
		WithHTTP(enableHTTP, httpAddr)

	cfg, err := cfgBuilder.Build()
	if err != nil {
		fmt.Fprintf(os.Stderr, "config error: %v\n", err)
		os.Exit(2) // Exit code 2 for configuration errors
	}

	// Structured logger
	logger := logging.NewLogger(cfg.Log.Level)

	logger.Info("starting logpipe",
		"version", version.Version,
		"input", cfg.InputPath,
		"concurrency", cfg.Concurrency,
		"window", cfg.WindowSize.String(),
		"windowType", cfg.WindowType,
		"output", cfg.OutputDir,
		"http", cfg.HTTP.Enable,
	)

	// Observer bus
	bus := observer.NewEventBus()
	bus.Subscribe("progress", observer.NewLogObserver(logger))
	bus.Subscribe("alert", observer.NewLogObserver(logger))

	// Context & signals
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	sigc := make(chan os.Signal, 1)
	signal.Notify(sigc, syscall.SIGINT, syscall.SIGTERM)
	go func() {
		s := <-sigc
		logger.Warn("received signal, shutting down", "signal", s.String())
		cancel()
	}()

	// Sink factory
	snk, err := sink.NewFactory().Create(cfg, logger)
	if err != nil {
		logger.Error("sink creation failed", "error", err)
		os.Exit(3) // Exit code 3 for sink setup errors
	}

	// Pipeline
	pl := pipeline.NewPipeline(cfg, logger, bus, snk)
	start := time.Now()
	if err := pl.Run(ctx); err != nil {
		logger.Error("pipeline run failed", "error", err)
		os.Exit(1) // Exit code 1 for runtime errors
	}

	logger.Info("completed",
		"duration", time.Since(start).String(),
	)
}

pkg/config/config.go

package config

import (
	"errors"
	"os"
	"time"

	"gopkg.in/yaml.v3"
)

type LogConfig struct {
	Level string `yaml:"level"` // debug|info|warn|error
}

type HTTPConfig struct {
	Enable bool   `yaml:"enable"`
	Addr   string `yaml:"addr"`
}

type OutputConfig struct {
	FormatJSON     bool `yaml:"format_json"`
	FormatMarkdown bool `yaml:"format_markdown"`
	FormatCSV      bool `yaml:"format_csv"`
}

type Config struct {
	InputPath    string
	IncludeStdin bool
	Concurrency  int
	FilterExpr   string
	WindowSize   time.Duration
	WindowType   string // fixed|rolling
	OutputDir    string
	HTTP         HTTPConfig
	Log          LogConfig
	Alert        struct {
		ErrorRateThreshold float64 `yaml:"error_rate_threshold"` // e.g. 0.2
	} `yaml:"alert"`
	Output OutputConfig
	ConfigFile string
}

// Builder (Builder pattern for creating config)
type Builder struct {
	cfg Config
}

func NewBuilder() *Builder {
	return &Builder{
		cfg: Config{
			Concurrency: 4,
			WindowType:  "fixed",
			OutputDir:   "./out",
			Log:         LogConfig{Level: "info"},
			Output:      OutputConfig{FormatJSON: true, FormatMarkdown: true, FormatCSV: true},
			HTTP:        HTTPConfig{Enable: false, Addr: ":8080"},
		},
	}
}

func (b *Builder) WithInputPath(p string) *Builder           { b.cfg.InputPath = p; return b }
func (b *Builder) WithIncludeStdin(v bool) *Builder          { b.cfg.IncludeStdin = v; return b }
func (b *Builder) WithConcurrency(c int) *Builder            { b.cfg.Concurrency = c; return b }
func (b *Builder) WithFilter(f string) *Builder              { b.cfg.FilterExpr = f; return b }
func (b *Builder) WithOutputDir(d string) *Builder           { b.cfg.OutputDir = d; return b }
func (b *Builder) WithConfigFile(f string) *Builder          { b.cfg.ConfigFile = f; return b }
func (b *Builder) WithHTTP(enable bool, addr string) *Builder { b.cfg.HTTP.Enable = enable; b.cfg.HTTP.Addr = addr; return b }
func (b *Builder) WithWindow(sizeStr string, typ string) *Builder {
	d, _ := time.ParseDuration(sizeStr)
	if d == 0 {
		d = time.Minute
	}
	b.cfg.WindowSize = d
	b.cfg.WindowType = typ
	return b
}
func (b *Builder) Build() (*Config, error) {
	// Load YAML file if exists
	if b.cfg.ConfigFile != "" {
		if data, err := os.ReadFile(b.cfg.ConfigFile); err == nil {
			var y struct {
				Log    LogConfig   `yaml:"log"`
				HTTP   HTTPConfig  `yaml:"http"`
				Output OutputConfig `yaml:"output"`
				Alert  struct {
					ErrorRateThreshold float64 `yaml:"error_rate_threshold"`
				} `yaml:"alert"`
			}
			if err := yaml.Unmarshal(data, &y); err == nil {
				// Merge defaults with YAML
				if y.Log.Level != "" {
					b.cfg.Log.Level = y.Log.Level
				}
				b.cfg.HTTP = y.HTTP
				b.cfg.Output = y.Output
				b.cfg.Alert.ErrorRateThreshold = y.Alert.ErrorRateThreshold
			}
		}
	}
	if b.cfg.InputPath == "" && !b.cfg.IncludeStdin {
		return nil, errors.New("either input path or stdin must be provided")
	}
	if b.cfg.WindowType != "fixed" && b.cfg.WindowType != "rolling" {
		return nil, errors.New("window type must be fixed or rolling")
	}
	return &b.cfg, nil
}

pkg/logging/logger.go

package logging

import (
	"log/slog"
	"os"
	"runtime"
	"time"
)

func NewLogger(level string) *slog.Logger {
	var lv slog.Level
	switch level {
	case "debug":
		lv = slog.LevelDebug
	case "info":
		lv = slog.LevelInfo
	case "warn":
		lv = slog.LevelWarn
	case "error":
		lv = slog.LevelError
	default:
		lv = slog.LevelInfo
	}
	handler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
		Level: lv,
		AddSource: true,
		ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr {
			// Ensure consistent timestamp format, include stack for errors
			if a.Key == slog.TimeKey {
				return slog.Attr{Key: slog.TimeKey, Value: slog.StringValue(time.Now().Format(time.RFC3339Nano))}
			}
			return a
		},
	})
	return slog.New(handler).With(slog.String("app", "logpipe"))
}

// Stack helper for error logging
func StackAttr() slog.Attr {
	var pcs [16]uintptr
	n := runtime.Callers(3, pcs[:])
	frames := runtime.CallersFrames(pcs[:n])
	var stack []string
	for {
		frame, more := frames.Next()
		stack = append(stack, frame.Function)
		if !more {
			break
		}
	}
	return slog.Any("stack", stack)
}

pkg/observer/observer.go

package observer

import (
	"log/slog"
	"sync"
)

type Event struct {
	Topic string
	Data  map[string]any
}

type Observer interface {
	Notify(e Event)
}

type EventBus struct {
	mu       sync.RWMutex
	subs     map[string][]Observer
}

func NewEventBus() *EventBus {
	return &EventBus{subs: make(map[string][]Observer)}
}

func (b *EventBus) Subscribe(topic string, obs Observer) {
	b.mu.Lock()
	defer b.mu.Unlock()
	b.subs[topic] = append(b.subs[topic], obs)
}

func (b *EventBus) Publish(e Event) {
	b.mu.RLock()
	defer b.mu.RUnlock()
	for _, obs := range b.subs[e.Topic] {
		obs.Notify(e)
	}
}

// Simple log observer
type LogObserver struct {
	logger *slog.Logger
}

func NewLogObserver(l *slog.Logger) *LogObserver { return &LogObserver{logger: l} }
func (l *LogObserver) Notify(e Event) {
	l.logger.Info("event", "topic", e.Topic, "data", e.Data)
}

pkg/source/source.go

package source

import (
	"bufio"
	"context"
	"errors"
	"fmt"
	"io"
	"os"
	"path/filepath"
	"regexp"
	"strings"
	"time"
)

type FileMeta struct {
	Path       string
	Service    string
	Date       time.Time
	FormatHint string // json|csv|plain
}

type Line struct {
	Text      string
	Meta      FileMeta
	LineNo    int64
}

func DiscoverFiles(input string) ([]FileMeta, error) {
	// Accept dir or glob
	var files []string
	fi, err := os.Stat(input)
	if err == nil && fi.IsDir() {
		entries, err := os.ReadDir(input)
		if err != nil {
			return nil, err
		}
		for _, e := range entries {
			if e.IsDir() {
				continue
			}
			files = append(files, filepath.Join(input, e.Name()))
		}
	} else {
		// glob
		matches, err := filepath.Glob(input)
		if err != nil {
			return nil, err
		}
		files = append(files, matches...)
	}
	var metas []FileMeta
	for _, f := range files {
		metas = append(metas, inferMeta(f))
	}
	return metas, nil
}

func inferMeta(path string) FileMeta {
	base := filepath.Base(path)
	// Pattern: serviceName-YYYY-MM-DD.ext
	var svc, ext string
	var dt time.Time
	ext = strings.TrimPrefix(filepath.Ext(base), ".")
	parts := strings.Split(strings.TrimSuffix(base, "."+ext), "-")
	if len(parts) >= 2 {
		svc = strings.Join(parts[:len(parts)-3], "-") // rough
		if len(parts) >= 3 {
			dateStr := strings.Join(parts[len(parts)-3:], "-")
			// Try parse 'YYYY-MM-DD'
			if t, err := time.Parse("2006-01-02", dateStr); err == nil {
				dt = t
			}
		}
	}
	if svc == "" {
		svc = "unknown"
	}
	format := "plain"
	switch strings.ToLower(ext) {
	case "json":
		format = "json"
	case "csv":
		format = "csv"
	default:
		format = "plain"
	}
	return FileMeta{Path: path, Service: svc, Date: dt, FormatHint: format}
}

func ReadLines(ctx context.Context, metas []FileMeta, includeStdin bool, filterExpr string, out chan<- Line) error {
	defer close(out)
	var re *regexp.Regexp
	var err error
	if filterExpr != "" {
		re, err = regexp.Compile(filterExpr)
		if err != nil {
			return fmt.Errorf("invalid filter regex: %w", err)
		}
	}
	// Files
	for _, m := range metas {
		select {
		case <-ctx.Done():
			return ctx.Err()
		default:
		}
		if err := readOne(ctx, m, re, out); err != nil {
			return err
		}
	}
	// stdin
	if includeStdin {
		m := FileMeta{Path: "stdin", Service: "stdin", Date: time.Now(), FormatHint: "plain"}
		r := bufio.NewReader(os.Stdin)
		var lineNo int64
		for {
			select {
			case <-ctx.Done():
				return ctx.Err()
			default:
			}
			s, err := r.ReadString('\n')
			if err != nil {
				if errors.Is(err, io.EOF) {
					break
				}
				return err
			}
			lineNo++
			s = strings.TrimRight(s, "\r\n")
			if re != nil && !re.MatchString(s) {
				continue
			}
			out <- Line{Text: s, Meta: m, LineNo: lineNo}
		}
	}
	return nil
}

func readOne(ctx context.Context, m FileMeta, re *regexp.Regexp, out chan<- Line) error {
	f, err := os.Open(m.Path)
	if err != nil {
		return err
	}
	defer f.Close()
	r := bufio.NewScanner(f)
	var lineNo int64
	for r.Scan() {
		select {
		case <-ctx.Done():
			return ctx.Err()
		default:
		}
		lineNo++
		txt := r.Text()
		if re != nil && !re.MatchString(txt) {
			continue
		}
		out <- Line{Text: txt, Meta: m, LineNo: lineNo}
	}
	return r.Err()
}

pkg/parser/parser.go

package parser

import (
	"encoding/csv"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"strconv"
	"strings"
	"time"

	"github.com/example/logpipe/pkg/source"
)

// Strategy interface
type Parser interface {
	Parse(line source.Line) (LogRecord, error)
}

type LogRecord struct {
	Service      string
	Timestamp    time.Time
	StatusCode   int
	LatencyMs    float64
	ResponseSize int64
	Message      string
	SourceFile   string
	LineNo       int64
	ID           string // for dedup
}

// Parser factory chooses based on format hint
func ForFormat(format string) Parser {
	switch format {
	case "json":
		return &JSONParser{}
	case "csv":
		return &CSVParser{}
	default:
		return &PlainParser{}
	}
}

type JSONParser struct{}

func (p *JSONParser) Parse(line source.Line) (LogRecord, error) {
	var obj map[string]any
	if err := json.Unmarshal([]byte(line.Text), &obj); err != nil {
		return LogRecord{}, fmt.Errorf("json parse: %w", err)
	}
	ts := time.Now()
	if v, ok := obj["timestamp"].(string); ok {
		if t, err := time.Parse(time.RFC3339Nano, v); err == nil {
			ts = t
		}
	}
	status := toInt(obj["status"])
	lat := toFloat(obj["latency_ms"])
	size := toInt64(obj["resp_bytes"])
	msg := toString(obj["message"])
	id := fmt.Sprintf("%s-%d-%d-%d", line.Meta.Path, line.LineNo, status, int64(ts.UnixNano()))
	return LogRecord{
		Service:      line.Meta.Service,
		Timestamp:    ts,
		StatusCode:   status,
		LatencyMs:    lat,
		ResponseSize: size,
		Message:      msg,
		SourceFile:   line.Meta.Path,
		LineNo:       line.LineNo,
		ID:           id,
	}, nil
}

type CSVParser struct{}

func (p *CSVParser) Parse(line source.Line) (LogRecord, error) {
	r := csv.NewReader(strings.NewReader(line.Text))
	r.FieldsPerRecord = -1
	rec, err := r.Read()
	if err != nil {
		if errors.Is(err, io.EOF) {
			return LogRecord{}, io.EOF
		}
		return LogRecord{}, fmt.Errorf("csv parse: %w", err)
	}
	// Assume header mapping example: timestamp,status,latency_ms,resp_bytes,message
	var ts time.Time
	if t, err := time.Parse(time.RFC3339Nano, rec[0]); err == nil {
		ts = t
	} else {
		ts = time.Now()
	}
	status, _ := strconv.Atoi(rec[1])
	lat, _ := strconv.ParseFloat(rec[2], 64)
	size, _ := strconv.ParseInt(rec[3], 10, 64)
	msg := rec[4]
	id := fmt.Sprintf("%s-%d-%d-%d", line.Meta.Path, line.LineNo, status, int64(ts.UnixNano()))
	return LogRecord{
		Service:      line.Meta.Service,
		Timestamp:    ts,
		StatusCode:   status,
		LatencyMs:    lat,
		ResponseSize: size,
		Message:      msg,
		SourceFile:   line.Meta.Path,
		LineNo:       line.LineNo,
		ID:           id,
	}, nil
}

type PlainParser struct{}

func (p *PlainParser) Parse(line source.Line) (LogRecord, error) {
	// Minimal extraction using simple tokens; users can customize.
	// Example: "ts=2025-11-20T10:10:10Z status=200 latency_ms=12 size=512 message=ok"
	fields := strings.Fields(line.Text)
	var ts time.Time = time.Now()
	var status int
	var lat float64
	var size int64
	var msg string
	for _, f := range fields {
		if strings.HasPrefix(f, "ts=") {
			t, err := time.Parse(time.RFC3339Nano, strings.TrimPrefix(f, "ts="))
			if err == nil {
				ts = t
			}
		} else if strings.HasPrefix(f, "status=") {
			status, _ = strconv.Atoi(strings.TrimPrefix(f, "status="))
		} else if strings.HasPrefix(f, "latency_ms=") {
			lat, _ = strconv.ParseFloat(strings.TrimPrefix(f, "latency_ms="), 64)
		} else if strings.HasPrefix(f, "size=") {
			size, _ = strconv.ParseInt(strings.TrimPrefix(f, "size="), 10, 64)
		} else if strings.HasPrefix(f, "message=") {
			msg = strings.TrimPrefix(f, "message=")
		}
	}
	id := fmt.Sprintf("%s-%d-%d-%d", line.Meta.Path, line.LineNo, status, int64(ts.UnixNano()))
	return LogRecord{
		Service:      line.Meta.Service,
		Timestamp:    ts,
		StatusCode:   status,
		LatencyMs:    lat,
		ResponseSize: size,
		Message:      msg,
		SourceFile:   line.Meta.Path,
		LineNo:       line.LineNo,
		ID:           id,
	}, nil
}

func toInt(v any) int {
	switch t := v.(type) {
	case float64:
		return int(t)
	case int:
		return t
	case string:
		i, _ := strconv.Atoi(t)
		return i
	default:
		return 0
	}
}
func toFloat(v any) float64 {
	switch t := v.(type) {
	case float64:
		return t
	case string:
		f, _ := strconv.ParseFloat(t, 64)
		return f
	default:
		return 0
	}
}
func toInt64(v any) int64 {
	switch t := v.(type) {
	case float64:
		return int64(t)
	case int64:
		return t
	case int:
		return int64(t)
	case string:
		i, _ := strconv.ParseInt(t, 10, 64)
		return i
	default:
		return 0
	}
}
func toString(v any) string {
	switch t := v.(type) {
	case string:
		return t
	default:
		return ""
	}
}

pkg/aggregator/aggregator.go

package aggregator

import (
	"sort"
	"sync"
	"time"

	"github.com/example/logpipe/pkg/parser"
)

type Snapshot struct {
	WindowStart   time.Time
	WindowEnd     time.Time
	Service       string
	Count         int64
	ErrorCount    int64
	ErrorRate     float64
	P95LatencyMs  float64
	P99LatencyMs  float64
	AvgRespBytes  float64
	LastUpdated   time.Time
}

type Aggregator interface {
	Add(rec parser.LogRecord)
	Snapshots() []Snapshot
}

// Fixed window aggregator (tumbling)
type FixedAggregator struct {
	winSize time.Duration
	mu      sync.RWMutex
	// key: service + windowStart
	buckets map[string]*bucket
}
type bucket struct {
	start   time.Time
	end     time.Time
	service string
	lat     []float64
	sizes   int64
	count   int64
	errs    int64
	seenIDs map[string]struct{}
}

func NewFixed(win time.Duration) *FixedAggregator {
	return &FixedAggregator{
		winSize: win,
		buckets: make(map[string]*bucket),
	}
}

func (a *FixedAggregator) Add(rec parser.LogRecord) {
	ws := rec.Timestamp.Truncate(a.winSize)
	key := rec.Service + "|" + ws.Format(time.RFC3339)
	a.mu.Lock()
	b, ok := a.buckets[key]
	if !ok {
		b = &bucket{
			start: ws,
			end:   ws.Add(a.winSize),
			service: rec.Service,
			seenIDs: make(map[string]struct{}),
		}
		a.buckets[key] = b
	}
	// Dedup
	if _, exists := b.seenIDs[rec.ID]; exists {
		a.mu.Unlock()
		return
	}
	b.seenIDs[rec.ID] = struct{}{}

	b.count++
	if rec.StatusCode >= 500 {
		b.errs++
	}
	if rec.LatencyMs > 0 {
		b.lat = append(b.lat, rec.LatencyMs)
	}
	b.sizes += rec.ResponseSize
	a.mu.Unlock()
}

func (a *FixedAggregator) Snapshots() []Snapshot {
	a.mu.RLock()
	defer a.mu.RUnlock()
	out := make([]Snapshot, 0, len(a.buckets))
	for _, b := range a.buckets {
		var p95, p99 float64
		if len(b.lat) > 0 {
			sort.Float64s(b.lat)
			p95 = quantile(b.lat, 0.95)
			p99 = quantile(b.lat, 0.99)
		}
		avg := 0.0
		if b.count > 0 {
			avg = float64(b.sizes) / float64(b.count)
		}
		errRate := 0.0
		if b.count > 0 {
			errRate = float64(b.errs) / float64(b.count)
		}
		out = append(out, Snapshot{
			WindowStart:  b.start,
			WindowEnd:    b.end,
			Service:      b.service,
			Count:        b.count,
			ErrorCount:   b.errs,
			ErrorRate:    errRate,
			P95LatencyMs: p95,
			P99LatencyMs: p99,
			AvgRespBytes: avg,
			LastUpdated:  time.Now(),
		})
	}
	return out
}

// Rolling window aggregator (sliding last N duration)
type RollingAggregator struct {
	winSize time.Duration
	mu      sync.RWMutex
	// service -> ring of records (keep recent)
	buffers map[string]*rollbuf
}

type rollbuf struct {
	recs    []parser.LogRecord
	seen    map[string]time.Time
}

func NewRolling(win time.Duration) *RollingAggregator {
	return &RollingAggregator{
		winSize: win,
		buffers: make(map[string]*rollbuf),
	}
}

func (a *RollingAggregator) Add(rec parser.LogRecord) {
	a.mu.Lock()
	defer a.mu.Unlock()
	buf, ok := a.buffers[rec.Service]
	if !ok {
		buf = &rollbuf{seen: make(map[string]time.Time)}
		a.buffers[rec.Service] = buf
	}
	// Dedup with TTL
	if _, exists := buf.seen[rec.ID]; exists {
		return
	}
	buf.seen[rec.ID] = rec.Timestamp
	buf.recs = append(buf.recs, rec)
	// Purge old
	cut := time.Now().Add(-a.winSize)
	var j int
	for _, r := range buf.recs {
		if r.Timestamp.After(cut) {
			buf.recs[j] = r
			j++
		} else {
			delete(buf.seen, r.ID)
		}
	}
	buf.recs = buf.recs[:j]
}

func (a *RollingAggregator) Snapshots() []Snapshot {
	a.mu.RLock()
	defer a.mu.RUnlock()
	var out []Snapshot
	now := time.Now()
	for svc, buf := range a.buffers {
		var lat []float64
		var sizes int64
		var count int64
		var errs int64
		cut := now.Add(-a.winSize)
		for _, r := range buf.recs {
			if r.Timestamp.Before(cut) {
				continue
			}
			count++
			if r.StatusCode >= 500 {
				errs++
			}
			if r.LatencyMs > 0 {
				lat = append(lat, r.LatencyMs)
			}
			sizes += r.ResponseSize
		}
		var p95, p99, avg, errRate float64
		if count > 0 {
			sort.Float64s(lat)
			p95 = quantile(lat, 0.95)
			p99 = quantile(lat, 0.99)
			avg = float64(sizes) / float64(count)
			errRate = float64(errs) / float64(count)
		}
		out = append(out, Snapshot{
			WindowStart:  cut,
			WindowEnd:    now,
			Service:      svc,
			Count:        count,
			ErrorCount:   errs,
			ErrorRate:    errRate,
			P95LatencyMs: p95,
			P99LatencyMs: p99,
			AvgRespBytes: avg,
			LastUpdated:  now,
		})
	}
	return out
}

func quantile(sorted []float64, q float64) float64 {
	if len(sorted) == 0 {
		return 0
	}
	idx := int(float64(len(sorted)-1)*q + 0.5)
	if idx < 0 {
		idx = 0
	}
	if idx >= len(sorted) {
		idx = len(sorted)-1
	}
	return sorted[idx]
}

pkg/sink/sink.go

package sink

import (
	"encoding/csv"
	"encoding/json"
	"fmt"
	"log/slog"
	"net/http"
	"os"
	"path/filepath"
	"strings"
	"sync"
	"time"

	"github.com/example/logpipe/pkg/aggregator"
	"github.com/example/logpipe/pkg/config"
)

type Sink interface {
	Write(snap []aggregator.Snapshot) error
	Close() error
}

type Factory struct{}
func NewFactory() *Factory { return &Factory{} }

func (f *Factory) Create(cfg *config.Config, logger *slog.Logger) (Sink, error) {
	if cfg.HTTP.Enable {
		hs := NewHTTPSink(cfg, logger)
		return hs, nil
	}
	// default file sink
	return NewFileSink(cfg.OutputDir, cfg.Output, logger)
}

type FileSink struct {
	outDir string
	formats config.OutputConfig
	logger *slog.Logger
}

func NewFileSink(outDir string, formats config.OutputConfig, logger *slog.Logger) (*FileSink, error) {
	if outDir != "-" {
		if err := os.MkdirAll(outDir, 0o755); err != nil {
			return nil, err
		}
	}
	return &FileSink{outDir: outDir, formats: formats, logger: logger}, nil
}

func (f *FileSink) Write(snap []aggregator.Snapshot) error {
	t := time.Now().Format("20060102-150405")
	base := "report-" + t
	// JSON
	if f.formats.FormatJSON {
		data, _ := json.MarshalIndent(snap, "", "  ")
		if f.outDir == "-" {
			fmt.Println(string(data))
		} else {
			_ = os.WriteFile(filepath.Join(f.outDir, base+".json"), data, 0o644)
		}
	}
	// Markdown
	if f.formats.FormatMarkdown {
		var b strings.Builder
		b.WriteString("# LogPipe Report\n\n")
		b.WriteString("| Service | WindowStart | WindowEnd | Count | Errors | ErrorRate | P95 | P99 | AvgBytes |\n")
		b.WriteString("|---|---|---|---:|---:|---:|---:|---:|---:|\n")
		for _, s := range snap {
			fmt.Fprintf(&b, "| %s | %s | %s | %d | %d | %.3f | %.2f | %.2f | %.2f |\n",
				s.Service, s.WindowStart.Format(time.RFC3339), s.WindowEnd.Format(time.RFC3339),
				s.Count, s.ErrorCount, s.ErrorRate, s.P95LatencyMs, s.P99LatencyMs, s.AvgRespBytes)
		}
		if f.outDir == "-" {
			fmt.Println(b.String())
		} else {
			_ = os.WriteFile(filepath.Join(f.outDir, base+".md"), []byte(b.String()), 0o644)
		}
	}
	// CSV
	if f.formats.FormatCSV {
		var w *csv.Writer
		if f.outDir == "-" {
			w = csv.NewWriter(os.Stdout)
		} else {
			fpath := filepath.Join(f.outDir, base+".csv")
			file, err := os.Create(fpath)
			if err != nil {
				return err
			}
			defer file.Close()
			w = csv.NewWriter(file)
		}
		_ = w.Write([]string{"service","window_start","window_end","count","error_count","error_rate","p95_ms","p99_ms","avg_resp_bytes"})
		for _, s := range snap {
			_ = w.Write([]string{
				s.Service,
				s.WindowStart.Format(time.RFC3339),
				s.WindowEnd.Format(time.RFC3339),
				fmt.Sprintf("%d", s.Count),
				fmt.Sprintf("%d", s.ErrorCount),
				fmt.Sprintf("%.6f", s.ErrorRate),
				fmt.Sprintf("%.2f", s.P95LatencyMs),
				fmt.Sprintf("%.2f", s.P99LatencyMs),
				fmt.Sprintf("%.2f", s.AvgRespBytes),
			})
		}
		w.Flush()
		if err := w.Error(); err != nil {
			return err
		}
	}
	return nil
}

func (f *FileSink) Close() error { return nil }

// Optional HTTP sink hosting latest snapshots and health
type HTTPSink struct {
	cfg    *config.Config
	logger *slog.Logger
	mu     sync.RWMutex
	latest []aggregator.Snapshot
	server *http.Server
}

func NewHTTPSink(cfg *config.Config, logger *slog.Logger) *HTTPSink {
	hs := &HTTPSink{cfg: cfg, logger: logger}
	mux := http.NewServeMux()
	mux.HandleFunc("/healthz", hs.handleHealth)
	mux.HandleFunc("/metrics", hs.handleMetrics)
	hs.server = &http.Server{Addr: cfg.HTTP.Addr, Handler: mux}
	go func() {
		logger.Info("http sink listening", "addr", cfg.HTTP.Addr)
		if err := hs.server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
			logger.Error("http server error", "error", err)
		}
	}()
	return hs
}

func (h *HTTPSink) Write(snap []aggregator.Snapshot) error {
	h.mu.Lock()
	h.latest = snap
	h.mu.Unlock()
	return nil
}

func (h *HTTPSink) Close() error {
	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	defer cancel()
	return h.server.Shutdown(ctx)
}

func (h *HTTPSink) handleHealth(w http.ResponseWriter, r *http.Request) {
	w.WriteHeader(http.StatusOK)
	w.Write([]byte(`{"status":"ok"}`))
}

func (h *HTTPSink) handleMetrics(w http.ResponseWriter, r *http.Request) {
	h.mu.RLock()
	defer h.mu.RUnlock()
	data, _ := json.MarshalIndent(h.latest, "", "  ")
	w.Header().Set("Content-Type", "application/json")
	w.WriteHeader(http.StatusOK)
	w.Write(data)
}

pkg/pipeline/pipeline.go

package pipeline

import (
	"context"
	"log/slog"
	"sync"
	"time"

	"github.com/example/logpipe/pkg/aggregator"
	"github.com/example/logpipe/pkg/config"
	"github.com/example/logpipe/pkg/observer"
	"github.com/example/logpipe/pkg/parser"
	"github.com/example/logpipe/pkg/sink"
	"github.com/example/logpipe/pkg/source"
)

type Pipeline struct {
	cfg    *config.Config
	logger *slog.Logger
	bus    *observer.EventBus
	sink   sink.Sink
}

func NewPipeline(cfg *config.Config, logger *slog.Logger, bus *observer.EventBus, s sink.Sink) *Pipeline {
	return &Pipeline{cfg: cfg, logger: logger, bus: bus, sink: s}
}

// Pipeline: source -> parse -> aggregate -> sink
func (p *Pipeline) Run(ctx context.Context) error {
	metas, err := source.DiscoverFiles(p.cfg.InputPath)
	if err != nil && !p.cfg.IncludeStdin {
		return err
	}
	srcCh := make(chan source.Line, 1024)
	go func() {
		_ = source.ReadLines(ctx, metas, p.cfg.IncludeStdin, p.cfg.FilterExpr, srcCh)
	}()

	// Parser workers
	type parseOut struct {
		rec parser.LogRecord
		err error
	}
	outCh := make(chan parseOut, 1024)

	var wg sync.WaitGroup
	wg.Add(p.cfg.Concurrency)
	for i := 0; i < p.cfg.Concurrency; i++ {
		go func() {
			defer wg.Done()
			for line := range srcCh {
				select {
				case <-ctx.Done():
					return
				default:
				}
				par := parser.ForFormat(line.Meta.FormatHint) // Strategy pattern
				rec, err := par.Parse(line)
				if err != nil {
					// tolerance: publish alert and continue
					p.bus.Publish(observer.Event{
						Topic: "alert",
						Data: map[string]any{"type":"parse_error","file":line.Meta.Path,"line":line.LineNo,"error":err.Error()},
					})
					continue
				}
				outCh <- parseOut{rec: rec}
			}
		}()
	}

	// Aggregator selection
	var agg aggregator.Aggregator
	if p.cfg.WindowType == "fixed" {
		agg = aggregator.NewFixed(p.cfg.WindowSize)
	} else {
		agg = aggregator.NewRolling(p.cfg.WindowSize)
	}

	// progress & alerts
	var processed int64
	alertThreshold := p.cfg.Alert.ErrorRateThreshold
	// aggregator consumer
	doneAgg := make(chan struct{})
	go func() {
		for po := range outCh {
			if po.err != nil {
				continue
			}
			agg.Add(po.rec)
			processed++
			if processed%1000 == 0 {
				p.bus.Publish(observer.Event{
					Topic: "progress",
					Data: map[string]any{"processed": processed},
				})
			}
			// Optional alert: if error rate exceeds threshold in any snapshot, publish
			if alertThreshold > 0 {
				snaps := agg.Snapshots()
				for _, s := range snaps {
					if s.ErrorRate >= alertThreshold {
						p.bus.Publish(observer.Event{
							Topic: "alert",
							Data: map[string]any{
								"type":"error_rate_high",
								"service": s.Service,
								"window": s.WindowStart.String()+"-"+s.WindowEnd.String(),
								"error_rate": s.ErrorRate,
							},
						})
					}
				}
			}
		}
		close(doneAgg)
	}()

	// Close parse output when workers done
	go func() {
		wg.Wait()
		close(outCh)
	}()

	// Wait for aggregation
	<-doneAgg

	// Write sink
	snaps := agg.Snapshots()
	if err := p.sink.Write(snaps); err != nil {
		p.logger.Error("sink write error", "error", err)
		return err
	}

	// Cleanup
	if err := p.sink.Close(); err != nil {
		p.logger.Warn("sink close warn", "error", err)
	}
	return nil
}

pkg/util/util.go

package util

// Placeholder for shared utilities if needed.

internal/version/version.go

package version

const Version = "0.1.0"

examples/data/serviceA-2025-11-20.log

ts=2025-11-20T10:10:10Z status=200 latency_ms=15 size=512 message=ok
ts=2025-11-20T10:10:11Z status=500 latency_ms=50 size=128 message=err
ts=2025-11-20T10:10:12Z status=200 latency_ms=20 size=256 message=ok

examples/data/serviceB-2025-11-20.json

{"timestamp":"2025-11-20T10:10:10Z","status":200,"latency_ms":12.5,"resp_bytes":1024,"message":"ok"}
{"timestamp":"2025-11-20T10:10:12Z","status":503,"latency_ms":35.0,"resp_bytes":256,"message":"upstream"}
{"timestamp":"2025-11-20T10:10:13Z","status":200,"latency_ms":22.0,"resp_bytes":512,"message":"ok"}

examples/data/serviceC-2025-11-20.csv

2025-11-20T10:10:10Z,200,10.5,128,ok
2025-11-20T10:10:11Z,500,45.0,64,err
2025-11-20T10:10:12Z,200,20.0,256,ok

configs/default.yaml

log:
  level: info
http:
  enable: false
  addr: ":8080"
output:
  format_json: true
  format_markdown: true
  format_csv: true
alert:
  error_rate_threshold: 0.5

pkg/parser/parser_test.go

package parser

import (
	"testing"
	"time"

	"github.com/example/logpipe/pkg/source"
)

func TestJSONParser(t *testing.T) {
	line := source.Line{Text: `{"timestamp":"2025-11-20T10:10:10Z","status":200,"latency_ms":12.5,"resp_bytes":1024,"message":"ok"}`, Meta: source.FileMeta{Path:"/tmp/a.json", Service:"svc", FormatHint:"json"}}
	p := &JSONParser{}
	rec, err := p.Parse(line)
	if err != nil {
		t.Fatalf("parse error: %v", err)
	}
	if rec.StatusCode != 200 || rec.LatencyMs != 12.5 || rec.ResponseSize != 1024 {
		t.Fatalf("unexpected: %+v", rec)
	}
	if rec.Timestamp.Format(time.RFC3339) != "2025-11-20T10:10:10Z" {
		t.Fatalf("bad timestamp: %s", rec.Timestamp)
	}
}

func TestPlainParser(t *testing.T) {
	line := source.Line{Text: `ts=2025-11-20T10:10:10Z status=500 latency_ms=55 size=64 message=boom`, Meta: source.FileMeta{Path:"/tmp/a.log", Service:"svc", FormatHint:"plain"}}
	p := &PlainParser{}
	rec, err := p.Parse(line)
	if err != nil {
		t.Fatalf("parse error: %v", err)
	}
	if rec.StatusCode != 500 || rec.LatencyMs != 55 || rec.ResponseSize != 64 {
		t.Fatalf("unexpected: %+v", rec)
	}
}

pkg/aggregator/aggregator_test.go

package aggregator

import (
	"testing"
	"time"

	"github.com/example/logpipe/pkg/parser"
)

func TestFixedAggregator(t *testing.T) {
	agg := NewFixed(time.Minute)
	r := parser.LogRecord{Service:"svc", Timestamp: time.Date(2025,11,20,10,10,0,0,time.UTC), StatusCode:200, LatencyMs:10, ResponseSize:100, ID:"a"}
	agg.Add(r)
	r2 := parser.LogRecord{Service:"svc", Timestamp: r.Timestamp, StatusCode:500, LatencyMs:20, ResponseSize:200, ID:"b"}
	agg.Add(r2)
	snaps := agg.Snapshots()
	if len(snaps) != 1 {
		t.Fatalf("expected 1 snapshot, got %d", len(snaps))
	}
	s := snaps[0]
	if s.Count != 2 || s.ErrorCount != 1 {
		t.Fatalf("unexpected: %+v", s)
	}
	if s.P95LatencyMs == 0 || s.P99LatencyMs == 0 {
		t.Fatalf("quantiles missing")
	}
}

func TestRollingAggregatorDedup(t *testing.T) {
	agg := NewRolling(2*time.Minute)
	ts := time.Now()
	r := parser.LogRecord{Service:"svc", Timestamp: ts, StatusCode:200, LatencyMs:10, ResponseSize:100, ID:"dup"}
	agg.Add(r)
	agg.Add(r) // dedup
	snaps := agg.Snapshots()
	if len(snaps) != 1 {
		t.Fatalf("expected 1 snapshot, got %d", len(snaps))
	}
	if snaps[0].Count != 1 {
		t.Fatalf("dedup failed: %+v", snaps[0])
	}
}

pkg/aggregator/aggregator_bench_test.go

package aggregator

import (
	"testing"
	"time"

	"github.com/example/logpipe/pkg/parser"
)

func BenchmarkFixedAggregatorAdd(b *testing.B) {
	agg := NewFixed(1 * time.Minute)
	ts := time.Now()
	for i := 0; i < b.N; i++ {
		rec := parser.LogRecord{
			Service: "svc",
			Timestamp: ts.Add(time.Duration(i%100) * time.Millisecond),
			StatusCode: 200,
			LatencyMs: float64(i%50) + 1,
			ResponseSize: int64(i%1024),
			ID:  "id-"+string(rune(i%1000)),
		}
		agg.Add(rec)
	}
}

Makefile

APP=logpipe

.PHONY: all build test fmt vet lint run docker clean

all: build

build:
	go build -o bin/$(APP) ./cmd/logpipe

test:
	go test ./...

bench:
	go test -bench=. -benchmem ./pkg/aggregator

fmt:
	go fmt ./...

vet:
	go vet ./...

run:
	go run ./cmd/logpipe --input=./examples/data --concurrency=4 --window=1m --window-type=fixed --out=-

docker:
	docker build -t logpipe:latest .

clean:
	rm -rf bin out

Dockerfile

# Multi-stage build
FROM golang:1.21 AS builder
WORKDIR /app
COPY . .
RUN make build

FROM gcr.io/distroless/base-debian12
COPY --from=builder /app/bin/logpipe /usr/local/bin/logpipe
COPY configs /configs
ENTRYPOINT ["/usr/local/bin/logpipe"]
CMD ["--input=/data", "--concurrency=4", "--window=1m", "--window-type=fixed", "--out=-", "--config=/configs/default.yaml"]

scripts/build.sh

#!/usr/bin/env bash
set -euo pipefail

GOOS=${GOOS:-linux}
GOARCH=${GOARCH:-amd64}

mkdir -p bin
echo "Building for $GOOS/$GOARCH"
GOOS=$GOOS GOARCH=$GOARCH go build -o bin/logpipe-${GOOS}-${GOARCH} ./cmd/logpipe

README.md(简要使用说明)

LogPipe: 并发日志数据处理管线 CLI 模板

功能概述
- 输入:读取指定目录或通配符的多格式日志(JSON/CSV/plain),支持stdin,正则过滤。
- 识别:按文件名推断服务名与日期。
- 并发:源→解析→聚合→下游输出,使用 worker 池与 channel。
- 设计模式:Pipeline、策略(多格式解析)、观察者(进度/告警)、工厂(sink 选择)、构建者(配置)。
- 聚合:请求数、错误率、P95/P99延迟、平均响应大小;支持固定/滚动窗口;去重与异常行容错。
- 输出:JSON/Markdown/CSV 到 stdout 或文件;可选启动HTTP端点展示最新统计与健康检查。
- 配置:命令行、环境变量、YAML;结构化日志(级别、时间戳、调用栈)。

快速开始
- 本地运行:make run
- 指定输入与过滤:go run ./cmd/logpipe --input="./examples/data/*.json" --filter="status=5.."
- 启动HTTP端点:go run ./cmd/logpipe --http --http-addr=":8080" --out=-

配置
- YAML:configs/default.yaml
- 环境变量示例:LOGPIPE_CONCURRENCY=8 LOGPIPE_WINDOW=2m LOGPIPE_OUTPUT=./out LOGPIPE_FILTER="latency_ms>100"

输出
- 文件:默认写入 ./out/report-*.json / .md / .csv
- stdout:--out=-

退出码约定
- 0:成功
- 1:运行时错误(管线失败、写入失败)
- 2:配置错误(参数/配置文件无效)
- 3:输出目标初始化失败(sink 创建失败)

性能与内存建议
- 根据文件大小与 CPU 核心设置 --concurrency;建议 2*NCPU 到 4*NCPU。
- 大量延迟样本计算分位数时排序开销较大,可用近似分位数数据结构(如 t-digest、GK)替代。
- 滚动窗口去重映射可能增长,需周期性清理(已按窗口 TTL 清理)。
- 对超大日志建议分批读入与流式处理,避免一次性加载。

扩展建议
- Source:增加 Kafka 消费者(sarama 或 franz-go),将消息转换为 source.Line。
- Metrics:暴露 Prometheus 指标(使用 prometheus/client_golang),如处理速率、错误计数。
- Parser:增加 Protobuf/NDJSON 格式解析器;基于文件扩展或 Magic bytes 自动识别。
- Sink:增加远端HTTP/REST、S3/GCS、Kafka Producer、数据库写入。
- 观察者:增加报警推送(Email/Slack/Webhook)。
- 配置:支持热更新(SIGHUP 或文件监控)。

测试与基准
- make test
- make bench

跨平台构建
- scripts/build.sh,设置 GOOS/GOARCH,例如:
  GOOS=darwin GOARCH=arm64 scripts/build.sh

如何根据需求调整与扩展

  • 调整聚合窗口与策略:修改 Config.WindowType/WindowSize,或实现自定义 Aggregator 满足特定统计需求(如分服务+API路由维度)。
  • 增加字段与指标:扩展 parser.LogRecord,修改 aggregator 计算与 sink 输出。
  • 与其他系统集成:实现新的 Sink(通过 Factory),例如将 Snapshot 推送到内部监控或报表系统。
  • 错误容错:在 pipeline 中增加重试、错误队列或死信处理;对解析失败的行记录到单独文件。
  • 安全与治理:对 HTTP 端点添加认证与速率限制,日志中避免敏感信息输出。
  • 规范与编码风格:使用 go vet、golangci-lint,统一 slog 结构化日志字段;函数注释与接口说明。

说明

  • 该模板采用标准库与 gopkg.in/yaml.v3。若需更强 CLI 与配置体验,可引入 pflag/cobra/viper。
  • 量化计算目前使用简单排序法;如需高吞吐,替换 quantile 实现为更高效近似算法。
  • 文件名推断服务名与日期为简化示例,实际可根据组织规范精确提取。

到此你即可直接运行本模板,或作为基础快速扩展为生产级并发日志处理工具。需要我帮你接入 Kafka source、Prometheus 指标或完善 HTTP UI,请继续说明具体需求。

示例详情

解决的问题

帮助开发者快速生成高质量示例代码或程序模板,以满足特定需求,同时提供相关技术指导,从而提高开发效率,降低学习成本并加速项目推进。

适用用户

软件开发工程师

通过快速生成高质量代码片段与模板,减少重复劳动,加速任务执行,专注于构建核心功能。

技术架构师

根据业务逻辑设计精准代码结构与设计模式,确保代码的可扩展性与维护性更高,助力整体技术架构优化。

初学编程者

自定义生成学习用例,轻松理解不同编程语言的实现方法与最佳实践,提高学习效率。

特征总结

轻松生成多语言代码片段,支持Python、JavaScript、Java、C++等多种编程语言,满足不同项目需求。
基于核心需求,自动提供高质量程序模板,节省开发时间并规范编程实践。
结合具体场景推荐设计模式与库,实现代码更高效、更易维护的最佳实践。
一键生成遵循标准编码规范的代码,提升团队代码质量并降低维护成本。
提供针对性建议,帮助扩展、调整现有代码以符合新需求或与系统集成。
输出优雅结构的示例代码,从逻辑框架到细节实现,全流程覆盖开发需求。
协助快速解决开发中的瓶颈问题,提供高效、实用的代码解决方案参考。
支持定制化代码生成,根据业务特定需求灵活调整,以实现最佳效果。

如何使用购买的提示词模板

1. 直接在外部 Chat 应用中使用

将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。

2. 发布为 API 接口调用

把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。

3. 在 MCP Client 中配置使用

在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。

AI 提示词价格
¥20.00元
先用后买,用好了再付款,超安全!

您购买后可以获得什么

获得完整提示词模板
- 共 161 tokens
- 5 个可调节参数
{ 代码需求描述 } { 编程语言 } { 代码用途 } { 输出风格 } { 框架或库 }
获得社区贡献内容的使用权
- 精选社区优质案例,助您快速上手提示词
限时免费

不要错过!

免费获取高级提示词-优惠即将到期

17
:
23
小时
:
59
分钟
:
59