Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add long_callback decorator #1702

Merged
merged 35 commits into from
Aug 18, 2021
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
0c1a053
Add long_callback decorator
jonmmease Aug 7, 2021
5a004e7
Rework long_callback to avoid disabling interval until all requests a…
jonmmease Aug 11, 2021
f5dfd6b
Add long_callback tests
jonmmease Aug 11, 2021
0b10624
Have the diskcache long_callback manager rely on multiprocess on all …
jonmmease Aug 11, 2021
813fc00
long_callback docstring
jonmmease Aug 12, 2021
4ddecf2
Fix import
jonmmease Aug 12, 2021
1ac543e
flakes
jonmmease Aug 12, 2021
aa676bd
pylint
jonmmease Aug 12, 2021
5ff33cf
Python 3.6 compat
jonmmease Aug 12, 2021
1da38be
Refactor long calblack mangaers and tests
jonmmease Aug 14, 2021
02b36df
Add cache_args_to_skip option to long_callback
jonmmease Aug 14, 2021
a1b8a39
Add dual long_callback test
jonmmease Aug 14, 2021
5d64332
Add dual long_callback test
jonmmease Aug 14, 2021
d50f214
celery tests on circleci (take 1)
jonmmease Aug 14, 2021
809f5da
pylist fixes
jonmmease Aug 14, 2021
9ed1b81
pylist fixes
jonmmease Aug 14, 2021
f5eec9d
CI WIP
jonmmease Aug 14, 2021
f79bde3
CI WIP (2)
jonmmease Aug 14, 2021
cc82b59
Re-enable tests
jonmmease Aug 14, 2021
b92f89f
Support single list input argument
jonmmease Aug 16, 2021
e2bd875
Raise informative error when dependency to long_callback has pattern-…
jonmmease Aug 16, 2021
71227cd
Remove module string from celery task name hash
jonmmease Aug 16, 2021
41520e5
validate that celery app has result backend configured
jonmmease Aug 16, 2021
e5f967e
Test celery manager with multiple celery workers
jonmmease Aug 16, 2021
2405f11
Add long callback manager docstrings
jonmmease Aug 16, 2021
7e0f386
bump up test wait times
jonmmease Aug 16, 2021
4b7ec3c
Don't fail on NoSuchProcess exception
jonmmease Aug 16, 2021
5731edf
Don't fail on NoSuchProcess exception (2)
jonmmease Aug 16, 2021
c07fe63
Merge remote-tracking branch 'origin/dev' into long_callback
jonmmease Aug 16, 2021
56ad571
Add CHANGELOG entry
jonmmease Aug 16, 2021
3733909
Merge remote-tracking branch 'origin/dev' into long_callback
jonmmease Aug 16, 2021
9134207
Add extra components to validation_layout and fix prevent_initial_call
jonmmease Aug 17, 2021
e5d538d
Merge remote-tracking branch 'origin/dev' into long_callback
jonmmease Aug 17, 2021
92d7c04
Increase sleep time to allow final app state to settle
jonmmease Aug 17, 2021
3892ecb
Increase sleep time to allow final app state to settle
jonmmease Aug 18, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ jobs:
PERCY_PARALLEL_TOTAL: -1
PUPPETEER_SKIP_CHROMIUM_DOWNLOAD: True
PYVERSION: python39
REDIS_URL: redis://localhost:6379
- image: circleci/redis
parallelism: 3
steps:
- checkout
Expand Down Expand Up @@ -252,6 +254,8 @@ jobs:
environment:
PERCY_ENABLE: 0
PYVERSION: python36
REDIS_URL: redis://localhost:6379
- image: circleci/redis

