airflow
-
안녕하세요. 주형권입니다. 최근에 개발을 다하고 시간이 조금 남아서 개발하면서 몇 가지 만났던 부분에 대해서 글을 많이 쓰게 되었습니다. 지금은 ELT 파이프라인을 개발하고 있는데요. 저 같은 경우 주로 Airflow를 통해서 스케줄 처리를 하고, PythonOperator를 이용해서 Python Class를 불러와서 데이터를 처리하는 구조로 ELT 파이프라인을 만듭니다. 이렇게 만들면 제가 입맛데로 원하는 가공을 할 수도 있고 여러 가지 기능을 제가 원하는 방향으로 넣을 수 있어서 PythonOperator를 선호하고 있습니다. 이 ELT 파이프라인은 조만간 정리하여 글을 쓰도록 하겠습니다. (거의 완성되었습니다.) 무엇을 하려고 하였는가? Airflow에 Connection에 Oracle 서버를 등록..
[Airflow] oracle connection 등록 후 python으로 부르기안녕하세요. 주형권입니다. 최근에 개발을 다하고 시간이 조금 남아서 개발하면서 몇 가지 만났던 부분에 대해서 글을 많이 쓰게 되었습니다. 지금은 ELT 파이프라인을 개발하고 있는데요. 저 같은 경우 주로 Airflow를 통해서 스케줄 처리를 하고, PythonOperator를 이용해서 Python Class를 불러와서 데이터를 처리하는 구조로 ELT 파이프라인을 만듭니다. 이렇게 만들면 제가 입맛데로 원하는 가공을 할 수도 있고 여러 가지 기능을 제가 원하는 방향으로 넣을 수 있어서 PythonOperator를 선호하고 있습니다. 이 ELT 파이프라인은 조만간 정리하여 글을 쓰도록 하겠습니다. (거의 완성되었습니다.) 무엇을 하려고 하였는가? Airflow에 Connection에 Oracle 서버를 등록..
2023.07.20 -
안녕하세요. 해당 글을 MySQL 문법을 사용 하였습니다. 지난번에 Query를 통해서 Airflow의 스케줄 시간을 파싱 하는 글을 작성하였는데요. ( 바로가기 ) 이번에는 파싱하여 일반 사용자가 보기 편하게 나타내는 Query를 만들었습니다. 위와 같이 Crontab시간을 사용자가 보기 편한 형태로 보여 주도록 하였습니다. 아무래도 규치적을 주고 파싱을 하다 보니 영어 표현과 어순이 조금 어색하거나 안 맞는 경우가 있습니다. 이 부분은 감안해 주세요. (도저히.. 어떻게 바꿀지 모르겠습니다.) SELECT schedule_interval ,CASE WHEN schedule_interval = '"@once"' THEN 'Schedule once and only once' WHEN schedule_in..
[Airflow] 스케줄 시간 (Crontab) Query 파싱안녕하세요. 해당 글을 MySQL 문법을 사용 하였습니다. 지난번에 Query를 통해서 Airflow의 스케줄 시간을 파싱 하는 글을 작성하였는데요. ( 바로가기 ) 이번에는 파싱하여 일반 사용자가 보기 편하게 나타내는 Query를 만들었습니다. 위와 같이 Crontab시간을 사용자가 보기 편한 형태로 보여 주도록 하였습니다. 아무래도 규치적을 주고 파싱을 하다 보니 영어 표현과 어순이 조금 어색하거나 안 맞는 경우가 있습니다. 이 부분은 감안해 주세요. (도저히.. 어떻게 바꿀지 모르겠습니다.) SELECT schedule_interval ,CASE WHEN schedule_interval = '"@once"' THEN 'Schedule once and only once' WHEN schedule_in..
2023.05.03 -
안녕하세요. 최근 이직으로 인해서 오랜만에 인사를 드립니다. 이직을 하면서 새로운 환경에서 새로운 데이터 파이프라인을 만들다 보니 처음 접하는 도구를 사용하는 일이 많아졌습니다. GCP에 새로운 기능이 많이 생겼고 인원이 없는 경우 Saas를 이용해서 빠르게 무언가를 구축해야 하는 경우가 있어서 이번에는 GCP의 Composer를 사용하게 되었습니다. Composer란? https://cloud.google.com/composer/docs/composer-2/run-apache-airflow-dag 빠른 시작: Cloud Composer 2에서 Apache Airflow DAG 실행 | Google Cloud Cloud Composer 환경을 만들고 Cloud Composer 2에서 Apache Airf..
[GCP] Composer를 Git repo에 연동안녕하세요. 최근 이직으로 인해서 오랜만에 인사를 드립니다. 이직을 하면서 새로운 환경에서 새로운 데이터 파이프라인을 만들다 보니 처음 접하는 도구를 사용하는 일이 많아졌습니다. GCP에 새로운 기능이 많이 생겼고 인원이 없는 경우 Saas를 이용해서 빠르게 무언가를 구축해야 하는 경우가 있어서 이번에는 GCP의 Composer를 사용하게 되었습니다. Composer란? https://cloud.google.com/composer/docs/composer-2/run-apache-airflow-dag 빠른 시작: Cloud Composer 2에서 Apache Airflow DAG 실행 | Google Cloud Cloud Composer 환경을 만들고 Cloud Composer 2에서 Apache Airf..
2022.09.22 -
Airflow의 스케줄 시간을 지표로 표현할 일이 있어서 Airflow의 스케줄 시간을 mysql을 통해서 Datetime 형태로 parsing 해야 하는 이슈가 있었습니다. 여기저기 찾아봤지만 정상적으로 안되어서 직접 Query를 작성하였습니다. 우선 한 가지 아쉬운 것은 모든 스케줄 시간을 parsing 하지는 못 하였습니다. 스케줄 시간에서 매일 발생하는 스케줄 시간만 표현하였고, 나머지 요일별, 월별, 주별 이런 내용은 parsing 하지 못하였습니다. 차후에 이 부분도 추가하여 글을 작성하도록 하겠습니다. 우선 저희 쪽에서 사용하는 부분은 일별 스케줄이라서, 이 부분만 작성 한 점 죄송합니다. 원본 데이터 원본 데이터의 경우 다음과 같은 형태로 있습니다. 여기서 @once도 제외하였습니다. (단..
MySQL을 이용하여 Airflow(Crontab) 스케줄 시간 parsingAirflow의 스케줄 시간을 지표로 표현할 일이 있어서 Airflow의 스케줄 시간을 mysql을 통해서 Datetime 형태로 parsing 해야 하는 이슈가 있었습니다. 여기저기 찾아봤지만 정상적으로 안되어서 직접 Query를 작성하였습니다. 우선 한 가지 아쉬운 것은 모든 스케줄 시간을 parsing 하지는 못 하였습니다. 스케줄 시간에서 매일 발생하는 스케줄 시간만 표현하였고, 나머지 요일별, 월별, 주별 이런 내용은 parsing 하지 못하였습니다. 차후에 이 부분도 추가하여 글을 작성하도록 하겠습니다. 우선 저희 쪽에서 사용하는 부분은 일별 스케줄이라서, 이 부분만 작성 한 점 죄송합니다. 원본 데이터 원본 데이터의 경우 다음과 같은 형태로 있습니다. 여기서 @once도 제외하였습니다. (단..
2021.05.21 -
기존에 링크드인에 공유한 포토폴리오 형식으로 제작 된 데이터 파이프라인 제작 관련 자료 입니다. 관련하여, 다운로드를 받으시려면 가장 아래에 첨부 파일을 확인해 주세요. 데이터 파이프라인 PDF
데이터 파인프라인 제작기기존에 링크드인에 공유한 포토폴리오 형식으로 제작 된 데이터 파이프라인 제작 관련 자료 입니다. 관련하여, 다운로드를 받으시려면 가장 아래에 첨부 파일을 확인해 주세요. 데이터 파이프라인 PDF
2021.03.16 -
오랜만에 글을 쓰는거 같습니다. 한동안 서버 세팅을 하면서 바쁜 시간을 보내고 재택 근무를 하면서 이런저런 적응을 하다보니 글을 안쓰게 되었습니다. 그러다 이번에 새롭게 Airflow2.0을 실제로 세팅 하면서 이런 저런것을 정리해야겠다 싶어서 글을 씁니다. 들어가며... 이 글을 읽고 참고 하시면 도움이 될 분들은 이런 분들 입니다. 스케줄 처리를 해야하는데, Airflow를 선택 하였다. 하지만 나는 딥하게 알지는 못한다. 대규모 처리가 필요한게 아닌 적당한 처리를 할 예정이다. 일단 실무에 급하게 반영해야 하는데 봐도 잘 모르겠다. 튜토리얼이 아닌 진짜 실무에서 쓸수 있는 Airflow 시스템을 구축 하려고 한다. 반면 읽어도 큰 도움이 안 될 분들은 이런 분들 입니다. 대규모 작업이 필요하며, 빠..
docker기반 Airflow 2.0 설치오랜만에 글을 쓰는거 같습니다. 한동안 서버 세팅을 하면서 바쁜 시간을 보내고 재택 근무를 하면서 이런저런 적응을 하다보니 글을 안쓰게 되었습니다. 그러다 이번에 새롭게 Airflow2.0을 실제로 세팅 하면서 이런 저런것을 정리해야겠다 싶어서 글을 씁니다. 들어가며... 이 글을 읽고 참고 하시면 도움이 될 분들은 이런 분들 입니다. 스케줄 처리를 해야하는데, Airflow를 선택 하였다. 하지만 나는 딥하게 알지는 못한다. 대규모 처리가 필요한게 아닌 적당한 처리를 할 예정이다. 일단 실무에 급하게 반영해야 하는데 봐도 잘 모르겠다. 튜토리얼이 아닌 진짜 실무에서 쓸수 있는 Airflow 시스템을 구축 하려고 한다. 반면 읽어도 큰 도움이 안 될 분들은 이런 분들 입니다. 대규모 작업이 필요하며, 빠..
2021.02.23 -
airflow에서 dag을 만들고 실행을 시켰는데 task는 분명히 뜨고 해당 task도 실행중으로 나오는데, 상태값만 변하고 실행되지 않는 경우가 있습니다. 다음과 같은 경우가 그 현상에 해당 합니다. 아래와 같이 Task의 상태값이 분명히 실행으로 변경되었는데, 아래의 네모칸에 실행중(초록색)으로 변경되지 않습니다. 아무리 기다려도 역시 그대로 입니다. 그래서 어떻게 할지 고민했는데, airflow에서 강제로 "Run"을 시킬 수 있습니다. 강제로 queue에 넣어주는 방법입니다. 가끔씩 should가 느리게 동작해서 반응이 없을 경우가 있는데 이렇게 강제로 Run을 시켜서 동작 시키는 경우가 많기 때문에 같은 증상으로 보고 똑같이 실행해 봤습니다. 다음과 같이 강제로 "Run"으로 변경 해봤습니다...
airflow dag의 task를 실행하고 동작하지 않는 현상airflow에서 dag을 만들고 실행을 시켰는데 task는 분명히 뜨고 해당 task도 실행중으로 나오는데, 상태값만 변하고 실행되지 않는 경우가 있습니다. 다음과 같은 경우가 그 현상에 해당 합니다. 아래와 같이 Task의 상태값이 분명히 실행으로 변경되었는데, 아래의 네모칸에 실행중(초록색)으로 변경되지 않습니다. 아무리 기다려도 역시 그대로 입니다. 그래서 어떻게 할지 고민했는데, airflow에서 강제로 "Run"을 시킬 수 있습니다. 강제로 queue에 넣어주는 방법입니다. 가끔씩 should가 느리게 동작해서 반응이 없을 경우가 있는데 이렇게 강제로 Run을 시켜서 동작 시키는 경우가 많기 때문에 같은 증상으로 보고 똑같이 실행해 봤습니다. 다음과 같이 강제로 "Run"으로 변경 해봤습니다...
2020.11.03 -
Hello. Many people seem to know the part of optimizing cost and performance by fetching data without reading unnecessary parts in big queries using partition columns. There are many advantages to using partitioned columns as above, but there may be cases where you cannot. If there are several date columns in the table, the case where the date column to be used is not partitioned is as follows. B..
How to save big query costHello. Many people seem to know the part of optimizing cost and performance by fetching data without reading unnecessary parts in big queries using partition columns. There are many advantages to using partitioned columns as above, but there may be cases where you cannot. If there are several date columns in the table, the case where the date column to be used is not partitioned is as follows. B..
2020.09.29 -
Airflow를 세팅하고 CPU를 보면 아무것도 하지 않고 있는데, CPU를 40~50% 정도 점유하는 경우가 있습니다. 하지만 이는 몇 가지 옵션만 변경한다면 쉽게 CPU 점유율을 낮출 수 있습니다. 또한 운영상에 큰 지장이 없습니다. 저의 경우 GCP 위에서 docker를 통해서 airflow를 운영하고 있으며, 옵션은 worker,scheduler,webserver 이렇게 3개의 컨테이너에 적용하였습니다. 모두 동일하게 적용하였습니다. 1 2 3 4 5 6 7 8 9 10 11 # The scheduler constantly tries to trigger new tasks (look at the # scheduler section in the docs for more information). Thi..
airflow CPU가 높게 점유되는 현상Airflow를 세팅하고 CPU를 보면 아무것도 하지 않고 있는데, CPU를 40~50% 정도 점유하는 경우가 있습니다. 하지만 이는 몇 가지 옵션만 변경한다면 쉽게 CPU 점유율을 낮출 수 있습니다. 또한 운영상에 큰 지장이 없습니다. 저의 경우 GCP 위에서 docker를 통해서 airflow를 운영하고 있으며, 옵션은 worker,scheduler,webserver 이렇게 3개의 컨테이너에 적용하였습니다. 모두 동일하게 적용하였습니다. 1 2 3 4 5 6 7 8 9 10 11 # The scheduler constantly tries to trigger new tasks (look at the # scheduler section in the docs for more information). Thi..
2020.08.11 -
Airflow는 ETL스케줄링 오픈소스로 많은 분들이 사용하고 있습니다. 하지만 오픈소스라서 여러가지 불편점이 있는데요. 그 중 알람의 경우도 그렇습니다. 물론 Dag에 slack 알람을 받을 수 있도록 설정이 가능 한데요. 이 경우 매우 불편하게 하나씩 설정해야 하는 경우가 있습니다. 그래서 Airflow의 postgreSQL에 있는 데이터를 기반으로 slack 알람을 받을 수 있는 python 스크립트를 만들어봤습니다. 우선 세팅이 필요한데요. slack으로 알람을 받기 위해서는 2가지가 필요합니다. slack channel ID slack webhook url 우선 channel ID의 경우 다음과 같이 받을 수 있습니다. slack을 웹으로 접속 할 경우 위에 url을 확인 할 수 있는데요. 다음..
Airflow 실패여부 slack알람으로 받기 (python)Airflow는 ETL스케줄링 오픈소스로 많은 분들이 사용하고 있습니다. 하지만 오픈소스라서 여러가지 불편점이 있는데요. 그 중 알람의 경우도 그렇습니다. 물론 Dag에 slack 알람을 받을 수 있도록 설정이 가능 한데요. 이 경우 매우 불편하게 하나씩 설정해야 하는 경우가 있습니다. 그래서 Airflow의 postgreSQL에 있는 데이터를 기반으로 slack 알람을 받을 수 있는 python 스크립트를 만들어봤습니다. 우선 세팅이 필요한데요. slack으로 알람을 받기 위해서는 2가지가 필요합니다. slack channel ID slack webhook url 우선 channel ID의 경우 다음과 같이 받을 수 있습니다. slack을 웹으로 접속 할 경우 위에 url을 확인 할 수 있는데요. 다음..
2019.12.04