Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
531 changes: 531 additions & 0 deletions docs/csv_pipeline.md

Large diffs are not rendered by default.

144 changes: 144 additions & 0 deletions examples/csv_pipeline_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
# -*- coding: utf-8 -*-
"""
Created on 2025-10-16
---------
@summary: CSV Pipeline 使用示例
---------
@author: 道长
@email: ctrlf4@yeah.net

演示如何使用 CsvPipeline 将爬虫数据保存为 CSV 文件。
"""

import feapder
from feapder.network.item import Item


# 定义数据项目
class ProductItem(Item):
"""商品数据项"""

# 指定表名,对应 CSV 文件名为 product.csv
table_name = "product"

def clean(self):
"""数据清洁方法(可选)"""
pass


class CsvPipelineSpider(feapder.AirSpider):
"""
演示使用CSV Pipeline的爬虫

注意:要启用CsvPipeline,需要在 setting.py 中配置:
ITEM_PIPELINES = [
...,
"feapder.pipelines.csv_pipeline.CsvPipeline",
]
"""

def start_requests(self):
"""生成初始请求"""
# 这里以示例数据代替真实网络请求
yield feapder.Request("https://example.com/products")

def parse(self, request, response):
"""
解析页面

在实际应用中,你会从HTML中提取数据。
这里我们生成示例数据来演示CSV存储功能。
"""
# 示例:生成10条商品数据
for i in range(10):
item = ProductItem()
item.id = i + 1
item.name = f"商品_{i + 1}"
item.price = 99.99 + i
item.category = "电子产品"
item.url = f"https://example.com/product/{i + 1}"

yield item


class CsvPipelineSpiderWithMultiTables(feapder.AirSpider):
"""
演示使用CSV Pipeline处理多表数据

CsvPipeline支持多表存储,每个表对应一个CSV文件。
"""

def start_requests(self):
"""生成初始请求"""
yield feapder.Request("https://example.com/products")
yield feapder.Request("https://example.com/users")

def parse(self, request, response):
"""解析页面,输出不同表的数据"""

if "/products" in request.url:
# 产品表数据
for i in range(5):
item = ProductItem()
item.id = i + 1
item.name = f"商品_{i + 1}"
item.price = 99.99 + i
item.category = "电子产品"
item.url = request.url

yield item

elif "/users" in request.url:
# 用户表数据
user_item = Item()
user_item.table_name = "user"

for i in range(5):
user_item.id = i + 1
user_item.username = f"user_{i + 1}"
user_item.email = f"user_{i + 1}@example.com"
user_item.created_at = "2024-10-16"

yield user_item


# 配置说明
"""
使用CSV Pipeline需要的配置步骤:

1. 在 feapder/setting.py 中启用 CsvPipeline:

ITEM_PIPELINES = [
"feapder.pipelines.mysql_pipeline.MysqlPipeline", # 保持MySQL
"feapder.pipelines.csv_pipeline.CsvPipeline", # 新增CSV
]

2. CSV文件会自动保存到 data/csv/ 目录下:
- product.csv: 商品表数据
- user.csv: 用户表数据
- 等等...

3. CSV文件会自动包含表头(首次创建时)

4. 如果爬虫中断后重新启动,CSV数据会继续追加
(支持断点续爬)

性能特点:
- 每批数据最多1000条(由 ITEM_UPLOAD_BATCH_MAX_SIZE 控制)
- 每秒最多1000条,或等待1秒触发批处理
- 使用Per-Table Lock,确保单表写入安全
- 通过 fsync 确保数据落盘,不会丢失

注意事项:
- CSV文件本身不支持真正的UPDATE操作
- 如果有重复数据,可在应用层处理或启用 ITEM_FILTER_ENABLE
- 如果需要真正的UPDATE操作,建议配合MySQL或MongoDB使用
"""


if __name__ == "__main__":
# 运行爬虫示例
CsvPipelineSpider().start()

# 或运行多表示例
# CsvPipelineSpiderWithMultiTables().start()
244 changes: 244 additions & 0 deletions feapder/pipelines/csv_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
# -*- coding: utf-8 -*-
"""
Created on 2025-10-16
---------
@summary: CSV 数据导出Pipeline
---------
@author: 道长
@email: ctrlf4@yeah.net
"""

import csv
import os
import threading
from typing import Dict, List, Tuple

from feapder.pipelines import BasePipeline
from feapder.utils.log import log


class CsvPipeline(BasePipeline):
"""
CSV 数据导出Pipeline

将爬虫数据保存为CSV文件。支持批量保存、并发写入控制、断点续爬等功能。

特点:
- 单表单锁设计,避免全局锁带来的性能问题
- 自动创建导出目录
- 支持追加模式,便于断点续爬
- 通过fsync确保数据落盘
- 表级别的字段名缓存,确保跨批字段顺序一致
"""

# 用于保护每个表的文件写入操作(Per-Table Lock)
_file_locks = {}

# 用于缓存每个表的字段名顺序(Per-Table Fieldnames Cache)
# 确保跨批次、跨线程的字段顺序一致
_table_fieldnames = {}

def __init__(self, csv_dir="data/csv"):
"""
初始化CSV Pipeline

Args:
csv_dir: CSV文件保存目录,默认为 data/csv
"""
super().__init__()
self.csv_dir = csv_dir
self._ensure_csv_dir_exists()

def _ensure_csv_dir_exists(self):
"""确保CSV保存目录存在"""
if not os.path.exists(self.csv_dir):
try:
os.makedirs(self.csv_dir, exist_ok=True)
log.info(f"创建CSV保存目录: {self.csv_dir}")
except Exception as e:
log.error(f"创建CSV目录失败: {e}")
raise

