Airflow 完整使用指南

news_pipeline_mvp.py代码:

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
import os

default_args = {"owner":"airflow","retries":1,"retry_delay":timedelta(minutes=5)}

with DAG(
    dag_id="news_pipeline_mvp",
    default_args=default_args,
    start_date=datetime(2025,8,24),
    schedule_interval="0 * * * *",   # 每小时
    catchup=False,
    max_active_runs=1,
    tags=["news-matrix"]
) as dag:

    # 初始化 Mongo 索引(幂等)
    mongo_indexes = BashOperator(
        task_id="mongo_indexes",
        bash_command="mongo --host news_mongo --eval 'load(\"/opt/project/schemas/mongo_indexes.js\")' || true"
    )

    # 初始化 ES 索引(幂等,pipelines 也会做一次兜底)
    es_index = BashOperator(
        task_id="es_index",
        bash_command=(
        "curl -s -o /dev/null -w '%{http_code}' http://news_es:9200/${ES_INDEX:-news} | grep -q 200 "
        "|| curl -X PUT http://news_es:9200/${ES_INDEX:-news} "
        "-H 'Content-Type: application/json' "
        "--data-binary @/opt/project/schemas/es_mapping_news.json"
    )
    )

    crawl_gov_bc = BashOperator(
        task_id="crawl_gov_bc",
        bash_command="cd /opt/project/scrapy_app && scrapy crawl gov_bc"
    )

    export_done = BashOperator(
        task_id="export_done",
        bash_command="echo 'batch done at ' $(date -u)"
    )

    [mongo_indexes, es_index] >> crawl_gov_bc >> export_done

一、什么是 Airflow?

Apache Airflow 是一个工作流编排平台,用于:

- 定时调度任务

- 管理任务依赖关系

- 监控任务执行状态

- 处理失败重试

二、核心概念

1. DAG (有向无环图)

- 定义工作流的结构

- 您的项目中:news_pipeline_mvp 就是一个 DAG

2. Task (任务)

- DAG 中的执行单元

- 您的项目有 4 个任务:

- mongo_indexes - 初始化 MongoDB

- es_index - 初始化 ElasticSearch

- crawl_gov_bc - 爬取新闻

- export_done - 标记完成

3. Operator (操作符)

- 定义任务做什么

- 您使用的是 BashOperator(执行 bash 命令)

4. Scheduler (调度器)

- 按时触发 DAG 运行

- 您设置的是每小时运行一次:"0 * * * *"

三、使用方式

🌐 1. Web UI 界面操作

访问 http://localhost:8080/

- 用户名:admin

- 密码:admin

主要功能:

- DAGs 页面:查看所有 DAG,开关调度

- Graph View:查看任务依赖关系图

- Tree View:查看历史执行记录

- Gantt Chart:查看任务执行时间线

- Task Instance:查看任务详情和日志

🖥️ 2. 命令行操作

# 查看所有 DAG

docker exec news_airflow airflow dags list

# 暂停/启用 DAG

docker exec news_airflow airflow dags pause news_pipeline_mvp

docker exec news_airflow airflow dags unpause news_pipeline_mvp

# 手动触发 DAG

docker exec news_airflow airflow dags trigger news_pipeline_mvp

# 查看 DAG 中的任务

docker exec news_airflow airflow tasks list news_pipeline_mvp

# 查看任务执行状态

docker exec news_airflow airflow tasks states-for-dag-run \

news_pipeline_mvp [execution_date]

# 测试单个任务(不影响数据库)

docker exec news_airflow airflow tasks test \

news_pipeline_mvp crawl_gov_bc 2025-08-25

# 查看 DAG 下次执行时间

docker exec news_airflow airflow dags next-execution news_pipeline_mvp

# 回填历史数据

docker exec news_airflow airflow dags backfill \

-s 2025-08-20 -e 2025-08-25 news_pipeline_mvp

四、您项目中的工作流

[mongo_indexes, es_index] >> crawl_gov_bc >> export_done

执行顺序:

1. mongo_indexes 和 es_index 并行执行

2. 两个都完成后,执行 crawl_gov_bc

3. 最后执行 export_done

五、监控和调试

查看日志

# 查看特定任务的日志

docker exec news_airflow airflow tasks logs \

news_pipeline_mvp crawl_gov_bc 2025-08-25

# 查看 Airflow 系统日志

docker logs news_airflow

查看爬取结果

# MongoDB 数据

docker exec news_mongo mongosh --eval \

"use newsdb; db.articles.countDocuments()"

# ElasticSearch 数据

curl "http://localhost:19200/news/_count"

# 查看导出文件

ls -la exports/

六、常见操作场景

1. 修改爬取频率

编辑 DAG 文件中的 schedule_interval:

- "0 * * * *" - 每小时

- "0 0 * * *" - 每天午夜

- "0 */6 * * *" - 每6小时

- "@daily" - 每天一次

- "@hourly" - 每小时一次

2. 添加新的爬虫

在 DAG 中添加新任务:

crawl_new_site = BashOperator(

task_id="crawl_new_site",

bash_command="cd /opt/project/scrapy_app && scrapy crawl new_spider"

)

3. 处理失败

DAG 已配置自动重试:

- retries: 1 - 失败后重试1次

- retry_delay: 5分钟 - 等待5分钟后重试

4. 清理失败的任务

# 清理失败的任务实例

docker exec news_airflow airflow tasks clear \

news_pipeline_mvp -s 2025-08-25 -e 2025-08-26

七、最佳实践

1. 幂等性:任务应该可以安全地重复执行

2. 原子性:每个任务应该是独立的工作单元

3. 监控告警:定期检查 Web UI 查看失败任务

4. 资源管理:避免任务占用过多资源

5. 版本控制:DAG 文件应该纳入 Git 管理

八、故障排查

如果任务失败:

1. 在 Web UI 查看任务日志

2. 检查容器是否正常运行

3. 验证网络连接(MongoDB/ES)

4. 检查磁盘空间

5. 查看 Airflow 系统日志

您现在可以通过 Web UI 或命令行完全控制新闻爬取流程了!