새소식

반응형
GCP/운영관련 개발

GCP - all open 방화벽 체크

  • -
반응형

지난 포스트에서 GCP의 방화벽을 만드는 법을 작성하였습니다. ( GCP방화벽 만들기 )

방화벽을 이용해서 최소한의 보안을 할 수 있습니다.

 

그런데 방화벽을 만들기 전에 0.0.0.0/0으로 All open으로 열려 있는 부분을 하나씩 보면서 체크 하긴 매우 어렵습니다.

지난 포스트에도 언급 하였지만 클라우드의 경우 불특정 다수의 사용자가 사용하다 보니 많은 부분에서 관리가 안 되는 경우가 있습니다. 그래서 운영을 할 때 만들때 전자결재를 받거나 매일 수동으로 체크하는 것은 매우 어렵다고 생각합니다.

 

기존의 인프라의 경우 이러한 작업을 할 경우 전자결재를 올리고 검토 이후에 서버에 추가하는 경우가 일반적이었습니다.

하지만 클라우드 환경에서는 다수의 사용자가 그렇게 할 경우 매우 불편하고 (만약 규모가 크고 인원이 많다면 괜찮겠지만) 담당자의 업무 부하가 너무 큽니다. 

 

그래서 다음과 같이 0.0.0.0/0으로 열려 있는 방화벽이 있는지를 체크하도록 하는 python 코드를 만들었습니다.

또한 체크에만 그치지 않고  이를 Slack으로 발송하여 실시간으로 체크가 가능하도록 하였습니다.

 

 

최종적으로 받을 수 있는 것

Slack에는 다음과 같이 표 형태로 예쁘게 나오도록 만들었습니다.

 

Slack 메시지 샘플

 

 

코드 뜯어보기

python 관련 구글에서 제공하는 API를 참고해서 만들었습니다. ( 바로가기 )

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
"""
BEFORE RUNNING:
---------------
1. If not already done, enable the Compute Engine API
   and check the quota for your project at
   https://console.developers.google.com/apis/api/compute
2. This sample uses Application Default Credentials for authentication.
   If not already done, install the gcloud CLI from
   https://cloud.google.com/sdk and run
   `gcloud beta auth application-default login`.
   For more information, see
   https://developers.google.com/identity/protocols/application-default-credentials
3. Install the Python client library for Google APIs by running
   `pip install --upgrade google-api-python-client`
"""
from pprint import pprint
 
from googleapiclient import discovery
from oauth2client.client import GoogleCredentials
 
credentials = GoogleCredentials.get_application_default()
 
service = discovery.build('compute''v1', credentials=credentials)
 
# Project ID for this request.
project = 'my-project'  # TODO: Update placeholder value.
 
request = service.firewalls().list(project=project)
while request is not None:
    response = request.execute()
 
    for firewall in response['items']:
        # TODO: Change code below to process each `firewall` resource:
        pprint(firewall)
 
    request = service.firewalls().list_next(previous_request=request, previous_response=response)
 
cs

추가적으로 이 링크도 참고하여 만들었습니다. ( 바로가기 )

 

일단 이 코드를 통해서 0.0.0.0/0으로 열려 있는 방화벽이 무엇이 있는지 간단하게 확인할 수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from pprint import pprint
 
from googleapiclient import discovery
from oauth2client.client import GoogleCredentials
 
credentials = GoogleCredentials.get_application_default()
 
service = discovery.build("compute""v1", credentials=credentials)
 
project = "입력 해주세요."  # TODO: Update placeholder value.
 
request = service.firewalls().list(project=project)
while request is not None:
    response = request.execute()
 
    for firewall in response["items"]:
        sourceRanges = firewall["sourceRanges"]
        if "0.0.0.0/0" in sourceRanges:
            pprint(firewall)
cs
sourceRanges

 

코드를 보시면 items에서 sourceRanges에 0.0.0.0/0으로 있는 경우만 필터 해서 가져오도록 만들었습니다.

