Skip to content

Commit

Permalink
Merge pull request #13085 from bbockelm/xrootd_qmf_thread_safety
Browse files Browse the repository at this point in the history
Fix Xrootd thread-safety issue in quality metric factory
  • Loading branch information
davidlange6 committed Jan 31, 2016
2 parents a7e19cf + 556e64e commit 3c25366
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 23 deletions.
4 changes: 3 additions & 1 deletion Utilities/XrdAdaptor/plugins/XrdStorageMaker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ class MakerResponseHandler : public XrdCl::ResponseHandler
virtual void HandleResponse( XrdCl::XRootDStatus *status,
XrdCl::AnyObject *response )
{
if (response) delete response;
// Note: Prepare call has a response object.
delete response;
delete status;
}

};
Expand Down
26 changes: 15 additions & 11 deletions Utilities/XrdAdaptor/src/QualityMetric.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,25 +152,29 @@ QualityMetric::get()
return m_value;
}

QualityMetricFactory * QualityMetricFactory::m_instance = new QualityMetricFactory();

[[cms::thread_safe]] QualityMetricFactory QualityMetricFactory::m_instance;


std::unique_ptr<QualityMetricSource>
QualityMetricFactory::get(timespec now, const std::string &id)
{
MetricMap::const_iterator it = m_instance->m_sources.find(id);
QualityMetricUniqueSource *source;
if (it == m_instance->m_sources.end())
{
source = new QualityMetricUniqueSource(now);
m_instance->m_sources[id] = source;
}
else
auto itFound = m_instance.m_sources.find(id);
if (itFound == m_instance.m_sources.end())
{
source = it->second;
// try to make a new one
std::unique_ptr<QualityMetricUniqueSource> source(new QualityMetricUniqueSource(now));
auto insertResult = m_instance.m_sources.insert(std::make_pair(id, source.get()));
itFound = insertResult.first;
if (insertResult.second)
{ // Insert was successful; release our reference.
source.release();
} // Otherwise, we raced with a different thread and they won; we will delete our new QM source.
}
return source->newSource(now);
return itFound->second->newSource(now);
}


QualityMetricSource::QualityMetricSource(QualityMetricUniqueSource &parent, timespec now, int default_value)
: QualityMetric(now, default_value),
m_parent(parent)
Expand Down
6 changes: 3 additions & 3 deletions Utilities/XrdAdaptor/src/QualityMetric.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

#include <mutex>
#include <memory>
#include <unordered_map>

#include "tbb/concurrent_unordered_map.h"
#include <boost/utility.hpp>

#include "FWCore/Utilities/interface/propagate_const.h"
Expand Down Expand Up @@ -66,9 +66,9 @@ friend class Source;
static
std::unique_ptr<QualityMetricSource> get(timespec now, const std::string &id);

static QualityMetricFactory *m_instance;
[[cms::thread_safe]] static QualityMetricFactory m_instance;

typedef std::unordered_map<std::string, QualityMetricUniqueSource*> MetricMap;
typedef tbb::concurrent_unordered_map<std::string, QualityMetricUniqueSource*> MetricMap;
MetricMap m_sources;
};

Expand Down
7 changes: 4 additions & 3 deletions Utilities/XrdAdaptor/src/XrdRequest.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@

#include <atomic>
#include <iostream>

#include "FWCore/MessageLogger/interface/MessageLogger.h"
Expand All @@ -11,10 +12,10 @@ using namespace XrdAdaptor;
// If you define XRD_FAKE_ERROR, 1/5 read requests should fail.
#ifdef XRD_FAKE_ERROR
#define FAKE_ERROR_COUNTER 5
int g_fakeError = 0;
static std::atomic<int> g_fakeError {0};
#else
#define FAKE_ERROR_COUNTER 0
int g_fakeError = 0;
static std::atomic<int> g_fakeError {0};
#endif

