|
11 | 11 | ERROR_MESSAGE_500,
|
12 | 12 | ERROR_MESSAGE_503,
|
13 | 13 | METRICS_TAG,
|
| 14 | + RWS_ROUTE_AUDIT_COUNT_BY_AUDITOR_OVER_TIME, |
14 | 15 | RWS_ROUTE_AUDITED_COUNT_OVER_TIME,
|
15 | 16 | RWS_ROUTE_COUNT_PER_VCS_PROVIDER_BY_WEEK,
|
16 | 17 | RWS_ROUTE_METRICS,
|
17 | 18 | RWS_ROUTE_UN_TRIAGED_COUNT_OVER_TIME
|
18 | 19 | )
|
19 | 20 | from resc_backend.db.connection import Session
|
| 21 | +from resc_backend.resc_web_service.crud import audit as audit_crud |
20 | 22 | from resc_backend.resc_web_service.crud import finding as finding_crud
|
21 | 23 | from resc_backend.resc_web_service.dependencies import get_db_connection
|
| 24 | +from resc_backend.resc_web_service.schema.audit_count_over_time import AuditCountOverTime |
22 | 25 | from resc_backend.resc_web_service.schema.finding_count_over_time import FindingCountOverTime
|
23 | 26 | from resc_backend.resc_web_service.schema.finding_status import FindingStatus
|
24 | 27 | from resc_backend.resc_web_service.schema.vcs_provider import VCSProviders
|
@@ -142,3 +145,48 @@ def convert_rows_to_finding_count_over_time(count_over_time: dict, weeks: int) -
|
142 | 145 |
|
143 | 146 | output.append(week_data)
|
144 | 147 | return output
|
| 148 | + |
| 149 | + |
| 150 | +@router.get(f"{RWS_ROUTE_AUDIT_COUNT_BY_AUDITOR_OVER_TIME}", |
| 151 | + response_model=list[AuditCountOverTime], |
| 152 | + summary="Get count of Audits by Auditor over time for given weeks", |
| 153 | + status_code=status.HTTP_200_OK, |
| 154 | + responses={ |
| 155 | + 200: {"description": "Retrieve count of Audits by Auditor over time for given weeks"}, |
| 156 | + 500: {"description": ERROR_MESSAGE_500}, |
| 157 | + 503: {"description": ERROR_MESSAGE_503} |
| 158 | + }) |
| 159 | +def get_audit_count_by_auditor_over_time(db_connection: Session = Depends(get_db_connection), |
| 160 | + weeks: Optional[int] = Query(default=13, ge=1)) \ |
| 161 | + -> list[AuditCountOverTime]: |
| 162 | + """ |
| 163 | + Retrieve count of Audits by Auditor over time for given weeks |
| 164 | + - **db_connection**: Session of the database connection |
| 165 | + - **weeks**: Nr of weeks for which to retrieve the audit counts |
| 166 | + - **return**: [AuditCountOverTime] |
| 167 | + The output will contain a list of AuditCountOverTime type objects |
| 168 | + """ |
| 169 | + audit_counts = audit_crud.get_audit_count_by_auditor_over_time(db_connection=db_connection, weeks=weeks) |
| 170 | + |
| 171 | + # get the unique auditors from the data |
| 172 | + auditors_default = {} |
| 173 | + for audit in audit_counts: |
| 174 | + auditors_default[audit['auditor']] = 0 |
| 175 | + |
| 176 | + # default to 0 per auditor for all weeks in range |
| 177 | + weekly_audit_counts = {} |
| 178 | + for week in range(0, weeks): |
| 179 | + nth_week = datetime.utcnow() - timedelta(weeks=week) |
| 180 | + week = f"{nth_week.isocalendar().year} W{nth_week.isocalendar().week:02d}" |
| 181 | + weekly_audit_counts[week] = AuditCountOverTime(time_period=week, audit_by_auditor_count=dict(auditors_default)) |
| 182 | + weekly_audit_counts = dict(sorted(weekly_audit_counts.items())) |
| 183 | + |
| 184 | + # set the counts based on the data from the database |
| 185 | + for audit in audit_counts: |
| 186 | + audit_week = f"{audit['year']} W{audit['week']:02d}" |
| 187 | + weekly_audit_counts.get(audit_week).audit_by_auditor_count[audit['auditor']] = audit['audit_count'] |
| 188 | + weekly_audit_counts.get(audit_week).total += audit['audit_count'] |
| 189 | + |
| 190 | + sorted_weekly_audit_counts = dict(sorted(weekly_audit_counts.items())) |
| 191 | + output = list(sorted_weekly_audit_counts.values()) |
| 192 | + return output |
0 commit comments