자세한 항목에 관해서 위의 구글 링크에서 확인을 해주세요. (회사의 네트워크라서 샘플 공개가 어렵습니다.)

 

 

최종 코드 

위에서 필요한 몇 가지 항목을 뽑아내서 Slack으로 보낼 메시지를 만들어 줍니다. 그렇게 할 경우 최종적으로 다음과 같이 코드가 나옵니다. 

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import json
from datetime import datetime, timedelta
 
import requests
from googleapiclient import discovery
from oauth2client.client import GoogleCredentials
 
 
def __main__():
 
    fcs = FirewallCheckSlack()
 
    try:
        print("start main Proccess...", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
        check_result = fcs.get_allopen_firewalllist()
 
        error_list = [
            "----------------------------------------------------------------------------------------------------------",
            "| 방화벽 ID                   | 방화벽 이름                                          | 방화벽 생성일             |",
            "----------------------------------------------------------------------------------------------------------",
        ]
 
        for result_row in check_result:
            id = result_row[0+ "                 "
            id = "| " + id[0:26+ "| "
            name = result_row[1+ "                                "
            name = name[0:50+ "| "
            create_time = (result_row[2])[0:19].replace("T"" "+ "    |"
 
            error_list.append((id + name + create_time))
            error_list.append(
                "----------------------------------------------------------------------------------------------------------",
            )
 
        error_list = "\n".join(error_list)
 
        fcs.slack_send(error_list)
 
    except Exception as ex:
        print("Error code: ", ex)
 
    finally:
        print("End main Proccess...", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
        print("finally!!")
 
 
class FirewallCheckSlack:
    pass
 
    def get_allopen_firewalllist(self):
        credentials = GoogleCredentials.get_application_default()
 
        service = discovery.build("compute""v1", credentials=credentials)
 
        # Project ID for this request.
        project = "입력 해주세요."  # TODO: Update placeholder value.
 
        request = service.firewalls().list(project=project)
        result_list = []
        while request is not None:
            response = request.execute()
 
            for firewall in response["items"]:
                sourceRanges = firewall["sourceRanges"]
                if "0.0.0.0/0" in sourceRanges:
                    firewall_id = firewall["id"]
                    firewall_name = firewall["name"]
                    create_time = firewall["creationTimestamp"]
                    sourceRanges = firewall["sourceRanges"]
 
                    result_list.append((firewall_id, firewall_name, create_time, sourceRanges))
 
            request = service.firewalls().list_next(
                previous_request=request, previous_response=response
            )
 
        return result_list
 
    def slack_send(self, text_result):
 
        if text_result != "":
            slack_channel_id = "입력 해주세요."
            slack_webhook_url = "입력 해주세요."
 
            message_result = (
                "GCP 인스턴스 방화벽에 0.0.0.0/0 으로 오픈 된 방화벽 정책이 있습니다.\n\n"
                + "```"
                + text_result
                + "```"
                + "\n"
            )
            slack_message = ":bell:" + " *방화벽 모니터링* \n" + message_result
 
            # slack message send
            payload = {
                "channel": slack_channel_id,
                "text": slack_message,
                "username""네트워크 방화벽 모니터링",
                "icon_emoji""false",
            }
            requests.post(
                slack_webhook_url,
                data=json.dumps(payload),
                headers={"Content-Type""application/json"},
            )
        else:
            print("empty error")
 
 
# =========================================================================
if __name__ == "__main__":
    now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    __main__()
cs

 

 

Slack 보내는 부분 관련하여...

혹시 위에서 Slack 메시지 보내는 곳에서 이 부분이 이해가 가지 않을 경우 링크를 참고해주세요.( 바로가기

1
2
3
4
5
 def slack_send(self, text_result):
 
        if text_result != "":
            slack_channel_id = "입력 해주세요."
            slack_webhook_url = "입력 해주세요."
cs

 

마지막으로 최종 코드를 첨부합니다.

 

firewall_check_blog.py
0.00MB

 

 

 

반응형
Contents

포스팅 주소를 복사했습니다

이 글이 도움이 되었다면 공감 부탁드립니다.