Airflow 从入门到精通-04-增强 DAG 健壮性
一、使用airflow发送邮件
1、配置SMTP邮件服务
$ cd /root/airflow
$ vi airflow.cfg
修改配置文件 airflow.cfg email smtp 部分 :
...
[email]
# Configuration email backend and whether to
# send email alerts on retry or failure
# Email backend to use
email_backend = airflow.utils.email.send_email_smtp
# Whether email alerts should be sent when a task is retried
default_email_on_retry = True
# Whether email alerts should be sent when a task failed
default_email_on_failure = True
# File that will be used as the template for Email subject (which will be rendered using Jinja2).
# If not set, Airflow uses a base template.
# Example: subject_template = /path/to/my_subject_template_file
# subject_template =
# File that will be used as the template for Email content (which will be rendered using Jinja2).
# If not set, Airflow uses a base template.
# Example: html_content_template = /path/to/my_html_content_template_file
# html_content_template =
[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = smtp.126.com
smtp_starttls = False
smtp_ssl = False
# Example: smtp_user = airflow
smtp_user = demo@126.com
# Example: smtp_password = airflow
smtp_password = demosjiofjoisdji
smtp_port = 25
smtp_mail_from = demo@126.com
smtp_timeout = 30
smtp_retry_limit = 5
[sentry]
# Sentry (https://docs.sentry.io) integration. Here you can supply
# additional configuration options based on the Python platform. See:
# https://docs.sentry.io/error-reporting/configuration/?platform=python.
...
2、测试邮件服务
保存之后,然后在 download_stock_price_v2.py 加入email任务
# 引入EmailOperator
from airflow.operators.email import EmailOperator
...
email_task = EmailOperator(
task_id='send_email',
to='407544577@qq.com',
subject='Stock Price is downloaded',
html_content="""<h2>Airflow Email Test.</h2>""",
dag=dag
)
download_task >> save_to_mysql_task >> mysql_task >> email_task
杀掉airflow进程,重启:
# 批量杀进程
ps -ef | grep airflow | grep -v grep | awk '{print "kill -9 "$2}' | sh
# 删除相关的pid文件,否则重启会报错
[root@quant airflow]# rm -rf airflow-webserver-monitor.pid
[root@quant airflow]# rm -rf airflow-scheduler.*
[root@quant airflow]# rm -rf airflow-webserver.*
# 重启
# 启动web服务
airflow webserver -p 8080 -D
# 启动定时任务
airflow scheduler -D
然后手动触发执行DAG,可以看到,sendmail 邮件任务执行成功,邮箱也可以收到该邮件 ^_^

查看该任务日志:
查看邮件:
注意,在设置邮箱时,一定要检查配置,如果报如下错误:
smtplib.SMTPNotSupportedError: STARTTLS extension not supported by server
则在设置 smtp 配置时,需要将smtp_starttls = False 设置为 False 。
二、执行失败邮件提示
如果在任务执行失败时,需要邮件提示,则可在默认参数进行配置
default_args = {
'owner': 'kaiyi',
'email': ['407544577@qq.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(seconds=30)
}
然后修改保存的路径为不存在的路径,人为的制造一个bug,看下airflow是否会发送一个失败的邮件:
然后手动触发,任务报错:

可以看到,已经收到该任务报错的邮件了
三、关于重跑DagRun的配置
如果同时跑多个并行的任务,可能会造成死锁,所以,在一次 run DAG 的时候,需要设置如下参数catchup=False,max_active_runs=1:
# [START instantiate_dag]
with DAG(
dag_id='download_stock_price_v2',
default_args=default_args,
description='download stock price and save to local csv files and save to database',
schedule_interval=None,
start_date=days_ago(2),
tags=['quantdata'],
catchup=False, # 新增,一个任务执行完了再执行下一个DAG任务
max_active_runs=1, # 一个任务执行完了再执行下一个DAG任务
) as dag:
完整 download_stock_price_v2.py 文件:
"""Example DAG demonstrating the usage of the BashOperator."""
from datetime import timedelta
from textwrap import dedent
import yfinance as yf
import mysql.connector
from airflow import DAG
from airflow.operators.python import PythonOperator
# from airflow.operators.mysql_operator import MySqlOperator
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.operators.email import EmailOperator
from airflow.utils.dates import days_ago
from airflow.models import Variable
def download_price(*args, **context):
stock_list = get_tickers(context)
# 新增正常的股票(没有退市的或不存在的)
valid_tickers = []
for ticker in stock_list:
dat = yf.Ticker(ticker)
hist = dat.history(period="1mo")
# print(type(hist))
# print(hist.shape)
# print(os.getcwd())
if hist.shape[0] > 0:
valid_tickers.append(ticker)
else:
continue
with open(get_file_path(ticker), 'w') as writer:
hist.to_csv(writer, index=True)
print("Finished downloading price data for " + ticker)
# 增加返回值(用于任务之间数据的传递)
return valid_tickers
def get_file_path(ticker):
# NOT SAVE in distributed system
return f'./{ticker}.csv'
def load_price_data(ticker):
with open(get_file_path(ticker), 'r') as reader:
lines = reader.readlines()
return [[ticker] + line.split(',')[:5] for line in lines if line[:4] != 'Date']
def get_tickers(context):
# 获取配置的变量Variables
stock_list = Variable.get("stock_list_json", deserialize_json=True)
# 如果有配置参数,则使用配置参数的数据(Trigger DAG with parameters)
stocks = context["dag_run"].conf.get("stocks")
if stocks:
stock_list = stocks
return stock_list
def save_to_mysql_stage(*args, **context):
# tickers = get_tickers(context)
# Pull the return_value XCOM from "pulling_task"
tickers = context['ti'].xcom_pull(task_ids='download_prices')
print(f"received tickers:{tickers}")
"""
# 连接数据库(硬编码方式连接)
mydb = mysql.connector.connect(
host="98.12.13.14",
user="root",
password="Quan988",
database="demodb",
port=3306
)
"""
# 使用airflow 的 Connections 动态获取配置信息
from airflow.hooks.base_hook import BaseHook
conn = BaseHook.get_connection('demodb')
mydb = mysql.connector.connect(
host=conn.host,
user=conn.login,
password=conn.password,
database=conn.schema,
port=conn.port
)
mycursor = mydb.cursor()
for ticker in tickers:
val = load_price_data(ticker)
print(f"{ticker} length={len(val)} {val[1]}")
sql = """INSERT INTO stock_prices_stage
(ticker, as_of_date, open_price, high_price, low_price, close_price)
VALUES (%s,%s,%s,%s,%s,%s)"""
mycursor.executemany(sql, val)
mydb.commit()
print(mycursor.rowcount, "record inserted.")
default_args = {
'owner': 'kaiyi',
'email': ['407544577@qq.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(seconds=30)
}
# [START instantiate_dag]
with DAG(
dag_id='download_stock_price_v2',
default_args=default_args,
description='download stock price and save to local csv files and save to database',
schedule_interval=None,
start_date=days_ago(2),
tags=['quantdata'],
catchup=False, # 新增,一个任务执行完了再执行下一个DAG任务
max_active_runs=1, # 一个任务执行完了再执行下一个DAG任务
) as dag:
# [END instantiate_dag]
dag.doc_md = """
This DAG download stock price
"""
download_task = PythonOperator(
task_id="download_prices",
python_callable=download_price,
provide_context=True
)
save_to_mysql_task = PythonOperator(
task_id="save_to_database",
python_callable=save_to_mysql_stage,
provide_context=True
)
mysql_task = MySqlOperator(
task_id="merge_stock_price",
mysql_conn_id='demodb',
sql="merge_stock_price.sql",
dag=dag,
)
email_task = EmailOperator(
task_id='send_email',
to='407544577@qq.com',
subject='Stock Price is downloaded',
html_content="""<h2>Airflow Email Test.</h2>""",
dag=dag
)
download_task >> save_to_mysql_task >> mysql_task >> email_task
为者常成,行者常至
自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)