-
Notifications
You must be signed in to change notification settings - Fork 21
/
Copy pathapi.py
168 lines (143 loc) · 5.83 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# pylint: disable=C0413,W0611,W0404
# Standard Library
import logging.config
# Third Party
from fastapi import Depends, FastAPI
from fastapi.middleware.cors import CORSMiddleware
from starlette.responses import RedirectResponse
from starlette.status import HTTP_302_FOUND
from tenacity import RetryError
# First Party
from resc_backend.common import get_package_version
from resc_backend.constants import RWS_VERSION_PREFIX
from resc_backend.db.connection import Session, engine
from resc_backend.helpers.environment_wrapper import validate_environment
from resc_backend.resc_web_service.configuration import (
AUTHENTICATION_REQUIRED,
CORS_ALLOWED_DOMAINS,
ENABLE_CORS,
WEB_SERVICE_ENV_VARS
)
from resc_backend.resc_web_service.dependencies import (
check_db_initialized,
requires_auth,
requires_no_auth,
add_security_headers
)
from resc_backend.resc_web_service.endpoints import (
common,
detailed_findings,
findings,
health,
metrics,
repositories,
rules,
rule_packs,
scans,
vcs_instances
)
from resc_backend.resc_web_service.helpers.exception_handler import add_exception_handlers
from resc_backend.resc_web_service.cache_manager import CacheManager
# Check and load environment variables
env_variables = validate_environment(WEB_SERVICE_ENV_VARS)
def generate_logger_config(log_file_path, debug=True):
"""A function to generate the global logger config dictionary
Arguments:
log_file_path {string} -- Path where the logs are to be stored
Keyword Arguments:
debug {bool} -- Whether the logging level should be set to DEBUG or INFO (default: {True})
Returns:
Dict -- A dictionary containing the logger configuration
"""
logging_level = "DEBUG" if debug else "INFO"
logging_config = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"generic-log-formatter": {
"format": "[%(levelname)s] [%(name)s] [%(asctime)s] %(message)s"
},
},
"handlers": {
"console": {
"level": logging_level,
"class": "logging.StreamHandler",
"formatter": "generic-log-formatter",
},
"file": {
"level": logging_level,
"class": "logging.handlers.RotatingFileHandler",
"formatter": "generic-log-formatter",
"filename": log_file_path,
"maxBytes": 100 * 1024 * 1024,
"backupCount": 5
}
},
"loggers": {
"": {
"handlers": ["console", "file"],
"level": logging_level,
"propagate": True
},
}
}
return logging_config
logging.config.dictConfig(generate_logger_config('local_logs.log'))
logger = logging.getLogger(__name__)
tags_metadata = [
{"name": "health", "description": "Checks health for API"},
{"name": "resc-common", "description": "Manage common information"},
{"name": "resc-rules", "description": "Manage rule information"},
{"name": "resc-rule-packs", "description": "Manage rule pack information"},
{"name": "resc-repositories", "description": "Manage repository information"},
{"name": "resc-scans", "description": "Manage scan information"},
{"name": "resc-findings", "description": "Manage findings information"},
{"name": "resc-vcs-instances", "description": "Manage vcs instance information"},
{"name": "resc-metrics", "description": "Retrieve metrics"},
]
# Check if authentication is required for api endpoints
auth_disabled = env_variables[AUTHENTICATION_REQUIRED].lower() in ["false"]
AUTH = [Depends(requires_no_auth)] if auth_disabled else [Depends(requires_auth)]
app = FastAPI(title="Repository Scanner (RESC)",
description="RESC API helps you to perform several operations upon findings "
"obtained from multiple source code repositories.",
version=get_package_version(),
openapi_tags=tags_metadata)
if env_variables[ENABLE_CORS].lower() in ["true"]:
origins = env_variables[CORS_ALLOWED_DOMAINS].split(', ')
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(health.router, prefix=RWS_VERSION_PREFIX)
app.include_router(common.router, prefix=RWS_VERSION_PREFIX, dependencies=AUTH)
app.include_router(rules.router, prefix=RWS_VERSION_PREFIX, dependencies=AUTH)
app.include_router(rule_packs.router, prefix=RWS_VERSION_PREFIX, dependencies=AUTH)
app.include_router(findings.router, prefix=RWS_VERSION_PREFIX, dependencies=AUTH)
app.include_router(detailed_findings.router, prefix=RWS_VERSION_PREFIX, dependencies=AUTH)
app.include_router(repositories.router, prefix=RWS_VERSION_PREFIX, dependencies=AUTH)
app.include_router(scans.router, prefix=RWS_VERSION_PREFIX, dependencies=AUTH)
app.include_router(vcs_instances.router, prefix=RWS_VERSION_PREFIX, dependencies=AUTH)
app.include_router(metrics.router, prefix=RWS_VERSION_PREFIX, dependencies=AUTH)
# Apply the security headers to the app in the form of middleware
app.middleware("http")(add_security_headers)
# Add exception handlers
add_exception_handlers(app=app)
@app.on_event("startup")
def app_startup():
CacheManager.initialize_cache(env_variables=env_variables)
try:
_ = Session(bind=engine)
check_db_initialized()
logger.info("Database is connected, expected table(s) found")
except RetryError as exc:
raise SystemExit("Error while connecting to the database, retry timed out") from exc
@app.on_event("shutdown")
async def app_shutdown():
await CacheManager.clear_all_cache()
@app.get("/")
def view_docs():
return RedirectResponse(url="/docs", status_code=HTTP_302_FOUND)