Skip to content

Commit da10075

Browse files
authored
Merge pull request #493 from xchem/m2ms-1226-xca-uploader
Target upload tweaks for better cooperation with XCA uploader
2 parents e9b039d + 7e1c0d2 commit da10075

File tree

2 files changed

+49
-19
lines changed

2 files changed

+49
-19
lines changed

viewer/target_loader.py

+15-10
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ class UploadState(str, Enum):
7272
REPORTING = "REPORTING"
7373
SUCCESS = "SUCCESS"
7474
FAILED = "FAILED"
75+
CANCELED = "CANCELED"
7576

7677

7778
class Level(str, Enum):
@@ -150,13 +151,14 @@ def log(self, level: Level, message: str) -> None:
150151
self.stack.append(UploadReportEntry(level=level, message=message))
151152
self._update_task(message)
152153

153-
def final(self, archive_name):
154-
if self.upload_state == UploadState.PROCESSING:
155-
self.upload_state = UploadState.SUCCESS
156-
message = f"{archive_name} uploaded successfully."
154+
def final(self, message, upload_state=None):
155+
if upload_state:
156+
self.upload_state = upload_state
157157
else:
158-
self.upload_state = UploadState.FAILED
159-
message = f"Uploading {archive_name} failed."
158+
if self.upload_state == UploadState.PROCESSING:
159+
self.upload_state = UploadState.SUCCESS
160+
else:
161+
self.upload_state = UploadState.FAILED
160162

161163
self.stack.append(UploadReportEntry(message=message))
162164
self._update_task(self.json())
@@ -167,6 +169,7 @@ def json(self):
167169
def _update_task(self, message: str | list) -> None:
168170
if self.task:
169171
try:
172+
logger.debug("taskstuff %s", dir(self.task))
170173
self.task.update_state(
171174
state=self.upload_state,
172175
meta={
@@ -1134,7 +1137,9 @@ def process_metadata(
11341137
# moved this bit from init
11351138
self.target, target_created = Target.objects.get_or_create(
11361139
title=self.target_name,
1137-
display_name=self.target_name,
1140+
defaults={
1141+
"display_name": self.target_name,
1142+
},
11381143
)
11391144

11401145
# TODO: original target loader's function get_create_projects
@@ -1153,7 +1158,7 @@ def process_metadata(
11531158
# remove uploaded file
11541159
Path(self.bundle_path).unlink()
11551160
msg = f"{self.bundle_name} already uploaded, skipping."
1156-
self.report.log(Level.INFO, msg)
1161+
self.report.final(msg, upload_state=UploadState.CANCELED)
11571162
raise FileExistsError(msg)
11581163

11591164
if project_created and committer.pk == settings.ANONYMOUS_USER:
@@ -1596,7 +1601,7 @@ def load_target(
15961601
)
15971602
except IntegrityError as exc:
15981603
logger.error(exc, exc_info=True)
1599-
target_loader.report.final(target_loader.data_bundle)
1604+
target_loader.report.final(f"Uploading {target_loader.data_bundle} failed")
16001605
target_loader.experiment_upload.message = target_loader.report.json()
16011606
raise IntegrityError from exc
16021607

@@ -1618,7 +1623,7 @@ def load_target(
16181623

16191624
set_directory_permissions(target_loader.abs_final_path, 0o755)
16201625

1621-
target_loader.report.final(target_loader.data_bundle)
1626+
target_loader.report.final(f"{target_loader.data_bundle} uploaded successfully")
16221627
target_loader.experiment_upload.message = target_loader.report.json()
16231628

16241629
# logger.debug("%s", upload_report)

viewer/views.py

+34-9
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
import pandas as pd
1414
import pytz
15+
from celery import Celery
1516
from celery.result import AsyncResult
1617
from dateutil.parser import parse
1718
from django.conf import settings
@@ -1578,15 +1579,39 @@ def get(self, request, task_id, *args, **kwargs):
15781579

15791580
# task_id is (will be) a UUID, but Celery expects a string
15801581
task_id_str = str(task_id)
1581-
try:
1582-
result = AsyncResult(task_id_str)
1583-
except TimeoutError:
1584-
error = {'error': 'Task result query timed out'}
1585-
return Response(error, status=status.HTTP_408_REQUEST_TIMEOUT)
1586-
1587-
if result.state == 'PENDING':
1588-
error = {'error': 'Unknown task'}
1589-
return Response(error, status=status.HTTP_400_BAD_REQUEST)
1582+
1583+
celery_app = Celery("fragalysis")
1584+
inspect = celery_app.control.inspect()
1585+
ping = inspect.ping()
1586+
1587+
if ping:
1588+
active_tasks = inspect.active()
1589+
1590+
# active_tasks.values is a list of tasks for every worker
1591+
if task_id_str in [
1592+
k['id'] for worker in active_tasks.values() for k in worker
1593+
]:
1594+
# celery confirms task exists
1595+
try:
1596+
result = AsyncResult(task_id_str)
1597+
except TimeoutError:
1598+
error = {'error': 'Task result query timed out'}
1599+
return Response(error, status=status.HTTP_408_REQUEST_TIMEOUT)
1600+
else:
1601+
# no such task
1602+
error = {'error': 'Unknown task'}
1603+
return Response(error, status=status.HTTP_400_BAD_REQUEST)
1604+
1605+
else:
1606+
# task scheduler not running. This may be the case in local
1607+
# development, but this means there's really no way to
1608+
# validate whether the task exists
1609+
logger.warning('Celery not running!')
1610+
try:
1611+
result = AsyncResult(task_id_str)
1612+
except TimeoutError:
1613+
error = {'error': 'Task result query timed out'}
1614+
return Response(error, status=status.HTTP_408_REQUEST_TIMEOUT)
15901615

15911616
# Extract messages (from task info)
15921617
# Assuming the task has some info.

0 commit comments

Comments
 (0)