news_pipeline_mvp.py代码:
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
import os
default_args = {"owner":"airflow","retries":1,"retry_delay":timedelta(minutes=5)}
with DAG(
dag_id="news_pipeline_mvp",
default_args=default_args,
start_date=datetime(2025,8,24),
schedule_interval="0 * * * *", # 每小时
catchup=False,
max_active_runs=1,
tags=["news-matrix"]
) as dag:
# 初始化 Mongo 索引(幂等)
mongo_indexes = BashOperator(
task_id="mongo_indexes",
bash_command="mongo --host news_mongo --eval 'load(\"/opt/project/schemas/mongo_indexes.js\")' || true"
)
# 初始化 ES 索引(幂等,pipelines 也会做一次兜底)
es_index = BashOperator(
task_id="es_index",
bash_command=(
"curl -s -o /dev/null -w '%{http_code}' http://news_es:9200/${ES_INDEX:-news} | grep -q 200 "
"|| curl -X PUT http://news_es:9200/${ES_INDEX:-news} "
"-H 'Content-Type: application/json' "
"--data-binary @/opt/project/schemas/es_mapping_news.json"
)
)
crawl_gov_bc = BashOperator(
task_id="crawl_gov_bc",
bash_command="cd /opt/project/scrapy_app && scrapy crawl gov_bc"
)
export_done = BashOperator(
task_id="export_done",
bash_command="echo 'batch done at ' $(date -u)"
)
[mongo_indexes, es_index] >> crawl_gov_bc >> export_done
一、什么是 Airflow?
Apache Airflow 是一个工作流编排平台,用于:
- 定时调度任务
- 管理任务依赖关系
- 监控任务执行状态
- 处理失败重试
二、核心概念
1. DAG (有向无环图)
- 定义工作流的结构
- 您的项目中:news_pipeline_mvp 就是一个 DAG
2. Task (任务)
- DAG 中的执行单元
- 您的项目有 4 个任务:
- mongo_indexes - 初始化 MongoDB
- es_index - 初始化 ElasticSearch
- crawl_gov_bc - 爬取新闻
- export_done - 标记完成
3. Operator (操作符)
- 定义任务做什么
- 您使用的是 BashOperator(执行 bash 命令)
4. Scheduler (调度器)
- 按时触发 DAG 运行
- 您设置的是每小时运行一次:"0 * * * *"
三、使用方式
🌐 1. Web UI 界面操作
访问 http://localhost:8080/
- 用户名:admin
- 密码:admin
主要功能:
- DAGs 页面:查看所有 DAG,开关调度
- Graph View:查看任务依赖关系图
- Tree View:查看历史执行记录
- Gantt Chart:查看任务执行时间线
- Task Instance:查看任务详情和日志
🖥️ 2. 命令行操作
# 查看所有 DAG
docker exec news_airflow airflow dags list
# 暂停/启用 DAG
docker exec news_airflow airflow dags pause news_pipeline_mvp
docker exec news_airflow airflow dags unpause news_pipeline_mvp
# 手动触发 DAG
docker exec news_airflow airflow dags trigger news_pipeline_mvp
# 查看 DAG 中的任务
docker exec news_airflow airflow tasks list news_pipeline_mvp
# 查看任务执行状态
docker exec news_airflow airflow tasks states-for-dag-run \
news_pipeline_mvp [execution_date]
# 测试单个任务(不影响数据库)
docker exec news_airflow airflow tasks test \
news_pipeline_mvp crawl_gov_bc 2025-08-25
# 查看 DAG 下次执行时间
docker exec news_airflow airflow dags next-execution news_pipeline_mvp
# 回填历史数据
docker exec news_airflow airflow dags backfill \
-s 2025-08-20 -e 2025-08-25 news_pipeline_mvp
四、您项目中的工作流
[mongo_indexes, es_index] >> crawl_gov_bc >> export_done
执行顺序:
1. mongo_indexes 和 es_index 并行执行
2. 两个都完成后,执行 crawl_gov_bc
3. 最后执行 export_done
五、监控和调试
查看日志
# 查看特定任务的日志
docker exec news_airflow airflow tasks logs \
news_pipeline_mvp crawl_gov_bc 2025-08-25
# 查看 Airflow 系统日志
docker logs news_airflow
查看爬取结果
# MongoDB 数据
docker exec news_mongo mongosh --eval \
"use newsdb; db.articles.countDocuments()"
# ElasticSearch 数据
curl "http://localhost:19200/news/_count"
# 查看导出文件
ls -la exports/
六、常见操作场景
1. 修改爬取频率
编辑 DAG 文件中的 schedule_interval:
- "0 * * * *" - 每小时
- "0 0 * * *" - 每天午夜
- "0 */6 * * *" - 每6小时
- "@daily" - 每天一次
- "@hourly" - 每小时一次
2. 添加新的爬虫
在 DAG 中添加新任务:
crawl_new_site = BashOperator(
task_id="crawl_new_site",
bash_command="cd /opt/project/scrapy_app && scrapy crawl new_spider"
)
3. 处理失败
DAG 已配置自动重试:
- retries: 1 - 失败后重试1次
- retry_delay: 5分钟 - 等待5分钟后重试
4. 清理失败的任务
# 清理失败的任务实例
docker exec news_airflow airflow tasks clear \
news_pipeline_mvp -s 2025-08-25 -e 2025-08-26
七、最佳实践
1. 幂等性:任务应该可以安全地重复执行
2. 原子性:每个任务应该是独立的工作单元
3. 监控告警:定期检查 Web UI 查看失败任务
4. 资源管理:避免任务占用过多资源
5. 版本控制:DAG 文件应该纳入 Git 管理
八、故障排查
如果任务失败:
1. 在 Web UI 查看任务日志
2. 检查容器是否正常运行
3. 验证网络连接(MongoDB/ES)
4. 检查磁盘空间
5. 查看 Airflow 系统日志
您现在可以通过 Web UI 或命令行完全控制新闻爬取流程了!