[Airflow] oracle connection 등록 후 python으로 부르기
안녕하세요. 주형권입니다.
최근에 개발을 다하고 시간이 조금 남아서 개발하면서 몇 가지 만났던 부분에 대해서 글을 많이 쓰게 되었습니다.
지금은 ELT 파이프라인을 개발하고 있는데요. 저 같은 경우 주로 Airflow를 통해서 스케줄 처리를 하고, PythonOperator를 이용해서 Python Class를 불러와서 데이터를 처리하는 구조로 ELT 파이프라인을 만듭니다.
이렇게 만들면 제가 입맛데로 원하는 가공을 할 수도 있고 여러 가지 기능을 제가 원하는 방향으로 넣을 수 있어서 PythonOperator를 선호하고 있습니다. 이 ELT 파이프라인은 조만간 정리하여 글을 쓰도록 하겠습니다. (거의 완성되었습니다.)
무엇을 하려고 하였는가?
Airflow에 Connection에 Oracle 서버를 등록하고 python에서 cx_oracle을 통해서 oracle을 연결하려고 하였습니다. 그런데, 많은 블로그들이 등록하는 방법에 대해서는 나오지만 이를 불러와서 연결하는 방법에 대해서는 나오지 않아서 여러 가지 방법을 찾아서 해결하였습니다.
어떻게 해결하였는가?
Airflow에 Connection에 Oracle을 등록하는 것은 전부 똑같습니다. Oracle로 Conn Type을 만들고 기존에 다른 Mysql , Postgresql등과 같이 똑같이 입력하면 됩니다. 하지만 한 가지 다른 부분이 있습니다.
cx_oracle을 이용하여 Python에서 Oracle 데이터베이스를 연결할 때 파라미터값으로 dsn을 받습니다. dsn에 Host 및 Port와 스키마까지 모두 들어가기 때문에 Host와 Port 등을 따로 받아서 넣지 않고 연결이 가능합니다. 하지만 dsn을 넣는 곳이 없으므로, 당연히 넣을 수 없습니다. 그리고 dsn의 경우 다음과 같이 구성이 되기 때문에 여러 곳에서 불러와서 조합해서 하는 것도 굉장히 번거롭습니다.
그래서 Python에서 값을 불러와서 처리하기보다는 Extra에 값을 넣어서 불러오면 이것을 바로 dsn 파라미터에 넣어줄 수 있습니다. Airflow의 get_extra()를 이용하면 Extra의 값을 String으로 불러옵니다. 물론 {}으로 dictionary로 만들어서 사용하셔도 됩니다. 하지만 저 같은 경우 딱히 다른 옵션을 쓸게 없어서 그대로 dsn값을 넣고 불러서 사용하였습니다.
rdb = BaseHook.get_connection(rdb_con)
self.ora_con = cx_Oracle.connect(dsn=rdb.get_extra(),
user=rdb.login,
password=rdb.password,
encoding="UTF-8")
self.ora_cursor = self.ora_con.cursor()
보시면 BaseHook을 이용하여 get_connection으로 Connection 정보를 가져오고 get_extra로 Extra에 있는 dsn 정보를 그대로 넣어주고 있습니다. 이렇게 하면 별다른 이슈 없이 연결이 됩니다.
참고
https://stackoverflow.com/questions/45280650/store-and-access-password-using-apache-airflow
https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/connection/index.html