From c1f03498b6816dd2254e28c7c167e0cdc79827ca Mon Sep 17 00:00:00 2001 From: Fantix King Date: Tue, 19 Feb 2019 17:41:10 -0600 Subject: [PATCH 1/6] feat(uniqueKeys): handle unique key error --- sheepdog/errors.py | 4 ++ sheepdog/transactions/upload/__init__.py | 3 ++ sheepdog/transactions/upload/transaction.py | 44 ++++++++++++++++++++- 3 files changed, 49 insertions(+), 2 deletions(-) diff --git a/sheepdog/errors.py b/sheepdog/errors.py index 6b140172..693f0430 100644 --- a/sheepdog/errors.py +++ b/sheepdog/errors.py @@ -23,3 +23,7 @@ def __init__(self, file_id): file_id ) self.code = 400 + + +class HandledIntegrityError(Exception): + pass diff --git a/sheepdog/transactions/upload/__init__.py b/sheepdog/transactions/upload/__init__.py index 24cf8556..60facf19 100644 --- a/sheepdog/transactions/upload/__init__.py +++ b/sheepdog/transactions/upload/__init__.py @@ -13,6 +13,7 @@ from sheepdog import auth from sheepdog import utils from sheepdog.errors import ParsingError, SchemaError, UnsupportedError, UserError +from sheepdog.errors import HandledIntegrityError from sheepdog.globals import FLAG_IS_ASYNC, PROJECT_SEED from sheepdog.transactions.upload.transaction import ( BulkUploadTransaction, @@ -31,6 +32,8 @@ def single_transaction_worker(transaction, *doc_args): transaction.flush() transaction.post_validate() transaction.commit() + except HandledIntegrityError: + pass except UserError as e: transaction.record_user_error(e) raise diff --git a/sheepdog/transactions/upload/transaction.py b/sheepdog/transactions/upload/transaction.py index 4afbb69c..4d3eba82 100644 --- a/sheepdog/transactions/upload/transaction.py +++ b/sheepdog/transactions/upload/transaction.py @@ -2,16 +2,18 @@ Define the ``UploadTransaction`` class. """ +import re from collections import Counter # Validating Entity Existence in dbGaP from datamodelutils import validators +from sqlalchemy.exc import IntegrityError from sqlalchemy.orm.attributes import flag_modified from sheepdog.auth import dbgap from sheepdog import models from sheepdog import utils -from sheepdog.errors import UserError +from sheepdog.errors import UserError, HandledIntegrityError from sheepdog.globals import ( case_cache_enabled, TX_LOG_STATE_ERRORED, @@ -23,6 +25,10 @@ from sheepdog.transactions.upload.entity_factory import UploadEntityFactory +KEYS_REGEXP = re.compile(r"_props ->> '([^']+)+'::text") +VALUES_REGEXP = re.compile(r"=\(([^\(\)]+)\)") + + class UploadTransaction(TransactionBase): """ An UploadTransaction should be used as a context manager. This way, we can @@ -172,7 +178,41 @@ def flush(self): """ for entity in self.valid_entities: entity.flush_to_session() - self.session.flush() + try: + self.session.flush() + except IntegrityError as e: + # don't handle non-unique constraint errors + if 'duplicate key value violates unique constraint' not in e.message: + raise + values = VALUES_REGEXP.findall(e.message) + if not values: + raise + values = [v.strip() for v in values[0].split(',')] + keys = KEYS_REGEXP.findall(e.message) + if len(keys) == len(values): + values = dict(zip(keys, values)) + entities = [] + label = None + for en in self.valid_entities: + for k, v in values.items(): + if getattr(en.node, k, None) != v: + break + else: + if label and label != en.node.label: + break + entities.append(en) + label = en.node.label + else: + for entity in entities: + entity.record_error( + '{} with {} already exists in the GDC' + .format(entity.node.label, values), + keys=keys, + ) + if entities: + raise HandledIntegrityError() + self.record_error('{} already exists in the GDC'.format(values)) + raise HandledIntegrityError() @property def status_code(self): From 530c4028a12be10601a6c3589bff1ea02208e086 Mon Sep 17 00:00:00 2001 From: Fantix King Date: Tue, 19 Feb 2019 23:25:10 -0600 Subject: [PATCH 2/6] Black --- sheepdog/transactions/upload/transaction.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/sheepdog/transactions/upload/transaction.py b/sheepdog/transactions/upload/transaction.py index 4d3eba82..8b45ef93 100644 --- a/sheepdog/transactions/upload/transaction.py +++ b/sheepdog/transactions/upload/transaction.py @@ -182,12 +182,12 @@ def flush(self): self.session.flush() except IntegrityError as e: # don't handle non-unique constraint errors - if 'duplicate key value violates unique constraint' not in e.message: + if "duplicate key value violates unique constraint" not in e.message: raise values = VALUES_REGEXP.findall(e.message) if not values: raise - values = [v.strip() for v in values[0].split(',')] + values = [v.strip() for v in values[0].split(",")] keys = KEYS_REGEXP.findall(e.message) if len(keys) == len(values): values = dict(zip(keys, values)) @@ -205,13 +205,14 @@ def flush(self): else: for entity in entities: entity.record_error( - '{} with {} already exists in the GDC' - .format(entity.node.label, values), + "{} with {} already exists in the GDC".format( + entity.node.label, values + ), keys=keys, ) if entities: raise HandledIntegrityError() - self.record_error('{} already exists in the GDC'.format(values)) + self.record_error("{} already exists in the GDC".format(values)) raise HandledIntegrityError() @property From be89f96189f04bd592117560463a22dff804c10b Mon Sep 17 00:00:00 2001 From: Fantix King Date: Tue, 19 Feb 2019 23:56:44 -0600 Subject: [PATCH 3/6] Fix test --- .../datadictwithobjid/submission/test_upload.py | 4 ++-- tests/integration/datadictwithobjid/submission/utils.py | 7 ++++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/tests/integration/datadictwithobjid/submission/test_upload.py b/tests/integration/datadictwithobjid/submission/test_upload.py index 90862654..de564abe 100644 --- a/tests/integration/datadictwithobjid/submission/test_upload.py +++ b/tests/integration/datadictwithobjid/submission/test_upload.py @@ -654,7 +654,7 @@ def test_data_file_update_url_invalid_id( assert new_url not in document.urls # response - assert_negative_response(resp) + assert_negative_response(resp, on_entity=False) assert_single_entity_from_response(resp) @@ -868,7 +868,7 @@ def test_data_file_update_url_id_provided_different_file_already_indexed( assert new_url not in different_file_matching_hash_and_size.urls # response - assert_negative_response(resp) + assert_negative_response(resp, on_entity=False) assert_single_entity_from_response(resp) diff --git a/tests/integration/datadictwithobjid/submission/utils.py b/tests/integration/datadictwithobjid/submission/utils.py index ac964396..b852a2e3 100644 --- a/tests/integration/datadictwithobjid/submission/utils.py +++ b/tests/integration/datadictwithobjid/submission/utils.py @@ -96,13 +96,14 @@ def assert_positive_response(resp): assert resp.json["success"] is True -def assert_negative_response(resp): +def assert_negative_response(resp, on_entity=True): assert resp.status_code != 200, resp.data entities = resp.json["entities"] # check if at least one entity has an error - entity_errors = [entity["errors"] for entity in entities if entity["errors"]] - assert entity_errors + if on_entity: + entity_errors = [entity["errors"] for entity in entities if entity["errors"]] + assert entity_errors assert resp.json["success"] is False From 31486462f152e09ae73003c5a187cbaec613ec34 Mon Sep 17 00:00:00 2001 From: Fantix King Date: Wed, 17 Apr 2019 15:55:59 -0500 Subject: [PATCH 4/6] add test for race condition --- .../datadict/submission/test_endpoints.py | 80 ++++++++++++++++++- 1 file changed, 77 insertions(+), 3 deletions(-) diff --git a/tests/integration/datadict/submission/test_endpoints.py b/tests/integration/datadict/submission/test_endpoints.py index 0b94c1a7..62e2eb77 100644 --- a/tests/integration/datadict/submission/test_endpoints.py +++ b/tests/integration/datadict/submission/test_endpoints.py @@ -5,22 +5,25 @@ # pylint: disable=superfluous-parens # pylint: disable=no-member import contextlib +import csv import json import os import uuid -import csv from StringIO import StringIO import boto import pytest +from datamodelutils import models as md from flask import g from moto import mock_s3 -from datamodelutils import models as md +from sheepdog.errors import HandledIntegrityError +from sheepdog.globals import ROLES from sheepdog.transactions.upload import UploadTransaction +from sheepdog.utils import get_external_proxies +from sheepdog.utils.transforms import TSVToJSONConverter from tests.integration.datadict.submission.utils import data_fnames - #: Do we have a cache case setting and should we do it? CACHE_CASES = False BLGSP_PATH = '/v0/submission/CGCI/BLGSP/' @@ -815,3 +818,74 @@ def test_submit_export_encoding(client, pg_driver, cgci_blgsp, submitter): path, headers=submitter) assert len(r.json) == 1 + + +def test_duplicate_submission(app, pg_driver, cgci_blgsp, submitter): + """ + Make sure that concurrent transactions don't cause duplicate submission. + """ + + data = { + "type": "experiment", + "submitter_id": "BLGSP-71-06-00019", + "projects.id": "daa208a7-f57a-562c-a04a-7a7c77542c98" + } + + # convert to TSV (save to file) + file_path = os.path.join( + os.path.dirname(os.path.realpath(__file__)), + "data/experiment_tmp.tsv" + ) + with open(file_path, "w") as f: + dw = csv.DictWriter(f, sorted(data.keys()), delimiter="\t") + dw.writeheader() + dw.writerow(data) + + # read the TSV data + data = None + with open(file_path, "r") as f: + data = f.read() + os.remove(file_path) # clean up (delete file) + assert data + + program, project = BLGSP_PATH.split('/')[3:5] + tsv_data = TSVToJSONConverter().convert(data)[0] + doc_args = [None, 'tsv', data, tsv_data] + utx1, utx2 = [UploadTransaction( + program=program, + project=project, + role=ROLES['UPDATE'], + logger=app.logger, + flask_config=app.config, + signpost=app.signpost, + external_proxies=get_external_proxies(), + db_driver=pg_driver, + ) for _ in range(2)] + with pg_driver.session_scope(can_inherit=False) as s1, utx1: + utx1.parse_doc(*doc_args) + with pg_driver.session_scope(can_inherit=False) as s2, utx2: + utx2.parse_doc(*doc_args) + + with pg_driver.session_scope(session=s2): + utx2.flush() + + with pg_driver.session_scope(session=s2): + utx2.post_validate() + + with pg_driver.session_scope(session=s2): + utx2.commit() + + try: + with pg_driver.session_scope(session=s1): + utx1.flush() + + with pg_driver.session_scope(session=s1): + utx1.post_validate() + + with pg_driver.session_scope(session=s1): + utx1.commit() + except HandledIntegrityError: + pass + + with pg_driver.session_scope(): + assert pg_driver.nodes(md.Experiment).count() == 1 From eccaa1d4f8512cb2c8b5f7325533d9c3ecd938cc Mon Sep 17 00:00:00 2001 From: Fantix King Date: Wed, 17 Apr 2019 16:59:04 -0500 Subject: [PATCH 5/6] fix pylint issues --- sheepdog/transactions/upload/transaction.py | 3 +- .../datadict/submission/test_endpoints.py | 40 ++++++++++--------- 2 files changed, 23 insertions(+), 20 deletions(-) diff --git a/sheepdog/transactions/upload/transaction.py b/sheepdog/transactions/upload/transaction.py index 8b45ef93..39f39e34 100644 --- a/sheepdog/transactions/upload/transaction.py +++ b/sheepdog/transactions/upload/transaction.py @@ -202,7 +202,8 @@ def flush(self): break entities.append(en) label = en.node.label - else: + else: # pylint: disable=useless-else-on-loop + # https://github.com/PyCQA/pylint/pull/2760 for entity in entities: entity.record_error( "{} with {} already exists in the GDC".format( diff --git a/tests/integration/datadict/submission/test_endpoints.py b/tests/integration/datadict/submission/test_endpoints.py index 62e2eb77..cfeab2cf 100644 --- a/tests/integration/datadict/submission/test_endpoints.py +++ b/tests/integration/datadict/submission/test_endpoints.py @@ -861,31 +861,33 @@ def test_duplicate_submission(app, pg_driver, cgci_blgsp, submitter): external_proxies=get_external_proxies(), db_driver=pg_driver, ) for _ in range(2)] - with pg_driver.session_scope(can_inherit=False) as s1, utx1: - utx1.parse_doc(*doc_args) - with pg_driver.session_scope(can_inherit=False) as s2, utx2: - utx2.parse_doc(*doc_args) + with pg_driver.session_scope(can_inherit=False) as s1: + with utx1: + utx1.parse_doc(*doc_args) + with pg_driver.session_scope(can_inherit=False) as s2: + with utx2: + utx2.parse_doc(*doc_args) - with pg_driver.session_scope(session=s2): - utx2.flush() + with pg_driver.session_scope(session=s2): + utx2.flush() - with pg_driver.session_scope(session=s2): - utx2.post_validate() + with pg_driver.session_scope(session=s2): + utx2.post_validate() - with pg_driver.session_scope(session=s2): - utx2.commit() + with pg_driver.session_scope(session=s2): + utx2.commit() - try: - with pg_driver.session_scope(session=s1): - utx1.flush() + try: + with pg_driver.session_scope(session=s1): + utx1.flush() - with pg_driver.session_scope(session=s1): - utx1.post_validate() + with pg_driver.session_scope(session=s1): + utx1.post_validate() - with pg_driver.session_scope(session=s1): - utx1.commit() - except HandledIntegrityError: - pass + with pg_driver.session_scope(session=s1): + utx1.commit() + except HandledIntegrityError: + pass with pg_driver.session_scope(): assert pg_driver.nodes(md.Experiment).count() == 1 From e81bed5bcbfe77be092fdfeb3ddbbde8a53145bc Mon Sep 17 00:00:00 2001 From: Fantix King Date: Fri, 15 Feb 2019 12:06:44 -0600 Subject: [PATCH 6/6] CRF: remove GDC * bump deps version --- requirements.txt | 4 ++-- sheepdog/transactions/upload/transaction.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/requirements.txt b/requirements.txt index fc493d14..0f9e72ff 100644 --- a/requirements.txt +++ b/requirements.txt @@ -23,7 +23,7 @@ more-itertools==5.0.0 -e git+https://git@github.com/uc-cdis/authutils.git@3.0.1#egg=authutils -e git+https://git@github.com/uc-cdis/cdis_oauth2client.git@0.1.3#egg=cdis_oauth2client cdispyutils==0.2.13 --e git+https://git@github.com/uc-cdis/datamodelutils.git@0.4.1#egg=datamodelutils +datamodelutils==0.4.3 psqlgraph==1.2.3 -e git+https://git@github.com/NCI-GDC/signpost.git@v1.1#egg=signpost # required for gdcdatamodel, not required for sheepdog @@ -31,7 +31,7 @@ psqlgraph==1.2.3 -e git+https://git@github.com/uc-cdis/cdislogging.git@master#egg=cdislogging -e git+https://git@github.com/NCI-GDC/cdisutils.git@f54e393c89939b2200dfae45c6235cbe2bae1206#egg=cdisutils -e git+https://git@github.com/uc-cdis/datadictionary.git@0.2.1#egg=gdcdictionary -gdcdatamodel==1.3.11 +gdcdatamodel==1.3.12 -e git+https://git@github.com/uc-cdis/indexclient.git@1.5.7#egg=indexclient -e git+https://git@github.com/NCI-GDC/python-signpostclient.git@ca686f55772e9a7f839b4506090e7d2bb0de5f15#egg=signpostclient -e git+https://git@github.com/uc-cdis/storage-client.git@0.1.7#egg=storageclient diff --git a/sheepdog/transactions/upload/transaction.py b/sheepdog/transactions/upload/transaction.py index 39f39e34..06999f63 100644 --- a/sheepdog/transactions/upload/transaction.py +++ b/sheepdog/transactions/upload/transaction.py @@ -206,14 +206,14 @@ def flush(self): # https://github.com/PyCQA/pylint/pull/2760 for entity in entities: entity.record_error( - "{} with {} already exists in the GDC".format( + "{} with {} already exists".format( entity.node.label, values ), keys=keys, ) if entities: raise HandledIntegrityError() - self.record_error("{} already exists in the GDC".format(values)) + self.record_error("{} already exists".format(values)) raise HandledIntegrityError() @property