GCP - all open 방화벽 체크
- -
지난 포스트에서 GCP의 방화벽을 만드는 법을 작성하였습니다. ( GCP방화벽 만들기 )
방화벽을 이용해서 최소한의 보안을 할 수 있습니다.
그런데 방화벽을 만들기 전에 0.0.0.0/0으로 All open으로 열려 있는 부분을 하나씩 보면서 체크 하긴 매우 어렵습니다.
지난 포스트에도 언급 하였지만 클라우드의 경우 불특정 다수의 사용자가 사용하다 보니 많은 부분에서 관리가 안 되는 경우가 있습니다. 그래서 운영을 할 때 만들때 전자결재를 받거나 매일 수동으로 체크하는 것은 매우 어렵다고 생각합니다.
기존의 인프라의 경우 이러한 작업을 할 경우 전자결재를 올리고 검토 이후에 서버에 추가하는 경우가 일반적이었습니다.
하지만 클라우드 환경에서는 다수의 사용자가 그렇게 할 경우 매우 불편하고 (만약 규모가 크고 인원이 많다면 괜찮겠지만) 담당자의 업무 부하가 너무 큽니다.
그래서 다음과 같이 0.0.0.0/0으로 열려 있는 방화벽이 있는지를 체크하도록 하는 python 코드를 만들었습니다.
또한 체크에만 그치지 않고 이를 Slack으로 발송하여 실시간으로 체크가 가능하도록 하였습니다.
최종적으로 받을 수 있는 것
Slack에는 다음과 같이 표 형태로 예쁘게 나오도록 만들었습니다.
코드 뜯어보기
python 관련 구글에서 제공하는 API를 참고해서 만들었습니다. ( 바로가기 )
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
"""
BEFORE RUNNING:
---------------
1. If not already done, enable the Compute Engine API
and check the quota for your project at
https://console.developers.google.com/apis/api/compute
2. This sample uses Application Default Credentials for authentication.
If not already done, install the gcloud CLI from
https://cloud.google.com/sdk and run
`gcloud beta auth application-default login`.
For more information, see
https://developers.google.com/identity/protocols/application-default-credentials
3. Install the Python client library for Google APIs by running
`pip install --upgrade google-api-python-client`
"""
from pprint import pprint
from googleapiclient import discovery
from oauth2client.client import GoogleCredentials
credentials = GoogleCredentials.get_application_default()
service = discovery.build('compute', 'v1', credentials=credentials)
# Project ID for this request.
project = 'my-project' # TODO: Update placeholder value.
request = service.firewalls().list(project=project)
while request is not None:
response = request.execute()
for firewall in response['items']:
# TODO: Change code below to process each `firewall` resource:
pprint(firewall)
request = service.firewalls().list_next(previous_request=request, previous_response=response)
|
cs |
추가적으로 이 링크도 참고하여 만들었습니다. ( 바로가기 )
일단 이 코드를 통해서 0.0.0.0/0으로 열려 있는 방화벽이 무엇이 있는지 간단하게 확인할 수 있습니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
from pprint import pprint
from googleapiclient import discovery
from oauth2client.client import GoogleCredentials
credentials = GoogleCredentials.get_application_default()
service = discovery.build("compute", "v1", credentials=credentials)
project = "입력 해주세요." # TODO: Update placeholder value.
request = service.firewalls().list(project=project)
while request is not None:
response = request.execute()
for firewall in response["items"]:
sourceRanges = firewall["sourceRanges"]
if "0.0.0.0/0" in sourceRanges:
pprint(firewall)
|
cs |
코드를 보시면 items에서 sourceRanges에 0.0.0.0/0으로 있는 경우만 필터 해서 가져오도록 만들었습니다.
자세한 항목에 관해서 위의 구글 링크에서 확인을 해주세요. (회사의 네트워크라서 샘플 공개가 어렵습니다.)
최종 코드
위에서 필요한 몇 가지 항목을 뽑아내서 Slack으로 보낼 메시지를 만들어 줍니다. 그렇게 할 경우 최종적으로 다음과 같이 코드가 나옵니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
import json
from datetime import datetime, timedelta
import requests
from googleapiclient import discovery
from oauth2client.client import GoogleCredentials
def __main__():
fcs = FirewallCheckSlack()
try:
print("start main Proccess...", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
check_result = fcs.get_allopen_firewalllist()
error_list = [
"----------------------------------------------------------------------------------------------------------",
"| 방화벽 ID | 방화벽 이름 | 방화벽 생성일 |",
"----------------------------------------------------------------------------------------------------------",
]
for result_row in check_result:
id = result_row[0] + " "
id = "| " + id[0:26] + "| "
name = result_row[1] + " "
name = name[0:50] + "| "
create_time = (result_row[2])[0:19].replace("T", " ") + " |"
error_list.append((id + name + create_time))
error_list.append(
"----------------------------------------------------------------------------------------------------------",
)
error_list = "\n".join(error_list)
fcs.slack_send(error_list)
except Exception as ex:
print("Error code: ", ex)
finally:
print("End main Proccess...", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
print("finally!!")
class FirewallCheckSlack:
pass
def get_allopen_firewalllist(self):
credentials = GoogleCredentials.get_application_default()
service = discovery.build("compute", "v1", credentials=credentials)
# Project ID for this request.
project = "입력 해주세요." # TODO: Update placeholder value.
request = service.firewalls().list(project=project)
result_list = []
while request is not None:
response = request.execute()
for firewall in response["items"]:
sourceRanges = firewall["sourceRanges"]
if "0.0.0.0/0" in sourceRanges:
firewall_id = firewall["id"]
firewall_name = firewall["name"]
create_time = firewall["creationTimestamp"]
sourceRanges = firewall["sourceRanges"]
result_list.append((firewall_id, firewall_name, create_time, sourceRanges))
request = service.firewalls().list_next(
previous_request=request, previous_response=response
)
return result_list
def slack_send(self, text_result):
if text_result != "":
slack_channel_id = "입력 해주세요."
slack_webhook_url = "입력 해주세요."
message_result = (
"GCP 인스턴스 방화벽에 0.0.0.0/0 으로 오픈 된 방화벽 정책이 있습니다.\n\n"
+ "```"
+ text_result
+ "```"
+ "\n"
)
slack_message = ":bell:" + " *방화벽 모니터링* \n" + message_result
# slack message send
payload = {
"channel": slack_channel_id,
"text": slack_message,
"username": "네트워크 방화벽 모니터링",
"icon_emoji": "false",
}
requests.post(
slack_webhook_url,
data=json.dumps(payload),
headers={"Content-Type": "application/json"},
)
else:
print("empty error")
# =========================================================================
if __name__ == "__main__":
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
__main__()
|
cs |
Slack 보내는 부분 관련하여...
혹시 위에서 Slack 메시지 보내는 곳에서 이 부분이 이해가 가지 않을 경우 링크를 참고해주세요.( 바로가기 )
1
2
3
4
5
|
def slack_send(self, text_result):
if text_result != "":
slack_channel_id = "입력 해주세요."
slack_webhook_url = "입력 해주세요."
|
cs |
마지막으로 최종 코드를 첨부합니다.
'GCP > 운영관련 개발' 카테고리의 다른 글
[Cloud Build] GCS <-> github 동기화 실패 (failed: step exited with non-zero status: 1) (0) | 2022.12.06 |
---|---|
GCP 방화벽(firewall) 설정 (0) | 2020.12.31 |
How to save big query cost (0) | 2020.09.29 |
GCP-stackdriver python api 사용하기(list_time_series) (0) | 2020.03.27 |
소중한 공감 감사합니다