@staticmethod
def _get_lock(table):
"""
获取表对应的文件锁

采用Per-Table Lock设计,每个表都有独立的锁,避免锁竞争。
这样设计既能保证单表的文件写入安全,又能充分利用多表并行写入的优势。

Args:
table: 表名

Returns:
threading.Lock: 该表对应的锁对象
"""
if table not in CsvPipeline._file_locks:
CsvPipeline._file_locks[table] = threading.Lock()
return CsvPipeline._file_locks[table]

@staticmethod
def _get_and_cache_fieldnames(table, items):
"""
获取并缓存表对应的字段名顺序

第一次调用时从items[0]提取字段名并缓存,后续调用直接返回缓存的字段名。
这样设计确保:
1. 跨批次的字段顺序保持一致(解决数据列错位问题)
2. 多线程并发时字段顺序不被污染
3. 避免重复提取,性能更优

Args:
table: 表名
items: 数据列表 [{},{},...]

Returns:
list: 字段名列表
"""
# 如果该表已经缓存了字段名,直接返回缓存的
if table in CsvPipeline._table_fieldnames:
return CsvPipeline._table_fieldnames[table]

# 第一次调用,从items提取字段名并缓存
if not items:
return []

first_item = items[0]
fieldnames = list(first_item.keys()) if isinstance(first_item, dict) else []

if fieldnames:
# 缓存字段名(使用静态变量,跨实例共享)
CsvPipeline._table_fieldnames[table] = fieldnames
log.info(f"表 {table} 的字段名已缓存: {fieldnames}")

return fieldnames

def _get_csv_file_path(self, table):
"""
获取表对应的CSV文件路径

Args:
table: 表名

Returns:
str: CSV文件的完整路径
"""
return os.path.join(self.csv_dir, f"{table}.csv")


def _file_exists_and_has_content(self, csv_file):
"""
检查CSV文件是否存在且有内容

Args:
csv_file: CSV文件路径

Returns:
bool: 文件存在且有内容返回True
"""
return os.path.exists(csv_file) and os.path.getsize(csv_file) > 0

def save_items(self, table, items: List[Dict]) -> bool:
"""
保存数据到CSV文件

采用追加模式打开文件,支持断点续爬。第一次写入时会自动添加表头。
使用Per-Table Lock确保多线程写入时的数据一致性。
使用缓存的字段名确保跨批次字段顺序一致,避免数据列错位。

Args:
table: 表名(对应CSV文件名)
items: 数据列表,[{}, {}, ...]

Returns:
bool: 保存成功返回True,失败返回False
失败时ItemBuffer会自动重试(最多10次)
"""
if not items:
return True

csv_file = self._get_csv_file_path(table)

# 使用缓存机制获取字段名(关键!确保跨批字段顺序一致)
fieldnames = self._get_and_cache_fieldnames(table, items)

if not fieldnames:
log.warning(f"无法提取字段名,items: {items}")
return False

try:
# 获取表级别的锁(关键!保证文件写入安全)
lock = self._get_lock(table)
with lock:
# 检查文件是否已存在且有内容
file_exists = self._file_exists_and_has_content(csv_file)

# 以追加模式打开文件
with open(
csv_file,
"a",
encoding="utf-8",
newline=""
) as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)

# 如果文件不存在或为空,写入表头
if not file_exists:
writer.writeheader()

# 批量写入数据行
# 使用缓存的fieldnames确保列顺序一致,避免跨批数据错位
writer.writerows(items)

# 刷新缓冲区到磁盘,确保数据不丢失
f.flush()
os.fsync(f.fileno())

# 记录导出日志
log.info(
f"共导出 {len(items)} 条数据 到 {table}.csv (文件路径: {csv_file})"
)
return True

except Exception as e:
log.error(
f"CSV写入失败. table: {table}, csv_file: {csv_file}, error: {e}"
)
return False

def update_items(self, table, items: List[Dict], update_keys=Tuple) -> bool:
"""
更新数据

注意:CSV文件本身不支持真正的"更新"操作(需要查询后替换)。
目前的实现是直接追加写入,相当于INSERT操作。

如果需要真正的UPDATE操作,建议:
1. 定期重新生成CSV文件
2. 使用数据库(MySQL/MongoDB)来处理UPDATE
3. 或在应用层进行去重和更新

Args:
table: 表名
items: 数据列表,[{}, {}, ...]
update_keys: 更新的字段(此实现中未使用)

Returns:
bool: 操作成功返回True
"""
# 对于CSV,update操作实现为追加写入
# 若需要真正的UPDATE操作,建议在应用层处理
return self.save_items(table, items)

def close(self):
"""
关闭Pipeline,释放资源

在爬虫结束时由ItemBuffer自动调用。
"""
try:
# 清理文件锁字典(可选,用于释放内存)
# 在长期运行的场景下,可能需要定期清理
pass
except Exception as e:
log.error(f"关闭CSV Pipeline时出错: {e}")
1 change: 1 addition & 0 deletions feapder/setting.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
ITEM_PIPELINES = [
"feapder.pipelines.mysql_pipeline.MysqlPipeline",
# "feapder.pipelines.mongo_pipeline.MongoPipeline",
# "feapder.pipelines.csv_pipeline.CsvPipeline",
# "feapder.pipelines.console_pipeline.ConsolePipeline",
]
EXPORT_DATA_MAX_FAILED_TIMES = 10 # 导出数据时最大的失败次数,包括保存和更新,超过这个次数报警
Expand Down
Loading