热门角色不仅是灵感来源,更是你的效率助手。通过精挑细选的角色提示词,你可以快速生成高质量内容、提升创作灵感,并找到最契合你需求的解决方案。让创作更轻松,让价值更直接!
我们根据不同用户需求,持续更新角色库,让你总能找到合适的灵感入口。
利用Beautiful Soup编写Python脚本从指定URL提取数据。
以下脚本使用requests和BeautifulSoup从指定促销页抓取商品数据,并进行基础的预处理(文本清理、价格解析、字段标准化)、去重、结构化输出(CSV、JSON)。脚本包含对robots.txt的合规性检查、重试与限速、以及多策略解析(页面卡片、Microdata、JSON-LD),以提升在未知前端结构下的鲁棒性。可直接运行,默认保存到当前目录。
使用说明(简要):
脚本(scrape_promo.py): import requests from bs4 import BeautifulSoup import re import json import csv import logging import time from urllib.parse import urljoin, urlparse from urllib.robotparser import RobotFileParser from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry
PROMO_URL = "https://shop.testsite.cn/promo/summer-2025" USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0 Safari/537.36" REQUEST_TIMEOUT = 10 RATE_LIMIT_SECONDS = 1.0 # 基本限速,避免过快请求
logging.basicConfig( level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s" )
def setup_session(): session = requests.Session() retries = Retry( total=3, backoff_factor=0.5, status_forcelist=[429, 500, 502, 503, 504], allowed_methods=frozenset(["GET"]) ) adapter = HTTPAdapter(max_retries=retries) session.mount("http://", adapter) session.mount("https://", adapter) session.headers.update({ "User-Agent": USER_AGENT, "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,/;q=0.8" }) return session
def is_allowed_by_robots(url, user_agent=USER_AGENT, session=None): parsed = urlparse(url) robots_url = f"{parsed.scheme}://{parsed.netloc}/robots.txt" rp = RobotFileParser() try: # 读取 robots.txt if session: resp = session.get(robots_url, timeout=REQUEST_TIMEOUT) if resp.status_code == 200: rp.parse(resp.text.splitlines()) else: # 无法获取robots,默认允许,但建议谨慎 logging.warning("无法获取 robots.txt,默认按允许处理:%s", robots_url) return True else: rp.set_url(robots_url) rp.read() return rp.can_fetch(user_agent, url) except Exception as e: logging.warning("检查 robots.txt 时出错:%s", e) return True
def fetch_html(url, session): time.sleep(RATE_LIMIT_SECONDS) resp = session.get(url, timeout=REQUEST_TIMEOUT) resp.raise_for_status() # 尝试合理编码 if not resp.encoding or resp.encoding.lower() == "iso-8859-1": resp.encoding = resp.apparent_encoding or "utf-8" return resp.text
def clean_text(text): if text is None: return None # 去除多余空白、不可见字符 return re.sub(r"\s+", " ", text).strip()
def parse_price(text):
"""
解析价格字符串,返回 (value: float, currency: str or None)
支持格式示例:¥1,299.00、¥899、1299 元、CNY 499
"""
if not text:
return None, None
t = clean_text(text)
# 先匹配价格数值
m = re.search(r"(?P
def parse_discount(text): if not text: return None t = clean_text(text) m = re.search(r"(\d{1,3})\s*%", t) if m: pct = int(m.group(1)) if 0 < pct <= 100: return pct / 100.0 return None
def parse_rating(text): # 支持“4.5/5”、“评分 4.2”、“4.0 星”等 if not text: return None t = clean_text(text) m = re.search(r"(\d+(?:.\d+)?)\s*(?:/5|星|stars)", t, flags=re.IGNORECASE) return float(m.group(1)) if m else None
def extract_jsonld_products(soup, base_url): products = [] for tag in soup.find_all("script", type="application/ld+json"): try: data = json.loads(tag.string or tag.get_text()) except Exception: continue items = [] if isinstance(data, list): items = data elif isinstance(data, dict): # 如果是ItemList,展开itemListElement if data.get("@type") == "ItemList" and "itemListElement" in data: for el in data["itemListElement"]: # 列表项可能是ListItem,取其中的item obj = el.get("item") if isinstance(el, dict) else el if obj: items.append(obj) else: items = [data] for obj in items: if not isinstance(obj, dict): continue typ = obj.get("@type") if isinstance(typ, list): is_product = "Product" in typ else: is_product = typ == "Product" if not is_product: continue name = clean_text(obj.get("name")) url = obj.get("url") image = obj.get("image") description = clean_text(obj.get("description")) sku = obj.get("sku") or obj.get("mpn") brand = None if isinstance(obj.get("brand"), dict): brand = obj["brand"].get("name") offers = obj.get("offers") price = None currency = None original_price = None availability = None if isinstance(offers, list) and offers: off = offers[0] price, currency, availability = off.get("price"), off.get("priceCurrency"), off.get("availability") elif isinstance(offers, dict): price, currency, availability = offers.get("price"), offers.get("priceCurrency"), offers.get("availability") # 数值化价格 price_val = float(price) if isinstance(price, (int, float, str)) and re.match(r"^\d+(.\d+)?$", str(price)) else None # 标准化URL和图片 url = urljoin(base_url, url) if url else None image = urljoin(base_url, image) if isinstance(image, str) else image products.append({ "name": name, "product_url": url, "image_url": image, "sku": sku, "brand": brand, "price": price_val, "currency": currency, "original_price": original_price, "discount_rate": None, "stock_status": availability, "rating": None, "review_count": None, "description": description, "source": "jsonld" }) return products
def find_product_cards(soup): # 多策略选择可能的商品卡片 selectors = [ 'div[class*="product"]', 'li[class*="product"]', 'article[class*="product"]', 'div[class*="item"]', 'li[class*="item"]', 'div[class*="card"]', 'li[class*="card"]' ] cards = [] seen = set() for sel in selectors: for el in soup.select(sel): # 去重同一对象 oid = id(el) if oid not in seen: seen.add(oid) cards.append(el) return cards
def first_match_text(el, selectors_or_class_keywords): # selectors_or_class_keywords既可为CSS选择器列表,也可为类名关键词列表 if isinstance(selectors_or_class_keywords, (list, tuple)): # 先当做选择器尝试 for sel in selectors_or_class_keywords: found = el.select_one(sel) if found and clean_text(found.get_text()): return clean_text(found.get_text()) # 再当类关键词匹配 for kw in selectors_or_class_keywords: for tag in el.find_all(True, class_=lambda c: c and kw in " ".join(c) if isinstance(c, list) else (c and kw in c)): txt = clean_text(tag.get_text()) if txt: return txt return None
def get_attr_chain(el, attrs): for a in attrs: v = el.get(a) if v: return v return None
def parse_product_card(card, base_url): # 名称 name = None name_selectors = ['h1', 'h2', 'h3', 'a[class*="title"]', 'a[class*="name"]', 'div[class*="title"]', 'span[class*="title"]'] for sel in name_selectors: tag = card.select_one(sel) if tag: txt = clean_text(tag.get_text()) if txt: name = txt break if not name: # 尝试图片alt img = card.find("img") if img: alt = clean_text(img.get("alt")) if alt: name = alt
# 商品链接
product_url = None
a = card.find("a", href=True)
if a:
href = a.get("href")
if href and not href.startswith("#"):
product_url = urljoin(base_url, href)
# 图片
image_url = None
img = card.find("img")
if img:
image_url = get_attr_chain(img, ["data-src", "data-original", "src"])
if image_url:
image_url = urljoin(base_url, image_url)
# SKU
sku = get_attr_chain(card, ["data-sku", "data-id"])
if not sku:
m = re.search(r"(SKU|货号)\s*[::]\s*([A-Za-z0-9\-_]+)", card.get_text())
if m:
sku = m.group(2)
# 价格相关
price = None
currency = None
original_price = None
# 优先从明显价格区块解析
price_blocks = card.find_all(True, class_=lambda c: c and (
("price" in " ".join(c)) or ("sale" in " ".join(c)) or ("final" in " ".join(c))
) if isinstance(c, list) else (c and ("price" in c or "sale" in c or "final" in c)))
if price_blocks:
# 当前价
for pb in price_blocks:
val, cur = parse_price(pb.get_text())
if val:
price, currency = val, cur or currency
break
# 原价(可能在删除线或标注was/list)
orig_blocks = card.find_all(["del", "s", "strike"]) + card.find_all(True, class_=lambda c: c and ("original" in c or "was" in c or "list" in c))
for ob in orig_blocks:
val, cur = parse_price(ob.get_text())
if val:
original_price = val
if not currency and cur:
currency = cur
break
# 折扣
discount_rate = None
disc_blocks = card.find_all(True, class_=lambda c: c and ("discount" in c or "off" in c))
for db in disc_blocks:
dr = parse_discount(db.get_text())
if dr:
discount_rate = dr
break
# 若未解析到折扣但有原价和现价,计算折扣
if not discount_rate and original_price and price and original_price > 0 and price <= original_price:
discount_rate = 1.0 - (price / original_price)
# 评分与评论数
rating = None
review_count = None
rating_blocks = card.find_all(True, class_=lambda c: c and ("rating" in c or "stars" in c))
for rb in rating_blocks:
r = parse_rating(rb.get_text())
if r:
rating = r
break
# 评论数
m = re.search(r"(\d+)\s*(条评|评论|reviews)", card.get_text(), flags=re.IGNORECASE)
if m:
review_count = int(m.group(1))
# 库存状态
stock_status = None
text_all = card.get_text()
if re.search(r"(有货|现货|在库)", text_all):
stock_status = "InStock"
elif re.search(r"(缺货|售罄|无货|OutOfStock)", text_all, flags=re.IGNORECASE):
stock_status = "OutOfStock"
return {
"name": name,
"product_url": product_url,
"image_url": image_url,
"sku": sku,
"brand": None,
"price": price,
"currency": currency,
"original_price": original_price,
"discount_rate": discount_rate,
"stock_status": stock_status,
"rating": rating,
"review_count": review_count,
"description": None,
"source": "card"
}
def extract_microdata_products(soup, base_url): products = [] for prod in soup.find_all(attrs={"itemscope": True, "itemtype": re.compile("Product", re.IGNORECASE)}): def get_itemprop(prop): tag = prod.find(attrs={"itemprop": prop}) return clean_text(tag.get_text()) if tag and tag.get_text() else (tag.get("content") if tag else None) name = get_itemprop("name") url = get_itemprop("url") image = get_itemprop("image") sku = get_itemprop("sku") or get_itemprop("mpn") price = get_itemprop("price") currency = get_itemprop("priceCurrency") # 数值化价格 price_val = float(price) if price and re.match(r"^\d+(.\d+)?$", str(price)) else None products.append({ "name": name, "product_url": urljoin(base_url, url) if url else None, "image_url": urljoin(base_url, image) if image else None, "sku": sku, "brand": get_itemprop("brand"), "price": price_val, "currency": currency or "CNY", "original_price": None, "discount_rate": None, "stock_status": None, "rating": None, "review_count": None, "description": get_itemprop("description"), "source": "microdata" }) return products
def deduplicate(products): seen = set() out = [] for p in products: key = ( (p.get("sku") or "").strip().lower(), (p.get("product_url") or "").strip().lower() ) if key in seen: continue # 弱去重:若sku和url均为空,用name+price近似去重 if not key[0] and not key[1]: alt_key = ((p.get("name") or "").strip().lower(), str(p.get("price") or "")) if alt_key in seen: continue seen.add(alt_key) else: seen.add(key) out.append(p) return out
def normalize(products): """ 预处理:清理文本、标准化货币、过滤无名项。 """ out = [] for p in products: if not p.get("name"): continue p["name"] = clean_text(p["name"]) if p.get("currency") in ["¥", "¥", "元", None]: p["currency"] = "CNY" # 保证价格为float或None if isinstance(p.get("price"), str): val, cur = parse_price(p["price"]) p["price"] = val p["currency"] = p["currency"] or cur or "CNY" out.append(p) return out
def save_csv(products, path): fields = ["name", "product_url", "image_url", "sku", "brand", "price", "currency", "original_price", "discount_rate", "stock_status", "rating", "review_count", "description", "source"] with open(path, "w", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=fields) writer.writeheader() for p in products: writer.writerow({k: p.get(k) for k in fields})
def save_json(products, path): with open(path, "w", encoding="utf-8") as f: json.dump(products, f, ensure_ascii=False, indent=2)
def main(): session = setup_session()
if not is_allowed_by_robots(PROMO_URL, USER_AGENT, session):
logging.error("robots.txt不允许抓取该URL:%s", PROMO_URL)
return
logging.info("开始抓取:%s", PROMO_URL)
html = fetch_html(PROMO_URL, session)
soup = BeautifulSoup(html, "html.parser")
# 多策略抽取
products = []
products += extract_jsonld_products(soup, PROMO_URL)
products += extract_microdata_products(soup, PROMO_URL)
# 卡片解析
cards = find_product_cards(soup)
logging.info("发现疑似商品卡片数量:%d", len(cards))
for card in cards:
p = parse_product_card(card, PROMO_URL)
# 至少需要名称或链接
if p.get("name") or p.get("product_url"):
products.append(p)
# 预处理与去重
products = normalize(products)
products = deduplicate(products)
logging.info("有效商品数量(去重后):%d", len(products))
if not products:
logging.warning("未提取到任何商品,可能页面为动态渲染或选择器需调整。")
else:
save_csv(products, "promo_summer_2025_products.csv")
save_json(products, "promo_summer_2025_products.json")
logging.info("已保存:promo_summer_2025_products.csv, promo_summer_2025_products.json")
if name == "main": main()
技术要点说明:
以下脚本使用 requests 与 Beautiful Soup 从指定栏目页抓取新闻数据,并包含基础的数据预处理(文本清洗、去重)、分页与速率控制、robots.txt 合规检查,以及可选的详情页补全。输出支持 CSV 与 JSONL,便于后续数据挖掘(特征工程、建模)的输入。
说明与假设:
使用方法:
Python脚本(保存为 scrape_marketing_insights.py):
import argparse
import csv
import json
import logging
import random
import re
import sys
import time
from dataclasses import dataclass, asdict
from typing import List, Optional, Set
from urllib.parse import urljoin, urlparse
from urllib.robotparser import RobotFileParser
import requests
from bs4 import BeautifulSoup
from bs4.element import Tag
# -----------------------
# 配置与工具函数
# -----------------------
USER_AGENT = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0 Safari/537.36"
DEFAULT_URL = "https://news.example.net/industry/marketing-insights"
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)
@dataclass
class Article:
title: str
url: str
published_at: Optional[str]
author: Optional[str]
summary: Optional[str]
source: str
def normalize_whitespace(text: Optional[str]) -> Optional[str]:
if text is None:
return None
return re.sub(r"\s+", " ", text).strip()
def build_robot_parser(start_url: str) -> RobotFileParser:
parsed = urlparse(start_url)
robots_url = f"{parsed.scheme}://{parsed.netloc}/robots.txt"
rp = RobotFileParser()
rp.set_url(robots_url)
try:
rp.read()
logging.info(f"robots.txt 加载完成: {robots_url}")
except Exception as e:
logging.warning(f"robots.txt 加载失败: {robots_url} -> {e}")
return rp
def can_fetch(rp: RobotFileParser, url: str, user_agent: str = USER_AGENT) -> bool:
# 如果 robots.txt 无法读取,RobotFileParser 默认返回 True
try:
return rp.can_fetch(user_agent, url)
except Exception:
return True
def get_session() -> requests.Session:
s = requests.Session()
s.headers.update({
"User-Agent": USER_AGENT,
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
})
return s
# -----------------------
# 解析函数(列表页与详情页)
# -----------------------
def find_article_containers(soup: BeautifulSoup) -> List[Tag]:
# 通用选择器组合,尽可能兼容不同站点结构
containers: List[Tag] = []
containers.extend(soup.find_all("article"))
containers.extend(soup.select('div[itemtype*="NewsArticle"], div[itemtype*="Article"]'))
for cls in ["news", "article", "post", "story", "card", "entry"]:
containers.extend(soup.select(f"div[class*='{cls}'], li[class*='{cls}']"))
# 去重:以元素的字符串位置标识符(sourceline)+名称近似去重
seen_ids = set()
unique = []
for tag in containers:
key = (tag.name, getattr(tag, "sourceline", None), getattr(tag, "sourcepos", None))
if key not in seen_ids:
seen_ids.add(key)
unique.append(tag)
return unique
def extract_title_and_url(container: Tag, base_url: str) -> (Optional[str], Optional[str]):
# 优先从标题标签提取
title_tag = None
for sel in ["h1 a", "h2 a", "h3 a", "h1", "h2", "h3", "a"]:
title_tag = container.select_one(sel)
if title_tag:
break
title = None
url = None
if title_tag:
if title_tag.name == "a":
title = title_tag.get_text(strip=True) or None
href = title_tag.get("href")
if href:
url = urljoin(base_url, href)
else:
title = title_tag.get_text(strip=True) or None
a = title_tag.find("a", href=True)
if a and a.get("href"):
url = urljoin(base_url, a["href"])
# 回退:任何可点击链接
if not url:
a = container.find("a", href=True)
if a and a.get_text(strip=True):
url = urljoin(base_url, a["href"])
title = title or a.get_text(strip=True)
return normalize_whitespace(title), url
def extract_published_at(container: Tag) -> Optional[str]:
# time 标签
t = container.find("time")
if t:
dt = t.get("datetime") or t.get_text(strip=True)
if dt:
return normalize_whitespace(dt)
# meta 标签(结构化)
for sel in [
'meta[itemprop="datePublished"]',
'meta[property="article:published_time"]',
'meta[name="pubdate"]',
'span[itemprop="datePublished"]',
]:
m = container.select_one(sel)
if m:
content = m.get("content") or m.get_text(strip=True)
if content:
return normalize_whitespace(content)
return None
def extract_author(container: Tag) -> Optional[str]:
for sel in [
".author", ".byline", '[itemprop="author"]', 'a[rel="author"]', 'meta[name="author"]',
]:
el = container.select_one(sel)
if el:
content = el.get("content") if el.name == "meta" else el.get_text(strip=True)
if content:
return normalize_whitespace(content)
return None
def extract_summary(container: Tag) -> Optional[str]:
for sel in [".summary", ".dek", ".abstract", "p"]:
el = container.select_one(sel)
if el:
text = el.get_text(" ", strip=True)
if text and len(text.split()) >= 3:
return normalize_whitespace(text)
return None
def parse_detail_page(soup: BeautifulSoup) -> dict:
# 从详情页提取更规范的字段(若存在)
data = {"title": None, "published_at": None, "author": None, "summary": None}
# 标题
title_tag = soup.find(["h1", "title"])
if title_tag:
data["title"] = normalize_whitespace(title_tag.get_text(strip=True))
og_title = soup.select_one('meta[property="og:title"]')
if og_title and og_title.get("content"):
data["title"] = normalize_whitespace(og_title["content"]) or data["title"]
# 摘要
og_desc = soup.select_one('meta[property="og:description"], meta[name="description"]')
if og_desc and og_desc.get("content"):
data["summary"] = normalize_whitespace(og_desc["content"])
# 发布时间
for sel in [
'meta[property="article:published_time"]',
'meta[itemprop="datePublished"]',
'time[datetime]',
]:
el = soup.select_one(sel)
if el:
content = el.get("content") or el.get("datetime") or el.get_text(strip=True)
if content:
data["published_at"] = normalize_whitespace(content)
break
# 作者
for sel in [
'meta[name="author"]',
'[itemprop="author"]',
'.author', '.byline', 'a[rel="author"]',
]:
el = soup.select_one(sel)
if el:
content = el.get("content") if el.name == "meta" else el.get_text(strip=True)
if content:
data["author"] = normalize_whitespace(content)
break
# JSON-LD(可选增强)
for script in soup.find_all("script", type="application/ld+json"):
try:
payload = json.loads(script.string or "")
except Exception:
continue
objs = payload if isinstance(payload, list) else [payload]
for obj in objs:
t = obj.get("@type") or obj.get("@graph", [{}])[0].get("@type")
if isinstance(t, list):
t = ",".join(t)
if t and ("NewsArticle" in t or "Article" in t):
data["title"] = normalize_whitespace(obj.get("headline")) or data["title"]
data["summary"] = normalize_whitespace(obj.get("description")) or data["summary"]
pub = obj.get("datePublished") or obj.get("dateCreated")
if pub:
data["published_at"] = normalize_whitespace(pub)
author = obj.get("author")
if isinstance(author, dict):
data["author"] = normalize_whitespace(author.get("name")) or data["author"]
elif isinstance(author, list) and author:
if isinstance(author[0], dict):
data["author"] = normalize_whitespace(author[0].get("name")) or data["author"]
elif isinstance(author[0], str):
data["author"] = normalize_whitespace(author[0]) or data["author"]
break
return data
# -----------------------
# 抓取主流程
# -----------------------
def scrape_list(start_url: str,
max_pages: int = 1,
sleep_min: float = 1.0,
sleep_max: float = 2.5,
fetch_detail: bool = False) -> List[Article]:
session = get_session()
rp = build_robot_parser(start_url)
articles: List[Article] = []
seen_urls: Set[str] = set()
def fetch(url: str) -> Optional[BeautifulSoup]:
if not can_fetch(rp, url):
logging.warning(f"robots 不允许抓取:{url}")
return None
try:
resp = session.get(url, timeout=30)
if resp.status_code != 200:
logging.warning(f"请求失败 {resp.status_code}: {url}")
return None
return BeautifulSoup(resp.text, "html.parser")
except requests.RequestException as e:
logging.error(f"请求异常: {url} -> {e}")
return None
page_url = start_url
base = f"{urlparse(start_url).scheme}://{urlparse(start_url).netloc}"
for page_idx in range(max_pages):
soup = fetch(page_url)
if soup is None:
break
containers = find_article_containers(soup)
if not containers:
logging.info("未找到文章容器,尝试使用更宽松的链接抽取")
# 宽松抽取:从主体区块抽取链接
for a in soup.select("main a[href], .content a[href], .container a[href]"):
title = normalize_whitespace(a.get_text(strip=True))
if not title or len(title) < 5:
continue
url = urljoin(base, a.get("href"))
if url in seen_urls:
continue
articles.append(Article(
title=title,
url=url,
published_at=None,
author=None,
summary=None,
source=start_url,
))
seen_urls.add(url)
else:
for c in containers:
title, url = extract_title_and_url(c, base)
if not url or not title:
continue
if url in seen_urls:
continue
published_at = extract_published_at(c)
author = extract_author(c)
summary = extract_summary(c)
# 详情页补全(可选)
if fetch_detail and can_fetch(rp, url):
time.sleep(random.uniform(sleep_min, sleep_max))
dsoup = fetch(url)
if dsoup:
detail = parse_detail_page(dsoup)
title = detail["title"] or title
summary = detail["summary"] or summary
published_at = detail["published_at"] or published_at
author = detail["author"] or author
articles.append(Article(
title=title,
url=url,
published_at=published_at,
author=author,
summary=summary,
source=start_url,
))
seen_urls.add(url)
# 分页:查找下一页链接
next_link = soup.select_one('a[rel="next"], a.next, a[aria-label*="下一页"], a[aria-label*="Next"], li.next a')
if next_link and next_link.get("href"):
next_url = urljoin(base, next_link.get("href"))
if next_url == page_url:
break
page_url = next_url
logging.info(f"跳转下一页: {page_url}")
time.sleep(random.uniform(sleep_min, sleep_max))
else:
break
return articles
# -----------------------
# 输出与 CLI
# -----------------------
def write_csv(path: str, items: List[Article]) -> None:
with open(path, "w", encoding="utf-8", newline="") as f:
w = csv.DictWriter(f, fieldnames=["title", "url", "published_at", "author", "summary", "source"])
w.writeheader()
for it in items:
w.writerow(asdict(it))
def write_jsonl(path: str, items: List[Article]) -> None:
with open(path, "w", encoding="utf-8") as f:
for it in items:
f.write(json.dumps(asdict(it), ensure_ascii=False) + "\n")
def main():
parser = argparse.ArgumentParser(description="Marketing Insights 数据抓取(BeautifulSoup)")
parser.add_argument("--url", type=str, default=DEFAULT_URL, help="起始列表页 URL")
parser.add_argument("--max-pages", type=int, default=1, help="最大分页页数")
parser.add_argument("--sleep-min", type=float, default=1.0, help="最小请求间隔(秒)")
parser.add_argument("--sleep-max", type=float, default=2.5, help="最大请求间隔(秒)")
parser.add_argument("--detail", action="store_true", help="是否抓取详情页补全字段")
parser.add_argument("--output-csv", type=str, default=None, help="CSV 输出路径")
parser.add_argument("--output-jsonl", type=str, default=None, help="JSONL 输出路径")
args = parser.parse_args()
items = scrape_list(
start_url=args.url,
max_pages=args.max_pages,
sleep_min=args.sleep_min,
sleep_max=args.sleep_max,
fetch_detail=args.detail,
)
logging.info(f"抓取完成,共 {len(items)} 条")
if args.output_csv:
write_csv(args.output_csv, items)
logging.info(f"CSV 已写入: {args.output_csv}")
if args.output_jsonl:
write_jsonl(args.output_jsonl, items)
logging.info(f"JSONL 已写入: {args.output_jsonl}")
# 如果未指定输出文件,打印示例前几条
if not args.output_csv and not args.output_jsonl:
for it in items[:5]:
print(json.dumps(asdict(it), ensure_ascii=False))
if __name__ == "__main__":
main()
数据挖掘建议(采集到数据后的处理要点):
将“给定网址+期望输出语言”一键转化为可直接运行的网页数据抓取脚本与清晰操作说明,面向增长、运营、产品、竞品研究与数据分析等高频场景,帮助用户在分钟级完成网页信息采集、基础清洗与结构化整理,显著降低技术门槛,提升交付确定性,为后续分析、可视化与报告撰写提供可靠的数据输入。
快速抓取竞品活动、价格与文案,生成可读数据,支持活动策略制定与投放优化。
批量采集来源数据,结合清洗与预处理建议,缩短数据准备周期,加快洞察产出。
汇总行业资讯与素材链接,规范化整理输出,提升选题效率与内容迭代速度。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
免费获取高级提示词-优惠即将到期