-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhealth_check.py
139 lines (128 loc) · 4.4 KB
/
health_check.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# health_check.py
import asyncio
import logging
from datetime import datetime
from typing import List
import aiohttp
from requests.exceptions import ConnectTimeout, ReadTimeout
from dtos import HealthResultDTO, HealthCheckDTO, HealthCheckConfigDTO
import to_checks_types as types
class HealthCheck:
HEALTHY_STATUS_CODE = 200
def __init__(
self,
*,
health_check_config: HealthCheckConfigDTO,
):
self.config = health_check_config
async def execute(
self,
*,
to_checks: List[types.ToChecksTypedDict],
current_unhealthy_urls: List[str], # List[urls]
health_check_dto: HealthCheckDTO,
) -> None:
tasks = [
self._health_check(
param=param,
url=to_check["url_base"].format(param=param),
)
for to_check in to_checks
for param in to_check["params"]
]
health_results = await asyncio.gather(*tasks)
for health_result in health_results:
self._add_result_to_group(
health_result=health_result,
health_check_dto=health_check_dto,
current_unhealthy_urls=current_unhealthy_urls,
)
def _add_result_to_group(
self,
*,
health_result: HealthResultDTO,
health_check_dto: HealthCheckDTO,
current_unhealthy_urls: List[str],
) -> None:
is_current_unhealthy = health_result.url in current_unhealthy_urls
is_healthy = health_result.is_healthy
if is_healthy:
{
True: health_check_dto.back_to_healthy,
False: health_check_dto.healthy,
}[
is_current_unhealthy
].append(health_result)
elif not is_healthy:
{
True: health_check_dto.still_unhealthy,
False: health_check_dto.new_unhealthy,
}[is_current_unhealthy].append(health_result)
async def _health_check(
self,
*,
url: str,
param: str,
) -> HealthResultDTO:
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=self.config.timeout) as response:
status_code = response.status
health_result = HealthResultDTO(
is_healthy=status_code == self.HEALTHY_STATUS_CODE,
status_code=status_code,
param=param,
url=url,
)
except asyncio.TimeoutError:
health_result = HealthResultDTO(
is_healthy=False,
status_code=408,
param=param,
url=url,
error_message="Client-side timeout",
)
except aiohttp.ClientConnectionError:
health_result = HealthResultDTO(
is_healthy=False,
status_code=-1,
param=param,
url=url,
error_message="Connection error occurred",
)
except aiohttp.ClientResponseError as e:
health_result = HealthResultDTO(
is_healthy=False,
status_code=e.status,
param=param,
url=url,
error_message=str(e.message),
)
except aiohttp.InvalidURL:
health_result = HealthResultDTO(
is_healthy=False,
status_code=-1,
param=param,
url=url,
error_message="Invalid URL",
)
except aiohttp.ClientPayloadError:
health_result = HealthResultDTO(
is_healthy=False,
status_code=-1,
param=param,
url=url,
error_message="Payload error",
)
except aiohttp.ClientError as e:
health_result = HealthResultDTO(
is_healthy=False,
status_code=-1,
param=param,
url=url,
error_message=str(e),
)
logging.info(
f"{datetime.now()} - url: {health_result.url} - param: {health_result.param} - status_code: {health_result.status_code} - is_healthy: {health_result.is_healthy} - error_message: {health_result.error_message}"
)
return health_result