From e63aec6e26d036472e96ac694fed3f94b4278d6d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arturo=20Filast=C3=B2?= Date: Thu, 1 Feb 2024 18:17:22 +0100 Subject: [PATCH] Refactor aggregation and measurement API to fastapi --- api/fastapi/dataapi/routers/aggregation.py | 375 ++++--- api/fastapi/dataapi/routers/measurements.py | 1047 ++++++++----------- api/ooniapi/measurements.py | 175 ++++ 3 files changed, 838 insertions(+), 759 deletions(-) create mode 100644 api/ooniapi/measurements.py diff --git a/api/fastapi/dataapi/routers/aggregation.py b/api/fastapi/dataapi/routers/aggregation.py index 635635ec..ce908995 100644 --- a/api/fastapi/dataapi/routers/aggregation.py +++ b/api/fastapi/dataapi/routers/aggregation.py @@ -3,33 +3,31 @@ The routes are mounted under /api """ -from datetime import datetime, timedelta -from typing import List, Any, Dict +from datetime import datetime, timedelta, date +from typing import List, Any, Dict, Optional, Union import logging -from flask import Blueprint -from flask import current_app, request, make_response, Response -from flask.json import jsonify +from fastapi import APIRouter, Depends, Query, Request +from fastapi.responses import Response +from pydantic import BaseModel +from typing_extensions import Annotated # debdeps: python3-sqlalchemy -from sqlalchemy import and_, select, sql, column - -from ooniapi.config import metrics -from ooniapi.database import query_click, query_click_one_row -from ooniapi.utils import jerror, convert_to_csv -from ooniapi.urlparams import ( +from sqlalchemy.sql.expression import and_, select, column, table +from sqlalchemy.sql.expression import table as sql_table +from sqlalchemy.sql.expression import text as sql_text + +from ..config import settings, metrics +from ..utils import ( + jerror, + convert_to_csv, commasplit, - param_asn_m, - param_date, - param_domain_m, - param_input_or_none, - param_lowercase_underscore, - param_probe_cc_m, - param_test_name_m, - param_uppercase, + query_click, + query_click_one_row, ) +from ..dependencies import ClickhouseClient, get_clickhouse_client -aggregation_blueprint = Blueprint("aggregation", "measurements") +router = APIRouter() log = logging.getLogger() @@ -76,7 +74,7 @@ def group_by_date(since, until, time_grain, cols, colnames, group_by): ) fun = gmap[time_grain] tcol = "measurement_start_day" # TODO: support dynamic axis names - cols.append(sql.text(f"{fun}(measurement_start_time) AS {tcol}")) + cols.append(sql_text(f"{fun}(measurement_start_time) AS {tcol}")) colnames.append(tcol) group_by.append(column(tcol)) return time_grain @@ -101,120 +99,160 @@ def add_axis(axis, cols, colnames, group_by): if axis == "blocking_type": # TODO: use blocking_type column t = "JSONExtractString(scores, 'analysis', 'blocking_type') AS blocking_type" - cols.append(sql.text(t)) + cols.append(sql_text(t)) else: validate_axis_name(axis) - cols.append(sql.text(axis)) + cols.append(sql_text(axis)) colnames.append(axis) group_by.append(column(axis)) -@aggregation_blueprint.route("/v1/aggregation") +class DBStats(BaseModel): + row_count: int + bytes: int + total_row_count: int + elapsed_seconds: float + + +class AggregationResult(BaseModel): + anomaly_count: int + confirmed_count: int + failure_count: int + ok_count: int + measurement_count: int + measurement_start_day: Optional[date] = None + blocking_type: Optional[str] = None + category_code: Optional[str] = None + domain: Optional[str] = None + input: Optional[str] = None + probe_cc: Optional[str] = None + probe_asn: Optional[str] = None + test_name: Optional[str] = None + + +class MeasurementAggregation(BaseModel): + v: int + dimension_count: int + db_stats: DBStats + result: List[AggregationResult] + + +@router.get("/v1/aggregation") @metrics.timer("get_aggregated") -def get_aggregated() -> Response: - """Aggregate counters data - --- - parameters: - - name: input - in: query - type: string - minLength: 3 - description: The input (for example a URL or IP address) to search measurements for - - name: domain - in: query - type: string - minLength: 3 - description: Domain to search measurements for, comma separated - - name: category_code - in: query - type: string - description: The category code from the citizenlab list - - name: probe_cc - in: query - type: string - description: Two letter capitalized country codes, comma separated - minLength: 2 - - name: probe_asn - in: query - type: string - description: Autonomous system number in the format ASxxx, comma separated - - name: test_name - in: query - type: string - description: Name of the tests, comma separated - - name: ooni_run_link_id - in: query - type: string - description: OONIRun descriptors comma separated - - name: since - in: query - type: string - description: >- - The start date of when measurements were run (ex. - "2016-10-20T10:30:00") - - name: until - in: query - type: string - description: >- - The end date of when measurement were run (ex. - "2016-10-20T10:30:00") - - name: time_grain - in: query - type: string - description: Time granularity. Used only when the X or Y axis represent time. - enum: - - hour - - day - - week - - month - - year - - auto - - name: axis_x - in: query - type: string - description: | - The dimension on the x axis e.g. measurement_start_day - - name: axis_y - in: query - type: string - description: | - The dimension on the y axis e.g. probe_cc - - name: format - in: query - type: string - description: | - Output format, JSON (default) or CSV - enum: - - JSON - - CSV - - name: download - in: query - type: boolean - description: If we should be triggering a file download - responses: - '200': - description: Returns aggregated counters - """ +async def get_measurements( + db: Annotated[ClickhouseClient, Depends(get_clickhouse_client)], + response: Response, + request: Request, + input: Annotated[ + Optional[str], + Query( + min_length=3, + description="The input (for example a URL or IP address) to search measurements for", + ), + ] = None, + domain: Annotated[ + Optional[str], + Query( + min_length=3, + description="Domain to search measurements for, comma separated", + ), + ] = None, + category_code: Annotated[ + Optional[str], + Query( + description="The category code from the citizenlab list", + regex=r"^[A-Z]+$", + ), + ] = None, + probe_cc: Annotated[ + Optional[str], + Query( + min_length=2, + description="Two letter capitalized country codes, comma separated", + ), + ] = None, + probe_asn: Annotated[ + Optional[str], + Query( + description="Autonomous system number in the format ASxxx, comma separated" + ), + ] = None, + test_name: Annotated[ + Optional[str], Query(description="Name of the tests, comma separated") + ] = None, + ooni_run_link_id: Annotated[ + Optional[str], Query(description="OONIRun descriptors comma separated") + ] = None, + since: Annotated[ + Optional[date], + Query( + description="""The start date of when measurements were run (ex. "2016-10-20T10:30:00")""" + ), + ] = None, + until: Annotated[ + Optional[date], + Query( + description="""The end date of when measurement were run (ex. "2016-10-20T10:30:00")""" + ), + ] = None, + time_grain: Annotated[ + Optional[str], + Query( + description="Time granularity. Used only when the X or Y axis represent time.", + enum=["hour", "day", "week", "month", "year", "auto"], + ), + ] = "auto", + axis_x: Annotated[ + Optional[str], + Query( + description="The dimension on the x axis e.g. measurement_start_day", + regex=r"^[a-z_]+$", + ), + ] = None, + axis_y: Annotated[ + Optional[str], + Query( + description="The dimension on the y axis e.g. probe_cc", + regex=r"^[a-z_]+$", + ), + ] = None, + format: Annotated[ + str, + Query( + description="Output format, JSON (default) or CSV", + enum=["JSON", "CSV", "json", "csv"], + ), + ] = "json", + download: Annotated[ + Optional[bool], Query(description="If we should be triggering a file download") + ] = False, +): # TODO(art): figure out how to define either CSV or JSON data format in the response + """Aggregate counters data""" # TODO: # better split of large dimensions in output? # add limit and warn user - param = request.args.get + test_name_s = [] + if test_name: + test_name_s = commasplit(test_name) + domain_s = [] + if domain: + domain_s = set(commasplit(domain)) + probe_asn_s = [] + if probe_asn: + probe_asn_s = commasplit(probe_asn) + probe_cc_s = [] + if probe_cc: + probe_cc_s = commasplit(probe_cc) + + if since: + since = datetime.combine(since, datetime.min.time()) + if until: + until = datetime.combine(until, datetime.min.time()) + + inp = input or "" try: - axis_x = param_lowercase_underscore("axis_x") - axis_y = param_lowercase_underscore("axis_y") - category_code = param_uppercase("category_code") - test_name_s = param_test_name_m("test_name") - domain_s = param_domain_m() - inp = param_input_or_none() or "" - probe_asn_s = param_asn_m() - probe_cc_s = param_probe_cc_m() - ooni_run_link_id_raw = param("ooni_run_link_id") - since = param_date("since") - until = param_date("until") - time_grain = param("time_grain", "auto").lower() - - resp_format = param("format", "JSON").upper() - download = param("download", "").lower() == "true" + ooni_run_link_id_raw = ooni_run_link_id + resp_format = format.upper() assert resp_format in ("JSON", "CSV") if axis_x is not None: @@ -236,60 +274,60 @@ def get_aggregated() -> Response: "measurement_count", ] cols = [ - sql.text( + sql_text( "countIf(anomaly = 't' AND confirmed = 'f' AND msm_failure = 'f') AS anomaly_count" ), - sql.text("countIf(confirmed = 't' AND msm_failure = 'f') AS confirmed_count"), - sql.text("countIf(msm_failure = 't') AS failure_count"), - sql.text( + sql_text("countIf(confirmed = 't' AND msm_failure = 'f') AS confirmed_count"), + sql_text("countIf(msm_failure = 't') AS failure_count"), + sql_text( "countIf(anomaly = 'f' AND confirmed = 'f' AND msm_failure = 'f') AS ok_count" ), - sql.text("COUNT(*) AS measurement_count"), + sql_text("COUNT(*) AS measurement_count"), ] - table = sql.table("fastpath") + table = sql_table("fastpath") where = [] query_params: Dict[str, Any] = {} if domain_s: - where.append(sql.text("domain IN :domains")) + where.append(sql_text("domain IN :domains")) query_params["domains"] = domain_s if inp: - where.append(sql.text("input = :input")) + where.append(sql_text("input = :input")) query_params["input"] = inp if category_code: - where.append(sql.text("citizenlab.category_code = :category_code")) + where.append(sql_text("citizenlab.category_code = :category_code")) query_params["category_code"] = category_code if probe_cc_s: - where.append(sql.text("(citizenlab.cc IN :lccs OR citizenlab.cc = 'ZZ')")) + where.append(sql_text("(citizenlab.cc IN :lccs OR citizenlab.cc = 'ZZ')")) query_params["lccs"] = [cc.lower() for cc in probe_cc_s] else: - where.append(sql.text("citizenlab.cc = 'ZZ'")) + where.append(sql_text("citizenlab.cc = 'ZZ'")) if probe_cc_s: - where.append(sql.text("probe_cc IN :probe_cc_s")) + where.append(sql_text("probe_cc IN :probe_cc_s")) query_params["probe_cc_s"] = probe_cc_s if probe_asn_s: - where.append(sql.text("probe_asn IN :probe_asn_s")) + where.append(sql_text("probe_asn IN :probe_asn_s")) query_params["probe_asn_s"] = probe_asn_s if ooni_run_link_id_raw: ooni_run_link_id_s = commasplit(ooni_run_link_id_raw) - where.append(sql.text("ooni_run_link_id IN :ooni_run_link_id_s")) + where.append(sql_text("ooni_run_link_id IN :ooni_run_link_id_s")) query_params["ooni_run_link_id_s"] = ooni_run_link_id_s if since: - where.append(sql.text("measurement_start_time >= :since")) + where.append(sql_text("measurement_start_time >= :since")) query_params["since"] = since if until: - where.append(sql.text("measurement_start_time < :until")) + where.append(sql_text("measurement_start_time < :until")) query_params["until"] = until if test_name_s: - where.append(sql.text("test_name IN :test_name_s")) + where.append(sql_text("test_name IN :test_name_s")) query_params["test_name_s"] = test_name_s group_by: List = [] @@ -312,12 +350,12 @@ def get_aggregated() -> Response: # Join in the citizenlab table if we need to filter on category_code # or perform group-by on it table = table.join( - sql.table("citizenlab"), - sql.text("citizenlab.url = fastpath.input"), + sql_table("citizenlab"), + sql_text("citizenlab.url = fastpath.input"), ) where_expr = and_(*where) - query = select(cols).where(where_expr).select_from(table) + query = select(cols).where(where_expr).select_from(table) # type: ignore # Add group-by for g in group_by: @@ -325,40 +363,45 @@ def get_aggregated() -> Response: try: if dimension_cnt > 0: - r: Any = list(query_click(query, query_params, query_prio=4)) + r: Any = list(query_click(db, query, query_params, query_prio=4)) else: - r = query_click_one_row(query, query_params, query_prio=4) + r = query_click_one_row(db, query, query_params, query_prio=4) - pq = current_app.click.last_query + pq = db.last_query + assert pq msg = f"Stats: {pq.progress.rows} {pq.progress.bytes} {pq.progress.total_rows} {pq.elapsed}" log.info(msg) + if cacheable: + response.headers["Cache-Control"] = f"max_age={3600 * 24}" + + headers = {} if resp_format == "CSV": csv_data = convert_to_csv(r) - response = make_response(csv_data) - response.headers["Content-Type"] = "text/csv" if download: - set_dload(response, "ooni-aggregate-data.csv") + headers[ + "Content-Disposition" + ] = f"attachment; filename=ooni-aggregate-data.csv" + + return Response(content=csv_data, media_type="text/csv", headers=headers) else: - resp_d = { - "v": 0, - "dimension_count": dimension_cnt, - "result": r, - "db_stats": { - "row_count": pq.progress.rows, - "bytes": pq.progress.bytes, - "total_row_count": pq.progress.total_rows, - "elapsed_seconds": pq.elapsed, - }, - } - response = jsonify(resp_d) if download: + headers[ + "Content-Disposition" + ] = f"attachment; filename=ooni-aggregate-data.csv" set_dload(response, "ooni-aggregate-data.json") - - if cacheable: - response.cache_control.max_age = 3600 * 24 - return response + return MeasurementAggregation( + v=0, + dimension_count=dimension_cnt, + db_stats=DBStats( + row_count=pq.progress.rows, + bytes=pq.progress.bytes, + total_row_count=pq.progress.total_rows, + elapsed_seconds=pq.elapsed, + ), + result=r, + ) except Exception as e: return jerror(str(e), v=0) diff --git a/api/fastapi/dataapi/routers/measurements.py b/api/fastapi/dataapi/routers/measurements.py index 0a397759..20965667 100644 --- a/api/fastapi/dataapi/routers/measurements.py +++ b/api/fastapi/dataapi/routers/measurements.py @@ -4,9 +4,8 @@ """ from datetime import datetime, timedelta -from dateutil.parser import parse as parse_date from pathlib import Path -from typing import Optional, Any, Dict +from typing import List, Optional, Any, Dict, Union import gzip import json import logging @@ -16,42 +15,38 @@ import ujson # debdeps: python3-ujson import urllib3 # debdeps: python3-urllib3 -from flask import current_app, request, make_response, abort, redirect, Response -from flask.json import jsonify -from werkzeug.exceptions import HTTPException, BadRequest +from fastapi import APIRouter, Depends, Query, HTTPException, Header, Request +from fastapi.responses import Response, JSONResponse +from pydantic import BaseModel +from typing_extensions import Annotated # debdeps: python3-sqlalchemy -from sqlalchemy import and_, text, select, sql, column +from sqlalchemy.sql.expression import and_, text, select, column +from sqlalchemy.sql.expression import text as sql_text +from sqlalchemy.sql.expression import table as sql_table from sqlalchemy.exc import OperationalError from psycopg2.extensions import QueryCanceledError # debdeps: python3-psycopg2 from urllib.request import urlopen from urllib.parse import urljoin, urlencode -from ooniapi.auth import role_required, get_account_id_or_none -from ooniapi.config import metrics -from ooniapi.utils import cachedjson, nocachejson, jerror -from ooniapi.database import query_click, query_click_one_row -from ooniapi.urlparams import ( - param_asn, - param_bool, - param_commasplit, - param_date, - param_input_or_none, - param_report_id, - param_report_id_or_none, - param_measurement_uid, +from ..config import settings, metrics +from ..utils import ( + jerror, + cachedjson, + commasplit, + query_click, + query_click_one_row, ) +from ..dependencies import ClickhouseClient, get_clickhouse_client -from flask import Blueprint - -api_msm_blueprint = Blueprint("msm_api", "measurements") +router = APIRouter() FASTPATH_MSM_ID_PREFIX = "temp-fid-" FASTPATH_SERVER = "fastpath.ooni.nu" FASTPATH_PORT = 8000 -log = logging.getLogger() +log = logging.getLogger(__name__) urllib_pool = urllib3.PoolManager() @@ -59,29 +54,26 @@ ostr = Optional[str] -class QueryTimeoutError(HTTPException): - code = 504 - description = "The database query timed out.\nTry changing the query parameters." - - -class MsmtNotFound(HTTPException): - code = 500 - description = "Measurement not found" +class MsmtNotFound(Exception): + pass +""" +TODO(art): do we care to have this redirect in place? @api_msm_blueprint.route("/") def show_apidocs(): - """Route to https://api.ooni.io/api/ to /apidocs/""" + Route to https://api.ooni.io/api/ to /apidocs/ return redirect("/apidocs") +""" -@api_msm_blueprint.route("/v1/files") -def list_files() -> Response: +@router.get("/v1/files", tags=["files"]) +def list_files() -> JSONResponse: """List files - unsupported""" return cachedjson("1d", msg="not implemented") -def measurement_uid_to_s3path_linenum(measurement_uid: str): +def measurement_uid_to_s3path_linenum(db: ClickhouseClient, measurement_uid: str): # TODO: cleanup this query = """SELECT s3path, linenum FROM jsonl PREWHERE (report_id, input) IN ( @@ -89,9 +81,9 @@ def measurement_uid_to_s3path_linenum(measurement_uid: str): ) LIMIT 1""" query_params = dict(uid=measurement_uid) - lookup = query_click_one_row(sql.text(query), query_params, query_prio=3) + lookup = query_click_one_row(db, sql_text(query), query_params, query_prio=3) if lookup is None: - raise MsmtNotFound + raise HTTPException(status_code=500, detail="Measurement not found") s3path = lookup["s3path"] linenum = lookup["linenum"] @@ -99,30 +91,18 @@ def measurement_uid_to_s3path_linenum(measurement_uid: str): @metrics.timer("get_measurement") -@api_msm_blueprint.route("/v1/measurement/") -def get_measurement(measurement_uid) -> Response: +@router.get("/v1/measurement/{measurement_uid}") +def get_measurement( + db: Annotated[ClickhouseClient, Depends(get_clickhouse_client)], + measurement_uid: str, + download: bool = False, +) -> Response: """Get one measurement by measurement_id, Returns only the measurement without extra data from the database - --- - parameters: - - name: measurement_uid - in: path - required: true - type: string - - name: download - in: query - type: boolean - description: triggers a file download - responses: - '200': - description: Returns the JSON blob for the specified measurement """ - log = current_app.logger assert measurement_uid - param = request.args.get - download = param("download", "").lower() == "true" try: - s3path, linenum = measurement_uid_to_s3path_linenum(measurement_uid) + s3path, linenum = measurement_uid_to_s3path_linenum(db, measurement_uid) except MsmtNotFound: return jerror("Incorrect or inexistent measurement_uid") @@ -133,13 +113,13 @@ def get_measurement(measurement_uid) -> Response: log.error(f"Failed to fetch file {s3path} from S3") return jerror("Incorrect or inexistent measurement_uid") - resp = make_response(body) - resp.mimetype = "application/json" - resp.cache_control.max_age = 3600 + headers = {"Cache-Control": "max-age=3600"} if download: - set_dload(resp, f"ooni_measurement-{measurement_uid}.json") + headers[ + "Content-Disposition" + ] = f"attachment; filename=ooni_measurement-{measurement_uid}.json" - return resp + return Response(content=body, media_type="application/json", headers=headers) # # Fetching measurement bodies @@ -150,9 +130,7 @@ def _fetch_jsonl_measurement_body_from_s3( s3path: str, linenum: int, ) -> bytes: - log = current_app.logger - bucket_name = current_app.config["S3_BUCKET_NAME"] - baseurl = f"https://{bucket_name}.s3.amazonaws.com/" + baseurl = f"https://{settings.s3_bucket_name}.s3.amazonaws.com/" url = urljoin(baseurl, s3path) log.info(f"Fetching {url}") r = urlopen(url) @@ -164,12 +142,12 @@ def _fetch_jsonl_measurement_body_from_s3( raise MsmtNotFound -def report_id_input_to_s3path_linenum(report_id: str, input: str): +def report_id_input_to_s3path_linenum(db: ClickhouseClient, report_id: str, input: str): query = """SELECT s3path, linenum FROM jsonl PREWHERE report_id = :report_id AND input = :inp LIMIT 1""" query_params = dict(inp=input, report_id=report_id) - lookup = query_click_one_row(sql.text(query), query_params, query_prio=3) + lookup = query_click_one_row(db, sql_text(query), query_params, query_prio=3) if lookup is None: m = f"Missing row in jsonl table: {report_id} {input}" @@ -184,7 +162,10 @@ def report_id_input_to_s3path_linenum(report_id: str, input: str): @metrics.timer("_fetch_jsonl_measurement_body_clickhouse") def _fetch_jsonl_measurement_body_clickhouse( - report_id: str, input: Optional[str], measurement_uid: Optional[str] + db: ClickhouseClient, + report_id: str, + input: Optional[str], + measurement_uid: Optional[str], ) -> Optional[bytes]: """ Fetch jsonl from S3, decompress it, extract single msmt @@ -192,22 +173,23 @@ def _fetch_jsonl_measurement_body_clickhouse( # TODO: switch to _fetch_measurement_body_by_uid if measurement_uid is not None: try: - s3path, linenum = measurement_uid_to_s3path_linenum(measurement_uid) + s3path, linenum = measurement_uid_to_s3path_linenum(db, measurement_uid) except MsmtNotFound: log.error(f"Measurement {measurement_uid} not found in jsonl") return None else: + inp = input or "" # NULL/None input is stored as '' try: - inp = input or "" # NULL/None input is stored as '' - s3path, linenum = report_id_input_to_s3path_linenum(report_id, inp) + s3path, linenum = report_id_input_to_s3path_linenum(db, report_id, inp) except Exception: log.error(f"Measurement {report_id} {inp} not found in jsonl") return None try: log.debug(f"Fetching file {s3path} from S3") - return _fetch_jsonl_measurement_body_from_s3(s3path, linenum) + # TODO(arturo): remove ignore once https://github.com/jsocol/pystatsd/pull/184 lands + return _fetch_jsonl_measurement_body_from_s3(s3path, linenum) # type: ignore except Exception: # pragma: no cover log.error(f"Failed to fetch file {s3path} from S3") return None @@ -243,16 +225,17 @@ def _fetch_measurement_body_on_disk_by_msmt_uid(msmt_uid: str) -> Optional[bytes return ujson.dumps(body).encode() -def _fetch_measurement_body_by_uid(msmt_uid: str) -> bytes: +def _fetch_measurement_body_by_uid(db: ClickhouseClient, msmt_uid: str) -> bytes: """Fetch measurement body from either disk or jsonl on S3""" log.debug(f"Fetching body for UID {msmt_uid}") body = _fetch_measurement_body_on_disk_by_msmt_uid(msmt_uid) if body is not None: - return body + # TODO(arturo): remove ignore once https://github.com/jsocol/pystatsd/pull/184 lands + return body # type: ignore log.debug(f"Fetching body for UID {msmt_uid} from jsonl on S3") - s3path, linenum = measurement_uid_to_s3path_linenum(msmt_uid) - return _fetch_jsonl_measurement_body_from_s3(s3path, linenum) + s3path, linenum = measurement_uid_to_s3path_linenum(db, msmt_uid) + return _fetch_jsonl_measurement_body_from_s3(s3path, linenum) # type: ignore @metrics.timer("_fetch_measurement_body_from_hosts") @@ -271,7 +254,7 @@ def _fetch_measurement_body_from_hosts(msmt_uid: str) -> Optional[bytes]: log.info("Error", exc_info=True) return None - for hostname in current_app.config["OTHER_COLLECTORS"]: + for hostname in settings.other_collectors: url = urljoin(f"https://{hostname}/measurement_spool/", path) log.debug(f"Attempt to load {url}") try: @@ -295,7 +278,7 @@ def _fetch_measurement_body_from_hosts(msmt_uid: str) -> Optional[bytes]: @metrics.timer("fetch_measurement_body") def _fetch_measurement_body( - report_id: str, input: Optional[str], measurement_uid: str + db: ClickhouseClient, report_id: str, input: Optional[str], measurement_uid: str ) -> bytes: """Fetch measurement body from either: - local measurement spool dir (.post files) @@ -309,6 +292,7 @@ def _fetch_measurement_body( # 20210124T210009Z_webconnectivity_VE_22313_n1_Ojb new_format = u_count == 5 and measurement_uid + fresh = False if new_format: ts = (datetime.utcnow() - timedelta(hours=1)).strftime("%Y%m%d%H%M") fresh = measurement_uid > ts @@ -319,25 +303,28 @@ def _fetch_measurement_body( _fetch_measurement_body_on_disk_by_msmt_uid(measurement_uid) or _fetch_measurement_body_from_hosts(measurement_uid) or _fetch_jsonl_measurement_body_clickhouse( - report_id, input, measurement_uid + db, report_id, input, measurement_uid ) ) elif new_format and not fresh: body = ( - _fetch_jsonl_measurement_body_clickhouse(report_id, input, measurement_uid) + _fetch_jsonl_measurement_body_clickhouse( + db, report_id, input, measurement_uid + ) or _fetch_measurement_body_on_disk_by_msmt_uid(measurement_uid) or _fetch_measurement_body_from_hosts(measurement_uid) ) else: body = _fetch_jsonl_measurement_body_clickhouse( - report_id, input, measurement_uid + db, report_id, input, measurement_uid ) if body: metrics.incr("msmt_body_found") - return body + # TODO(arturo): remove ignore once https://github.com/jsocol/pystatsd/pull/184 lands + return body # type: ignore metrics.incr("msmt_body_not_found") raise MsmtNotFound @@ -345,56 +332,56 @@ def _fetch_measurement_body( def genurl(path: str, **kw) -> str: """Generate absolute URL for the API""" - base = current_app.config["BASE_URL"] + base = settings.base_url return urljoin(base, path) + "?" + urlencode(kw) -@api_msm_blueprint.route("/v1/raw_measurement") +@router.get("/v1/raw_measurement") @metrics.timer("get_raw_measurement") -def get_raw_measurement() -> Response: +async def get_raw_measurement( + db: Annotated[ClickhouseClient, Depends(get_clickhouse_client)], + report_id: Annotated[ + Optional[str], + Query(description="The report_id to search measurements for", min_length=3), + ] = None, + input: Annotated[ + Optional[str], + Query( + description="The input (for example a URL or IP address) to search measurements for", + min_length=3, + ), + ] = None, + measurement_uid: Annotated[ + Optional[str], + Query( + description="The measurement_uid to search measurements for", min_length=3 + ), + ] = None, +) -> Response: """Get raw measurement body by report_id + input - --- - parameters: - - name: report_id - in: query - type: string - description: The report_id to search measurements for - - name: input - in: query - type: string - minLength: 3 - description: The input (for example a URL or IP address) to search measurements for - - name: measurement_uid - in: query - type: string - description: The measurement_uid to search measurements for responses: '200': description: raw measurement body, served as JSON file to be dowloaded """ # This is used by Explorer to let users download msmts - try: - msmt_uid = param_measurement_uid() + if measurement_uid: # TODO: uid_cleanup - msmt_meta = _get_measurement_meta_by_uid(msmt_uid) - except Exception: - report_id = param_report_id() - param = request.args.get - input_ = param("input") + msmt_meta = _get_measurement_meta_by_uid(db, measurement_uid) + elif report_id: # _fetch_measurement_body needs the UID - msmt_meta = _get_measurement_meta_clickhouse(report_id, input_) + msmt_meta = _get_measurement_meta_clickhouse(db, report_id, input) + else: + raise Exception("Either report_id or measurement_uid must be provided") + body = "{}" if msmt_meta: + # TODO(arturo): remove ignore once https://github.com/jsocol/pystatsd/pull/184 lands body = _fetch_measurement_body( - msmt_meta["report_id"], msmt_meta["input"], msmt_meta["measurement_uid"] + db, msmt_meta["report_id"], msmt_meta["input"], msmt_meta["measurement_uid"] # type: ignore ) - resp = make_response(body) - else: - resp = make_response({}) - resp.headers.set("Content-Type", "application/json") - resp.cache_control.max_age = 24 * 3600 - return resp + headers = {"Cache-Control": f"max-age={24*3600}"} + return Response(content=body, media_type="application/json", headers=headers) def format_msmt_meta(msmt_meta: dict) -> dict: @@ -418,7 +405,9 @@ def format_msmt_meta(msmt_meta: dict) -> dict: @metrics.timer("get_measurement_meta_clickhouse") -def _get_measurement_meta_clickhouse(report_id: str, input_: Optional[str]) -> dict: +def _get_measurement_meta_clickhouse( + db: ClickhouseClient, report_id: str, input_: Optional[str] +) -> dict: # Given report_id + input, fetch measurement data from fastpath table query = "SELECT * FROM fastpath " if input_ is None: @@ -433,7 +422,7 @@ def _get_measurement_meta_clickhouse(report_id: str, input_: Optional[str]) -> d """ query_params = dict(input=input_, report_id=report_id) query += "LIMIT 1" - msmt_meta = query_click_one_row(sql.text(query), query_params, query_prio=3) + msmt_meta = query_click_one_row(db, sql_text(query), query_params, query_prio=3) if not msmt_meta: return {} # measurement not found if msmt_meta["probe_asn"] == 0: @@ -445,14 +434,14 @@ def _get_measurement_meta_clickhouse(report_id: str, input_: Optional[str]) -> d @metrics.timer("get_measurement_meta_by_uid") -def _get_measurement_meta_by_uid(measurement_uid: str) -> dict: +def _get_measurement_meta_by_uid(db: ClickhouseClient, measurement_uid: str) -> dict: query = """SELECT * FROM fastpath LEFT OUTER JOIN citizenlab ON citizenlab.url = fastpath.input WHERE measurement_uid = :uid LIMIT 1 """ query_params = dict(uid=measurement_uid) - msmt_meta = query_click_one_row(sql.text(query), query_params, query_prio=3) + msmt_meta = query_click_one_row(db, sql_text(query), query_params, query_prio=3) if not msmt_meta: return {} # measurement not found if msmt_meta["probe_asn"] == 0: @@ -463,103 +452,81 @@ def _get_measurement_meta_by_uid(measurement_uid: str) -> dict: return format_msmt_meta(msmt_meta) -@api_msm_blueprint.route("/v1/measurement_meta") +class MeasurementMeta(BaseModel): + anomaly: bool + confirmed: bool + category_code: str + failure: bool + input: str + probe_asn: int + probe_cc: str + raw_measurement: str + report_id: str + scores: str + test_name: str + test_start_time: datetime + + +@router.get("/v1/measurement_meta") @metrics.timer("get_measurement_meta") -def get_measurement_meta() -> Response: - """Get metadata on one measurement by measurement_uid or report_id + input - --- - produces: - - application/json - parameters: - - name: measurement_uid - in: query - type: string - description: The measurement ID, mutually exclusive with report_id + input - - name: report_id - in: query - type: string - description: The report_id to search measurements for - example: 20210208T162755Z_ndt_DZ_36947_n1_8swgXi7xNuRUyO9a - - name: input - in: query - type: string - description: The input (for example a URL or IP address) to search measurements for - - name: full - in: query - type: boolean - description: Include JSON measurement data - responses: - 200: - description: Returns measurement metadata, optionally including the raw measurement body - schema: - type: object - properties: - anomaly: - type: boolean - category_code: - type: string - confirmed: - type: boolean - failure: - type: boolean - input: - type: string - measurement_start_time: - type: string - probe_asn: - type: integer - probe_cc: - type: string - raw_measurement: - type: string - report_id: - type: string - scores: - type: string - test_name: - type: string - test_start_time: - type: string - example: { - "anomaly": false, - "confirmed": false, - "failure": false, - "input": null, - "measurement_start_time": "2021-02-08T23:31:46Z", - "probe_asn": 36947, - "probe_cc": "DZ", - "report_id": "20210208T162755Z_ndt_DZ_36947_n1_8swgXi7xNuRUyO9a", - "scores": "{}", - "test_name": "ndt", - "test_start_time": "2021-02-08T23:31:43Z" - } - """ +async def get_measurement_meta( + db: Annotated[ClickhouseClient, Depends(get_clickhouse_client)], + response: Response, + measurement_uid: Annotated[ + Optional[str], + Query( + description="The measurement ID, mutually exclusive with report_id + input", + min_length=3, + ), + ] = None, + report_id: Annotated[ + Optional[str], + Query( + description=( + "The report_id to search measurements for example: " + "20210208T162755Z_ndt_DZ_36947_n1_8swgXi7xNuRUyO9a" + ), + min_length=3, + ), + ] = None, + input: Annotated[ + Optional[str], + Query( + description="The input (for example a URL or IP address) to search measurements for", + min_length=3, + ), + ] = None, + full: Annotated[bool, Query(description="Include JSON measurement data")] = False, +) -> MeasurementMeta: + """Get metadata on one measurement by measurement_uid or report_id + input""" # TODO: input can be '' or NULL in the fastpath table - fix it # TODO: see integ tests for TODO items - param = request.args.get - full = param("full", "").lower() in ("true", "1", "yes") - try: - msmt_uid = param_measurement_uid() - log.info(f"get_measurement_meta {msmt_uid}") - msmt_meta = _get_measurement_meta_by_uid(msmt_uid) - except Exception: - report_id = param_report_id() - input_ = param_input_or_none() - log.info(f"get_measurement_meta {report_id} {input_}") - msmt_meta = _get_measurement_meta_clickhouse(report_id, input_) + if measurement_uid: + log.info(f"get_measurement_meta {measurement_uid}") + msmt_meta = _get_measurement_meta_by_uid(db, measurement_uid) + elif report_id: + log.info(f"get_measurement_meta {report_id} {input}") + msmt_meta = _get_measurement_meta_clickhouse(db, report_id, input) + else: + raise Exception("Either report_id or measurement_uid must be provided") assert isinstance(msmt_meta, dict) if not full: - return cachedjson("1m", **msmt_meta) + response.headers["Cache-Control"] = f"max-age=60" + return MeasurementMeta(**msmt_meta) if msmt_meta == {}: # measurement not found - return cachedjson("1m", raw_measurement="", **msmt_meta) + response.headers["Cache-Control"] = f"max-age=60" + return MeasurementMeta( + raw_measurement="", + **msmt_meta, + ) try: # TODO: uid_cleanup body = _fetch_measurement_body( - msmt_meta["report_id"], msmt_meta["input"], msmt_meta["measurement_uid"] + db, msmt_meta["report_id"], msmt_meta["input"], msmt_meta["measurement_uid"] ) assert isinstance(body, bytes) body = body.decode() @@ -567,184 +534,201 @@ def get_measurement_meta() -> Response: log.error(e, exc_info=True) body = "" - return cachedjson("1m", raw_measurement=body, **msmt_meta) + response.headers["Cache-Control"] = f"max-age=60" + return MeasurementMeta( + raw_measurement=body, + **msmt_meta, + ) # # Listing measurements -@api_msm_blueprint.route("/v1/measurements") +# TODO(art): Isn't this the same as the above MeasurementMeta? Check it +class MeasurementMeta2(BaseModel): + measurement_url: str + anomaly: Optional[bool] = None + confirmed: Optional[bool] = None + failure: Optional[bool] = None + input: Optional[str] = None + measurement_start_time: Optional[datetime] = None + measurement_uid: Optional[str] = None + probe_asn: Optional[str] = None + probe_cc: Optional[str] = None + report_id: Optional[str] = None + scores: Optional[dict] = None + test_name: Optional[str] = None + + +class ResultsMetadata(BaseModel): + count: int + current_page: int + limit: int + next_url: Optional[str] + offset: int + pages: int + query_time: float + + +class MeasurementList(BaseModel): + metadata: ResultsMetadata + results: List[MeasurementMeta2] + + +@router.get("/v1/measurements") @metrics.timer("list_measurements") -def list_measurements() -> Response: - """Search for measurements using only the database. Provide pagination. - --- - parameters: - - name: report_id - in: query - type: string - description: Report_id to search measurements for - - name: input - in: query - type: string - minLength: 3 # `input` is handled by pg_trgm - description: Input (for example a URL or IP address) to search measurements for - - name: domain - in: query - type: string - minLength: 3 - description: Domain to search measurements for - - name: probe_cc - in: query - type: string - description: Two letter country code - - name: probe_asn - in: query - type: string - description: Autonomous system number in the format "ASXXX" - - name: test_name - in: query - type: string - description: Name of the test - - name: category_code - in: query - type: string - description: Category code from the citizenlab list - - name: since - in: query - type: string - description: >- - Start date of when measurements were run (ex. - "2016-10-20T10:30:00") - - name: until - in: query - type: string - description: >- - End date of when measurement were run (ex. - "2016-10-20T10:30:00") - - - name: confirmed - in: query - type: string - description: | - Set "true" for confirmed network anomalies (we found a blockpage, a middlebox, etc.). - Default: no filtering (show both true and false) - - - name: anomaly - in: query - type: string - description: | - Set "true" for measurements that require special attention (likely to be a case of blocking) - Default: no filtering (show both true and false) - - - name: failure - in: query - type: string - description: | - Set "true" for failed measurements (the control request failed, there was a bug, etc.). - Default: no filtering (show both true and false) - - - name: software_version - in: query - type: string - description: Filter measurements by software version. Comma-separated. - - - name: test_version - in: query - type: string - description: Filter measurements by test version. Comma-separated. - - - name: engine_version - in: query - type: string - description: Filter measurements by engine version. Comma-separated. - - - name: ooni_run_link_id - in: query - type: string - description: Filter measurements by OONIRun ID. - - - name: order_by - in: query - type: string - description: 'By which key the results should be ordered by (default: `null`)' - enum: - - test_start_time - - measurement_start_time - - input - - probe_cc - - probe_asn - - test_name - - name: order - in: query - type: string - description: |- - If the order should be ascending or descending (one of: `asc` or `desc`) - enum: - - asc - - desc - - ASC - - DESC - - name: offset - in: query - type: integer - description: 'Offset into the result set (default: 0)' - - name: limit - in: query - type: integer - description: 'Number of records to return (default: 100)' - responses: - '200': - description: Returns the list of measurement IDs for the specified criteria - schema: - $ref: "#/definitions/MeasurementList" - """ +async def list_measurements( + db: Annotated[ClickhouseClient, Depends(get_clickhouse_client)], + response: Response, + request: Request, + report_id: Annotated[ + Optional[str], + Query(description="Report_id to search measurements for", min_length=3), + ] = None, + input: Annotated[ + Optional[str], + Query( + description="Input (for example a URL or IP address) to search measurements for", + min_length=3, + ), + ] = None, + domain: Annotated[ + Optional[str], + Query(description="Domain to search measurements for", min_length=3), + ] = None, + probe_cc: Annotated[ + Optional[str], Query(description="Two letter country code") + ] = None, + probe_asn: Annotated[ + Union[str, int, None], + Query(description='Autonomous system number in the format "ASXXX"'), + ] = None, + test_name: Annotated[Optional[str], Query(description="Name of the test")] = None, + category_code: Annotated[ + Optional[str], Query(description="Category code from the citizenlab list") + ] = None, + since: Annotated[ + Optional[datetime], + Query( + description='Start date of when measurements were run (ex. "2016-10-20T10:30:00")' + ), + ] = None, + until: Annotated[ + Optional[datetime], + Query( + description='End date of when measurement were run (ex. "2016-10-20T10:30:00")' + ), + ] = None, + confirmed: Annotated[ + Optional[bool], + Query( + description=( + 'Set "true" for confirmed network anomalies (we found a blockpage, a middlebox, etc.). ' + "Default: no filtering (show both true and false)" + ) + ), + ] = None, + anomaly: Annotated[ + Optional[bool], + Query( + description=( + 'Set "true" for measurements that require special attention (likely to be a case of blocking).' + "Default: no filtering (show both true and false)" + ) + ), + ] = None, + failure: Annotated[ + Optional[bool], + Query( + description=( + 'Set "true" for failed measurements (the control request failed, there was a bug, etc.). ' + "Default: no filtering (show both true and false)" + ) + ), + ] = None, + software_version: Annotated[ + Optional[str], + Query(description="Filter measurements by software version. Comma-separated."), + ] = None, + test_version: Annotated[ + Optional[str], + Query(description="Filter measurements by test version. Comma-separated."), + ] = None, + engine_version: Annotated[ + Optional[str], + Query(description="Filter measurements by engine version. Comma-separated."), + ] = None, + ooni_run_link_id: Annotated[ + Optional[str], Query(description="Filter measurements by OONIRun ID.") + ] = None, + order_by: Annotated[ + Optional[str], + Query( + description="By which key the results should be ordered by (default: `null`)", + enum=[ + "test_start_time", + "measurement_start_time", + "input", + "probe_cc", + "probe_asn", + "test_name", + ], + ), + ] = None, + order: Annotated[ + str, + Query( + description="If the order should be ascending or descending (one of: `asc` or `desc`)", + enum=["asc", "desc", "ASC", "DESC"], + ), + ] = "asc", + offset: Annotated[ + int, Query(description="Offset into the result set (default: 0)") + ] = 0, + limit: Annotated[ + int, Query(description="Number of records to return (default: 100)") + ] = 100, + user_agent: Annotated[str | None, Header()] = None, +) -> MeasurementList: + """Search for measurements using only the database. Provide pagination.""" # x-code-samples: # - lang: 'curl' # source: | # curl "https://api.ooni.io/api/v1/measurements?probe_cc=IT&confirmed=true&since=2017-09-01" - param = request.args.get - report_id = param_report_id_or_none() - probe_asn = param_asn("probe_asn") # int / None - probe_cc = param("probe_cc") - test_name = param("test_name") - since = param_date("since") - until = param_date("until") - order_by = param("order_by") - order = param("order", "desc") - offset = int(param("offset", 0)) - limit = int(param("limit", 100)) - failure = param_bool("failure") - anomaly = param_bool("anomaly") - confirmed = param_bool("confirmed") - category_code = param("category_code") - software_versions = param_commasplit("software_version") - test_versions = param_commasplit("test_version") - engine_versions = param_commasplit("engine_version") - ooni_run_link_id = param("ooni_run_link_id") + if ( + probe_asn is not None + and isinstance(probe_asn, str) + and probe_asn.startswith("AS") + ): + probe_asn = int(probe_asn[2:]) + software_versions = None + if software_version: + software_versions = commasplit(software_version) + test_versions = None + if test_version: + test_versions = commasplit(test_version) + engine_versions = None + if engine_version: + engine_versions = commasplit(engine_version) # Workaround for https://github.com/ooni/probe/issues/1034 - user_agent = request.headers.get("User-Agent", "") - if user_agent.startswith("okhttp"): - bug_probe1034_response = { - "metadata": { - "count": 1, - "current_page": 1, - "limit": 100, - "next_url": None, - "offset": 0, - "pages": 1, - "query_time": 0.001, - }, - "results": [{"measurement_url": ""}], - } + if user_agent and user_agent.startswith("okhttp"): # Cannot be cached due to user_agent - return nocachejson(**bug_probe1034_response) + return MeasurementList( + metadata=ResultsMetadata( + count=1, + current_page=1, + limit=100, + next_url=None, + offset=0, + pages=1, + query_time=0.001, + ), + results=[MeasurementMeta2(measurement_url="")], + ) # # Prepare query parameters - input_ = request.args.get("input") - domain = request.args.get("domain") - # Set reasonable since/until ranges if not specified. try: if until is None: @@ -752,17 +736,17 @@ def list_measurements() -> Response: t = datetime.utcnow() + timedelta(days=1) until = datetime(t.year, t.month, t.day) except ValueError: - raise BadRequest("Invalid until") + raise HTTPException(status_code=400, detail="Invalid until") try: if since is None: if report_id is None and until is not None: since = until - timedelta(days=30) except ValueError: - raise BadRequest("Invalid since") + raise HTTPException(status_code=400, detail="Invalid since") if order.lower() not in ("asc", "desc"): - raise BadRequest("Invalid order") + raise HTTPException(status_code=400, detail="Invalid order") # # Perform query @@ -777,55 +761,61 @@ def list_measurements() -> Response: if since is not None: query_params["since"] = since - fpwhere.append(sql.text("measurement_start_time > :since")) + fpwhere.append(sql_text("measurement_start_time > :since")) if until is not None: query_params["until"] = until - fpwhere.append(sql.text("measurement_start_time <= :until")) + fpwhere.append(sql_text("measurement_start_time <= :until")) if report_id: query_params["report_id"] = report_id - fpwhere.append(sql.text("report_id = :report_id")) + fpwhere.append(sql_text("report_id = :report_id")) if probe_cc: if probe_cc == "ZZ": log.info("Refusing list_measurements with probe_cc set to ZZ") - abort(403) + raise HTTPException( + status_code=403, + detail="Refusing list_measurements with probe_cc set to ZZ", + ) query_params["probe_cc"] = probe_cc - fpwhere.append(sql.text("probe_cc = :probe_cc")) + fpwhere.append(sql_text("probe_cc = :probe_cc")) else: - fpwhere.append(sql.text("probe_cc != 'ZZ'")) + fpwhere.append(sql_text("probe_cc != 'ZZ'")) if probe_asn is not None: if probe_asn == 0: log.info("Refusing list_measurements with probe_asn set to 0") - abort(403) + raise HTTPException( + status_code=403, + detail="Refusing list_measurements with probe_asn set to 0", + ) query_params["probe_asn"] = probe_asn - fpwhere.append(sql.text("probe_asn = :probe_asn")) + fpwhere.append(sql_text("probe_asn = :probe_asn")) else: # https://ooni.org/post/2020-ooni-probe-asn-incident-report/ # https://github.com/ooni/explorer/issues/495 - fpwhere.append(sql.text("probe_asn != 0")) + fpwhere.append(sql_text("probe_asn != 0")) if test_name is not None: query_params["test_name"] = test_name - fpwhere.append(sql.text("test_name = :test_name")) + fpwhere.append(sql_text("test_name = :test_name")) if software_versions is not None: query_params["software_versions"] = software_versions - fpwhere.append(sql.text("software_version IN :software_versions")) + fpwhere.append(sql_text("software_version IN :software_versions")) if test_versions is not None: query_params["test_versions"] = test_versions - fpwhere.append(sql.text("test_version IN :test_versions")) + fpwhere.append(sql_text("test_version IN :test_versions")) if engine_versions is not None: query_params["engine_versions"] = engine_versions - fpwhere.append(sql.text("engine_version IN :engine_versions")) + fpwhere.append(sql_text("engine_version IN :engine_versions")) if ooni_run_link_id is not None: query_params["ooni_run_link_id"] = ooni_run_link_id - fpwhere.append(sql.text("ooni_run_link_id = :ooni_run_link_id")) + fpwhere.append(sql_text("ooni_run_link_id = :ooni_run_link_id")) # Filter on anomaly, confirmed and failure: # The database stores anomaly and confirmed as boolean + NULL and stores @@ -835,43 +825,43 @@ def list_measurements() -> Response: # See test_list_measurements_filter_flags_fastpath if anomaly is True: - fpwhere.append(sql.text("fastpath.anomaly = 't'")) + fpwhere.append(sql_text("fastpath.anomaly = 't'")) elif anomaly is False: - fpwhere.append(sql.text("fastpath.anomaly = 'f'")) + fpwhere.append(sql_text("fastpath.anomaly = 'f'")) if confirmed is True: - fpwhere.append(sql.text("fastpath.confirmed = 't'")) + fpwhere.append(sql_text("fastpath.confirmed = 't'")) elif confirmed is False: - fpwhere.append(sql.text("fastpath.confirmed = 'f'")) + fpwhere.append(sql_text("fastpath.confirmed = 'f'")) if failure is True: - fpwhere.append(sql.text("fastpath.msm_failure = 't'")) + fpwhere.append(sql_text("fastpath.msm_failure = 't'")) elif failure is False: - fpwhere.append(sql.text("fastpath.msm_failure = 'f'")) + fpwhere.append(sql_text("fastpath.msm_failure = 'f'")) - fpq_table = sql.table("fastpath") + fpq_table = sql_table("fastpath") - if input_: + if input: # input_ overrides domain and category_code - query_params["input"] = input_ - fpwhere.append(sql.text("input = :input")) + query_params["input"] = input + fpwhere.append(sql_text("input = :input")) elif domain or category_code: # both domain and category_code can be set at the same time if domain: query_params["domain"] = domain - fpwhere.append(sql.text("domain = :domain")) + fpwhere.append(sql_text("domain = :domain")) if category_code: query_params["category_code"] = category_code fpq_table = fpq_table.join( - sql.table("citizenlab"), - sql.text("citizenlab.url = fastpath.input"), + sql_table("citizenlab"), + sql_text("citizenlab.url = fastpath.input"), ) - fpwhere.append(sql.text("citizenlab.category_code = :category_code")) + fpwhere.append(sql_text("citizenlab.category_code = :category_code")) fp_query = select("*").where(and_(*fpwhere)).select_from(fpq_table) @@ -890,26 +880,26 @@ def list_measurements() -> Response: iter_start_time = time.time() try: - rows = query_click(query, query_params) + rows = query_click(db, query, query_params) results = [] for row in rows: msmt_uid = row["measurement_uid"] url = genurl("/api/v1/raw_measurement", measurement_uid=msmt_uid) results.append( - { - "measurement_uid": msmt_uid, - "measurement_url": url, - "report_id": row["report_id"], - "probe_cc": row["probe_cc"], - "probe_asn": "AS{}".format(row["probe_asn"]), - "test_name": row["test_name"], - "measurement_start_time": row["measurement_start_time"], - "input": row["input"], - "anomaly": row["anomaly"] == "t", - "confirmed": row["confirmed"] == "t", - "failure": row["msm_failure"] == "t", - "scores": json.loads(row["scores"]), - } + MeasurementMeta2( + measurement_uid=msmt_uid, + measurement_url=url, + report_id=row["report_id"], + probe_cc=row["probe_cc"], + probe_asn="AS{}".format(row["probe_asn"]), + test_name=row["test_name"], + measurement_start_time=row["measurement_start_time"], + input=row["input"], + anomaly=row["anomaly"] == "t", # TODO: This is wrong + confirmed=row["confirmed"] == "t", + failure=row["msm_failure"] == "t", + scores=json.loads(row["scores"]), + ) ) except OperationalError as exc: log.error(exc) @@ -917,14 +907,14 @@ def list_measurements() -> Response: # FIXME: this is a postgresql exception! # Timeout due to a slow query. Generate metric and do not feed it # to Sentry. - abort(504) + raise HTTPException(status_code=504) raise exc # Replace the special value INULL for "input" with None for i, r in enumerate(results): - if r["input"] == INULL: - results[i]["input"] = None + if r.input == INULL: + results[i].input = None pages = -1 count = -1 @@ -943,224 +933,95 @@ def list_measurements() -> Response: # pages = math.ceil(count / limit) # current_page = math.ceil(offset / limit) + 1 # query_time += time.time() - count_start_time - next_args = request.args.to_dict() + next_args = dict(request.query_params) next_args["offset"] = str(offset + limit) next_args["limit"] = str(limit) next_url = genurl("/api/v1/measurements", **next_args) query_time = time.time() - iter_start_time - metadata = { - "offset": offset, - "limit": limit, - "count": count, - "pages": pages, - "current_page": current_page, - "next_url": next_url, - "query_time": query_time, - } - return cachedjson("1m", metadata=metadata, results=results[:limit]) - - -def set_dload(resp, fname: str): - """Add header to make response downloadable""" - resp.headers["Content-Disposition"] = f"attachment; filename={fname}" + metadata = ResultsMetadata( + offset=offset, + limit=limit, + count=count, + pages=pages, + current_page=current_page, + next_url=next_url, + query_time=query_time, + ) + response.headers["Cache-Control"] = "max-age=60" + return MeasurementList(metadata=metadata, results=results[:limit]) -@api_msm_blueprint.route("/v1/torsf_stats") +@router.get("/v1/torsf_stats") @metrics.timer("get_torsf_stats") -def get_torsf_stats() -> Response: +async def get_torsf_stats( + db: Annotated[ClickhouseClient, Depends(get_clickhouse_client)], + response: Response, + probe_cc: Annotated[Optional[str], Query(description="Two letter country code")], + since: Annotated[ + Optional[datetime], + Query( + description='Start date of when measurements were run (ex. "2016-10-20T10:30:00")' + ), + ], + until: Annotated[ + Optional[datetime], + Query( + description='End date of when measurement were run (ex. "2016-10-20T10:30:00")' + ), + ], +) -> Response: """Tor Pluggable Transports statistics - Average / percentiles / total_count grouped by day - Either group-by or filter by probe_cc - Returns a format similar to get_aggregated - --- - parameters: - - name: probe_cc - in: query - type: string - description: The two letter country code - minLength: 2 - - name: since - in: query - type: string - description: >- - The start date of when measurements were run (ex. - "2016-10-20T10:30:00") - - name: until - in: query - type: string - description: >- - The end date of when measurement were run (ex. - "2016-10-20T10:30:00") + Average / percentiles / total_count grouped by day + Either group-by or filter by probe_cc + Returns a format similar to get_aggregated responses: - '200': - description: Returns aggregated counters + '200': + description: Returns aggregated counters """ - param = request.args.get - probe_cc = param("probe_cc") - since = param("since") - until = param("until") cacheable = False cols = [ - sql.text("toDate(measurement_start_time) AS measurement_start_day"), + sql_text("toDate(measurement_start_time) AS measurement_start_day"), column("probe_cc"), - sql.text("countIf(anomaly = 't') AS anomaly_count"), - sql.text("countIf(confirmed = 't') AS confirmed_count"), - sql.text("countIf(msm_failure = 't') AS failure_count"), + sql_text("countIf(anomaly = 't') AS anomaly_count"), + sql_text("countIf(confirmed = 't') AS confirmed_count"), + sql_text("countIf(msm_failure = 't') AS failure_count"), ] - table = sql.table("fastpath") - where = [sql.text("test_name = 'torsf'")] + table = sql_table("fastpath") + where = [sql_text("test_name = 'torsf'")] query_params: Dict[str, Any] = {} if probe_cc: - where.append(sql.text("probe_cc = :probe_cc")) + where.append(sql_text("probe_cc = :probe_cc")) query_params["probe_cc"] = probe_cc if since: - where.append(sql.text("measurement_start_time > :since")) - query_params["since"] = str(parse_date(since)) + where.append(sql_text("measurement_start_time > :since")) + query_params["since"] = since if until: - until_td = parse_date(until) - where.append(sql.text("measurement_start_time <= :until")) - query_params["until"] = str(until_td) - cacheable = until_td < datetime.now() - timedelta(hours=72) + where.append(sql_text("measurement_start_time <= :until")) + query_params["until"] = until + cacheable = until < datetime.now() - timedelta(hours=72) # Assemble query where_expr = and_(*where) - query = select(cols).where(where_expr).select_from(table) + query = select(cols).where(where_expr).select_from(table) # type: ignore query = query.group_by(column("measurement_start_day"), column("probe_cc")) query = query.order_by(column("measurement_start_day"), column("probe_cc")) try: - q = query_click(query, query_params) + q = query_click(db, query, query_params) result = [] for row in q: row = dict(row) row["anomaly_rate"] = row["anomaly_count"] / row["measurement_count"] result.append(row) - response = jsonify({"v": 0, "result": result}) if cacheable: - response.cache_control.max_age = 3600 * 24 - return response + response.headers["Cache-Control"] = f"max_age={3600 * 24}" + return Response({"v": 0, "result": result}) except Exception as e: return jerror(str(e), v=0) - - -# # measurement feedback - -from ooniapi.database import insert_click - - -""" -CREATE TABLE msmt_feedback -( - `measurement_uid` String, - `account_id` String, - `status` String, - `update_time` DateTime64(3) MATERIALIZED now64() -) -ENGINE = ReplacingMergeTree -ORDER BY (measurement_uid, account_id) -SETTINGS index_granularity = 4 -""" - -valid_feedback_status = [ - "blocked", - "blocked.blockpage", - "blocked.blockpage.http", - "blocked.blockpage.dns", - "blocked.blockpage.server_side", - "blocked.blockpage.server_side.captcha", - "blocked.dns", - "blocked.dns.inconsistent", - "blocked.dns.nxdomain", - "blocked.tcp", - "blocked.tls", - "ok", - "down", - "down.unreachable", - "down.misconfigured", -] - - -@api_msm_blueprint.route("/_/measurement_feedback/") -@metrics.timer("get_msmt_feedback") -def get_msmt_feedback(measurement_uid) -> Response: - """Get measurement for the curred logged user for a given measurement - --- - produces: - - application/json - parameters: - - name: measurement_uid - in: path - type: string - description: Measurement ID - minLength: 5 - required: true - responses: - 200: - description: status summary - """ - account_id = get_account_id_or_none() - query = """SELECT status, account_id = :aid AS is_mine, count() AS cnt - FROM msmt_feedback FINAL - WHERE measurement_uid = :muid - GROUP BY status, is_mine - """ - qp = dict(aid=account_id, muid=measurement_uid) - rows = query_click(sql.text(query), qp) - out: Dict[str, Any] = dict(summary={}) - for row in rows: - status = row["status"] - if row["is_mine"]: - out["user_feedback"] = status - out["summary"][status] = out["summary"].get(status, 0) + row["cnt"] - - return cachedjson("0s", **out) - - -@api_msm_blueprint.route("/_/measurement_feedback", methods=["POST"]) -@metrics.timer("submit_msmt_feedback") -@role_required(["admin", "user"]) -def submit_msmt_feedback() -> Response: - """Submit measurement feedback. Only for registered users. - --- - produces: - - application/json - consumes: - - application/json - parameters: - - in: body - required: true - schema: - type: object - properties: - measurement_uid: - type: string - description: Measurement ID - status: - type: string - description: Measurement status - minLength: 2 - responses: - 200: - description: Submission or update accepted - """ - - def jparam(name): - return request.json.get(name, "").strip() - - account_id = get_account_id_or_none() - status = jparam("status") - if status not in valid_feedback_status: - return jerror("Invalid status") - measurement_uid = jparam("measurement_uid") - - query = "INSERT INTO msmt_feedback (measurement_uid, account_id, status) VALUES" - query_params = [measurement_uid, account_id, status] - insert_click(query, [query_params]) - return cachedjson("0s") diff --git a/api/ooniapi/measurements.py b/api/ooniapi/measurements.py new file mode 100644 index 00000000..2c174374 --- /dev/null +++ b/api/ooniapi/measurements.py @@ -0,0 +1,175 @@ +""" +Measurements API +The routes are mounted under /api +""" + +from datetime import datetime, timedelta +from dateutil.parser import parse as parse_date +from pathlib import Path +from typing import Optional, Any, Dict +import gzip +import json +import logging +import math +import time + +import ujson # debdeps: python3-ujson +import urllib3 # debdeps: python3-urllib3 + +from flask import current_app, request, make_response, abort, redirect, Response +from flask.json import jsonify +from werkzeug.exceptions import HTTPException, BadRequest + +# debdeps: python3-sqlalchemy +from sqlalchemy import and_, text, select, sql, column +from sqlalchemy.exc import OperationalError +from psycopg2.extensions import QueryCanceledError # debdeps: python3-psycopg2 + +from urllib.request import urlopen +from urllib.parse import urljoin, urlencode + +from ooniapi.auth import role_required, get_account_id_or_none +from ooniapi.config import metrics +from ooniapi.utils import cachedjson, nocachejson, jerror +from ooniapi.database import query_click, query_click_one_row +from ooniapi.urlparams import ( + param_asn, + param_bool, + param_commasplit, + param_date, + param_input_or_none, + param_report_id, + param_report_id_or_none, + param_measurement_uid, +) + +from flask import Blueprint + +api_msm_blueprint = Blueprint("msm_api", "measurements") + +FASTPATH_MSM_ID_PREFIX = "temp-fid-" +FASTPATH_SERVER = "fastpath.ooni.nu" +FASTPATH_PORT = 8000 + +log = logging.getLogger() + +urllib_pool = urllib3.PoolManager() + +# type hints +ostr = Optional[str] + + +# # measurement feedback + +from ooniapi.database import insert_click + + +""" +CREATE TABLE msmt_feedback +( + `measurement_uid` String, + `account_id` String, + `status` String, + `update_time` DateTime64(3) MATERIALIZED now64() +) +ENGINE = ReplacingMergeTree +ORDER BY (measurement_uid, account_id) +SETTINGS index_granularity = 4 +""" + +valid_feedback_status = [ + "blocked", + "blocked.blockpage", + "blocked.blockpage.http", + "blocked.blockpage.dns", + "blocked.blockpage.server_side", + "blocked.blockpage.server_side.captcha", + "blocked.dns", + "blocked.dns.inconsistent", + "blocked.dns.nxdomain", + "blocked.tcp", + "blocked.tls", + "ok", + "down", + "down.unreachable", + "down.misconfigured", +] + + +@api_msm_blueprint.route("/_/measurement_feedback/") +@metrics.timer("get_msmt_feedback") +def get_msmt_feedback(measurement_uid) -> Response: + """Get measurement for the curred logged user for a given measurement + --- + produces: + - application/json + parameters: + - name: measurement_uid + in: path + type: string + description: Measurement ID + minLength: 5 + required: true + responses: + 200: + description: status summary + """ + account_id = get_account_id_or_none() + query = """SELECT status, account_id = :aid AS is_mine, count() AS cnt + FROM msmt_feedback FINAL + WHERE measurement_uid = :muid + GROUP BY status, is_mine + """ + qp = dict(aid=account_id, muid=measurement_uid) + rows = query_click(sql.text(query), qp) + out: Dict[str, Any] = dict(summary={}) + for row in rows: + status = row["status"] + if row["is_mine"]: + out["user_feedback"] = status + out["summary"][status] = out["summary"].get(status, 0) + row["cnt"] + + return cachedjson("0s", **out) + + +@api_msm_blueprint.route("/_/measurement_feedback", methods=["POST"]) +@metrics.timer("submit_msmt_feedback") +@role_required(["admin", "user"]) +def submit_msmt_feedback() -> Response: + """Submit measurement feedback. Only for registered users. + --- + produces: + - application/json + consumes: + - application/json + parameters: + - in: body + required: true + schema: + type: object + properties: + measurement_uid: + type: string + description: Measurement ID + status: + type: string + description: Measurement status + minLength: 2 + responses: + 200: + description: Submission or update accepted + """ + + def jparam(name): + return request.json.get(name, "").strip() + + account_id = get_account_id_or_none() + status = jparam("status") + if status not in valid_feedback_status: + return jerror("Invalid status") + measurement_uid = jparam("measurement_uid") + + query = "INSERT INTO msmt_feedback (measurement_uid, account_id, status) VALUES" + query_params = [measurement_uid, account_id, status] + insert_click(query, [query_params]) + return cachedjson("0s")