새소식

반응형
Data pipeline/Airflow

Airflow 실패여부 slack알람으로 받기 (python)

  • -
반응형

Airflow는 ETL스케줄링 오픈소스로 많은 분들이 사용하고 있습니다. 하지만 오픈소스라서 여러가지 불편점이 있는데요.

그 중 알람의 경우도 그렇습니다. 물론 Dag에 slack 알람을 받을 수 있도록 설정이 가능 한데요.

이 경우 매우 불편하게 하나씩 설정해야 하는 경우가 있습니다.

 

그래서 Airflow의 postgreSQL에 있는 데이터를 기반으로 slack 알람을 받을 수 있는 python 스크립트를 만들어봤습니다.

우선 세팅이 필요한데요. slack으로 알람을 받기 위해서는 2가지가 필요합니다.

  • slack channel ID
  • slack webhook url 

 

우선 channel ID의 경우 다음과 같이 받을 수 있습니다.

 

slack을 웹으로 접속 할 경우 위에 url을 확인 할 수 있는데요. 다음과 같이 보입니다.

https://app.slack.com/client/AAAAAA/CCCCCC  

client/고유ID/채널ID  

 

위와 같이 보시면 됩니다. 가장 마지막의 부분이 channel ID 입니다. 이를 복사하여 저장해 둡니다.

 

그 다음으로 slack webhook url이 있는데요. 자세하게 설명된 블로그가 있어서 링크를 걸어 드립니다.

https://zeddios.tistory.com/123

 

왕초보를 위한 Slack webHooks 사용법(incoming webhooks)

안녕하세요 :) 오늘은 외부에서 Slack(이하 슬랙)에 메세지를 보내는 방법을 알아볼거에요 :) 정말 과정 하나하나 같이 해볼거니까, 차근차근 따라하시면 슬랙에 메세지가 뿅 하고 뜰거에요 XD 그럼 시작할게요!..

zeddios.tistory.com

 

 

모든 준비가 끝났으면 이제 코드를 확인해 보겠습니다. 코드는 대단히 간단합니다. 

postgreSQL에서 테이블을 읽어서 slack으로 전송하는 일밖에 없습니다. 몇군데 하드코딩한 것도 있습니다.

 

우선 앞에서 발급 받은 코드를 넣어야 합니다. 

또한 얼마 주기로 가져올지도 설정 해야합니다. 아래에서 설정 할 수 있습니다.

1
2
3
4
5
6
7
#slack code here
slack_channel_id = "slack channel id"
slack_webhook_url = "slack webhook url"
 
#slcak alram time here
start_date = (datetime.now()-timedelta(hours=1)+timedelta(hours=9)).strftime('%Y-%m-%d %H:00')
end_date = (datetime.now()+timedelta(days=1)+timedelta(hours=9)).strftime('%Y-%m-%d %H:00')
cs

 

중요한것은 아래의 테이블을 가져오는 부분입니다. postgreSQL에 접속해서 다음의 Query를 기반으로 데이터를 가져 옵니다.

1
2
3
4
5
6
select_Query = ("select a.task_id,a.dag_id,a.execution_date,a.start_date,a.end_date,a.duration,a.state,a.try_number,a.job_id,b.owners"
               "from task_instance as a"
               "Inner join dag as b"
               "On a.dag_id = b.dag_id"
               "where state ='failed' and "+'end_date >='+"'"+start_date+"'"+' and '+'end_date < '+"'"+end_date+"'"
               )
cs

 

이것을 SQL로 보면 실제로 postgreSQL에 다음과 같이 select 합니다.

1
2
3
4
5
select a.task_id,a.dag_id,a.execution_date,a.start_date,a.end_date,a.duration,a.state,a.try_number,a.job_id,b.owners
from task_instance as a
Inner join dag as b
On a.dag_id = b.dag_id
where state ='failed' and end_date >= '2019-12-04 00:00' and end_date < '2019-12-04 02:00'
cs

 

select하면 결과가 ...

안나옵니다. 해당 시간에 실제한 것이 없네요. 쿼리를 살짝 변경해서 failed가 아닌 전부 가져오도록 하겠습니다.

정상적으로 데이터가 나옵니다. 위의 내용을 읽어서 다음과 같이 slack 메시지를 만듭니다.

* 주의 하실 부분은 위의 테이블은 갱신형입니다. 갱신이 되므로, 실패 후에 다시 재처리하여 성공할 경우 값이 변경됩니다. (성공으로 변경되고 try_number(시도횟수) 부분의 count가 증가 합니다. )

* 혹시 이 부분을 기록하시고 싶으실 경우 따로 데이터베이스를 만들어서 저장해야 합니다.

 

