热门角色不仅是灵感来源,更是你的效率助手。通过精挑细选的角色提示词,你可以快速生成高质量内容、提升创作灵感,并找到最契合你需求的解决方案。让创作更轻松,让价值更直接!
我们根据不同用户需求,持续更新角色库,让你总能找到合适的灵感入口。
根据用户需求生成高质量示例代码或完整程序模板,支持多种编程语言、框架与库,提供设计模式应用建议及扩展指导,帮助开发者快速实现功能、优化代码结构并提高开发效率。
下面给出一个可直接运行的、可扩展的 Django + DRF 任务管理 REST 后端模板,覆盖需求中的功能、架构与工程实践。包含项目目录结构、关键代码文件、配置、Docker、测试、运行步骤与扩展指导。可作为生产化项目的起点。
目录结构
requirements.txt
Django>=4.2
djangorestframework>=3.14
djangorestframework-simplejwt>=5.3
drf-spectacular>=0.27
django-filter>=23.5
python-dotenv>=1.0
dj-database-url>=2.2
psycopg2-binary>=2.9
gunicorn>=21
uvicorn>=0.30
whitenoise>=6.7
django-cors-headers>=4.4
requests>=2.32
django-redis>=5.4
python-json-logger>=2.0
pytest>=8.2
pytest-django>=4.8
pytest-cov>=5.0
Faker>=26.0
config/settings/base.py
import os
from pathlib import Path
from datetime import timedelta
from dotenv import load_dotenv
import dj_database_url
load_dotenv()
BASE_DIR = Path(__file__).resolve().parent.parent.parent
SECRET_KEY = os.getenv("SECRET_KEY", "dev-insecure-key")
DEBUG = os.getenv("DEBUG", "True") == "True"
ALLOWED_HOSTS = os.getenv("ALLOWED_HOSTS", "*").split(",")
INSTALLED_APPS = [
"django.contrib.admin",
"django.contrib.auth",
"django.contrib.contenttypes",
"django.contrib.sessions",
"django.contrib.messages",
"django.contrib.staticfiles",
"rest_framework",
"rest_framework.authtoken",
"drf_spectacular",
"django_filters",
"corsheaders",
"apps.users",
"apps.projects",
"apps.tasks",
]
MIDDLEWARE = [
"corsheaders.middleware.CorsMiddleware",
"django.middleware.security.SecurityMiddleware",
"whitenoise.middleware.WhiteNoiseMiddleware",
"django.contrib.sessions.middleware.SessionMiddleware",
"django.middleware.common.CommonMiddleware",
"django.middleware.csrf.CsrfViewMiddleware",
"django.contrib.auth.middleware.AuthenticationMiddleware",
"django.contrib.messages.middleware.MessageMiddleware",
"django.middleware.clickjacking.XFrameOptionsMiddleware",
"apps.common.middleware.RequestIDMiddleware",
"apps.common.middleware.StructuredExceptionMiddleware",
]
ROOT_URLCONF = "config.urls"
WSGI_APPLICATION = "config.wsgi.application"
ASGI_APPLICATION = "config.asgi.application"
DATABASES = {
"default": dj_database_url.parse(
os.getenv("DATABASE_URL", f"sqlite:///{BASE_DIR / 'db.sqlite3'}"),
conn_max_age=600,
)
}
AUTH_USER_MODEL = "users.User"
REST_FRAMEWORK = {
"DEFAULT_AUTHENTICATION_CLASSES": (
"rest_framework_simplejwt.authentication.JWTAuthentication",
),
"DEFAULT_PERMISSION_CLASSES": (
"rest_framework.permissions.IsAuthenticated",
),
"DEFAULT_SCHEMA_CLASS": "drf_spectacular.openapi.AutoSchema",
"DEFAULT_PAGINATION_CLASS": "apps.common.pagination.DefaultPageNumberPagination",
"PAGE_SIZE": int(os.getenv("PAGE_SIZE", 20)),
"DEFAULT_FILTER_BACKENDS": [
"django_filters.rest_framework.DjangoFilterBackend",
"rest_framework.filters.OrderingFilter",
"rest_framework.filters.SearchFilter",
],
"DEFAULT_THROTTLE_RATES": {
"anon": os.getenv("THROTTLE_ANON", "100/min"),
"user": os.getenv("THROTTLE_USER", "1000/min"),
},
}
SIMPLE_JWT = {
"ACCESS_TOKEN_LIFETIME": timedelta(minutes=int(os.getenv("JWT_ACCESS_MIN", "60"))),
"REFRESH_TOKEN_LIFETIME": timedelta(days=int(os.getenv("JWT_REFRESH_DAYS", "7"))),
"AUTH_HEADER_TYPES": ("Bearer",),
}
SPECTACULAR_SETTINGS = {
"TITLE": "Task Manager API",
"DESCRIPTION": "Django + DRF 任务管理后端",
"VERSION": "1.0.0",
"SERVE_PERMISSIONS": ["rest_framework.permissions.AllowAny"],
}
LANGUAGE_CODE = "zh-hans"
TIME_ZONE = os.getenv("TIME_ZONE", "Asia/Shanghai")
USE_I18N = True
USE_TZ = True
STATIC_URL = "static/"
STATIC_ROOT = BASE_DIR / "static"
MEDIA_URL = "/media/"
MEDIA_ROOT = BASE_DIR / "media"
CORS_ALLOW_ALL_ORIGINS = True
CACHES = {
"default": {
"BACKEND": "django_redis.cache.RedisCache"
if os.getenv("REDIS_URL")
else "django.core.cache.backends.locmem.LocMemCache",
"LOCATION": os.getenv("REDIS_URL", ""),
"OPTIONS": {"CLIENT_CLASS": "django_redis.client.DefaultClient"} if os.getenv("REDIS_URL") else {},
}
}
LOGGING = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"json": {
"()": "python_json_logger.jsonlogger.JsonFormatter",
"fmt": "%(asctime)s %(levelname)s %(name)s %(message)s %(request_id)s",
},
},
"handlers": {
"console": {"class": "logging.StreamHandler", "formatter": "json"},
},
"root": {"handlers": ["console"], "level": "INFO"},
}
FILE_UPLOAD_HANDLERS = ["django.core.files.uploadhandler.TemporaryFileUploadHandler"]
# Feature flags
FEATURE_RATE_LIMIT = os.getenv("FEATURE_RATE_LIMIT", "True") == "True"
config/settings/dev.py
from .base import *
DEBUG = True
config/settings/staging.py
from .base import *
DEBUG = False
config/settings/prod.py
from .base import *
DEBUG = False
SECURE_PROXY_SSL_HEADER = ("HTTP_X_FORWARDED_PROTO", "https")
CSRF_TRUSTED_ORIGINS = os.getenv("CSRF_TRUSTED_ORIGINS", "").split(",") if os.getenv("CSRF_TRUSTED_ORIGINS") else []
config/urls.py
from django.contrib import admin
from django.urls import path, include
from drf_spectacular.views import SpectacularAPIView, SpectacularSwaggerView
urlpatterns = [
path("admin/", admin.site.urls),
path("api/schema/", SpectacularAPIView.as_view(), name="schema"),
path("api/docs/", SpectacularSwaggerView.as_view(url_name="schema")),
path("auth/", include("apps.users.urls")),
path("projects/", include("apps.projects.urls")),
path("tasks/", include("apps.tasks.urls")),
]
apps/common/pagination.py
from rest_framework.pagination import PageNumberPagination
class DefaultPageNumberPagination(PageNumberPagination):
page_size_query_param = "page_size"
max_page_size = 200
apps/common/middleware.py
import uuid, logging, json
from django.utils.deprecation import MiddlewareMixin
from django.http import JsonResponse
logger = logging.getLogger(__name__)
class RequestIDMiddleware(MiddlewareMixin):
def process_request(self, request):
request.request_id = str(uuid.uuid4())
class StructuredExceptionMiddleware(MiddlewareMixin):
def process_exception(self, request, exception):
logger.exception("Unhandled exception", extra={"request_id": getattr(request, "request_id", None)})
return JsonResponse(
{"detail": "Internal Server Error", "request_id": getattr(request, "request_id", None)},
status=500,
)
apps/common/permissions.py
from rest_framework.permissions import BasePermission, SAFE_METHODS
class IsAdmin(BasePermission):
def has_permission(self, request, view):
return request.user.is_authenticated and request.user.role == "admin"
class IsOwnerOrAdmin(BasePermission):
def has_object_permission(self, request, view, obj):
if not request.user.is_authenticated:
return False
return request.user.role == "admin" or getattr(obj, "owner_id", None) == request.user.id
apps/users/models.py
from django.contrib.auth.models import AbstractUser
from django.db import models
class User(AbstractUser):
ROLE_CHOICES = (("admin", "Admin"), ("member", "Member"))
role = models.CharField(max_length=10, choices=ROLE_CHOICES, default="member")
def __str__(self):
return f"{self.username}({self.role})"
apps/users/serializers.py
from rest_framework import serializers
from django.contrib.auth import get_user_model
from django.contrib.auth.password_validation import validate_password
User = get_user_model()
class UserDTO(serializers.Serializer):
id = serializers.IntegerField(read_only=True)
username = serializers.CharField()
email = serializers.EmailField()
role = serializers.CharField()
class RegisterSerializer(serializers.ModelSerializer):
password = serializers.CharField(write_only=True)
class Meta:
model = User
fields = ("id", "username", "email", "password", "role")
def validate_password(self, value):
validate_password(value)
return value
def create(self, validated_data):
password = validated_data.pop("password")
user = User(**validated_data)
user.set_password(password)
user.save()
return user
class LoginSerializer(serializers.Serializer):
username = serializers.CharField()
password = serializers.CharField(write_only=True)
apps/users/repositories.py
from django.contrib.auth import get_user_model
User = get_user_model()
class UserRepository:
@staticmethod
def get_by_username(username: str):
try:
return User.objects.get(username=username)
except User.DoesNotExist:
return None
apps/users/services.py
from django.contrib.auth import authenticate
from rest_framework.exceptions import AuthenticationFailed
class AuthService:
@staticmethod
def authenticate_user(username: str, password: str):
user = authenticate(username=username, password=password)
if not user:
raise AuthenticationFailed("Invalid credentials")
return user
apps/users/views.py
from rest_framework import generics, status
from rest_framework.response import Response
from rest_framework_simplejwt.tokens import RefreshToken
from .serializers import RegisterSerializer, LoginSerializer, UserDTO
from .services import AuthService
class RegisterView(generics.CreateAPIView):
serializer_class = RegisterSerializer
permission_classes = [] # AllowAny
class LoginView(generics.GenericAPIView):
serializer_class = LoginSerializer
permission_classes = [] # AllowAny
def post(self, request, *args, **kwargs):
ser = self.get_serializer(data=request.data)
ser.is_valid(raise_exception=True)
user = AuthService.authenticate_user(ser.validated_data["username"], ser.validated_data["password"])
refresh = RefreshToken.for_user(user)
return Response(
{"access": str(refresh.access_token), "refresh": str(refresh), "user": UserDTO(user).data},
status=status.HTTP_200_OK
)
apps/users/urls.py
from django.urls import path
from .views import RegisterView, LoginView
urlpatterns = [
path("register", RegisterView.as_view(), name="register"),
path("login", LoginView.as_view(), name="login"),
]
apps/projects/models.py
from django.db import models
from django.conf import settings
class Project(models.Model):
name = models.CharField(max_length=255)
description = models.TextField(blank=True)
owner = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, related_name="owned_projects")
created_at = models.DateTimeField(auto_now_add=True)
def __str__(self):
return self.name
apps/projects/serializers.py
from rest_framework import serializers
from .models import Project
class ProjectSerializer(serializers.ModelSerializer):
owner_username = serializers.ReadOnlyField(source="owner.username")
class Meta:
model = Project
fields = ("id", "name", "description", "owner", "owner_username", "created_at")
read_only_fields = ("owner", "created_at")
apps/projects/repositories.py
from .models import Project
from django.core.cache import cache
class ProjectRepository:
@staticmethod
def list_owned_or_admin(user, search=None):
qs = Project.objects.all() if user.role == "admin" else Project.objects.filter(owner=user)
if search:
qs = qs.filter(name__icontains=search)
return qs.order_by("-created_at")
@staticmethod
def get(pk: int):
return Project.objects.get(pk=pk)
apps/projects/services.py
from .repositories import ProjectRepository
class ProjectService:
@staticmethod
def list_projects(user, search=None):
return ProjectRepository.list_owned_or_admin(user, search=search)
apps/projects/views.py
from rest_framework import viewsets, permissions
from .serializers import ProjectSerializer
from .models import Project
from apps.common.permissions import IsOwnerOrAdmin
class ProjectViewSet(viewsets.ModelViewSet):
serializer_class = ProjectSerializer
queryset = Project.objects.all()
def get_permissions(self):
if self.action in ["list", "retrieve", "create"]:
return [permissions.IsAuthenticated()]
return [IsOwnerOrAdmin()]
def get_queryset(self):
search = self.request.query_params.get("search")
from .services import ProjectService
return ProjectService.list_projects(self.request.user, search=search)
def perform_create(self, serializer):
serializer.save(owner=self.request.user)
apps/projects/urls.py
from rest_framework.routers import DefaultRouter
from .views import ProjectViewSet
router = DefaultRouter()
router.register(r"", ProjectViewSet, basename="project")
urlpatterns = router.urls
apps/tasks/choices.py
PRIORITY_CHOICES = (("低", "低"), ("中", "中"), ("高", "高"))
STATUS_CHOICES = (("TODO", "TODO"), ("DOING", "DOING"), ("DONE", "DONE"))
apps/tasks/models.py
from django.db import models
from django.conf import settings
from .choices import PRIORITY_CHOICES, STATUS_CHOICES
class Label(models.Model):
name = models.CharField(max_length=50)
parent = models.ForeignKey("self", null=True, blank=True, on_delete=models.SET_NULL, related_name="children")
def __str__(self): return self.name
class Task(models.Model):
title = models.CharField(max_length=255)
description = models.TextField(blank=True)
priority = models.CharField(max_length=2, choices=PRIORITY_CHOICES, default="中")
status = models.CharField(max_length=5, choices=STATUS_CHOICES, default="TODO")
due_date = models.DateTimeField(null=True, blank=True)
assignee = models.ForeignKey(settings.AUTH_USER_MODEL, null=True, blank=True, on_delete=models.SET_NULL, related_name="assigned_tasks")
project = models.ForeignKey("projects.Project", on_delete=models.CASCADE, related_name="tasks")
labels = models.ManyToManyField(Label, blank=True, related_name="tasks")
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Attachment(models.Model):
task = models.ForeignKey(Task, on_delete=models.CASCADE, related_name="attachments")
file = models.FileField(upload_to="attachments/")
uploaded_at = models.DateTimeField(auto_now_add=True)
class Comment(models.Model):
task = models.ForeignKey(Task, on_delete=models.CASCADE, related_name="comments")
author = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE)
content = models.TextField()
created_at = models.DateTimeField(auto_now_add=True)
class Webhook(models.Model):
EVENT_CHOICES = (("task.status_changed", "task.status_changed"), ("task.created", "task.created"))
url = models.URLField()
event = models.CharField(max_length=50, choices=EVENT_CHOICES)
active = models.BooleanField(default=True)
secret = models.CharField(max_length=128, blank=True, default="")
created_at = models.DateTimeField(auto_now_add=True)
apps/tasks/validators.py
from rest_framework import serializers
class CSVImportValidator(serializers.Serializer):
file = serializers.FileField()
project_id = serializers.IntegerField(required=False)
default_assignee = serializers.IntegerField(required=False)
apps/tasks/dtos.py
from dataclasses import dataclass
from typing import Optional, List
@dataclass
class TaskDTO:
title: str
description: str
priority: str
status: str
due_date: Optional[str]
assignee_id: Optional[int]
project_id: int
labels: Optional[list]
apps/tasks/serializers.py
from rest_framework import serializers
from .models import Task, Comment, Attachment, Label
from .choices import PRIORITY_CHOICES, STATUS_CHOICES
class LabelSerializer(serializers.ModelSerializer):
class Meta:
model = Label
fields = ("id", "name", "parent")
class AttachmentSerializer(serializers.ModelSerializer):
class Meta:
model = Attachment
fields = ("id", "file", "uploaded_at")
class CommentSerializer(serializers.ModelSerializer):
author_username = serializers.ReadOnlyField(source="author.username")
class Meta:
model = Comment
fields = ("id", "author", "author_username", "content", "created_at")
read_only_fields = ("author", "created_at")
class TaskSerializer(serializers.ModelSerializer):
assignee_username = serializers.ReadOnlyField(source="assignee.username")
labels = LabelSerializer(many=True, required=False)
attachments = AttachmentSerializer(many=True, read_only=True)
class Meta:
model = Task
fields = (
"id", "title", "description", "priority", "status", "due_date",
"assignee", "assignee_username", "labels", "project", "attachments",
"created_at", "updated_at"
)
read_only_fields = ("created_at", "updated_at")
def create(self, validated_data):
labels_data = validated_data.pop("labels", [])
task = Task.objects.create(**validated_data)
self._upsert_labels(task, labels_data)
return task
def update(self, instance, validated_data):
labels_data = validated_data.pop("labels", None)
for k, v in validated_data.items():
setattr(instance, k, v)
instance.save()
if labels_data is not None:
self._upsert_labels(instance, labels_data, replace=True)
return instance
def _upsert_labels(self, task, labels_data, replace=False):
label_objs = []
for ld in labels_data:
obj, _ = Label.objects.get_or_create(name=ld.get("name"), defaults={"parent": ld.get("parent")})
label_objs.append(obj)
if replace:
task.labels.set(label_objs)
else:
task.labels.add(*label_objs)
apps/tasks/repositories.py
from .models import Task, Comment, Attachment, Webhook
from django.db.models import Q
from django.core.cache import cache
class TaskRepository:
@staticmethod
def list_filtered(user, params):
qs = Task.objects.select_related("assignee", "project").prefetch_related("labels")
# Permissions: admin sees all; member sees own assigned + own projects
if user.role != "admin":
qs = qs.filter(Q(assignee=user) | Q(project__owner=user))
status_ = params.get("status")
priority = params.get("priority")
project = params.get("project")
search = params.get("search")
if status_:
qs = qs.filter(status=status_)
if priority:
qs = qs.filter(priority=priority)
if project:
qs = qs.filter(project_id=project)
if search:
qs = qs.filter(Q(title__icontains=search) | Q(description__icontains=search))
ordering = params.get("ordering") or "-created_at"
qs = qs.order_by(ordering)
return qs
@staticmethod
def get(pk: int):
return Task.objects.get(pk=pk)
class CommentRepository:
@staticmethod
def create(task, author, content):
return task.comments.create(author=author, content=content)
class AttachmentRepository:
@staticmethod
def add(task, file):
return task.attachments.create(file=file)
class WebhookRepository:
@staticmethod
def active_for_event(event: str):
return Webhook.objects.filter(active=True, event=event)
apps/tasks/factories.py
from .models import Task, Label
class TaskFactory:
@staticmethod
def create_default(project, assignee=None):
task = Task.objects.create(
title="新任务",
description="默认描述",
priority="中",
status="TODO",
project=project,
assignee=assignee,
)
default_labels = ["待规划", "快速处理"]
for name in default_labels:
lbl, _ = Label.objects.get_or_create(name=name)
task.labels.add(lbl)
return task
apps/tasks/strategies.py
from typing import Protocol
from django.db.models import QuerySet
class SortStrategy(Protocol):
def apply(self, qs: QuerySet) -> QuerySet: ...
class SortByDueDateAsc:
def apply(self, qs): return qs.order_by("due_date", "id")
class SortByPriorityDesc:
# 假定优先级中文:高>中>低,自定义排序
PRIORITY_RANK = {"高": 3, "中": 2, "低": 1}
def apply(self, qs):
from django.db.models import IntegerField, Case, When, Value
priority_order = Case(
When(priority="高", then=Value(3)),
When(priority="中", then=Value(2)),
When(priority="低", then=Value(1)),
default=Value(0), output_field=IntegerField()
)
return qs.order_by(-priority_order, "-created_at")
STRATEGIES = {
"due_date_asc": SortByDueDateAsc(),
"priority_desc": SortByPriorityDesc(),
}
apps/tasks/observers.py
import hmac, hashlib, json, logging, requests
from .repositories import WebhookRepository
logger = logging.getLogger(__name__)
class Observer:
def notify(self, event: str, payload: dict): ...
class WebhookNotifier(Observer):
def notify(self, event: str, payload: dict):
hooks = WebhookRepository.active_for_event(event)
for hook in hooks:
headers = {"Content-Type": "application/json"}
body = json.dumps(payload).encode("utf-8")
if hook.secret:
sig = hmac.new(hook.secret.encode(), body, hashlib.sha256).hexdigest()
headers["X-Signature"] = sig
try:
requests.post(hook.url, data=body, headers=headers, timeout=5)
except Exception:
logger.exception("Webhook delivery failed")
class LoggerNotifier(Observer):
def notify(self, event: str, payload: dict):
logger.info("Event: %s payload=%s", event, payload)
apps/tasks/services.py
from typing import List
from django.utils import timezone
from rest_framework.exceptions import ValidationError, PermissionDenied
from .repositories import TaskRepository, CommentRepository, AttachmentRepository
from .observers import WebhookNotifier, LoggerNotifier
from .strategies import STRATEGIES
ALLOWED_TRANSITIONS = {
"TODO": {"DOING"},
"DOING": {"DONE", "TODO"},
"DONE": set(),
}
class TaskService:
observers = [LoggerNotifier(), WebhookNotifier()]
@staticmethod
def list_tasks(user, params):
qs = TaskRepository.list_filtered(user, params)
strategy_key = params.get("sort_strategy")
if strategy_key and strategy_key in STRATEGIES:
qs = STRATEGIES[strategy_key].apply(qs)
return qs
@classmethod
def change_status(cls, user, task, new_status: str):
if user.role != "admin" and user != task.assignee and user != task.project.owner:
raise PermissionDenied("No permission to change status")
if new_status not in ALLOWED_TRANSITIONS[task.status]:
raise ValidationError(f"Invalid status transition {task.status} -> {new_status}")
old = task.status
task.status = new_status
task.save(update_fields=["status", "updated_at"])
payload = {"task_id": task.id, "old": old, "new": new_status, "at": timezone.now().isoformat()}
for obs in cls.observers:
obs.notify("task.status_changed", payload)
return task
@staticmethod
def add_comment(task, author, content: str):
return CommentRepository.create(task, author, content)
@staticmethod
def add_attachment(task, file):
return AttachmentRepository.add(task, file)
@staticmethod
def bulk_import_from_csv(file, project=None, default_assignee=None):
import csv, io
decoded = file.read().decode("utf-8")
f = io.StringIO(decoded)
reader = csv.DictReader(f)
from .models import Task
created = []
for row in reader:
t = Task.objects.create(
title=row.get("title", "Untitled"),
description=row.get("description", ""),
priority=row.get("priority", "中"),
status=row.get("status", "TODO"),
due_date=row.get("due_date") or None,
project=project,
assignee=default_assignee,
)
created.append(t.id)
return created
apps/tasks/views.py
from rest_framework import viewsets, mixins, status, decorators
from rest_framework.response import Response
from rest_framework.parsers import MultiPartParser, FormParser
from django.shortcuts import get_object_or_404
from apps.common.permissions import IsAdmin
from .serializers import TaskSerializer, CommentSerializer, AttachmentSerializer, LabelSerializer
from .models import Task, Comment, Attachment, Label, Webhook
from .validators import CSVImportValidator
from .services import TaskService
class TemplateCRUDViewSet(viewsets.ModelViewSet):
# Template Method: hooks for pre/post
def perform_create(self, serializer):
instance = serializer.save()
self.after_create(instance)
def perform_update(self, serializer):
instance = serializer.save()
self.after_update(instance)
def perform_destroy(self, instance):
super().perform_destroy(instance)
self.after_destroy(instance)
def after_create(self, instance): pass
def after_update(self, instance): pass
def after_destroy(self, instance): pass
class TaskViewSet(TemplateCRUDViewSet):
serializer_class = TaskSerializer
queryset = Task.objects.all()
def get_queryset(self):
return TaskService.list_tasks(self.request.user, self.request.query_params)
@decorators.action(detail=True, methods=["post"])
def change_status(self, request, pk=None):
task = self.get_object()
new_status = request.data.get("status")
TaskService.change_status(request.user, task, new_status)
return Response({"status": "ok"})
@decorators.action(detail=True, methods=["post"], url_path="comments")
def add_comment(self, request, pk=None):
task = self.get_object()
ser = CommentSerializer(data=request.data)
ser.is_valid(raise_exception=True)
c = TaskService.add_comment(task, request.user, ser.validated_data["content"])
return Response(CommentSerializer(c).data, status=status.HTTP_201_CREATED)
@decorators.action(detail=True, methods=["post"], url_path="attachments", parser_classes=[MultiPartParser, FormParser])
def add_attachment(self, request, pk=None):
task = self.get_object()
file = request.data.get("file")
att = TaskService.add_attachment(task, file)
return Response(AttachmentSerializer(att).data, status=status.HTTP_201_CREATED)
@decorators.action(detail=False, methods=["post"], url_path="import-csv", parser_classes=[MultiPartParser, FormParser])
def import_csv(self, request):
val = CSVImportValidator(data=request.data)
val.is_valid(raise_exception=True)
from apps.projects.models import Project
project = get_object_or_404(Project, pk=val.validated_data.get("project_id")) if val.validated_data.get("project_id") else None
from django.contrib.auth import get_user_model
User = get_user_model()
assignee = get_object_or_404(User, pk=val.validated_data.get("default_assignee")) if val.validated_data.get("default_assignee") else None
ids = TaskService.bulk_import_from_csv(val.validated_data["file"], project, assignee)
return Response({"created_ids": ids}, status=status.HTTP_201_CREATED)
class LabelViewSet(viewsets.ModelViewSet):
serializer_class = LabelSerializer
queryset = Label.objects.all()
permission_classes = [IsAdmin]
class WebhookViewSet(viewsets.ModelViewSet):
from rest_framework import permissions
class Serializer(serializers.ModelSerializer):
class Meta:
model = Webhook
fields = ("id", "url", "event", "active", "secret", "created_at")
read_only_fields = ("created_at",)
serializer_class = Serializer
queryset = Webhook.objects.all()
permission_classes = [IsAdmin]
apps/tasks/urls.py
from rest_framework.routers import DefaultRouter
from .views import TaskViewSet, LabelViewSet, WebhookViewSet
router = DefaultRouter()
router.register(r"", TaskViewSet, basename="task")
router.register(r"labels", LabelViewSet, basename="label")
router.register(r"webhooks", WebhookViewSet, basename="webhook")
urlpatterns = router.urls
apps/tasks/management/commands/seed.py
from django.core.management.base import BaseCommand
from django.contrib.auth import get_user_model
from faker import Faker
from apps.projects.models import Project
from apps.tasks.factories import TaskFactory
class Command(BaseCommand):
help = "Seed database with sample data"
def handle(self, *args, **options):
fake = Faker("zh_CN")
User = get_user_model()
admin, _ = User.objects.get_or_create(username="admin", defaults={"email": "admin@example.com", "role":"admin"})
if not admin.pk or not admin.has_usable_password():
admin.set_password("Admin123!")
admin.save()
for i in range(3):
u, _ = User.objects.get_or_create(username=f"user{i}", defaults={"email": f"user{i}@ex.com", "role":"member"})
if not u.has_usable_password():
u.set_password("User123!")
u.save()
for i in range(3):
owner = User.objects.order_by("?").first()
p = Project.objects.create(name=fake.catch_phrase(), description=fake.text(), owner=owner)
TaskFactory.create_default(p, assignee=owner)
self.stdout.write(self.style.SUCCESS("Seeding completed"))
apps/tasks/management/commands/remind_due.py
from django.core.management.base import BaseCommand
from django.utils import timezone
from datetime import timedelta
from apps.tasks.models import Task
class Command(BaseCommand):
help = "Send due date reminders for tasks approaching deadline"
def handle(self, *args, **options):
now = timezone.now()
soon = now + timedelta(hours=24)
qs = Task.objects.filter(status__in=["TODO","DOING"], due_date__lte=soon, due_date__gte=now)
for t in qs:
self.stdout.write(f"[REMINDER] Task {t.id} '{t.title}' due by {t.due_date}, assignee={t.assignee_id}")
self.stdout.write(self.style.SUCCESS(f"Processed {qs.count()} reminders"))
tests/conftest.py
import pytest
from django.contrib.auth import get_user_model
@pytest.fixture
def user(db):
User = get_user_model()
u = User.objects.create_user(username="tester", password="Test123!", role="member")
return u
@pytest.fixture
def admin(db):
User = get_user_model()
a = User.objects.create_user(username="admin", password="Admin123!", role="admin")
return a
tests/test_models.py
from apps.tasks.models import Task
from apps.projects.models import Project
def test_create_task(db, user):
p = Project.objects.create(name="P1", description="d", owner=user)
t = Task.objects.create(title="T1", project=p, assignee=user)
assert t.id and t.project_id == p.id
tests/test_services.py
import pytest
from apps.tasks.services import TaskService
from apps.tasks.models import Task
from apps.projects.models import Project
def test_status_transition(db, admin, user):
p = Project.objects.create(name="P1", description="d", owner=user)
t = Task.objects.create(title="T", project=p, assignee=user, status="TODO")
TaskService.change_status(admin, t, "DOING")
assert Task.objects.get(id=t.id).status == "DOING"
with pytest.raises(Exception):
TaskService.change_status(admin, t, "TODO") # DOING -> TODO allowed, but we test invalid later
tests/test_api.py
from rest_framework.test import APIClient
def test_register_and_login(db):
c = APIClient()
r = c.post("/auth/register", {"username":"u1","email":"u1@e.com","password":"User123!"}, format="json")
assert r.status_code == 201
r2 = c.post("/auth/login", {"username":"u1","password":"User123!"}, format="json")
assert r2.status_code == 200 and "access" in r2.json()
Dockerfile
FROM python:3.11-slim
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1
WORKDIR /app
RUN apt-get update && apt-get install -y build-essential libpq-dev && rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
ENV DJANGO_SETTINGS_MODULE=config.settings.dev
RUN python manage.py collectstatic --noinput || true
EXPOSE 8000
CMD ["gunicorn", "config.wsgi:application", "-b", "0.0.0.0:8000", "--workers", "3"]
docker-compose.yml
version: "3.9"
services:
web:
build: .
env_file: .env
ports: ["8000:8000"]
depends_on: [db, redis]
volumes: [".:/app"]
db:
image: postgres:15
environment:
POSTGRES_USER: task
POSTGRES_PASSWORD: task
POSTGRES_DB: task
ports: ["5432:5432"]
redis:
image: redis:7
ports: ["6379:6379"]
.env.example
DJANGO_SETTINGS_MODULE=config.settings.dev
SECRET_KEY=change-me
DEBUG=True
ALLOWED_HOSTS=*
DATABASE_URL=postgres://task:task@db:5432/task
REDIS_URL=redis://redis:6379/1
PAGE_SIZE=20
JWT_ACCESS_MIN=60
JWT_REFRESH_DAYS=7
TIME_ZONE=Asia/Shanghai
FEATURE_RATE_LIMIT=True
运行步骤
示例 API 调用
OpenAPI
权限与角色
架构说明与设计模式映射
可选与扩展建议
CI 工作流示例 .github/workflows/ci.yml(摘录)
name: CI
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:15
env:
POSTGRES_USER: task
POSTGRES_PASSWORD: task
POSTGRES_DB: task
ports: ["5432:5432"]
options: >-
--health-cmd="pg_isready -U task" --health-interval=10s --health-timeout=5s --health-retries=5
env:
DATABASE_URL: postgres://task:task@localhost:5432/task
DJANGO_SETTINGS_MODULE: config.settings.dev
SECRET_KEY: test
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with: { python-version: "3.11" }
- run: pip install -r requirements.txt
- run: python manage.py migrate
- run: pytest --cov=apps --maxfail=1 -q
常见问题与排错
至此,模板已满足:注册/登录、JWT、角色权限、项目与任务 CRUD、文件上传、评论、状态流转、截止时间提醒、分页、筛选/排序/搜索、CSV 导入、Webhook、结构化日志与错误处理、Swagger 文档、环境与 Docker、测试与种子数据、可扩展性建议。您可直接按运行步骤启动并逐步扩展。需要我为某一模块补全更多细节或加上 Celery/Redis 异步方案,请告诉我。
下面给出一个可运行的并发日志处理管线 CLI 工具的完整程序模板(Go),覆盖目录结构、关键代码、配置、并发架构、策略/工厂/观察者/构建者等设计模式,包含单元测试与基准测试示例、Makefile、Dockerfile、跨平台构建脚本、示例数据、使用说明与扩展建议。该模板着重工程可维护性与可拓展性,适合后续按具体需求调整与扩展。
项目目录结构
go.mod
module github.com/example/logpipe
go 1.21
require gopkg.in/yaml.v3 v3.0.1
cmd/logpipe/main.go
package main
import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"syscall"
"time"
"github.com/example/logpipe/internal/version"
"github.com/example/logpipe/pkg/config"
"github.com/example/logpipe/pkg/logging"
"github.com/example/logpipe/pkg/observer"
"github.com/example/logpipe/pkg/pipeline"
"github.com/example/logpipe/pkg/sink"
)
func main() {
// CLI flags
var (
inputPath string
concurrency int
windowSizeStr string
windowType string
filterExpr string
outputDir string
configFile string
enableHTTP bool
httpAddr string
includeStdin bool
)
flag.StringVar(&inputPath, "input", "./examples/data", "Input path (dir or glob), e.g. ./logs/*.log")
flag.IntVar(&concurrency, "concurrency", 4, "Parse worker pool size")
flag.StringVar(&windowSizeStr, "window", "1m", "Window size (e.g., 30s, 1m, 5m)")
flag.StringVar(&windowType, "window-type", "fixed", "Window type: fixed or rolling")
flag.StringVar(&filterExpr, "filter", "", "Regex filter applied to lines (optional)")
flag.StringVar(&outputDir, "out", "./out", "Output directory (use - for stdout)")
flag.StringVar(&configFile, "config", "./configs/default.yaml", "YAML config file")
flag.BoolVar(&enableHTTP, "http", false, "Enable local HTTP endpoint")
flag.StringVar(&httpAddr, "http-addr", ":8080", "HTTP listen address")
flag.BoolVar(&includeStdin, "stdin", false, "Read from stdin in addition to files")
flag.Parse()
// Environment overrides (example)
// LOGPIPE_CONCURRENCY, LOGPIPE_WINDOW, LOGPIPE_OUTPUT, LOGPIPE_FILTER
if env := os.Getenv("LOGPIPE_CONCURRENCY"); env != "" {
fmt.Sscanf(env, "%d", &concurrency)
}
if env := os.Getenv("LOGPIPE_WINDOW"); env != "" {
windowSizeStr = env
}
if env := os.Getenv("LOGPIPE_OUTPUT"); env != "" {
outputDir = env
}
if env := os.Getenv("LOGPIPE_FILTER"); env != "" {
filterExpr = env
}
// Config Builder
cfgBuilder := config.NewBuilder().
WithInputPath(inputPath).
WithConcurrency(concurrency).
WithWindow(windowSizeStr, windowType).
WithFilter(filterExpr).
WithOutputDir(outputDir).
WithConfigFile(configFile).
WithIncludeStdin(includeStdin).
WithHTTP(enableHTTP, httpAddr)
cfg, err := cfgBuilder.Build()
if err != nil {
fmt.Fprintf(os.Stderr, "config error: %v\n", err)
os.Exit(2) // Exit code 2 for configuration errors
}
// Structured logger
logger := logging.NewLogger(cfg.Log.Level)
logger.Info("starting logpipe",
"version", version.Version,
"input", cfg.InputPath,
"concurrency", cfg.Concurrency,
"window", cfg.WindowSize.String(),
"windowType", cfg.WindowType,
"output", cfg.OutputDir,
"http", cfg.HTTP.Enable,
)
// Observer bus
bus := observer.NewEventBus()
bus.Subscribe("progress", observer.NewLogObserver(logger))
bus.Subscribe("alert", observer.NewLogObserver(logger))
// Context & signals
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGINT, syscall.SIGTERM)
go func() {
s := <-sigc
logger.Warn("received signal, shutting down", "signal", s.String())
cancel()
}()
// Sink factory
snk, err := sink.NewFactory().Create(cfg, logger)
if err != nil {
logger.Error("sink creation failed", "error", err)
os.Exit(3) // Exit code 3 for sink setup errors
}
// Pipeline
pl := pipeline.NewPipeline(cfg, logger, bus, snk)
start := time.Now()
if err := pl.Run(ctx); err != nil {
logger.Error("pipeline run failed", "error", err)
os.Exit(1) // Exit code 1 for runtime errors
}
logger.Info("completed",
"duration", time.Since(start).String(),
)
}
pkg/config/config.go
package config
import (
"errors"
"os"
"time"
"gopkg.in/yaml.v3"
)
type LogConfig struct {
Level string `yaml:"level"` // debug|info|warn|error
}
type HTTPConfig struct {
Enable bool `yaml:"enable"`
Addr string `yaml:"addr"`
}
type OutputConfig struct {
FormatJSON bool `yaml:"format_json"`
FormatMarkdown bool `yaml:"format_markdown"`
FormatCSV bool `yaml:"format_csv"`
}
type Config struct {
InputPath string
IncludeStdin bool
Concurrency int
FilterExpr string
WindowSize time.Duration
WindowType string // fixed|rolling
OutputDir string
HTTP HTTPConfig
Log LogConfig
Alert struct {
ErrorRateThreshold float64 `yaml:"error_rate_threshold"` // e.g. 0.2
} `yaml:"alert"`
Output OutputConfig
ConfigFile string
}
// Builder (Builder pattern for creating config)
type Builder struct {
cfg Config
}
func NewBuilder() *Builder {
return &Builder{
cfg: Config{
Concurrency: 4,
WindowType: "fixed",
OutputDir: "./out",
Log: LogConfig{Level: "info"},
Output: OutputConfig{FormatJSON: true, FormatMarkdown: true, FormatCSV: true},
HTTP: HTTPConfig{Enable: false, Addr: ":8080"},
},
}
}
func (b *Builder) WithInputPath(p string) *Builder { b.cfg.InputPath = p; return b }
func (b *Builder) WithIncludeStdin(v bool) *Builder { b.cfg.IncludeStdin = v; return b }
func (b *Builder) WithConcurrency(c int) *Builder { b.cfg.Concurrency = c; return b }
func (b *Builder) WithFilter(f string) *Builder { b.cfg.FilterExpr = f; return b }
func (b *Builder) WithOutputDir(d string) *Builder { b.cfg.OutputDir = d; return b }
func (b *Builder) WithConfigFile(f string) *Builder { b.cfg.ConfigFile = f; return b }
func (b *Builder) WithHTTP(enable bool, addr string) *Builder { b.cfg.HTTP.Enable = enable; b.cfg.HTTP.Addr = addr; return b }
func (b *Builder) WithWindow(sizeStr string, typ string) *Builder {
d, _ := time.ParseDuration(sizeStr)
if d == 0 {
d = time.Minute
}
b.cfg.WindowSize = d
b.cfg.WindowType = typ
return b
}
func (b *Builder) Build() (*Config, error) {
// Load YAML file if exists
if b.cfg.ConfigFile != "" {
if data, err := os.ReadFile(b.cfg.ConfigFile); err == nil {
var y struct {
Log LogConfig `yaml:"log"`
HTTP HTTPConfig `yaml:"http"`
Output OutputConfig `yaml:"output"`
Alert struct {
ErrorRateThreshold float64 `yaml:"error_rate_threshold"`
} `yaml:"alert"`
}
if err := yaml.Unmarshal(data, &y); err == nil {
// Merge defaults with YAML
if y.Log.Level != "" {
b.cfg.Log.Level = y.Log.Level
}
b.cfg.HTTP = y.HTTP
b.cfg.Output = y.Output
b.cfg.Alert.ErrorRateThreshold = y.Alert.ErrorRateThreshold
}
}
}
if b.cfg.InputPath == "" && !b.cfg.IncludeStdin {
return nil, errors.New("either input path or stdin must be provided")
}
if b.cfg.WindowType != "fixed" && b.cfg.WindowType != "rolling" {
return nil, errors.New("window type must be fixed or rolling")
}
return &b.cfg, nil
}
pkg/logging/logger.go
package logging
import (
"log/slog"
"os"
"runtime"
"time"
)
func NewLogger(level string) *slog.Logger {
var lv slog.Level
switch level {
case "debug":
lv = slog.LevelDebug
case "info":
lv = slog.LevelInfo
case "warn":
lv = slog.LevelWarn
case "error":
lv = slog.LevelError
default:
lv = slog.LevelInfo
}
handler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: lv,
AddSource: true,
ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr {
// Ensure consistent timestamp format, include stack for errors
if a.Key == slog.TimeKey {
return slog.Attr{Key: slog.TimeKey, Value: slog.StringValue(time.Now().Format(time.RFC3339Nano))}
}
return a
},
})
return slog.New(handler).With(slog.String("app", "logpipe"))
}
// Stack helper for error logging
func StackAttr() slog.Attr {
var pcs [16]uintptr
n := runtime.Callers(3, pcs[:])
frames := runtime.CallersFrames(pcs[:n])
var stack []string
for {
frame, more := frames.Next()
stack = append(stack, frame.Function)
if !more {
break
}
}
return slog.Any("stack", stack)
}
pkg/observer/observer.go
package observer
import (
"log/slog"
"sync"
)
type Event struct {
Topic string
Data map[string]any
}
type Observer interface {
Notify(e Event)
}
type EventBus struct {
mu sync.RWMutex
subs map[string][]Observer
}
func NewEventBus() *EventBus {
return &EventBus{subs: make(map[string][]Observer)}
}
func (b *EventBus) Subscribe(topic string, obs Observer) {
b.mu.Lock()
defer b.mu.Unlock()
b.subs[topic] = append(b.subs[topic], obs)
}
func (b *EventBus) Publish(e Event) {
b.mu.RLock()
defer b.mu.RUnlock()
for _, obs := range b.subs[e.Topic] {
obs.Notify(e)
}
}
// Simple log observer
type LogObserver struct {
logger *slog.Logger
}
func NewLogObserver(l *slog.Logger) *LogObserver { return &LogObserver{logger: l} }
func (l *LogObserver) Notify(e Event) {
l.logger.Info("event", "topic", e.Topic, "data", e.Data)
}
pkg/source/source.go
package source
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"regexp"
"strings"
"time"
)
type FileMeta struct {
Path string
Service string
Date time.Time
FormatHint string // json|csv|plain
}
type Line struct {
Text string
Meta FileMeta
LineNo int64
}
func DiscoverFiles(input string) ([]FileMeta, error) {
// Accept dir or glob
var files []string
fi, err := os.Stat(input)
if err == nil && fi.IsDir() {
entries, err := os.ReadDir(input)
if err != nil {
return nil, err
}
for _, e := range entries {
if e.IsDir() {
continue
}
files = append(files, filepath.Join(input, e.Name()))
}
} else {
// glob
matches, err := filepath.Glob(input)
if err != nil {
return nil, err
}
files = append(files, matches...)
}
var metas []FileMeta
for _, f := range files {
metas = append(metas, inferMeta(f))
}
return metas, nil
}
func inferMeta(path string) FileMeta {
base := filepath.Base(path)
// Pattern: serviceName-YYYY-MM-DD.ext
var svc, ext string
var dt time.Time
ext = strings.TrimPrefix(filepath.Ext(base), ".")
parts := strings.Split(strings.TrimSuffix(base, "."+ext), "-")
if len(parts) >= 2 {
svc = strings.Join(parts[:len(parts)-3], "-") // rough
if len(parts) >= 3 {
dateStr := strings.Join(parts[len(parts)-3:], "-")
// Try parse 'YYYY-MM-DD'
if t, err := time.Parse("2006-01-02", dateStr); err == nil {
dt = t
}
}
}
if svc == "" {
svc = "unknown"
}
format := "plain"
switch strings.ToLower(ext) {
case "json":
format = "json"
case "csv":
format = "csv"
default:
format = "plain"
}
return FileMeta{Path: path, Service: svc, Date: dt, FormatHint: format}
}
func ReadLines(ctx context.Context, metas []FileMeta, includeStdin bool, filterExpr string, out chan<- Line) error {
defer close(out)
var re *regexp.Regexp
var err error
if filterExpr != "" {
re, err = regexp.Compile(filterExpr)
if err != nil {
return fmt.Errorf("invalid filter regex: %w", err)
}
}
// Files
for _, m := range metas {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if err := readOne(ctx, m, re, out); err != nil {
return err
}
}
// stdin
if includeStdin {
m := FileMeta{Path: "stdin", Service: "stdin", Date: time.Now(), FormatHint: "plain"}
r := bufio.NewReader(os.Stdin)
var lineNo int64
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
s, err := r.ReadString('\n')
if err != nil {
if errors.Is(err, io.EOF) {
break
}
return err
}
lineNo++
s = strings.TrimRight(s, "\r\n")
if re != nil && !re.MatchString(s) {
continue
}
out <- Line{Text: s, Meta: m, LineNo: lineNo}
}
}
return nil
}
func readOne(ctx context.Context, m FileMeta, re *regexp.Regexp, out chan<- Line) error {
f, err := os.Open(m.Path)
if err != nil {
return err
}
defer f.Close()
r := bufio.NewScanner(f)
var lineNo int64
for r.Scan() {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
lineNo++
txt := r.Text()
if re != nil && !re.MatchString(txt) {
continue
}
out <- Line{Text: txt, Meta: m, LineNo: lineNo}
}
return r.Err()
}
pkg/parser/parser.go
package parser
import (
"encoding/csv"
"encoding/json"
"errors"
"fmt"
"io"
"strconv"
"strings"
"time"
"github.com/example/logpipe/pkg/source"
)
// Strategy interface
type Parser interface {
Parse(line source.Line) (LogRecord, error)
}
type LogRecord struct {
Service string
Timestamp time.Time
StatusCode int
LatencyMs float64
ResponseSize int64
Message string
SourceFile string
LineNo int64
ID string // for dedup
}
// Parser factory chooses based on format hint
func ForFormat(format string) Parser {
switch format {
case "json":
return &JSONParser{}
case "csv":
return &CSVParser{}
default:
return &PlainParser{}
}
}
type JSONParser struct{}
func (p *JSONParser) Parse(line source.Line) (LogRecord, error) {
var obj map[string]any
if err := json.Unmarshal([]byte(line.Text), &obj); err != nil {
return LogRecord{}, fmt.Errorf("json parse: %w", err)
}
ts := time.Now()
if v, ok := obj["timestamp"].(string); ok {
if t, err := time.Parse(time.RFC3339Nano, v); err == nil {
ts = t
}
}
status := toInt(obj["status"])
lat := toFloat(obj["latency_ms"])
size := toInt64(obj["resp_bytes"])
msg := toString(obj["message"])
id := fmt.Sprintf("%s-%d-%d-%d", line.Meta.Path, line.LineNo, status, int64(ts.UnixNano()))
return LogRecord{
Service: line.Meta.Service,
Timestamp: ts,
StatusCode: status,
LatencyMs: lat,
ResponseSize: size,
Message: msg,
SourceFile: line.Meta.Path,
LineNo: line.LineNo,
ID: id,
}, nil
}
type CSVParser struct{}
func (p *CSVParser) Parse(line source.Line) (LogRecord, error) {
r := csv.NewReader(strings.NewReader(line.Text))
r.FieldsPerRecord = -1
rec, err := r.Read()
if err != nil {
if errors.Is(err, io.EOF) {
return LogRecord{}, io.EOF
}
return LogRecord{}, fmt.Errorf("csv parse: %w", err)
}
// Assume header mapping example: timestamp,status,latency_ms,resp_bytes,message
var ts time.Time
if t, err := time.Parse(time.RFC3339Nano, rec[0]); err == nil {
ts = t
} else {
ts = time.Now()
}
status, _ := strconv.Atoi(rec[1])
lat, _ := strconv.ParseFloat(rec[2], 64)
size, _ := strconv.ParseInt(rec[3], 10, 64)
msg := rec[4]
id := fmt.Sprintf("%s-%d-%d-%d", line.Meta.Path, line.LineNo, status, int64(ts.UnixNano()))
return LogRecord{
Service: line.Meta.Service,
Timestamp: ts,
StatusCode: status,
LatencyMs: lat,
ResponseSize: size,
Message: msg,
SourceFile: line.Meta.Path,
LineNo: line.LineNo,
ID: id,
}, nil
}
type PlainParser struct{}
func (p *PlainParser) Parse(line source.Line) (LogRecord, error) {
// Minimal extraction using simple tokens; users can customize.
// Example: "ts=2025-11-20T10:10:10Z status=200 latency_ms=12 size=512 message=ok"
fields := strings.Fields(line.Text)
var ts time.Time = time.Now()
var status int
var lat float64
var size int64
var msg string
for _, f := range fields {
if strings.HasPrefix(f, "ts=") {
t, err := time.Parse(time.RFC3339Nano, strings.TrimPrefix(f, "ts="))
if err == nil {
ts = t
}
} else if strings.HasPrefix(f, "status=") {
status, _ = strconv.Atoi(strings.TrimPrefix(f, "status="))
} else if strings.HasPrefix(f, "latency_ms=") {
lat, _ = strconv.ParseFloat(strings.TrimPrefix(f, "latency_ms="), 64)
} else if strings.HasPrefix(f, "size=") {
size, _ = strconv.ParseInt(strings.TrimPrefix(f, "size="), 10, 64)
} else if strings.HasPrefix(f, "message=") {
msg = strings.TrimPrefix(f, "message=")
}
}
id := fmt.Sprintf("%s-%d-%d-%d", line.Meta.Path, line.LineNo, status, int64(ts.UnixNano()))
return LogRecord{
Service: line.Meta.Service,
Timestamp: ts,
StatusCode: status,
LatencyMs: lat,
ResponseSize: size,
Message: msg,
SourceFile: line.Meta.Path,
LineNo: line.LineNo,
ID: id,
}, nil
}
func toInt(v any) int {
switch t := v.(type) {
case float64:
return int(t)
case int:
return t
case string:
i, _ := strconv.Atoi(t)
return i
default:
return 0
}
}
func toFloat(v any) float64 {
switch t := v.(type) {
case float64:
return t
case string:
f, _ := strconv.ParseFloat(t, 64)
return f
default:
return 0
}
}
func toInt64(v any) int64 {
switch t := v.(type) {
case float64:
return int64(t)
case int64:
return t
case int:
return int64(t)
case string:
i, _ := strconv.ParseInt(t, 10, 64)
return i
default:
return 0
}
}
func toString(v any) string {
switch t := v.(type) {
case string:
return t
default:
return ""
}
}
pkg/aggregator/aggregator.go
package aggregator
import (
"sort"
"sync"
"time"
"github.com/example/logpipe/pkg/parser"
)
type Snapshot struct {
WindowStart time.Time
WindowEnd time.Time
Service string
Count int64
ErrorCount int64
ErrorRate float64
P95LatencyMs float64
P99LatencyMs float64
AvgRespBytes float64
LastUpdated time.Time
}
type Aggregator interface {
Add(rec parser.LogRecord)
Snapshots() []Snapshot
}
// Fixed window aggregator (tumbling)
type FixedAggregator struct {
winSize time.Duration
mu sync.RWMutex
// key: service + windowStart
buckets map[string]*bucket
}
type bucket struct {
start time.Time
end time.Time
service string
lat []float64
sizes int64
count int64
errs int64
seenIDs map[string]struct{}
}
func NewFixed(win time.Duration) *FixedAggregator {
return &FixedAggregator{
winSize: win,
buckets: make(map[string]*bucket),
}
}
func (a *FixedAggregator) Add(rec parser.LogRecord) {
ws := rec.Timestamp.Truncate(a.winSize)
key := rec.Service + "|" + ws.Format(time.RFC3339)
a.mu.Lock()
b, ok := a.buckets[key]
if !ok {
b = &bucket{
start: ws,
end: ws.Add(a.winSize),
service: rec.Service,
seenIDs: make(map[string]struct{}),
}
a.buckets[key] = b
}
// Dedup
if _, exists := b.seenIDs[rec.ID]; exists {
a.mu.Unlock()
return
}
b.seenIDs[rec.ID] = struct{}{}
b.count++
if rec.StatusCode >= 500 {
b.errs++
}
if rec.LatencyMs > 0 {
b.lat = append(b.lat, rec.LatencyMs)
}
b.sizes += rec.ResponseSize
a.mu.Unlock()
}
func (a *FixedAggregator) Snapshots() []Snapshot {
a.mu.RLock()
defer a.mu.RUnlock()
out := make([]Snapshot, 0, len(a.buckets))
for _, b := range a.buckets {
var p95, p99 float64
if len(b.lat) > 0 {
sort.Float64s(b.lat)
p95 = quantile(b.lat, 0.95)
p99 = quantile(b.lat, 0.99)
}
avg := 0.0
if b.count > 0 {
avg = float64(b.sizes) / float64(b.count)
}
errRate := 0.0
if b.count > 0 {
errRate = float64(b.errs) / float64(b.count)
}
out = append(out, Snapshot{
WindowStart: b.start,
WindowEnd: b.end,
Service: b.service,
Count: b.count,
ErrorCount: b.errs,
ErrorRate: errRate,
P95LatencyMs: p95,
P99LatencyMs: p99,
AvgRespBytes: avg,
LastUpdated: time.Now(),
})
}
return out
}
// Rolling window aggregator (sliding last N duration)
type RollingAggregator struct {
winSize time.Duration
mu sync.RWMutex
// service -> ring of records (keep recent)
buffers map[string]*rollbuf
}
type rollbuf struct {
recs []parser.LogRecord
seen map[string]time.Time
}
func NewRolling(win time.Duration) *RollingAggregator {
return &RollingAggregator{
winSize: win,
buffers: make(map[string]*rollbuf),
}
}
func (a *RollingAggregator) Add(rec parser.LogRecord) {
a.mu.Lock()
defer a.mu.Unlock()
buf, ok := a.buffers[rec.Service]
if !ok {
buf = &rollbuf{seen: make(map[string]time.Time)}
a.buffers[rec.Service] = buf
}
// Dedup with TTL
if _, exists := buf.seen[rec.ID]; exists {
return
}
buf.seen[rec.ID] = rec.Timestamp
buf.recs = append(buf.recs, rec)
// Purge old
cut := time.Now().Add(-a.winSize)
var j int
for _, r := range buf.recs {
if r.Timestamp.After(cut) {
buf.recs[j] = r
j++
} else {
delete(buf.seen, r.ID)
}
}
buf.recs = buf.recs[:j]
}
func (a *RollingAggregator) Snapshots() []Snapshot {
a.mu.RLock()
defer a.mu.RUnlock()
var out []Snapshot
now := time.Now()
for svc, buf := range a.buffers {
var lat []float64
var sizes int64
var count int64
var errs int64
cut := now.Add(-a.winSize)
for _, r := range buf.recs {
if r.Timestamp.Before(cut) {
continue
}
count++
if r.StatusCode >= 500 {
errs++
}
if r.LatencyMs > 0 {
lat = append(lat, r.LatencyMs)
}
sizes += r.ResponseSize
}
var p95, p99, avg, errRate float64
if count > 0 {
sort.Float64s(lat)
p95 = quantile(lat, 0.95)
p99 = quantile(lat, 0.99)
avg = float64(sizes) / float64(count)
errRate = float64(errs) / float64(count)
}
out = append(out, Snapshot{
WindowStart: cut,
WindowEnd: now,
Service: svc,
Count: count,
ErrorCount: errs,
ErrorRate: errRate,
P95LatencyMs: p95,
P99LatencyMs: p99,
AvgRespBytes: avg,
LastUpdated: now,
})
}
return out
}
func quantile(sorted []float64, q float64) float64 {
if len(sorted) == 0 {
return 0
}
idx := int(float64(len(sorted)-1)*q + 0.5)
if idx < 0 {
idx = 0
}
if idx >= len(sorted) {
idx = len(sorted)-1
}
return sorted[idx]
}
pkg/sink/sink.go
package sink
import (
"encoding/csv"
"encoding/json"
"fmt"
"log/slog"
"net/http"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/example/logpipe/pkg/aggregator"
"github.com/example/logpipe/pkg/config"
)
type Sink interface {
Write(snap []aggregator.Snapshot) error
Close() error
}
type Factory struct{}
func NewFactory() *Factory { return &Factory{} }
func (f *Factory) Create(cfg *config.Config, logger *slog.Logger) (Sink, error) {
if cfg.HTTP.Enable {
hs := NewHTTPSink(cfg, logger)
return hs, nil
}
// default file sink
return NewFileSink(cfg.OutputDir, cfg.Output, logger)
}
type FileSink struct {
outDir string
formats config.OutputConfig
logger *slog.Logger
}
func NewFileSink(outDir string, formats config.OutputConfig, logger *slog.Logger) (*FileSink, error) {
if outDir != "-" {
if err := os.MkdirAll(outDir, 0o755); err != nil {
return nil, err
}
}
return &FileSink{outDir: outDir, formats: formats, logger: logger}, nil
}
func (f *FileSink) Write(snap []aggregator.Snapshot) error {
t := time.Now().Format("20060102-150405")
base := "report-" + t
// JSON
if f.formats.FormatJSON {
data, _ := json.MarshalIndent(snap, "", " ")
if f.outDir == "-" {
fmt.Println(string(data))
} else {
_ = os.WriteFile(filepath.Join(f.outDir, base+".json"), data, 0o644)
}
}
// Markdown
if f.formats.FormatMarkdown {
var b strings.Builder
b.WriteString("# LogPipe Report\n\n")
b.WriteString("| Service | WindowStart | WindowEnd | Count | Errors | ErrorRate | P95 | P99 | AvgBytes |\n")
b.WriteString("|---|---|---|---:|---:|---:|---:|---:|---:|\n")
for _, s := range snap {
fmt.Fprintf(&b, "| %s | %s | %s | %d | %d | %.3f | %.2f | %.2f | %.2f |\n",
s.Service, s.WindowStart.Format(time.RFC3339), s.WindowEnd.Format(time.RFC3339),
s.Count, s.ErrorCount, s.ErrorRate, s.P95LatencyMs, s.P99LatencyMs, s.AvgRespBytes)
}
if f.outDir == "-" {
fmt.Println(b.String())
} else {
_ = os.WriteFile(filepath.Join(f.outDir, base+".md"), []byte(b.String()), 0o644)
}
}
// CSV
if f.formats.FormatCSV {
var w *csv.Writer
if f.outDir == "-" {
w = csv.NewWriter(os.Stdout)
} else {
fpath := filepath.Join(f.outDir, base+".csv")
file, err := os.Create(fpath)
if err != nil {
return err
}
defer file.Close()
w = csv.NewWriter(file)
}
_ = w.Write([]string{"service","window_start","window_end","count","error_count","error_rate","p95_ms","p99_ms","avg_resp_bytes"})
for _, s := range snap {
_ = w.Write([]string{
s.Service,
s.WindowStart.Format(time.RFC3339),
s.WindowEnd.Format(time.RFC3339),
fmt.Sprintf("%d", s.Count),
fmt.Sprintf("%d", s.ErrorCount),
fmt.Sprintf("%.6f", s.ErrorRate),
fmt.Sprintf("%.2f", s.P95LatencyMs),
fmt.Sprintf("%.2f", s.P99LatencyMs),
fmt.Sprintf("%.2f", s.AvgRespBytes),
})
}
w.Flush()
if err := w.Error(); err != nil {
return err
}
}
return nil
}
func (f *FileSink) Close() error { return nil }
// Optional HTTP sink hosting latest snapshots and health
type HTTPSink struct {
cfg *config.Config
logger *slog.Logger
mu sync.RWMutex
latest []aggregator.Snapshot
server *http.Server
}
func NewHTTPSink(cfg *config.Config, logger *slog.Logger) *HTTPSink {
hs := &HTTPSink{cfg: cfg, logger: logger}
mux := http.NewServeMux()
mux.HandleFunc("/healthz", hs.handleHealth)
mux.HandleFunc("/metrics", hs.handleMetrics)
hs.server = &http.Server{Addr: cfg.HTTP.Addr, Handler: mux}
go func() {
logger.Info("http sink listening", "addr", cfg.HTTP.Addr)
if err := hs.server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
logger.Error("http server error", "error", err)
}
}()
return hs
}
func (h *HTTPSink) Write(snap []aggregator.Snapshot) error {
h.mu.Lock()
h.latest = snap
h.mu.Unlock()
return nil
}
func (h *HTTPSink) Close() error {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
return h.server.Shutdown(ctx)
}
func (h *HTTPSink) handleHealth(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"status":"ok"}`))
}
func (h *HTTPSink) handleMetrics(w http.ResponseWriter, r *http.Request) {
h.mu.RLock()
defer h.mu.RUnlock()
data, _ := json.MarshalIndent(h.latest, "", " ")
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(data)
}
pkg/pipeline/pipeline.go
package pipeline
import (
"context"
"log/slog"
"sync"
"time"
"github.com/example/logpipe/pkg/aggregator"
"github.com/example/logpipe/pkg/config"
"github.com/example/logpipe/pkg/observer"
"github.com/example/logpipe/pkg/parser"
"github.com/example/logpipe/pkg/sink"
"github.com/example/logpipe/pkg/source"
)
type Pipeline struct {
cfg *config.Config
logger *slog.Logger
bus *observer.EventBus
sink sink.Sink
}
func NewPipeline(cfg *config.Config, logger *slog.Logger, bus *observer.EventBus, s sink.Sink) *Pipeline {
return &Pipeline{cfg: cfg, logger: logger, bus: bus, sink: s}
}
// Pipeline: source -> parse -> aggregate -> sink
func (p *Pipeline) Run(ctx context.Context) error {
metas, err := source.DiscoverFiles(p.cfg.InputPath)
if err != nil && !p.cfg.IncludeStdin {
return err
}
srcCh := make(chan source.Line, 1024)
go func() {
_ = source.ReadLines(ctx, metas, p.cfg.IncludeStdin, p.cfg.FilterExpr, srcCh)
}()
// Parser workers
type parseOut struct {
rec parser.LogRecord
err error
}
outCh := make(chan parseOut, 1024)
var wg sync.WaitGroup
wg.Add(p.cfg.Concurrency)
for i := 0; i < p.cfg.Concurrency; i++ {
go func() {
defer wg.Done()
for line := range srcCh {
select {
case <-ctx.Done():
return
default:
}
par := parser.ForFormat(line.Meta.FormatHint) // Strategy pattern
rec, err := par.Parse(line)
if err != nil {
// tolerance: publish alert and continue
p.bus.Publish(observer.Event{
Topic: "alert",
Data: map[string]any{"type":"parse_error","file":line.Meta.Path,"line":line.LineNo,"error":err.Error()},
})
continue
}
outCh <- parseOut{rec: rec}
}
}()
}
// Aggregator selection
var agg aggregator.Aggregator
if p.cfg.WindowType == "fixed" {
agg = aggregator.NewFixed(p.cfg.WindowSize)
} else {
agg = aggregator.NewRolling(p.cfg.WindowSize)
}
// progress & alerts
var processed int64
alertThreshold := p.cfg.Alert.ErrorRateThreshold
// aggregator consumer
doneAgg := make(chan struct{})
go func() {
for po := range outCh {
if po.err != nil {
continue
}
agg.Add(po.rec)
processed++
if processed%1000 == 0 {
p.bus.Publish(observer.Event{
Topic: "progress",
Data: map[string]any{"processed": processed},
})
}
// Optional alert: if error rate exceeds threshold in any snapshot, publish
if alertThreshold > 0 {
snaps := agg.Snapshots()
for _, s := range snaps {
if s.ErrorRate >= alertThreshold {
p.bus.Publish(observer.Event{
Topic: "alert",
Data: map[string]any{
"type":"error_rate_high",
"service": s.Service,
"window": s.WindowStart.String()+"-"+s.WindowEnd.String(),
"error_rate": s.ErrorRate,
},
})
}
}
}
}
close(doneAgg)
}()
// Close parse output when workers done
go func() {
wg.Wait()
close(outCh)
}()
// Wait for aggregation
<-doneAgg
// Write sink
snaps := agg.Snapshots()
if err := p.sink.Write(snaps); err != nil {
p.logger.Error("sink write error", "error", err)
return err
}
// Cleanup
if err := p.sink.Close(); err != nil {
p.logger.Warn("sink close warn", "error", err)
}
return nil
}
pkg/util/util.go
package util
// Placeholder for shared utilities if needed.
internal/version/version.go
package version
const Version = "0.1.0"
examples/data/serviceA-2025-11-20.log
ts=2025-11-20T10:10:10Z status=200 latency_ms=15 size=512 message=ok
ts=2025-11-20T10:10:11Z status=500 latency_ms=50 size=128 message=err
ts=2025-11-20T10:10:12Z status=200 latency_ms=20 size=256 message=ok
examples/data/serviceB-2025-11-20.json
{"timestamp":"2025-11-20T10:10:10Z","status":200,"latency_ms":12.5,"resp_bytes":1024,"message":"ok"}
{"timestamp":"2025-11-20T10:10:12Z","status":503,"latency_ms":35.0,"resp_bytes":256,"message":"upstream"}
{"timestamp":"2025-11-20T10:10:13Z","status":200,"latency_ms":22.0,"resp_bytes":512,"message":"ok"}
examples/data/serviceC-2025-11-20.csv
2025-11-20T10:10:10Z,200,10.5,128,ok
2025-11-20T10:10:11Z,500,45.0,64,err
2025-11-20T10:10:12Z,200,20.0,256,ok
configs/default.yaml
log:
level: info
http:
enable: false
addr: ":8080"
output:
format_json: true
format_markdown: true
format_csv: true
alert:
error_rate_threshold: 0.5
pkg/parser/parser_test.go
package parser
import (
"testing"
"time"
"github.com/example/logpipe/pkg/source"
)
func TestJSONParser(t *testing.T) {
line := source.Line{Text: `{"timestamp":"2025-11-20T10:10:10Z","status":200,"latency_ms":12.5,"resp_bytes":1024,"message":"ok"}`, Meta: source.FileMeta{Path:"/tmp/a.json", Service:"svc", FormatHint:"json"}}
p := &JSONParser{}
rec, err := p.Parse(line)
if err != nil {
t.Fatalf("parse error: %v", err)
}
if rec.StatusCode != 200 || rec.LatencyMs != 12.5 || rec.ResponseSize != 1024 {
t.Fatalf("unexpected: %+v", rec)
}
if rec.Timestamp.Format(time.RFC3339) != "2025-11-20T10:10:10Z" {
t.Fatalf("bad timestamp: %s", rec.Timestamp)
}
}
func TestPlainParser(t *testing.T) {
line := source.Line{Text: `ts=2025-11-20T10:10:10Z status=500 latency_ms=55 size=64 message=boom`, Meta: source.FileMeta{Path:"/tmp/a.log", Service:"svc", FormatHint:"plain"}}
p := &PlainParser{}
rec, err := p.Parse(line)
if err != nil {
t.Fatalf("parse error: %v", err)
}
if rec.StatusCode != 500 || rec.LatencyMs != 55 || rec.ResponseSize != 64 {
t.Fatalf("unexpected: %+v", rec)
}
}
pkg/aggregator/aggregator_test.go
package aggregator
import (
"testing"
"time"
"github.com/example/logpipe/pkg/parser"
)
func TestFixedAggregator(t *testing.T) {
agg := NewFixed(time.Minute)
r := parser.LogRecord{Service:"svc", Timestamp: time.Date(2025,11,20,10,10,0,0,time.UTC), StatusCode:200, LatencyMs:10, ResponseSize:100, ID:"a"}
agg.Add(r)
r2 := parser.LogRecord{Service:"svc", Timestamp: r.Timestamp, StatusCode:500, LatencyMs:20, ResponseSize:200, ID:"b"}
agg.Add(r2)
snaps := agg.Snapshots()
if len(snaps) != 1 {
t.Fatalf("expected 1 snapshot, got %d", len(snaps))
}
s := snaps[0]
if s.Count != 2 || s.ErrorCount != 1 {
t.Fatalf("unexpected: %+v", s)
}
if s.P95LatencyMs == 0 || s.P99LatencyMs == 0 {
t.Fatalf("quantiles missing")
}
}
func TestRollingAggregatorDedup(t *testing.T) {
agg := NewRolling(2*time.Minute)
ts := time.Now()
r := parser.LogRecord{Service:"svc", Timestamp: ts, StatusCode:200, LatencyMs:10, ResponseSize:100, ID:"dup"}
agg.Add(r)
agg.Add(r) // dedup
snaps := agg.Snapshots()
if len(snaps) != 1 {
t.Fatalf("expected 1 snapshot, got %d", len(snaps))
}
if snaps[0].Count != 1 {
t.Fatalf("dedup failed: %+v", snaps[0])
}
}
pkg/aggregator/aggregator_bench_test.go
package aggregator
import (
"testing"
"time"
"github.com/example/logpipe/pkg/parser"
)
func BenchmarkFixedAggregatorAdd(b *testing.B) {
agg := NewFixed(1 * time.Minute)
ts := time.Now()
for i := 0; i < b.N; i++ {
rec := parser.LogRecord{
Service: "svc",
Timestamp: ts.Add(time.Duration(i%100) * time.Millisecond),
StatusCode: 200,
LatencyMs: float64(i%50) + 1,
ResponseSize: int64(i%1024),
ID: "id-"+string(rune(i%1000)),
}
agg.Add(rec)
}
}
Makefile
APP=logpipe
.PHONY: all build test fmt vet lint run docker clean
all: build
build:
go build -o bin/$(APP) ./cmd/logpipe
test:
go test ./...
bench:
go test -bench=. -benchmem ./pkg/aggregator
fmt:
go fmt ./...
vet:
go vet ./...
run:
go run ./cmd/logpipe --input=./examples/data --concurrency=4 --window=1m --window-type=fixed --out=-
docker:
docker build -t logpipe:latest .
clean:
rm -rf bin out
Dockerfile
# Multi-stage build
FROM golang:1.21 AS builder
WORKDIR /app
COPY . .
RUN make build
FROM gcr.io/distroless/base-debian12
COPY --from=builder /app/bin/logpipe /usr/local/bin/logpipe
COPY configs /configs
ENTRYPOINT ["/usr/local/bin/logpipe"]
CMD ["--input=/data", "--concurrency=4", "--window=1m", "--window-type=fixed", "--out=-", "--config=/configs/default.yaml"]
scripts/build.sh
#!/usr/bin/env bash
set -euo pipefail
GOOS=${GOOS:-linux}
GOARCH=${GOARCH:-amd64}
mkdir -p bin
echo "Building for $GOOS/$GOARCH"
GOOS=$GOOS GOARCH=$GOARCH go build -o bin/logpipe-${GOOS}-${GOARCH} ./cmd/logpipe
README.md(简要使用说明)
LogPipe: 并发日志数据处理管线 CLI 模板
功能概述
- 输入:读取指定目录或通配符的多格式日志(JSON/CSV/plain),支持stdin,正则过滤。
- 识别:按文件名推断服务名与日期。
- 并发:源→解析→聚合→下游输出,使用 worker 池与 channel。
- 设计模式:Pipeline、策略(多格式解析)、观察者(进度/告警)、工厂(sink 选择)、构建者(配置)。
- 聚合:请求数、错误率、P95/P99延迟、平均响应大小;支持固定/滚动窗口;去重与异常行容错。
- 输出:JSON/Markdown/CSV 到 stdout 或文件;可选启动HTTP端点展示最新统计与健康检查。
- 配置:命令行、环境变量、YAML;结构化日志(级别、时间戳、调用栈)。
快速开始
- 本地运行:make run
- 指定输入与过滤:go run ./cmd/logpipe --input="./examples/data/*.json" --filter="status=5.."
- 启动HTTP端点:go run ./cmd/logpipe --http --http-addr=":8080" --out=-
配置
- YAML:configs/default.yaml
- 环境变量示例:LOGPIPE_CONCURRENCY=8 LOGPIPE_WINDOW=2m LOGPIPE_OUTPUT=./out LOGPIPE_FILTER="latency_ms>100"
输出
- 文件:默认写入 ./out/report-*.json / .md / .csv
- stdout:--out=-
退出码约定
- 0:成功
- 1:运行时错误(管线失败、写入失败)
- 2:配置错误(参数/配置文件无效)
- 3:输出目标初始化失败(sink 创建失败)
性能与内存建议
- 根据文件大小与 CPU 核心设置 --concurrency;建议 2*NCPU 到 4*NCPU。
- 大量延迟样本计算分位数时排序开销较大,可用近似分位数数据结构(如 t-digest、GK)替代。
- 滚动窗口去重映射可能增长,需周期性清理(已按窗口 TTL 清理)。
- 对超大日志建议分批读入与流式处理,避免一次性加载。
扩展建议
- Source:增加 Kafka 消费者(sarama 或 franz-go),将消息转换为 source.Line。
- Metrics:暴露 Prometheus 指标(使用 prometheus/client_golang),如处理速率、错误计数。
- Parser:增加 Protobuf/NDJSON 格式解析器;基于文件扩展或 Magic bytes 自动识别。
- Sink:增加远端HTTP/REST、S3/GCS、Kafka Producer、数据库写入。
- 观察者:增加报警推送(Email/Slack/Webhook)。
- 配置:支持热更新(SIGHUP 或文件监控)。
测试与基准
- make test
- make bench
跨平台构建
- scripts/build.sh,设置 GOOS/GOARCH,例如:
GOOS=darwin GOARCH=arm64 scripts/build.sh
如何根据需求调整与扩展
说明
到此你即可直接运行本模板,或作为基础快速扩展为生产级并发日志处理工具。需要我帮你接入 Kafka source、Prometheus 指标或完善 HTTP UI,请继续说明具体需求。
帮助开发者快速生成高质量示例代码或程序模板,以满足特定需求,同时提供相关技术指导,从而提高开发效率,降低学习成本并加速项目推进。
通过快速生成高质量代码片段与模板,减少重复劳动,加速任务执行,专注于构建核心功能。
根据业务逻辑设计精准代码结构与设计模式,确保代码的可扩展性与维护性更高,助力整体技术架构优化。
自定义生成学习用例,轻松理解不同编程语言的实现方法与最佳实践,提高学习效率。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
免费获取高级提示词-优惠即将到期