- GCP - all open 방화벽 체크2021년 01월 04일 16시 02분 24초에 업로드 된 글입니다.작성자: DE 군고구마반응형
지난 포스트에서 GCP의 방화벽을 만드는 법을 작성하였습니다. ( GCP방화벽 만들기 )
방화벽을 이용해서 최소한의 보안을 할 수 있습니다.
그런데 방화벽을 만들기 전에 0.0.0.0/0으로 All open으로 열려 있는 부분을 하나씩 보면서 체크 하긴 매우 어렵습니다.
지난 포스트에도 언급 하였지만 클라우드의 경우 불특정 다수의 사용자가 사용하다 보니 많은 부분에서 관리가 안 되는 경우가 있습니다. 그래서 운영을 할 때 만들때 전자결재를 받거나 매일 수동으로 체크하는 것은 매우 어렵다고 생각합니다.
기존의 인프라의 경우 이러한 작업을 할 경우 전자결재를 올리고 검토 이후에 서버에 추가하는 경우가 일반적이었습니다.
하지만 클라우드 환경에서는 다수의 사용자가 그렇게 할 경우 매우 불편하고 (만약 규모가 크고 인원이 많다면 괜찮겠지만) 담당자의 업무 부하가 너무 큽니다.
그래서 다음과 같이 0.0.0.0/0으로 열려 있는 방화벽이 있는지를 체크하도록 하는 python 코드를 만들었습니다.
또한 체크에만 그치지 않고 이를 Slack으로 발송하여 실시간으로 체크가 가능하도록 하였습니다.
최종적으로 받을 수 있는 것
Slack에는 다음과 같이 표 형태로 예쁘게 나오도록 만들었습니다.
Slack 메시지 샘플 코드 뜯어보기
python 관련 구글에서 제공하는 API를 참고해서 만들었습니다. ( 바로가기 )
12345678910111213141516171819202122232425262728293031323334353637"""BEFORE RUNNING:---------------1. If not already done, enable the Compute Engine APIand check the quota for your project athttps://console.developers.google.com/apis/api/compute2. This sample uses Application Default Credentials for authentication.If not already done, install the gcloud CLI fromhttps://cloud.google.com/sdk and run`gcloud beta auth application-default login`.For more information, seehttps://developers.google.com/identity/protocols/application-default-credentials3. Install the Python client library for Google APIs by running`pip install --upgrade google-api-python-client`"""from pprint import pprintfrom googleapiclient import discoveryfrom oauth2client.client import GoogleCredentialscredentials = GoogleCredentials.get_application_default()service = discovery.build('compute', 'v1', credentials=credentials)# Project ID for this request.project = 'my-project' # TODO: Update placeholder value.request = service.firewalls().list(project=project)while request is not None:response = request.execute()for firewall in response['items']:# TODO: Change code below to process each `firewall` resource:pprint(firewall)request = service.firewalls().list_next(previous_request=request, previous_response=response)cs 추가적으로 이 링크도 참고하여 만들었습니다. ( 바로가기 )
일단 이 코드를 통해서 0.0.0.0/0으로 열려 있는 방화벽이 무엇이 있는지 간단하게 확인할 수 있습니다.
12345678910111213141516171819from pprint import pprintfrom googleapiclient import discoveryfrom oauth2client.client import GoogleCredentialscredentials = GoogleCredentials.get_application_default()service = discovery.build("compute", "v1", credentials=credentials)project = "입력 해주세요." # TODO: Update placeholder value.request = service.firewalls().list(project=project)while request is not None:response = request.execute()for firewall in response["items"]:sourceRanges = firewall["sourceRanges"]if "0.0.0.0/0" in sourceRanges:pprint(firewall)cs sourceRanges 코드를 보시면 items에서 sourceRanges에 0.0.0.0/0으로 있는 경우만 필터 해서 가져오도록 만들었습니다.
자세한 항목에 관해서 위의 구글 링크에서 확인을 해주세요. (회사의 네트워크라서 샘플 공개가 어렵습니다.)
최종 코드
위에서 필요한 몇 가지 항목을 뽑아내서 Slack으로 보낼 메시지를 만들어 줍니다. 그렇게 할 경우 최종적으로 다음과 같이 코드가 나옵니다.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113import jsonfrom datetime import datetime, timedeltaimport requestsfrom googleapiclient import discoveryfrom oauth2client.client import GoogleCredentialsdef __main__():fcs = FirewallCheckSlack()try:print("start main Proccess...", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))check_result = fcs.get_allopen_firewalllist()error_list = ["----------------------------------------------------------------------------------------------------------","| 방화벽 ID | 방화벽 이름 | 방화벽 생성일 |","----------------------------------------------------------------------------------------------------------",]for result_row in check_result:id = result_row[0] + " "id = "| " + id[0:26] + "| "name = result_row[1] + " "name = name[0:50] + "| "create_time = (result_row[2])[0:19].replace("T", " ") + " |"error_list.append((id + name + create_time))error_list.append("----------------------------------------------------------------------------------------------------------",)error_list = "\n".join(error_list)fcs.slack_send(error_list)except Exception as ex:print("Error code: ", ex)finally:print("End main Proccess...", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))print("finally!!")class FirewallCheckSlack:passdef get_allopen_firewalllist(self):credentials = GoogleCredentials.get_application_default()service = discovery.build("compute", "v1", credentials=credentials)# Project ID for this request.project = "입력 해주세요." # TODO: Update placeholder value.request = service.firewalls().list(project=project)result_list = []while request is not None:response = request.execute()for firewall in response["items"]:sourceRanges = firewall["sourceRanges"]if "0.0.0.0/0" in sourceRanges:firewall_id = firewall["id"]firewall_name = firewall["name"]create_time = firewall["creationTimestamp"]sourceRanges = firewall["sourceRanges"]result_list.append((firewall_id, firewall_name, create_time, sourceRanges))request = service.firewalls().list_next(previous_request=request, previous_response=response)return result_listdef slack_send(self, text_result):if text_result != "":slack_channel_id = "입력 해주세요."slack_webhook_url = "입력 해주세요."message_result = ("GCP 인스턴스 방화벽에 0.0.0.0/0 으로 오픈 된 방화벽 정책이 있습니다.\n\n"+ "```"+ text_result+ "```"+ "\n")slack_message = ":bell:" + " *방화벽 모니터링* \n" + message_result# slack message sendpayload = {"channel": slack_channel_id,"text": slack_message,"username": "네트워크 방화벽 모니터링","icon_emoji": "false",}requests.post(slack_webhook_url,data=json.dumps(payload),headers={"Content-Type": "application/json"},)else:print("empty error")# =========================================================================if __name__ == "__main__":now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")__main__()cs Slack 보내는 부분 관련하여...
혹시 위에서 Slack 메시지 보내는 곳에서 이 부분이 이해가 가지 않을 경우 링크를 참고해주세요.( 바로가기 )
12345def slack_send(self, text_result):if text_result != "":slack_channel_id = "입력 해주세요."slack_webhook_url = "입력 해주세요."cs 마지막으로 최종 코드를 첨부합니다.
firewall_check_blog.py0.00MB반응형'GCP > 운영관련 개발' 카테고리의 다른 글
[Cloud Build] GCS <-> github 동기화 실패 (failed: step exited with non-zero status: 1) (0) 2022.12.06 GCP 방화벽(firewall) 설정 (0) 2020.12.31 How to save big query cost (0) 2020.09.29 GCP-stackdriver python api 사용하기(list_time_series) (0) 2020.03.27 다음글이 없습니다.이전글이 없습니다.댓글