workflows:
version: 2
Expand Down
3 changes: 1 addition & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ This project adheres to [Semantic Versioning](https://semver.org/).
## Dash and Dash Renderer

### Added
- [#1702](https://github.com/plotly/dash/pull/1702) Added a new `@app.long_callback` decorator to support callback functions that take a long time to run. See the PR and documentation for more information.
- [#1514](https://github.com/plotly/dash/pull/1514) Perform json encoding using the active plotly JSON engine. This will default to the faster orjson encoder if the `orjson` package is installed.



### Changed
- [#1707](https://github.com/plotly/dash/pull/1707) Change the default value of the `compress` argument to the `dash.Dash` constructor to `False`. This change reduces CPU usage, and was made in recognition of the fact that many deployment platforms (e.g. Dash Enterprise) already apply their own compression. If deploying to an environment that does not already provide compression, the Dash 1 behavior may be restored by adding `compress=True` to the `dash.Dash` constructor.

Expand Down
272 changes: 271 additions & 1 deletion dash/dash.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
handle_callback_args,
handle_grouped_callback_args,
Output,
State,
Input,
)
from .development.base_component import ComponentRegistry
from .exceptions import PreventUpdate, InvalidResourceError, ProxyError
Expand Down Expand Up @@ -58,6 +60,7 @@
grouping_len,
)


_flask_compress_version = parse_version(get_distribution("flask-compress").version)

# Add explicit mapping for map files
Expand Down Expand Up @@ -258,6 +261,10 @@ class Dash(object):
Set to None or '' if you don't want the document.title to change or if you
want to control the document.title through a separate component or
clientside callback.

:param long_callback_manager: Long callback manager instance to support the
``@app.long_callback`` decorator. Currently one of
``DiskcacheLongCallbackManager`` or ``CeleryLongCallbackManager``
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though the first sentence should be clear, the second sentence makes it sound like you can just give the class, like long_callback_manager=DiskcacheLongCallbackManager

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. updated in 3892ecb (apologies for the bogus commit message)

"""

def __init__(
Expand Down Expand Up @@ -286,6 +293,7 @@ def __init__(
plugins=None,
title="Dash",
update_title="Updating...",
long_callback_manager=None,
**obsolete,
):
_validate.check_obsolete(obsolete)
Expand Down Expand Up @@ -409,6 +417,8 @@ def __init__(
)

self._assets_files = []
self._long_callback_count = 0
self._long_callback_manager = long_callback_manager

self.logger = logging.getLogger(name)
self.logger.addHandler(logging.StreamHandler(stream=sys.stdout))
Expand Down Expand Up @@ -559,6 +569,8 @@ def serve_layout(self):
)

def _config(self):
from dash_html_components import Div # pylint: disable=import-outside-toplevel

# pieces of config needed by the front end
config = {
"url_base_pathname": self.config.url_base_pathname,
Expand All @@ -576,7 +588,15 @@ def _config(self):
"max_retry": self._dev_tools.hot_reload_max_retry,
}
if self.validation_layout and not self.config.suppress_callback_exceptions:
config["validation_layout"] = self.validation_layout
validation_layout = self.validation_layout

# Add extra components
if self._extra_components:
validation_layout = Div(
children=[validation_layout] + self._extra_components
)

config["validation_layout"] = validation_layout

return config

Expand Down Expand Up @@ -1119,6 +1139,256 @@ def add_context(*args, **kwargs):

return wrap_func

def long_callback(self, *_args, **_kwargs):
"""
Normally used as a decorator, `@app.long_callback` is an alternative to
`@app.callback` designed for callbacks that take a long time to run,
without locking up the Dash app or timing out.

`@long_callback` is designed to support multiple callback managers.
Two long callback managers are currently implemented:

- A diskcache manager (`DiskcacheLongCallbackManager`) that runs callback
logic in a separate process and stores the results to disk using the
diskcache library. This is the easiest backend to use for local
development.
- A Celery manager (`CeleryLongCallbackManager`) that runs callback logic
in a celery worker and returns results to the Dash app through a Celery
broker like RabbitMQ or Redis.

The following arguments may include any valid arguments to `@app.callback`.
In addition, `@app.long_callback` supports the following optional
keyword arguments:

:Keyword Arguments:
:param manager:
A long callback manager instance. Currently one of
`DiskcacheLongCallbackManager` or `CeleryLongCallbackManager`.
Defaults to the `long_callback_manager` instance provided to the
`dash.Dash constructor`.
:param running:
A list of 3-element tuples. The first element of each tuple should be
an `Output` dependency object referencing a property of a component in
the app layout. The second element is the value that the property
should be set to while the callback is running, and the third element
is the value the property should be set to when the callback completes.
:param cancel:
A list of `Input` dependency objects that reference a property of a
component in the app's layout. When the value of this property changes
while a callback is running, the callback is canceled.
Note that the value of the property is not significant, any change in
value will result in the cancellation of the running job (if any).
:param progress:
An `Output` dependency grouping that references properties of
components in the app's layout. When provided, the decorated function
will be called with an extra argument as the first argument to the
function. This argument, is a function handle that the decorated
function should call in order to provide updates to the app on its
current progress. This function accepts a single argument, which
correspond to the grouping of properties specified in the provided
`Output` dependency grouping
:param progress_default:
A grouping of values that should be assigned to the components
specified by the `progress` argument when the callback is not in
progress. If `progress_default` is not provided, all the dependency
properties specified in `progress` will be set to `None` when the
callback is not running.
:param cache_args_to_ignore:
Arguments to ignore when caching is enabled. If callback is configured
with keyword arguments (Input/State provided in a dict),
this should be a list of argument names as strings. Otherwise,
this should be a list of argument indices as integers.
"""
from dash._callback_context import ( # pylint: disable=import-outside-toplevel
callback_context,
)
import dash_core_components as dcc # pylint: disable=import-outside-toplevel
from dash.exceptions import ( # pylint: disable=import-outside-toplevel
WildcardInLongCallback,
)

# Get long callback manager
callback_manager = _kwargs.pop("manager", self._long_callback_manager)
if callback_manager is None:
raise ValueError(
"The @app.long_callback decorator requires a long callback manager\n"
"instance. This may be provided to the app using the \n"
"long_callback_manager argument to the dash.Dash constructor, or\n"
"it may be provided to the @app.long_callback decorator as the \n"
"manager argument"
)

# Extract special long_callback kwargs
running = _kwargs.pop("running", ())
cancel = _kwargs.pop("cancel", ())
progress = _kwargs.pop("progress", ())
progress_default = _kwargs.pop("progress_default", None)
interval_time = _kwargs.pop("interval", 1000)
cache_args_to_ignore = _kwargs.pop("cache_args_to_ignore", [])

# Parse remaining args just like app.callback
(
output,
flat_inputs,
flat_state,
inputs_state_indices,
prevent_initial_call,
) = handle_grouped_callback_args(_args, _kwargs)
inputs_and_state = flat_inputs + flat_state
args_deps = map_grouping(lambda i: inputs_and_state[i], inputs_state_indices)

# Disallow wildcard dependencies
for deps in [output, flat_inputs, flat_state]:
for dep in flatten_grouping(deps):
if dep.has_wildcard():
raise WildcardInLongCallback(
f"""
@app.long_callback does not support dependencies with
pattern-matching ids
Received: {repr(dep)}\n"""
)

# Get unique id for this long_callback definition. This increment is not
# thread safe, but it doesn't need to be because callback definitions
# happen on the main thread before the app starts
self._long_callback_count += 1
long_callback_id = self._long_callback_count

# Create Interval and Store for long callback and add them to the app's
# _extra_components list
interval_id = f"_long_callback_interval_{long_callback_id}"
interval_component = dcc.Interval(
id=interval_id, interval=interval_time, disabled=prevent_initial_call
)
store_id = f"_long_callback_store_{long_callback_id}"
store_component = dcc.Store(id=store_id, data=dict())
self._extra_components.extend([interval_component, store_component])

# Compute full component plus property name for the cancel dependencies
cancel_prop_ids = tuple(
".".join([dep.component_id, dep.component_property]) for dep in cancel
)

def wrapper(fn):
background_fn = callback_manager.make_job_fn(fn, bool(progress), args_deps)

def callback(_triggers, user_store_data, user_callback_args):
# Build result cache key from inputs
pending_key = callback_manager.build_cache_key(
fn, user_callback_args, cache_args_to_ignore
)
current_key = user_store_data.get("current_key", None)
pending_job = user_store_data.get("pending_job", None)

should_cancel = pending_key == current_key or any(
trigger["prop_id"] in cancel_prop_ids
for trigger in callback_context.triggered
)

# Compute grouping of values to set the progress component's to
# when cleared
if progress_default is None:
clear_progress = (
map_grouping(lambda x: None, progress) if progress else ()
)
else:
clear_progress = progress_default

if should_cancel:
user_store_data["current_key"] = None
user_store_data["pending_key"] = None
user_store_data["pending_job"] = None

callback_manager.terminate_job(pending_job)

return dict(
user_callback_output=map_grouping(lambda x: no_update, output),
interval_disabled=True,
in_progress=[val for (_, _, val) in running],
progress=clear_progress,
user_store_data=user_store_data,
)

# Look up progress value if a job is in progress
if pending_job:
progress_value = callback_manager.get_progress(pending_key)
else:
progress_value = None

if callback_manager.result_ready(pending_key):
result = callback_manager.get_result(pending_key, pending_job)
# Set current key (hash of data stored in client)
# to pending key (hash of data requested by client)
user_store_data["current_key"] = pending_key

# Disable interval if this value was pulled from cache.
# If this value was the result of a background calculation, don't
# disable yet. If no other calculations are in progress,
# interval will be disabled in should_cancel logic above
# the next time the interval fires.
interval_disabled = pending_job is None
return dict(
user_callback_output=result,
interval_disabled=interval_disabled,
in_progress=[val for (_, _, val) in running],
progress=clear_progress,
user_store_data=user_store_data,
)
elif progress_value:
return dict(
user_callback_output=map_grouping(lambda x: no_update, output),
interval_disabled=False,
in_progress=[val for (_, val, _) in running],
progress=progress_value or {},
user_store_data=user_store_data,
)
else:
# Check if there is a running calculation that can now
# be canceled
old_pending_key = user_store_data.get("pending_key", None)
if (
old_pending_key
and old_pending_key != pending_key
and callback_manager.job_running(pending_job)
):
callback_manager.terminate_job(pending_job)

user_store_data["pending_key"] = pending_key
callback_manager.terminate_unhealthy_job(pending_job)
if not callback_manager.job_running(pending_job):
user_store_data["pending_job"] = callback_manager.call_job_fn(
pending_key, background_fn, user_callback_args
)

return dict(
user_callback_output=map_grouping(lambda x: no_update, output),
interval_disabled=False,
in_progress=[val for (_, val, _) in running],
progress=clear_progress,
user_store_data=user_store_data,
)

return self.callback(
inputs=dict(
_triggers=dict(
n_intervals=Input(interval_id, "n_intervals"),
cancel=cancel,
),
user_store_data=State(store_id, "data"),
user_callback_args=args_deps,
),
output=dict(
user_callback_output=output,
interval_disabled=Output(interval_id, "disabled"),
in_progress=[dep for (dep, _, _) in running],
progress=progress,
user_store_data=Output(store_id, "data"),
),
prevent_initial_call=prevent_initial_call,
)(callback)

return wrapper

def dispatch(self):
body = flask.request.get_json()
flask.g.inputs_list = inputs = body.get( # pylint: disable=assigning-non-slot
Expand Down
10 changes: 10 additions & 0 deletions dash/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,16 @@ def _id_matches(self, other):
def __hash__(self):
return hash(str(self))

def has_wildcard(self):
"""
Return true if id contains a wildcard (MATCH, ALL, or ALLSMALLER)
"""
if isinstance(self.component_id, dict):
for v in self.component_id.values():
if isinstance(v, _Wildcard):
return True
return False


class Output(DashDependency): # pylint: disable=too-few-public-methods
"""Output of a callback."""
Expand Down
4 changes: 4 additions & 0 deletions dash/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ class IDsCantContainPeriods(CallbackException):
pass


class WildcardInLongCallback(CallbackException):
pass


# Better error name now that more than periods are not permitted.
class InvalidComponentIdError(IDsCantContainPeriods):
pass
Expand Down
2 changes: 2 additions & 0 deletions dash/long_callback/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .managers.celery_manager import CeleryLongCallbackManager # noqa: F401,E402
from .managers.diskcache_manager import DiskcacheLongCallbackManager # noqa: F401,E402
Loading