Skip to content

Commit

Permalink
[Turbopack] improve race condition handling of scheduling (#75748)
Browse files Browse the repository at this point in the history
<!-- Thanks for opening a PR! Your contribution is much appreciated.
To make sure your PR is handled as smoothly as possible we request that
you follow the checklist sections below.
Choose the right checklist for the change(s) that you're making:

## For Contributors

### Improving Documentation

- Run `pnpm prettier-fix` to fix formatting issues before opening the
PR.
- Read the Docs Contribution Guide to ensure your contribution follows
the docs guidelines:
https://nextjs.org/docs/community/contribution-guide

### Adding or Updating Examples

- The "examples guidelines" are followed from our contributing doc
https://github.com/vercel/next.js/blob/canary/contributing/examples/adding-examples.md
- Make sure the linting passes by running `pnpm build && pnpm lint`. See
https://github.com/vercel/next.js/blob/canary/contributing/repository/linting.md

### Fixing a bug

- Related issues linked using `fixes #number`
- Tests added. See:
https://github.com/vercel/next.js/blob/canary/contributing/core/testing.md#writing-tests-for-nextjs
- Errors have a helpful link attached, see
https://github.com/vercel/next.js/blob/canary/contributing.md

### Adding a feature

- Implements an existing feature request or RFC. Make sure the feature
request has been accepted for implementation before opening a PR. (A
discussion must be opened, see
https://github.com/vercel/next.js/discussions/new?category=ideas)
- Related issues/discussions are linked using `fixes #number`
- e2e tests added
(https://github.com/vercel/next.js/blob/canary/contributing/core/testing.md#writing-tests-for-nextjs)
- Documentation added
- Telemetry added. In case of a feature if it's used or not.
- Errors have a helpful link attached, see
https://github.com/vercel/next.js/blob/canary/contributing.md


## For Maintainers

- Minimal description (aim for explaining to someone not on the team to
understand the PR)
- When linking to a Slack thread, you might want to share details of the
conclusion
- Link both the Linear (Fixes NEXT-xxx) and the GitHub issues
- Add review comments if necessary to explain to the reviewer the logic
behind a change

### What?

### Why?

### How?

Closes NEXT-
Fixes #

-->
  • Loading branch information
sokra authored Feb 13, 2025
1 parent 5891da2 commit f95d8b9
Showing 1 changed file with 96 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1575,25 +1575,35 @@ impl AggregationUpdateQueue {
let children = get_followers(&follower);
drop(follower);

if !data.is_empty() {
let has_data = !data.is_empty();
if has_data || !is_active {
for upper_id in upper_ids.iter() {
// add data to upper
let mut upper = ctx.task(*upper_id, TaskDataCategory::Meta);
let diff = data.apply(
&mut upper,
ctx.session_id(),
ctx.should_track_activeness(),
self,
);
if !diff.is_empty() {
let upper_ids = get_uppers(&upper);
self.push(
AggregatedDataUpdateJob {
upper_ids,
update: diff,
}
.into(),
)
if has_data {
let diff = data.apply(
&mut upper,
ctx.session_id(),
ctx.should_track_activeness(),
self,
);
if !diff.is_empty() {
let upper_ids = get_uppers(&upper);
self.push(
AggregatedDataUpdateJob {
upper_ids,
update: diff,
}
.into(),
)
}
}
if !is_active {
// We need to check this again, since this might have changed in the
// meantime due to race conditions
if upper.has_key(&CachedDataItemKey::Activeness {}) {
is_active = true;
}
}
}
}
Expand Down Expand Up @@ -1647,7 +1657,7 @@ impl AggregationUpdateQueue {
let follower = ctx.task(new_follower_id, TaskDataCategory::Meta);
(new_follower_id, get_aggregation_number(&follower))
})
.collect::<Vec<_>>();
.collect::<SmallVec<[_; 4]>>();

let mut new_followers_of_upper_uppers = SmallVec::new();
let mut is_active = false;
Expand Down Expand Up @@ -1699,43 +1709,51 @@ impl AggregationUpdateQueue {
}
}

if !followers_with_aggregation_number.is_empty() {
let mut inner_tasks_with_aggregation_number = followers_with_aggregation_number;

if !inner_tasks_with_aggregation_number.is_empty() {
#[cfg(feature = "trace_aggregation_update")]
let _span = trace_span!("new inner").entered();
let mut upper_data_updates = Vec::new();
let mut upper_new_followers = SmallVec::new();
for &(follower_id, _) in followers_with_aggregation_number.iter() {
let mut follower = ctx.task(follower_id, TaskDataCategory::Meta);
if update_count!(follower, Upper { task: upper_id }, 1) {
if count!(follower, Upper).is_power_of_two() {
self.push_optimize_task(follower_id);
}
swap_retain(
&mut inner_tasks_with_aggregation_number,
|&mut (inner_id, _)| {
let mut inner = ctx.task(inner_id, TaskDataCategory::Meta);
if update_count!(inner, Upper { task: upper_id }, 1) {
if count!(inner, Upper).is_power_of_two() {
self.push_optimize_task(inner_id);
}

// It's a new upper
let data = AggregatedDataUpdate::from_task(&mut follower);
let children = get_followers(&follower);
let follower_aggregation_number = get_aggregation_number(&follower);
drop(follower);
// It's a new upper
let data = AggregatedDataUpdate::from_task(&mut inner);
let children = get_followers(&inner);
let follower_aggregation_number = get_aggregation_number(&inner);
drop(inner);

if !data.is_empty() {
upper_data_updates.push(data);
}
upper_new_followers.extend(children);

// Balancing is only needed when they are equal (or could have become equal in
// the meantime). This is not perfect from concurrent
// perspective, but we can accept a few incorrect invariants
// in the graph.
if upper_aggregation_number <= follower_aggregation_number
&& !is_root_node(upper_aggregation_number)
{
self.push(AggregationUpdateJob::BalanceEdge {
upper_id,
task_id: follower_id,
})
if !data.is_empty() {
upper_data_updates.push(data);
}
upper_new_followers.extend(children);

// Balancing is only needed when they are equal (or could have become equal
// in the meantime). This is not perfect from
// concurrent perspective, but we can accept a few
// incorrect invariants in the graph.
if upper_aggregation_number <= follower_aggregation_number
&& !is_root_node(upper_aggregation_number)
{
self.push(AggregationUpdateJob::BalanceEdge {
upper_id,
task_id: inner_id,
})
}
true
} else {
false
}
}
}
},
);

if !upper_new_followers.is_empty() {
self.push(AggregationUpdateJob::InnerOfUpperHasNewFollowers {
Expand Down Expand Up @@ -1782,12 +1800,20 @@ impl AggregationUpdateQueue {
);
}
}
if is_active {
self.extend_find_and_schedule_dirty(
followers_with_aggregation_number
.into_iter()
.map(|(id, _)| id),
);
if !inner_tasks_with_aggregation_number.is_empty() {
if !is_active {
// We need to check this again, since this might have changed in the
// meantime due to race conditions
let upper = ctx.task(upper_id, TaskDataCategory::Meta);
is_active = upper.has_key(&CachedDataItemKey::Activeness {});
}
if is_active {
self.extend_find_and_schedule_dirty(
inner_tasks_with_aggregation_number
.into_iter()
.map(|(id, _)| id),
);
}
}
}
if !new_followers_of_upper_uppers.is_empty() {
Expand Down Expand Up @@ -1821,23 +1847,13 @@ impl AggregationUpdateQueue {
) {
#[cfg(feature = "trace_aggregation_update")]
let _span = trace_span!("process new follower").entered();
let should_track_activeness = ctx.should_track_activeness();

let (follower_aggregation_number, already_active) = {
let follower_aggregation_number = {
let follower = ctx.task(new_follower_id, TaskDataCategory::Meta);
(
get_aggregation_number(&follower),
should_track_activeness && follower.has_key(&CachedDataItemKey::Activeness {}),
)
get_aggregation_number(&follower)
};

let mut upper = ctx.task(upper_id, TaskDataCategory::Meta);
if should_track_activeness
&& !already_active
&& upper.has_key(&CachedDataItemKey::Activeness {})
{
self.push_find_and_schedule_dirty(new_follower_id);
}
// decide if it should be an inner or follower
let upper_aggregation_number = get_aggregation_number(&upper);

Expand Down Expand Up @@ -1893,16 +1909,18 @@ impl AggregationUpdateQueue {
let _span = trace_span!("new inner").entered();

// It's an inner node, continue with the list
let mut is_active = upper.has_key(&CachedDataItemKey::Activeness {});
drop(upper);
let mut follower = ctx.task(new_follower_id, TaskDataCategory::Meta);
if update_count!(follower, Upper { task: upper_id }, 1) {
if count!(follower, Upper).is_power_of_two() {

let mut inner = ctx.task(new_follower_id, TaskDataCategory::Meta);
if update_count!(inner, Upper { task: upper_id }, 1) {
if count!(inner, Upper).is_power_of_two() {
self.push_optimize_task(new_follower_id);
}
// It's a new upper
let data = AggregatedDataUpdate::from_task(&mut follower);
let followers = get_followers(&follower);
drop(follower);
let data = AggregatedDataUpdate::from_task(&mut inner);
let followers = get_followers(&inner);
drop(inner);

if !data.is_empty() {
// add data to upper
Expand Down Expand Up @@ -1930,6 +1948,13 @@ impl AggregationUpdateQueue {
new_follower_ids: followers,
});
}
if !is_active {
let upper = ctx.task(upper_id, TaskDataCategory::Meta);
is_active = upper.has_key(&CachedDataItemKey::Activeness {});
}
if is_active {
self.push_find_and_schedule_dirty(new_follower_id);
}
}
}
}
Expand Down

0 comments on commit f95d8b9

Please sign in to comment.