From e8668cc0371e8727617e4047c18064d06608ac59 Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Tue, 16 Aug 2022 16:07:55 +0200 Subject: [PATCH 1/5] Implement thread locking for caches --- lumen/sources/base.py | 34 +++++++++++++++++++++++++++------- 1 file changed, 27 insertions(+), 7 deletions(-) diff --git a/lumen/sources/base.py b/lumen/sources/base.py index 851b7df67..46626f810 100644 --- a/lumen/sources/base.py +++ b/lumen/sources/base.py @@ -5,6 +5,7 @@ import re import shutil import sys +import threading from concurrent import futures from functools import wraps @@ -25,7 +26,7 @@ from ..util import get_dataframe_schema, is_ref, merge_schemas -def cached(with_query=True): +def cached(with_query=True, locks={}): """ Adds caching to a Source.get query. @@ -44,13 +45,20 @@ def cached(with_query=True): def _inner_cached(method): @wraps(method) def wrapped(self, table, **query): + lock_key = (self, table) + if lock_key in locks: + lock = locks[lock_key] + else: + locks[lock_key] = lock = threading.Lock() cache_query = query if with_query else {} - df, no_query = self._get_cache(table, **cache_query) + with lock: + df, no_query = self._get_cache(table, **cache_query) if df is None: if not with_query and (hasattr(self, 'dask') or hasattr(self, 'use_dask')): cache_query['__dask'] = True df = method(self, table, **cache_query) - self._set_cache(df, table, **cache_query) + with lock: + self._set_cache(df, table, **cache_query) filtered = df if (not with_query or no_query) and query: filtered = FilterTransform.apply_to( @@ -63,10 +71,15 @@ def wrapped(self, table, **query): return _inner_cached -def cached_schema(method): +def cached_schema(method, locks={}): @wraps(method) def wrapped(self, table=None): - schema = self._get_schema_cache() + if self in locks: + main_lock = locks[self] + else: + locks[self] = main_lock = threading.Lock() + with main_lock: + schema = self._get_schema_cache() if schema is None or (table is not None and table not in schema): schema = schema or {} if table is None: @@ -77,8 +90,15 @@ def wrapped(self, table=None): else: missing_tables = [table] for missing_table in missing_tables: - schema[missing_table] = method(self, missing_table) - self._set_schema_cache(schema) + lock_key = (self, missing_table) + if lock_key in locks: + lock = locks[lock_key] + else: + locks[lock_key] = lock = threading.Lock() + with lock: + schema[missing_table] = method(self, missing_table) + with main_lock: + self._set_schema_cache(schema) if table is None: return schema return schema[table] From c6e4366ed023ea873e318724fd63ec6330cc8986 Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Tue, 16 Aug 2022 16:09:03 +0200 Subject: [PATCH 2/5] Avoid multiple schema lookups on Pipeline --- lumen/pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lumen/pipeline.py b/lumen/pipeline.py index af56439d3..5d938bb67 100644 --- a/lumen/pipeline.py +++ b/lumen/pipeline.py @@ -179,7 +179,7 @@ def from_spec( params['filters'] = filters = [] filter_specs = spec.pop('filters', {}) if filter_specs: - schema = source.get_schema(table) + params['schema'] = schema = source.get_schema(table) for filt_spec in (filter_specs.items() if isinstance(filter_specs, dict) else filter_specs): if isinstance(filt_spec, tuple): filt_spec = dict(filt_spec[1], table=table, name=filt_spec[0]) From ccafcb5adcb3cca02b26952fc85eddda272922be Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Tue, 16 Aug 2022 16:25:18 +0200 Subject: [PATCH 3/5] Delete locks --- lumen/sources/base.py | 39 ++++++++++++++++++++++++++------------- 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/lumen/sources/base.py b/lumen/sources/base.py index 46626f810..6b8b05053 100644 --- a/lumen/sources/base.py +++ b/lumen/sources/base.py @@ -6,6 +6,7 @@ import shutil import sys import threading +import weakref from concurrent import futures from functools import wraps @@ -26,7 +27,7 @@ from ..util import get_dataframe_schema, is_ref, merge_schemas -def cached(with_query=True, locks={}): +def cached(with_query=True, locks=weakref.WeakKeyDictionary()): """ Adds caching to a Source.get query. @@ -45,11 +46,16 @@ def cached(with_query=True, locks={}): def _inner_cached(method): @wraps(method) def wrapped(self, table, **query): - lock_key = (self, table) - if lock_key in locks: - lock = locks[lock_key] + if self in locks: + main_lock = locks[self]['main'] else: - locks[lock_key] = lock = threading.Lock() + main_lock = threading.Lock() + locks[self] = {'main': main_lock} + with main_lock: + if table in locks: + lock = locks[self][table] + else: + locks[self][table] = lock = threading.Lock() cache_query = query if with_query else {} with lock: df, no_query = self._get_cache(table, **cache_query) @@ -59,6 +65,9 @@ def wrapped(self, table, **query): df = method(self, table, **cache_query) with lock: self._set_cache(df, table, **cache_query) + with main_lock: + if not lock.locked() and table in locks[self]: + del locks[self][table] filtered = df if (not with_query or no_query) and query: filtered = FilterTransform.apply_to( @@ -71,13 +80,14 @@ def wrapped(self, table, **query): return _inner_cached -def cached_schema(method, locks={}): +def cached_schema(method, locks=weakref.WeakKeyDictionary()): @wraps(method) def wrapped(self, table=None): if self in locks: - main_lock = locks[self] + main_lock = locks[self]['main'] else: - locks[self] = main_lock = threading.Lock() + main_lock = threading.Lock() + locks[self] = {'main': main_lock} with main_lock: schema = self._get_schema_cache() if schema is None or (table is not None and table not in schema): @@ -90,13 +100,16 @@ def wrapped(self, table=None): else: missing_tables = [table] for missing_table in missing_tables: - lock_key = (self, missing_table) - if lock_key in locks: - lock = locks[lock_key] - else: - locks[lock_key] = lock = threading.Lock() + with main_lock: + if missing_table in locks[self]: + lock = locks[self][missing_table] + else: + locks[self][missing_table] = lock = threading.Lock() with lock: schema[missing_table] = method(self, missing_table) + with main_lock: + if not lock.locked() and missing_table in locks[self]: + del locks[self][missing_table] with main_lock: self._set_schema_cache(schema) if table is None: From 686817d0794aa5afe4e99a297b4cd0c5ae9e80b8 Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Tue, 16 Aug 2022 16:25:32 +0200 Subject: [PATCH 4/5] Do not pass pipelines to Facet --- lumen/target.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lumen/target.py b/lumen/target.py index 15ff24069..aabec4b7f 100644 --- a/lumen/target.py +++ b/lumen/target.py @@ -147,7 +147,7 @@ def _update_options(self): self._sort_widget.options = self.param.sort.objects @classmethod - def from_spec(cls, spec, schema, pipelines={}): + def from_spec(cls, spec, schema): """ Creates a Facet object from a schema and a set of fields. """ @@ -161,7 +161,7 @@ def from_spec(cls, spec, schema, pipelines={}): f = by_spec by.append(f) sort = spec.pop('sort', [b.field for b in by]) - sorter = cls(by=by, pipelines=pipelines, **spec) + sorter = cls(by=by, **spec) sorter.param.sort.objects = sort return sorter From 31e0190b5f033cd0830a393be4b509e76f363615 Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Tue, 16 Aug 2022 19:00:55 +0200 Subject: [PATCH 5/5] Improve locking --- lumen/sources/base.py | 51 +++++++++++++++++++------------------------ 1 file changed, 22 insertions(+), 29 deletions(-) diff --git a/lumen/sources/base.py b/lumen/sources/base.py index 6b8b05053..fd4bb9239 100644 --- a/lumen/sources/base.py +++ b/lumen/sources/base.py @@ -49,13 +49,13 @@ def wrapped(self, table, **query): if self in locks: main_lock = locks[self]['main'] else: - main_lock = threading.Lock() + main_lock = threading.RLock() locks[self] = {'main': main_lock} with main_lock: if table in locks: lock = locks[self][table] else: - locks[self][table] = lock = threading.Lock() + locks[self][table] = lock = threading.RLock() cache_query = query if with_query else {} with lock: df, no_query = self._get_cache(table, **cache_query) @@ -65,9 +65,6 @@ def wrapped(self, table, **query): df = method(self, table, **cache_query) with lock: self._set_cache(df, table, **cache_query) - with main_lock: - if not lock.locked() and table in locks[self]: - del locks[self][table] filtered = df if (not with_query or no_query) and query: filtered = FilterTransform.apply_to( @@ -86,35 +83,31 @@ def wrapped(self, table=None): if self in locks: main_lock = locks[self]['main'] else: - main_lock = threading.Lock() + main_lock = threading.RLock() locks[self] = {'main': main_lock} with main_lock: - schema = self._get_schema_cache() - if schema is None or (table is not None and table not in schema): - schema = schema or {} - if table is None: - missing_tables = [ - table for table in self.get_tables() - if table not in schema - ] - else: - missing_tables = [table] - for missing_table in missing_tables: - with main_lock: - if missing_table in locks[self]: - lock = locks[self][missing_table] - else: - locks[self][missing_table] = lock = threading.Lock() - with lock: - schema[missing_table] = method(self, missing_table) + schema = self._get_schema_cache() or {} + tables = self.get_tables() if table is None else [table] + if all(table in schema for table in tables): + return schema if table is None else schema[table] + for missing in tables: + if missing in schema: + continue + with main_lock: + if missing in locks[self]: + lock = locks[self][missing] + else: + locks[self][missing] = lock = threading.RLock() + with lock: with main_lock: - if not lock.locked() and missing_table in locks[self]: - del locks[self][missing_table] + new_schema = self._get_schema_cache() or {} + if missing in new_schema: + schema[missing] = new_schema[missing] + else: + schema[missing] = method(self, missing) with main_lock: self._set_schema_cache(schema) - if table is None: - return schema - return schema[table] + return schema if table is None else schema[table] return wrapped