XrdAdaptor::ClientRequest::~ClientRequest() {}
Expand Down Expand Up @@ -82,7 +83,7 @@ XrdAdaptor::ClientRequest::HandleResponse(XrdCl::XRootDStatus *stat, XrdCl::AnyO
m_promise.set_exception(std::current_exception());
edm::LogWarning("XrdAdaptorInternal") << "Caught a CMSSW exception when running connection recovery.";
}
catch (...)
catch (std::exception)
{
edm::Exception ex(edm::errors::FileReadError);
ex << "XrdRequestManager::handle(name='" << m_manager.getFilename()
Expand Down
7 changes: 6 additions & 1 deletion Utilities/XrdAdaptor/src/XrdRequestManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,16 @@ class SendMonitoringInfoHandler : boost::noncopyable, public XrdCl::ResponseHand
{
XrdCl::Buffer *buffer = nullptr;
response->Get(buffer);
response->Set(static_cast<int*>(nullptr));
delete buffer;
}
// Send Info has a response object; we must delete it.
delete response;
delete status;
}
};

SendMonitoringInfoHandler nullHandler;
[[cms::thread_safe]] SendMonitoringInfoHandler nullHandler;


static void
Expand Down Expand Up @@ -1065,6 +1069,7 @@ XrdAdaptor::RequestManager::OpenHandler::~OpenHandler()
void
XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts(XrdCl::XRootDStatus *status_ptr, XrdCl::AnyObject *, XrdCl::HostList *hostList_ptr)
{
// NOTE: as in XrdCl::File (synchronous), we ignore the response object.
// Make sure that we set m_outstanding_open to false on exit from this function.
std::unique_ptr<char, std::function<void(char*)>> outstanding_guard(nullptr, [&](char*){m_outstanding_open=false;});

Expand Down
10 changes: 6 additions & 4 deletions Utilities/XrdAdaptor/src/XrdSource.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#define _GLIBCXX_USE_NANOSLEEP
#include <thread>
#include <chrono>
#include <atomic>
#include <iostream>
#include <assert.h>
#include <netdb.h>
Expand All @@ -24,9 +25,9 @@
//#define XRD_DELAY 5140
#define XRD_DELAY 1000
#define XRD_SLOW_RATE 2
int g_delayCount = 0;
std::atomic<int> g_delayCount {0};
#else
int g_delayCount = 0;
std::atomic<int> g_delayCount {0};
#endif

using namespace XrdAdaptor;
Expand Down Expand Up @@ -59,14 +60,15 @@ class DelayedClose : boost::noncopyable, public XrdCl::ResponseHandler

virtual void HandleResponseWithHosts(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response, XrdCl::HostList *hostList) override
{
if (!status->IsOK())
if (status && !status->IsOK())
{

edm::LogWarning("XrdFileWarning") << "Source delayed close failed with error '" << status->ToStr()
<< "' (errno=" << status->errNo << ", code=" << status->code << ", server=" << m_id << ", site=" << m_site << ")";
}
delete status;
delete hostList;
// NOTE: we do not delete response (copying behavior from XrdCl).
delete this;
}

Expand Down Expand Up @@ -190,7 +192,6 @@ Source::Source(timespec now, std::unique_ptr<XrdCl::File> fh, const std::string
m_id("(unknown)"),
m_exclude(exclude),
m_fh(std::move(fh)),
m_qm(QualityMetricFactory::get(now, m_id)),
m_stats(nullptr)
#ifdef XRD_FAKE_SLOW
, m_slow(++g_delayCount % XRD_SLOW_RATE == 0)
Expand All @@ -207,6 +208,7 @@ Source::Source(timespec now, std::unique_ptr<XrdCl::File> fh, const std::string
}
if (!m_exclude.size()) {m_exclude = m_id;}
}
m_qm = QualityMetricFactory::get(now, m_id);
m_prettyid = m_id + " (unknown site)";
std::string domain_id;
if (getDomain(m_id, domain_id)) {m_site = domain_id;}
Expand Down

0 comments on commit 3c25366

Please sign in to comment.