Python实大模型文章的RSS订阅采集器
在这篇教程中,我们将详细讲解如何使用Python构建一个可靠的RSS订阅采集器。这个工具可以帮助你自动收集和过滤来自知乎、博客等支持RSS的平台的文章。
实际上也是我们技术博客站点的源码
以下是细节拆解,完整的源码见:
https://github.com/llama-factory/llamafactory/blob/main/Python实现大模型文章的RSS订阅采集器.md
目录
项目概述
我们的RSS订阅采集器主要完成以下任务:
- 从YAML配置文件读取配置信息
- 从多个源获取RSS订阅
- 解析订阅并提取相关信息
- 基于相关性过滤内容
- 将收集到的数据存储到MySQL数据库中
环境准备
你需要安装以下Python包:
pip install requests PyYAML pymysql schedule tqdm
项目结构
项目主要包含两个文件:
collect.py
- 主Python脚本config.yaml
- 配置文件
配置文件示例
sources:
知乎:
- https://www.zhihu.com/people/username/posts
科学空间:
- https://spaces.ac.cn/feed
博客:
- https://example.com/feed.xml
rss_templates:
zhihu: "https://rsshub.example.com/zhihu/posts/people/{uid}"
database:
host: "localhost"
user: "username"
password: "password"
database: "dbname"
table: "rss_feed_data"
workflow:
base_url: "https://api.example.com/v1/workflows/run"
api_key: "your-api-key"
详细实现
1. 创建基础类结构
class RSSFeedCollector:
def __init__(self):
self.logger = self._setup_logger()
def _setup_logger(self):
logger = logging.getLogger("RSSFeedCollector")
logger.setLevel(logging.INFO)
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
# 使用 RotatingFileHandler 进行日志轮转
file_handler = RotatingFileHandler(
"rss_collector.log", maxBytes=5 * 1024 * 1024, backupCount=3
)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
# 同时输出到控制台
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
return logger
2. 加载配置文件
def load_config(self):
with open("config.yaml", "r") as file:
config = yaml.safe_load(file)
self.SOURCE = {
url: source_type
for source_type, urls in config["sources"].items()
for url in urls
}
self.db_config = config["database"]
self.table = self.db_config.pop("table")
self.workflow_config = config["workflow"]
3. 获取和解析RSS数据
def fetch_rss_data(self, url):
try:
response = requests.get(url)
response.raise_for_status()
return ET.fromstring(response.text)
except requests.RequestException as e:
self.logger.error(f"获取RSS数据失败: {url}, 错误: {str(e)}")
raise RSSFetchError(f"无法获取RSS数据: {url}") from e
except ET.ParseError as e:
self.logger.error(f"解析RSS XML失败: {url}, 错误: {str(e)}")
raise RSSParseError(f"无法解析RSS XML: {url}") from e
def parse_channel_info(self, channel):
return {
"channel_title": channel.findtext("title"),
"channel_link": channel.findtext("link"),
"channel_description": channel.findtext("description"),
"language": channel.findtext("language"),
}
def parse_rss_item(self, item):
pubDate = item.findtext("pubDate")
if pubDate:
try:
dt = parsedate_to_datetime(pubDate)
pubDate = dt.strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
self.logger.warning(f"无法解析日期: {pubDate}")
pubDate = None
content = item.findtext("description") or ""
description = self.smart_truncate(
self.extract_text(content), length=200, max_length=250
)
return {
"title": item.findtext("title"),
"description": description,
"link": item.findtext("link"),
"pubDate": pubDate,
"content": content,
}
4. 内容过滤
def filter_relevant_blogs(self, data_list):
results = []
for blog in tqdm(data_list, desc="过滤文章"):
is_relevant = self.check_blog_relevance(blog)
if is_relevant:
results.append(blog)
filtered_count = len(data_list) - len(results)
self.logger.info(f"过滤了 {filtered_count} 篇不相关的文章")
return results
5. 数据库操作
def insert_into_database(self, connection, cursor, data_list):
if not data_list:
self.logger.info("没有相关文章需要插入数据库")
return
insert_query = f"""
INSERT INTO {self.table}
(title, description, link, pubDate, content)
VALUES (%s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
title = VALUES(title),
description = VALUES(description),
content = VALUES(content)
"""
batch_size = 100
success_count = 0
error_count = 0
for i in range(0, len(data_list), batch_size):
batch = data_list[i:i + batch_size]
try:
cursor.executemany(insert_query, [
(item["title"], item["description"], item["link"],
item["pubDate"], item["content"])
for item in batch
])
connection.commit()
success_count += len(batch)
except Error as e:
connection.rollback()
error_count += len(batch)
self.logger.error(f"批量插入错误: {str(e)}")
self.logger.info(f"插入完成。成功: {success_count}, 失败: {error_count}")
运行说明
你可以通过以下两种方式运行采集器:
- 立即执行一次采集:
python collect.py --now
- 启动定时任务(每3天执行一次):
python collect.py
最佳实践和错误处理
-
使用日志轮转:通过
RotatingFileHandler
确保日志文件不会无限增长。 -
批量数据库操作:使用批量插入提高效率,同时在出错时进行单条重试。
-
优雅的错误处理:使用自定义异常类和try/except块处理各种可能的错误:
class RSSFetchError(Exception):
"""获取RSS数据时发生的错误"""
pass
class RSSParseError(Exception):
"""解析RSS数据时发生的错误"""
pass
- 进度显示:使用tqdm库显示处理进度,提供更好的用户体验。
进阶优化建议
- 添加代理支持:在
fetch_rss_data
中添加代理支持,避免IP被封禁:
def fetch_rss_data(self, url, proxies=None):
try:
response = requests.get(url, proxies=proxies)
response.raise_for_status()
return ET.fromstring(response.text)
except requests.RequestException as e:
self.logger.error(f"获取RSS数据失败: {url}, 错误: {str(e)}")
raise RSSFetchError(f"无法获取RSS数据: {url}") from e
- 添加重试机制:对于不稳定的网络环境,添加重试机制:
from retrying import retry
@retry(stop_max_attempt_number=3, wait_fixed=2000)
def fetch_rss_data(self, url):
# ... 原有的实现 ...
- 异步支持:使用
aiohttp
和asyncio
实现异步爬取,提高效率:
async def fetch_rss_data_async(self, url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
text = await response.text()
return ET.fromstring(text)
总结
通过这个项目,我们实现了一个功能完整的RSS订阅采集器。它不仅可以自动收集多个来源的RSS内容,还能进行内容过滤和存储。通过使用日志记录、错误处理和批量操作等最佳实践,我们确保了程序的可靠性和效率。
你可以基于这个基础实现,根据自己的需求进行定制和扩展,比如添加更多的数据源、实现更复杂的过滤逻辑,或者集成到其他系统中。
记住,在处理网络请求时要注意添加适当的延迟,遵守网站的robots.txt规则,做一个有道德的爬虫程序。同时,定期备份你的数据库,以防意外情况发生。
原文地址:https://blog.csdn.net/budahui/article/details/142798973
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!