데이터엔지니어 군고구마
  • Airflow 실패여부 slack알람으로 받기 (python)
    2019년 12월 04일 16시 26분 22초에 업로드 된 글입니다.
    작성자: DE 군고구마
    반응형

    Airflow는 ETL스케줄링 오픈소스로 많은 분들이 사용하고 있습니다. 하지만 오픈소스라서 여러가지 불편점이 있는데요.

    그 중 알람의 경우도 그렇습니다. 물론 Dag에 slack 알람을 받을 수 있도록 설정이 가능 한데요.

    이 경우 매우 불편하게 하나씩 설정해야 하는 경우가 있습니다.

     

    그래서 Airflow의 postgreSQL에 있는 데이터를 기반으로 slack 알람을 받을 수 있는 python 스크립트를 만들어봤습니다.

    우선 세팅이 필요한데요. slack으로 알람을 받기 위해서는 2가지가 필요합니다.

    • slack channel ID
    • slack webhook url 

     

    우선 channel ID의 경우 다음과 같이 받을 수 있습니다.

     

    slack을 웹으로 접속 할 경우 위에 url을 확인 할 수 있는데요. 다음과 같이 보입니다.

    https://app.slack.com/client/AAAAAA/CCCCCC  

    client/고유ID/채널ID  

     

    위와 같이 보시면 됩니다. 가장 마지막의 부분이 channel ID 입니다. 이를 복사하여 저장해 둡니다.

     

    그 다음으로 slack webhook url이 있는데요. 자세하게 설명된 블로그가 있어서 링크를 걸어 드립니다.

    https://zeddios.tistory.com/123

     

    왕초보를 위한 Slack webHooks 사용법(incoming webhooks)

    안녕하세요 :) 오늘은 외부에서 Slack(이하 슬랙)에 메세지를 보내는 방법을 알아볼거에요 :) 정말 과정 하나하나 같이 해볼거니까, 차근차근 따라하시면 슬랙에 메세지가 뿅 하고 뜰거에요 XD 그럼 시작할게요!..

    zeddios.tistory.com

     

     

    모든 준비가 끝났으면 이제 코드를 확인해 보겠습니다. 코드는 대단히 간단합니다. 

    postgreSQL에서 테이블을 읽어서 slack으로 전송하는 일밖에 없습니다. 몇군데 하드코딩한 것도 있습니다.

     

    우선 앞에서 발급 받은 코드를 넣어야 합니다. 

    또한 얼마 주기로 가져올지도 설정 해야합니다. 아래에서 설정 할 수 있습니다.

    1
    2
    3
    4
    5
    6
    7
    #slack code here
    slack_channel_id = "slack channel id"
    slack_webhook_url = "slack webhook url"
     
    #slcak alram time here
    start_date = (datetime.now()-timedelta(hours=1)+timedelta(hours=9)).strftime('%Y-%m-%d %H:00')
    end_date = (datetime.now()+timedelta(days=1)+timedelta(hours=9)).strftime('%Y-%m-%d %H:00')
    cs

     

    중요한것은 아래의 테이블을 가져오는 부분입니다. postgreSQL에 접속해서 다음의 Query를 기반으로 데이터를 가져 옵니다.

    1
    2
    3
    4
    5
    6
    select_Query = ("select a.task_id,a.dag_id,a.execution_date,a.start_date,a.end_date,a.duration,a.state,a.try_number,a.job_id,b.owners"
                   "from task_instance as a"
                   "Inner join dag as b"
                   "On a.dag_id = b.dag_id"
                   "where state ='failed' and "+'end_date >='+"'"+start_date+"'"+' and '+'end_date < '+"'"+end_date+"'"
                   )
    cs

     

    이것을 SQL로 보면 실제로 postgreSQL에 다음과 같이 select 합니다.

    1
    2
    3
    4
    5
    select a.task_id,a.dag_id,a.execution_date,a.start_date,a.end_date,a.duration,a.state,a.try_number,a.job_id,b.owners
    from task_instance as a
    Inner join dag as b
    On a.dag_id = b.dag_id
    where state ='failed' and end_date >= '2019-12-04 00:00' and end_date < '2019-12-04 02:00'
    cs

     

    select하면 결과가 ...

    안나옵니다. 해당 시간에 실제한 것이 없네요. 쿼리를 살짝 변경해서 failed가 아닌 전부 가져오도록 하겠습니다.

    정상적으로 데이터가 나옵니다. 위의 내용을 읽어서 다음과 같이 slack 메시지를 만듭니다.

    * 주의 하실 부분은 위의 테이블은 갱신형입니다. 갱신이 되므로, 실패 후에 다시 재처리하여 성공할 경우 값이 변경됩니다. (성공으로 변경되고 try_number(시도횟수) 부분의 count가 증가 합니다. )

    * 혹시 이 부분을 기록하시고 싶으실 경우 따로 데이터베이스를 만들어서 저장해야 합니다.

     

    아래의 error_message 부분이 최종적으로 보여지는 slack 메시지 입니다. 

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    # select postgreSQL row
    for row in records:
        dag_name = row[0]
        Task_name = row[1]
        execution_time = str(row[2])
        duration = str(row[3])
        try_number = str(row[4])
        date_flag = row[5]
        execution_date= str(row[6]-timedelta(hours=9))
        str_execution_date = str(str(execution_date)[0:10])+"T"+str(str(execution_date)[11:13])
        print((dag_name,Task_name,execution_time,duration,try_number,date_flag,str_execution_date))
     
        url = ("http://IP address/admin/airflow/log?"
                +"task_id="+Task_name+"&"
                +"dag_id="+dag_name+"&"
                +"execution_date="+str_execution_date+"%3A00%3A00%2B00%3A00&format=json"
                )
     
        #error message create
        error_message = (":ddube-smile::bangbang:"+" *Airflow Error* :bangbang:\n"
                        +"```"
                        +"Dag_name : "+dag_name+"\n"
                        +"Task_name : "+Task_name+"\n"
                        +"Execution time : "+execution_time+"\n"
                        +"Duration : "+duration+"\n"
                        +"Try_number : "+try_number+"\n"
                        +"URL : "+url+"\n"
                        +"```"
        )
    cs

     

    위의 보시면 url 부분이 있는데요. 이 부분의 경우 클릭하면 해당 오류가 발생한 airflow 로그로 자동 연결 됩니다.

    규칙은 간단합니다. 위의 내용을 참고하셔서 실제 airflow의 로그를 클릭하시고 url을 보시면 단번에 이해가 되실 겁니다.

     

    이 메시지를 실제로 보면 다음과 같이 나옵니다.

     

    전체적인 코드는 아래에서 참고 부탁드립니다.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    import psycopg2
    import json
    import requests
    from datetime import datetime, timedelta
     
    #slack code here
    slack_channel_id = "slack channel id"
    slack_webhook_url = "slack webhook url"
     
    #slcak alram time here
    start_date = (datetime.now()-timedelta(hours=1)+timedelta(hours=9)).strftime('%Y-%m-%d %H:00')
    end_date = (datetime.now()+timedelta(days=1)+timedelta(hours=9)).strftime('%Y-%m-%d %H:00')
     
    if __name__ == '__main__':
        try:
            # connection postgreSQL
            # ====================================================================================================
            pcon = psycopg2.connect(user="user",password="password",host="IP",port="5432",database="airflow")
            pcursor = pcon.cursor()
            # Dag_log table select query  
            select_Query = ("select a.task_id,a.dag_id,a.execution_date,a.start_date,a.end_date,a.duration,a.state,a.try_number,a.job_id,b.owners"
                            "from task_instance as a"
                            "Inner join dag as b"
                            "On a.dag_id = b.dag_id"
                            "where state ='failed' and "+'end_date >='+"'"+start_date+"'"+' and '+'end_date < '+"'"+end_date+"'"
                                  )
            print(select_Query)
     
            pcursor.execute(select_Query)
            records = pcursor.fetchall() 
            # ====================================================================================================
            # select postgreSQL row
            for row in records:
                dag_name = row[0]
                Task_name = row[1]
                execution_time = str(row[2])
                duration = str(row[3])
                try_number = str(row[4])
                date_flag = row[5]
                execution_date= str(row[6]-timedelta(hours=9))
                str_execution_date = str(str(execution_date)[0:10])+"T"+str(str(execution_date)[11:13])
                print((dag_name,Task_name,execution_time,duration,try_number,date_flag,str_execution_date))
     
                url = ("http://IP address/admin/airflow/log?"
                        +"task_id="+Task_name+"&"
                        +"dag_id="+dag_name+"&"
                        +"execution_date="+str_execution_date+"%3A00%3A00%2B00%3A00&format=json"
                        )
     
                #error message create
                error_message = (":ddube-smile::bangbang:"+" *Airflow Error* :bangbang:\n"
                                +"```"
                                +"Dag_name : "+dag_name+"\n"
                                +"Task_name : "+Task_name+"\n"
                                +"Execution time : "+execution_time+"\n"
                                +"Duration : "+duration+"\n"
                                +"Try_number : "+try_number+"\n"
                                +"URL : "+url+"\n"
                                +"```"
                )  
                #slack message send 
                payload = {"channel": slack_channel_id, "text": error_message, "username":"Airflow","icon_emoji":"false"}
                requests.post(slack_webhook_url, data=json.dumps(payload), headers={'Content-Type''application/json'})
        except (Exception, psycopg2.Error) as error :
            print ("Error", error)
     
        finally:
            # Mysql closing database connection.
            if(pcon):
                pcursor.close()
                pcon.close()
                now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                print('finally time :' ,now)
     
    cs

     

    아래에 파일도 같이 첨부 드립니다.

    slack.py
    0.00MB

    반응형
    댓글