forked from dmwm/CRABServer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathRucioUtils.py
121 lines (106 loc) · 5.07 KB
/
RucioUtils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
""" a small set of utilities to work with Rucio used in various places """
import logging
from TaskWorker.WorkerExceptions import TaskWorkerException
from rucio.client import Client
from rucio.common.exception import RSENotFound, RuleNotFound
def getNativeRucioClient(config=None, logger=None):
"""
instantiates a Rucio python Client for use in CRAB TaskWorker
:param config: a TaskWorker configuration object in which
at least the variables used below are defined
:param logger: a valid logger instance
:return: a Rucio Client object
"""
logger.info("Initializing native Rucio client")
rucioLogger = logging.getLogger('RucioClient')
rucioLogger.setLevel(logging.INFO)
# silence a few noisy components used by rucio
ul = logging.getLogger('urllib3')
ul.setLevel(logging.ERROR)
dl = logging.getLogger('dogpile')
dl.setLevel(logging.ERROR)
cl = logging.getLogger('charset_normalizer')
cl.setLevel(logging.ERROR)
rucioCert = getattr(config.Services, "Rucio_cert", config.TaskWorker.cmscert)
rucioKey = getattr(config.Services, "Rucio_key", config.TaskWorker.cmskey)
logger.debug("Using cert [%s]\n and key [%s] for rucio client.", rucioCert, rucioKey)
nativeClient = Client(
rucio_host=config.Services.Rucio_host,
auth_host=config.Services.Rucio_authUrl,
ca_cert=config.Services.Rucio_caPath,
account=config.Services.Rucio_account,
creds={"client_cert": rucioCert, "client_key": rucioKey},
auth_type='x509',
logger=rucioLogger
)
ret = nativeClient.ping()
logger.info("Rucio server v.%s contacted", ret['version'])
ret = nativeClient.whoami()
logger.info("Rucio client initialized for %s in status %s", ret['account'], ret['status'])
return nativeClient
def getWritePFN(rucioClient=None, siteName='', lfn='', # pylint: disable=dangerous-default-value
operations=['third_party_copy_write', 'write'], logger=None):
"""
convert a single LFN into a PFN which can be used for Writing via Rucio
Rucio supports the possibility that at some point in the future sites may
require different protocols or hosts for read or write operations
:param rucioClient: Rucio python client, e.g. the object returned by getNativeRucioClient above
:param siteName: e.g. 'T2_CH_CERN'
:param lfn: a CMS-style LFN
:param logger: a valid logger instance
:return: a CMS-style PFN
"""
# add a scope to turn LFN into Rucio DID syntax
did = 'cms:' + lfn
# we prefer to do ASO via FTS which uses 3rd party copy, fall back to protocols defined
# for other operations in case that fails, order matters here !
# "third_party_copy_write": provides the PFN to be used with FTS
# "write": provides the PFN to be used with gfal
# 2022-08: dario checked with felipe that every sane RSE has non-zero value
# for the third_party_copy_write column, which means that it is available.
exceptionString = ""
didDict = None
for operation in operations:
try:
logger.warning('Try Rucio lfn2pn with operation %s', operation)
didDict = rucioClient.lfns2pfns(siteName, [did], operation=operation)
break
except RSENotFound:
msg = f"Site {siteName} not found in CMS site list"
raise TaskWorkerException(msg) from RSENotFound
except Exception as ex: # pylint: disable=broad-except
msg = 'Rucio lfn2pfn resolution for %s failed with:\n%s\nTry next one.'
logger.warning(msg, operation, str(ex))
exceptionString += f"operation: {operation}, exception: {ex}\n"
if not didDict:
msg = f"lfn2pfn resolution with Rucio failed for site: {siteName} LFN: {lfn}"
msg += f" with exception(s) :\n{exceptionString}"
raise TaskWorkerException(msg)
# lfns2pfns returns a dictionary with did as key and pfn as value:
# https://rucio.readthedocs.io/en/latest/api/rse.html
# {u'cms:/store/user/rucio': u'gsiftp://eoscmsftp.cern.ch:2811/eos/cms/store/user/rucio'}
pfn = didDict[did]
logger.info(f"Will use {pfn} as stageout location")
return pfn
def getRuleQuota(rucioClient=None, ruleId=None):
""" return quota needed by this rule in Bytes """
size = 0
try:
rule = rucioClient.get_replication_rule(ruleId)
except RuleNotFound:
return 0
files = rucioClient.list_files(scope=rule['scope'], name= rule['name'])
size = sum(file['bytes'] for file in files)
return size
def getTapeRecallUsage(rucioClient=None, account=None):
""" size of ongoing tape recalls for this account """
activity = 'Analysis Tape Recall'
rucioAccount = account
# for testing before Rahul's code is ready
activity = 'Analysis Input'
rucioAccount = 'crab_tape_recall'
rules = rucioClient.list_replication_rules(
filters={'account': rucioAccount, 'activity': activity})
usage = sum(getRuleQuota(rucioClient, rule['id']) for rule in rules\
if rule['state'] in ['REPLICATING', 'STUCK', 'SUSPENDED']) # in Bytes
return usage