아래의 error_message 부분이 최종적으로 보여지는 slack 메시지 입니다. 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# select postgreSQL row
for row in records:
    dag_name = row[0]
    Task_name = row[1]
    execution_time = str(row[2])
    duration = str(row[3])
    try_number = str(row[4])
    date_flag = row[5]
    execution_date= str(row[6]-timedelta(hours=9))
    str_execution_date = str(str(execution_date)[0:10])+"T"+str(str(execution_date)[11:13])
    print((dag_name,Task_name,execution_time,duration,try_number,date_flag,str_execution_date))
 
    url = ("http://IP address/admin/airflow/log?"
            +"task_id="+Task_name+"&"
            +"dag_id="+dag_name+"&"
            +"execution_date="+str_execution_date+"%3A00%3A00%2B00%3A00&format=json"
            )
 
    #error message create
    error_message = (":ddube-smile::bangbang:"+" *Airflow Error* :bangbang:\n"
                    +"```"
                    +"Dag_name : "+dag_name+"\n"
                    +"Task_name : "+Task_name+"\n"
                    +"Execution time : "+execution_time+"\n"
                    +"Duration : "+duration+"\n"
                    +"Try_number : "+try_number+"\n"
                    +"URL : "+url+"\n"
                    +"```"
    )
cs

 

위의 보시면 url 부분이 있는데요. 이 부분의 경우 클릭하면 해당 오류가 발생한 airflow 로그로 자동 연결 됩니다.

규칙은 간단합니다. 위의 내용을 참고하셔서 실제 airflow의 로그를 클릭하시고 url을 보시면 단번에 이해가 되실 겁니다.

 

이 메시지를 실제로 보면 다음과 같이 나옵니다.

 

전체적인 코드는 아래에서 참고 부탁드립니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import psycopg2
import json
import requests
from datetime import datetime, timedelta
 
#slack code here
slack_channel_id = "slack channel id"
slack_webhook_url = "slack webhook url"
 
#slcak alram time here
start_date = (datetime.now()-timedelta(hours=1)+timedelta(hours=9)).strftime('%Y-%m-%d %H:00')
end_date = (datetime.now()+timedelta(days=1)+timedelta(hours=9)).strftime('%Y-%m-%d %H:00')
 
if __name__ == '__main__':
    try:
        # connection postgreSQL
        # ====================================================================================================
        pcon = psycopg2.connect(user="user",password="password",host="IP",port="5432",database="airflow")
        pcursor = pcon.cursor()
        # Dag_log table select query  
        select_Query = ("select a.task_id,a.dag_id,a.execution_date,a.start_date,a.end_date,a.duration,a.state,a.try_number,a.job_id,b.owners"
                        "from task_instance as a"
                        "Inner join dag as b"
                        "On a.dag_id = b.dag_id"
                        "where state ='failed' and "+'end_date >='+"'"+start_date+"'"+' and '+'end_date < '+"'"+end_date+"'"
                              )
        print(select_Query)
 
        pcursor.execute(select_Query)
        records = pcursor.fetchall() 
        # ====================================================================================================
        # select postgreSQL row
        for row in records:
            dag_name = row[0]
            Task_name = row[1]
            execution_time = str(row[2])
            duration = str(row[3])
            try_number = str(row[4])
            date_flag = row[5]
            execution_date= str(row[6]-timedelta(hours=9))
            str_execution_date = str(str(execution_date)[0:10])+"T"+str(str(execution_date)[11:13])
            print((dag_name,Task_name,execution_time,duration,try_number,date_flag,str_execution_date))
 
            url = ("http://IP address/admin/airflow/log?"
                    +"task_id="+Task_name+"&"
                    +"dag_id="+dag_name+"&"
                    +"execution_date="+str_execution_date+"%3A00%3A00%2B00%3A00&format=json"
                    )
 
            #error message create
            error_message = (":ddube-smile::bangbang:"+" *Airflow Error* :bangbang:\n"
                            +"```"
                            +"Dag_name : "+dag_name+"\n"
                            +"Task_name : "+Task_name+"\n"
                            +"Execution time : "+execution_time+"\n"
                            +"Duration : "+duration+"\n"
                            +"Try_number : "+try_number+"\n"
                            +"URL : "+url+"\n"
                            +"```"
            )  
            #slack message send 
            payload = {"channel": slack_channel_id, "text": error_message, "username":"Airflow","icon_emoji":"false"}
            requests.post(slack_webhook_url, data=json.dumps(payload), headers={'Content-Type''application/json'})
    except (Exception, psycopg2.Error) as error :
        print ("Error", error)
 
    finally:
        # Mysql closing database connection.
        if(pcon):
            pcursor.close()
            pcon.close()
            now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            print('finally time :' ,now)
 
cs

 

아래에 파일도 같이 첨부 드립니다.

slack.py
0.00MB

반응형
Contents

포스팅 주소를 복사했습니다

이 글이 도움이 되었다면 공감 부탁드립니다.