Skip to content

Commit 30a4c28

Browse files
committed
Use a ThreadPool to clean up API throttling in Fixer
Also fix mutex unwrap() in executor. Addresses #1519
1 parent fb93861 commit 30a4c28

File tree

3 files changed

+29
-31
lines changed

3 files changed

+29
-31
lines changed

qlty-check/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ globset.workspace = true
3232
ignore.workspace = true
3333
indicatif.workspace = true
3434
itertools.workspace = true
35+
lazy_static.workspace = true
3536
lzma-rs.workspace = true
3637
num_cpus.workspace = true
3738
once_cell.workspace = true

qlty-check/src/executor.rs

+8-3
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use crate::{
1515
ui::{Progress, ProgressBar},
1616
};
1717
use crate::{cache::IssuesCacheHit, planner::Plan, Results};
18-
use anyhow::{bail, Context, Result};
18+
use anyhow::{anyhow, bail, Context, Result};
1919
use chrono::Utc;
2020
pub use driver::Driver;
2121
use ignore::{DirEntry, WalkBuilder, WalkState};
@@ -640,7 +640,10 @@ fn run_invocation(
640640
file_result.issues.len(),
641641
MAX_ISSUES_PER_FILE
642642
);
643-
issue_limit_reached.lock().unwrap().insert(PathBuf::from(&file_result.path));
643+
match issue_limit_reached.lock() {
644+
Ok(mut limit) => limit.insert(file_result.path.clone().into()),
645+
Err(_) => { debug!("Poison error in thread"); false },
646+
};
644647
file_result.issues.truncate(MAX_ISSUES_PER_FILE);
645648
file_result.issues.shrink_to_fit();
646649
return;
@@ -671,7 +674,9 @@ fn run_invocation(
671674
progress.increment(plan.workspace_entries.len() as u64);
672675
task.clear();
673676

674-
let issue_limit_reached = issue_limit_reached.lock().unwrap();
677+
let issue_limit_reached = issue_limit_reached
678+
.lock()
679+
.map_err(|_| anyhow!("Posion error in thread"))?;
675680
if !issue_limit_reached.is_empty() {
676681
result.push_message(
677682
MessageLevel::Error,

qlty-check/src/llm/fixer.rs

+20-28
Original file line numberDiff line numberDiff line change
@@ -3,31 +3,35 @@ use crate::source_reader::SourceReader;
33
use crate::ui::ProgressBar as _;
44
use crate::{executor::staging_area::StagingArea, Progress};
55
use anyhow::{bail, Result};
6+
use lazy_static::lazy_static;
67
use qlty_cloud::Client;
78
use qlty_config::issue_transformer::IssueTransformer;
89
use qlty_types::analysis::v1::{Issue, Suggestion};
10+
use rayon::{ThreadPool, ThreadPoolBuilder};
911
use std::collections::HashMap;
1012
use std::sync::atomic::{AtomicUsize, Ordering};
1113
use std::sync::{Arc, Mutex};
12-
use std::thread::sleep;
13-
use std::time::Duration;
14-
use tracing::{debug, warn};
15-
use tracing::{info, trace};
14+
use tracing::{debug, info, warn};
1615
use ureq::json;
1716

1817
const MAX_FIXES: usize = 500;
1918
const MAX_FIXES_PER_FILE: usize = 30;
2019
const MAX_CONCURRENT_FIXES: usize = 10;
2120

21+
lazy_static! {
22+
static ref API_THREAD_POOL: ThreadPool = ThreadPoolBuilder::new()
23+
.num_threads(MAX_CONCURRENT_FIXES)
24+
.build()
25+
.unwrap();
26+
}
27+
2228
#[derive(Clone, Debug)]
2329
pub struct Fixer {
2430
progress: Progress,
2531
staging_area: StagingArea,
2632
r#unsafe: bool,
2733
attempts_per_file: Arc<Mutex<HashMap<Option<String>, AtomicUsize>>>,
2834
total_attempts: Arc<AtomicUsize>,
29-
api_concurrency_lock: Arc<AtomicUsize>,
30-
api_concurrency_guard: Arc<Mutex<()>>,
3135
}
3236

3337
impl IssueTransformer for Fixer {
@@ -55,8 +59,6 @@ impl Fixer {
5559
r#unsafe: plan.settings.r#unsafe,
5660
attempts_per_file: Arc::new(Mutex::new(HashMap::new())),
5761
total_attempts: Arc::new(AtomicUsize::new(0)),
58-
api_concurrency_lock: Arc::new(AtomicUsize::new(0)),
59-
api_concurrency_guard: Arc::new(Mutex::new(())),
6062
}
6163
}
6264

@@ -129,18 +131,18 @@ impl Fixer {
129131
if let Some(path) = issue.path() {
130132
let client = Client::authenticated()?;
131133
let content = self.staging_area.read(issue.path().unwrap().into())?;
132-
self.try_fix_barrier();
133-
let response = client.post("/fixes").send_json(json!({
134-
"issue": issue.clone(),
135-
"files": [{ "content": content, "path": path }],
136-
"options": {
137-
"unsafe": self.r#unsafe
138-
},
139-
}));
140-
self.api_concurrency_lock.fetch_sub(1, Ordering::SeqCst);
134+
let response = API_THREAD_POOL.scope(|_| {
135+
client.post("/fixes").send_json(json!({
136+
"issue": issue.clone(),
137+
"files": [{ "content": content, "path": path }],
138+
"options": {
139+
"unsafe": self.r#unsafe
140+
},
141+
}))
142+
})?;
141143

142144
let response_debug = format!("{:?}", &response);
143-
let suggestions: Vec<Suggestion> = response?.into_json()?;
145+
let suggestions: Vec<Suggestion> = response.into_json()?;
144146
debug!("{} with {} suggestions", response_debug, suggestions.len());
145147

146148
let mut issue = issue.clone();
@@ -151,14 +153,4 @@ impl Fixer {
151153
bail!("Issue {} has no path", issue.id);
152154
}
153155
}
154-
155-
fn try_fix_barrier(&self) {
156-
let guard = self.api_concurrency_guard.lock().unwrap();
157-
while self.api_concurrency_lock.load(Ordering::SeqCst) >= MAX_CONCURRENT_FIXES {
158-
sleep(Duration::from_millis(100));
159-
}
160-
let value = self.api_concurrency_lock.fetch_add(1, Ordering::SeqCst);
161-
trace!("API request made with {} concurrent fixes", value);
162-
drop(guard);
163-
}
164156
}

0 commit comments

Comments
 (0)