|
1 | 1 | # pylint: disable=R0916,R0912,C0121
|
2 | 2 | # Standard Library
|
3 | 3 | import logging
|
4 |
| -from datetime import datetime |
| 4 | +from datetime import datetime, timedelta |
5 | 5 | from typing import List
|
6 | 6 |
|
7 | 7 | # Third Party
|
8 |
| -from sqlalchemy import and_, extract, func, or_ |
| 8 | +from sqlalchemy import and_, extract, func, or_, union |
| 9 | +from sqlalchemy.engine import Row |
9 | 10 | from sqlalchemy.orm import Session
|
10 | 11 |
|
11 | 12 | # First Party
|
@@ -450,6 +451,7 @@ def get_rule_findings_count_by_status(db_connection: Session, rule_pack_versions
|
450 | 451 | "clarification_required": 0,
|
451 | 452 | "total_findings_count": 0
|
452 | 453 | }
|
| 454 | + |
453 | 455 | for status_count in status_counts:
|
454 | 456 | rule_count_dict[status_count[0]]["total_findings_count"] += status_count[2]
|
455 | 457 | if status_count[1] == FindingStatus.NOT_ANALYZED or status_count[1] is None:
|
@@ -640,3 +642,160 @@ def delete_findings_by_vcs_instance_id(db_connection: Session, vcs_instance_id:
|
640 | 642 | model.vcs_instance.DBVcsInstance.id_ == vcs_instance_id) \
|
641 | 643 | .delete(synchronize_session=False)
|
642 | 644 | db_connection.commit()
|
| 645 | + |
| 646 | + |
| 647 | +def get_finding_audit_status_count_over_time(db_connection: Session, status: FindingStatus, weeks: int = 13) -> dict: |
| 648 | + """ |
| 649 | + Retrieve count of true positive findings over time for given weeks |
| 650 | + :param db_connection: |
| 651 | + Session of the database connection |
| 652 | + :param status: |
| 653 | + mandatory, status for which to get the audit counts over time |
| 654 | + :param weeks: |
| 655 | + optional, filter on last n weeks, default 13 |
| 656 | + :return: true_positive_count_over_time |
| 657 | + list of rows containing finding statuses count over time per week |
| 658 | + """ |
| 659 | + all_tables = [] |
| 660 | + for week in range(0, weeks): |
| 661 | + last_nth_week_date_time = datetime.utcnow() - timedelta(weeks=week) |
| 662 | + query = db_connection.query(extract('year', last_nth_week_date_time).label("year"), |
| 663 | + extract('week', last_nth_week_date_time).label("week"), |
| 664 | + model.DBVcsInstance.provider_type.label("provider_type"), |
| 665 | + func.count(model.DBaudit.id_).label("finding_count") |
| 666 | + ) |
| 667 | + max_audit_subquery = db_connection.query(func.max(model.DBaudit.id_).label("audit_id")) \ |
| 668 | + .filter(extract('year', model.DBaudit.timestamp) == extract('year', last_nth_week_date_time)) \ |
| 669 | + .filter(extract('week', model.DBaudit.timestamp) <= extract('week', last_nth_week_date_time)) \ |
| 670 | + .group_by(model.DBaudit.finding_id).subquery() |
| 671 | + query = query.join(max_audit_subquery, max_audit_subquery.c.audit_id == model.DBaudit.id_) |
| 672 | + query = query.join(model.DBfinding, model.DBfinding.id_ == model.DBaudit.finding_id) |
| 673 | + query = query.join(model.DBbranch, model.DBbranch.id_ == model.DBfinding.branch_id) |
| 674 | + query = query.join(model.DBrepository, model.DBrepository.id_ == model.DBbranch.repository_id) |
| 675 | + query = query.join(model.DBVcsInstance, model.DBVcsInstance.id_ == model.DBrepository.vcs_instance) |
| 676 | + query = query.filter(model.DBaudit.status == status) |
| 677 | + query = query.group_by(model.DBVcsInstance.provider_type) |
| 678 | + |
| 679 | + all_tables.append(query) |
| 680 | + |
| 681 | + # union |
| 682 | + unioned_query = union(*all_tables) |
| 683 | + status_count_over_time = db_connection.execute(unioned_query).all() |
| 684 | + return status_count_over_time |
| 685 | + |
| 686 | + |
| 687 | +def get_finding_count_by_vcs_provider_over_time(db_connection: Session, weeks: int = 13) -> list[Row]: |
| 688 | + """ |
| 689 | + Retrieve count findings over time for given weeks |
| 690 | + :param db_connection: |
| 691 | + Session of the database connection |
| 692 | + :param weeks: |
| 693 | + optional, filter on last n weeks, default 13 |
| 694 | + :return: count_over_time |
| 695 | + list of rows containing finding count over time per week |
| 696 | + """ |
| 697 | + all_tables = [] |
| 698 | + for week in range(0, weeks): |
| 699 | + last_nth_week_date_time = datetime.utcnow() - timedelta(weeks=week) |
| 700 | + query = db_connection.query(extract('year', last_nth_week_date_time).label("year"), |
| 701 | + extract('week', last_nth_week_date_time).label("week"), |
| 702 | + model.DBVcsInstance.provider_type.label("provider_type"), |
| 703 | + func.count(model.DBfinding.id_).label("finding_count") |
| 704 | + ) |
| 705 | + max_base_scan = db_connection.query(func.max(model.DBscan.id_).label("scan_id"), |
| 706 | + model.DBscan.branch_id) \ |
| 707 | + .filter(extract('year', model.DBscan.timestamp) == extract('year', last_nth_week_date_time)) \ |
| 708 | + .filter(extract('week', model.DBscan.timestamp) <= extract('week', last_nth_week_date_time)) \ |
| 709 | + .filter(model.DBscan.scan_type == ScanType.BASE) \ |
| 710 | + .group_by(model.DBscan.branch_id).subquery() |
| 711 | + |
| 712 | + query = query.join(model.DBscanFinding, model.DBfinding.id_ == model.DBscanFinding.finding_id) |
| 713 | + query = query.join(model.DBscan, model.DBscan.id_ == model.DBscanFinding.scan_id) |
| 714 | + query = query.join(max_base_scan, and_(max_base_scan.c.branch_id == model.DBscan.branch_id, |
| 715 | + or_(model.DBscan.id_ == max_base_scan.c.scan_id, |
| 716 | + (and_(model.DBscan.id_ > max_base_scan.c.scan_id, |
| 717 | + model.DBscan.scan_type == ScanType.INCREMENTAL, |
| 718 | + extract('week', model.DBscan.timestamp) <= |
| 719 | + extract('week', last_nth_week_date_time), |
| 720 | + extract('year', model.DBscan.timestamp) == |
| 721 | + extract('year', last_nth_week_date_time))) |
| 722 | + ) |
| 723 | + ) |
| 724 | + ) |
| 725 | + query = query.join(model.DBbranch, model.DBbranch.id_ == model.DBscan.branch_id) |
| 726 | + query = query.join(model.DBrepository, model.DBrepository.id_ == model.DBbranch.repository_id) |
| 727 | + query = query.join(model.DBVcsInstance, model.DBVcsInstance.id_ == model.DBrepository.vcs_instance) |
| 728 | + query = query.group_by(model.DBVcsInstance.provider_type) |
| 729 | + |
| 730 | + all_tables.append(query) |
| 731 | + |
| 732 | + # union |
| 733 | + unioned_query = union(*all_tables) |
| 734 | + count_over_time = db_connection.execute(unioned_query).all() |
| 735 | + return count_over_time |
| 736 | + |
| 737 | + |
| 738 | +def get_un_triaged_finding_count_by_vcs_provider_over_time(db_connection: Session, weeks: int = 13) -> list[Row]: |
| 739 | + """ |
| 740 | + Retrieve count of un triaged findings over time for given weeks |
| 741 | + :param db_connection: |
| 742 | + Session of the database connection |
| 743 | + :param weeks: |
| 744 | + optional, filter on last n weeks, default 13 |
| 745 | + :return: count_over_time |
| 746 | + list of rows containing un triaged findings count over time per week |
| 747 | + """ |
| 748 | + all_tables = [] |
| 749 | + for week in range(0, weeks): |
| 750 | + last_nth_week_date_time = datetime.utcnow() - timedelta(weeks=week) |
| 751 | + query = db_connection.query(extract('year', last_nth_week_date_time).label("year"), |
| 752 | + extract('week', last_nth_week_date_time).label("week"), |
| 753 | + model.DBVcsInstance.provider_type.label("provider_type"), |
| 754 | + func.count(model.DBfinding.id_).label("finding_count") |
| 755 | + ) |
| 756 | + max_base_scan = db_connection.query(func.max(model.DBscan.id_).label("scan_id"), |
| 757 | + model.DBscan.branch_id) \ |
| 758 | + .filter(extract('year', model.DBscan.timestamp) == extract('year', last_nth_week_date_time)) \ |
| 759 | + .filter(extract('week', model.DBscan.timestamp) <= extract('week', last_nth_week_date_time)) \ |
| 760 | + .filter(model.DBscan.scan_type == ScanType.BASE) \ |
| 761 | + .group_by(model.DBscan.branch_id).subquery() |
| 762 | + |
| 763 | + max_audit_subquery = db_connection.query(model.DBaudit.finding_id, |
| 764 | + func.max(model.DBaudit.id_).label("audit_id")) \ |
| 765 | + .filter(extract('year', model.DBaudit.timestamp) == extract('year', last_nth_week_date_time)) \ |
| 766 | + .filter(extract('week', model.DBaudit.timestamp) <= extract('week', last_nth_week_date_time)) \ |
| 767 | + .group_by(model.DBaudit.finding_id).subquery() |
| 768 | + |
| 769 | + query = query.join(model.DBscanFinding, model.DBfinding.id_ == model.DBscanFinding.finding_id) |
| 770 | + query = query.join(model.DBscan, model.DBscan.id_ == model.DBscanFinding.scan_id) |
| 771 | + query = query.join(max_base_scan, and_(max_base_scan.c.branch_id == model.DBscan.branch_id, |
| 772 | + or_(model.DBscan.id_ == max_base_scan.c.scan_id, |
| 773 | + (and_(model.DBscan.id_ > max_base_scan.c.scan_id, |
| 774 | + model.DBscan.scan_type == ScanType.INCREMENTAL, |
| 775 | + extract('week', model.DBscan.timestamp) <= |
| 776 | + extract('week', last_nth_week_date_time), |
| 777 | + extract('year', model.DBscan.timestamp) == |
| 778 | + extract('year', last_nth_week_date_time))) |
| 779 | + ) |
| 780 | + ) |
| 781 | + ) |
| 782 | + query = query.join(model.DBbranch, model.DBbranch.id_ == model.DBscan.branch_id) |
| 783 | + query = query.join(model.DBrepository, model.DBrepository.id_ == model.DBbranch.repository_id) |
| 784 | + query = query.join(model.DBVcsInstance, model.DBVcsInstance.id_ == model.DBrepository.vcs_instance) |
| 785 | + |
| 786 | + query = query.join(max_audit_subquery, max_audit_subquery.c.finding_id == model.finding.DBfinding.id_, |
| 787 | + isouter=True) |
| 788 | + query = query.join(model.DBaudit, and_(model.audit.DBaudit.finding_id == model.finding.DBfinding.id_, |
| 789 | + model.audit.DBaudit.id_ == max_audit_subquery.c.audit_id), |
| 790 | + isouter=True) |
| 791 | + query = query.filter( |
| 792 | + or_(model.DBaudit.id_ == None, model.DBaudit.status == FindingStatus.NOT_ANALYZED)) # noqa: E711 |
| 793 | + |
| 794 | + query = query.group_by(model.DBVcsInstance.provider_type) |
| 795 | + |
| 796 | + all_tables.append(query) |
| 797 | + |
| 798 | + # union |
| 799 | + unioned_query = union(*all_tables) |
| 800 | + count_over_time = db_connection.execute(unioned_query).all() |
| 801 | + return count_over_time |
0 commit comments