From f8edb1155d50501812f7e51e1c2dd754fe73b698 Mon Sep 17 00:00:00 2001 From: David Sherret Date: Tue, 5 Jul 2022 17:17:57 -0400 Subject: [PATCH 01/13] fix(cli): synchronize async stdio/file reads and writes --- cli/tests/testdata/stdout_write_all.out | 99 +++++++++++++++++++ cli/tests/testdata/stdout_write_all.ts | 15 ++- runtime/ops/fs.rs | 126 ++++++------------------ runtime/ops/io.rs | 44 ++++++--- runtime/ops/tty.rs | 48 +++++---- 5 files changed, 189 insertions(+), 143 deletions(-) diff --git a/cli/tests/testdata/stdout_write_all.out b/cli/tests/testdata/stdout_write_all.out index af5626b4a114ab..7c386a5924b6d7 100644 --- a/cli/tests/testdata/stdout_write_all.out +++ b/cli/tests/testdata/stdout_write_all.out @@ -1 +1,100 @@ Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! +Hello, world! diff --git a/cli/tests/testdata/stdout_write_all.ts b/cli/tests/testdata/stdout_write_all.ts index 623bd8f5381e67..1d27302561208d 100644 --- a/cli/tests/testdata/stdout_write_all.ts +++ b/cli/tests/testdata/stdout_write_all.ts @@ -1,8 +1,13 @@ const encoder = new TextEncoder(); -const pending = [ - Deno.stdout.write(encoder.encode("Hello, ")), - Deno.stdout.write(encoder.encode("world!")), -]; + +const pending = []; + +// do this a bunch of times to ensure it doesn't race +// and everything happens in order +for (let i = 0; i < 100; i++) { + pending.push(Deno.stdout.write(encoder.encode("Hello, "))); + pending.push(Deno.stdout.write(encoder.encode("world!"))); + pending.push(Deno.stdout.write(encoder.encode("\n"))); +} await Promise.all(pending); -await Deno.stdout.write(encoder.encode("\n")); diff --git a/runtime/ops/fs.rs b/runtime/ops/fs.rs index fdfdff6b436e17..37e874c09e3129 100644 --- a/runtime/ops/fs.rs +++ b/runtime/ops/fs.rs @@ -322,10 +322,9 @@ fn seek_helper(args: SeekArgs) -> Result<(u32, SeekFrom), AnyError> { #[op] fn op_seek_sync(state: &mut OpState, args: SeekArgs) -> Result { let (rid, seek_from) = seek_helper(args)?; - let pos = StdFileResource::with_file(state, rid, |std_file| { + StdFileResource::with_file(state, rid, |std_file| { std_file.seek(seek_from).map_err(AnyError::from) - })?; - Ok(pos) + }) } #[op] @@ -335,19 +334,10 @@ async fn op_seek_async( ) -> Result { let (rid, seek_from) = seek_helper(args)?; - let resource = state - .borrow_mut() - .resource_table - .get::(rid)?; - - let std_file = resource.std_file(); - - tokio::task::spawn_blocking(move || { - let mut std_file = std_file.lock(); - std_file.seek(seek_from) + StdFileResource::with_file_async(state, rid, move |std_file| { + std_file.seek(seek_from).map_err(AnyError::from) }) - .await? - .map_err(AnyError::from) + .await } #[op] @@ -357,8 +347,7 @@ fn op_fdatasync_sync( ) -> Result<(), AnyError> { StdFileResource::with_file(state, rid, |std_file| { std_file.sync_data().map_err(AnyError::from) - })?; - Ok(()) + }) } #[op] @@ -366,27 +355,17 @@ async fn op_fdatasync_async( state: Rc>, rid: ResourceId, ) -> Result<(), AnyError> { - let resource = state - .borrow_mut() - .resource_table - .get::(rid)?; - - let std_file = resource.std_file(); - - tokio::task::spawn_blocking(move || { - let std_file = std_file.lock(); - std_file.sync_data() + StdFileResource::with_file_async(state, rid, move |std_file| { + std_file.sync_data().map_err(AnyError::from) }) - .await? - .map_err(AnyError::from) + .await } #[op] fn op_fsync_sync(state: &mut OpState, rid: ResourceId) -> Result<(), AnyError> { StdFileResource::with_file(state, rid, |std_file| { std_file.sync_all().map_err(AnyError::from) - })?; - Ok(()) + }) } #[op] @@ -394,19 +373,10 @@ async fn op_fsync_async( state: Rc>, rid: ResourceId, ) -> Result<(), AnyError> { - let resource = state - .borrow_mut() - .resource_table - .get::(rid)?; - - let std_file = resource.std_file(); - - tokio::task::spawn_blocking(move || { - let std_file = std_file.lock(); - std_file.sync_all() + StdFileResource::with_file_async(state, rid, move |std_file| { + std_file.sync_all().map_err(AnyError::from) }) - .await? - .map_err(AnyError::from) + .await } #[op] @@ -425,19 +395,11 @@ async fn op_fstat_async( state: Rc>, rid: ResourceId, ) -> Result { - let resource = state - .borrow_mut() - .resource_table - .get::(rid)?; - - let std_file = resource.std_file(); - - let metadata = tokio::task::spawn_blocking(move || { - let std_file = std_file.lock(); - std_file.metadata() - }) - .await? - .map_err(AnyError::from)?; + let metadata = + StdFileResource::with_file_async(state, rid, move |std_file| { + std_file.metadata().map_err(AnyError::from) + }) + .await?; Ok(get_stat(metadata)) } @@ -469,15 +431,7 @@ async fn op_flock_async( use fs3::FileExt; super::check_unstable2(&state, "Deno.flock"); - let resource = state - .borrow_mut() - .resource_table - .get::(rid)?; - - let std_file = resource.std_file(); - - tokio::task::spawn_blocking(move || -> Result<(), AnyError> { - let std_file = std_file.lock(); + StdFileResource::with_file_async(state, rid, move |std_file| { if exclusive { std_file.lock_exclusive()?; } else { @@ -485,7 +439,7 @@ async fn op_flock_async( } Ok(()) }) - .await? + .await } #[op] @@ -510,19 +464,11 @@ async fn op_funlock_async( use fs3::FileExt; super::check_unstable2(&state, "Deno.funlock"); - let resource = state - .borrow_mut() - .resource_table - .get::(rid)?; - - let std_file = resource.std_file(); - - tokio::task::spawn_blocking(move || -> Result<(), AnyError> { - let std_file = std_file.lock(); + StdFileResource::with_file_async(state, rid, move |std_file| { std_file.unlock()?; Ok(()) }) - .await? + .await } #[op] @@ -1592,19 +1538,11 @@ async fn op_ftruncate_async( let rid = args.rid; let len = args.len as u64; - let resource = state - .borrow_mut() - .resource_table - .get::(rid)?; - - let std_file = resource.std_file(); - - tokio::task::spawn_blocking(move || { - let std_file = std_file.lock(); - std_file.set_len(len) + StdFileResource::with_file_async(state, rid, move |std_file| { + std_file.set_len(len)?; + Ok(()) }) - .await? - .map_err(AnyError::from) + .await } #[derive(Deserialize)] @@ -1884,19 +1822,11 @@ async fn op_futime_async( let atime = filetime::FileTime::from_unix_time(args.atime.0, args.atime.1); let mtime = filetime::FileTime::from_unix_time(args.mtime.0, args.mtime.1); - let resource = state - .borrow_mut() - .resource_table - .get::(rid)?; - - let std_file = resource.std_file(); - tokio::task::spawn_blocking(move || { - let std_file = std_file.lock(); - filetime::set_file_handle_times(&std_file, Some(atime), Some(mtime))?; + StdFileResource::with_file_async(state, rid, move |std_file| { + filetime::set_file_handle_times(std_file, Some(atime), Some(mtime))?; Ok(()) }) .await - .unwrap() } #[derive(Deserialize)] diff --git a/runtime/ops/io.rs b/runtime/ops/io.rs index bef33e9fad083f..b76f442b5f3aee 100644 --- a/runtime/ops/io.rs +++ b/runtime/ops/io.rs @@ -322,7 +322,7 @@ impl StdFileResourceInner { StdFileResourceInner::File(Arc::new(Mutex::new(fs_file))) } - pub fn with_file(&self, mut f: impl FnMut(&mut StdFile) -> R) -> R { + pub fn with_file(&self, f: impl FnOnce(&mut StdFile) -> R) -> R { match self { Self::File(file) | Self::Stdin(file) => { let mut file = file.lock(); @@ -383,6 +383,9 @@ impl Write for StdFileResourceInner { pub struct StdFileResource { inner: StdFileResourceInner, + /// This is used to help synchronize asynchronous reads and writes + /// so they happen in order. Tokio's mutex is FIFO. + order_sync: tokio::sync::Mutex<()>, metadata: RefCell, name: String, } @@ -391,6 +394,7 @@ impl StdFileResource { fn stdio(inner: StdFileResourceInner, name: &str) -> Self { Self { inner, + order_sync: Default::default(), metadata: Default::default(), name: name.to_string(), } @@ -399,24 +403,12 @@ impl StdFileResource { pub fn fs_file(fs_file: StdFile) -> Self { Self { inner: StdFileResourceInner::file(fs_file), + order_sync: Default::default(), metadata: Default::default(), name: "fsFile".to_string(), } } - pub fn std_file(&self) -> Arc> { - match &self.inner { - StdFileResourceInner::File(fs_file) - | StdFileResourceInner::Stdin(fs_file) => fs_file.clone(), - StdFileResourceInner::Stdout => { - Arc::new(Mutex::new(STDOUT_HANDLE.try_clone().unwrap())) - } - StdFileResourceInner::Stderr => { - Arc::new(Mutex::new(STDERR_HANDLE.try_clone().unwrap())) - } - } - } - pub fn metadata_mut(&self) -> std::cell::RefMut { self.metadata.borrow_mut() } @@ -426,6 +418,7 @@ impl StdFileResource { mut buf: ZeroCopyBuf, ) -> Result<(usize, ZeroCopyBuf), AnyError> { let mut inner = self.inner.clone(); + let _sync = self.order_sync.lock().await; tokio::task::spawn_blocking( move || -> Result<(usize, ZeroCopyBuf), AnyError> { Ok((inner.read(&mut buf)?, buf)) @@ -436,6 +429,7 @@ impl StdFileResource { async fn write(self: Rc, buf: ZeroCopyBuf) -> Result { let mut inner = self.inner.clone(); + let _sync = self.order_sync.lock().await; tokio::task::spawn_blocking(move || inner.write_and_maybe_flush(&buf)) .await? .map_err(AnyError::from) @@ -459,12 +453,32 @@ impl StdFileResource { f: F, ) -> Result where - F: FnMut(&mut StdFile) -> Result, + F: FnOnce(&mut StdFile) -> Result, { let resource = state.resource_table.get::(rid)?; resource.inner.with_file(f) } + pub async fn with_file_async( + state: Rc>, + rid: ResourceId, + f: F, + ) -> Result + where + F: (FnOnce(&mut StdFile) -> Result) + Send + 'static, + { + let resource = state + .borrow_mut() + .resource_table + .get::(rid)?; + + let inner = resource.inner.clone(); + let _sync = resource.order_sync.lock().await; + tokio::task::spawn_blocking(move || inner.with_file(f)) + .await + .unwrap() + } + pub fn clone_file( state: &mut OpState, rid: ResourceId, diff --git a/runtime/ops/tty.rs b/runtime/ops/tty.rs index 62a7717a62b283..814c0a225215c0 100644 --- a/runtime/ops/tty.rs +++ b/runtime/ops/tty.rs @@ -82,37 +82,35 @@ fn op_set_raw(state: &mut OpState, args: SetRawArgs) -> Result<(), AnyError> { use winapi::shared::minwindef::FALSE; use winapi::um::{consoleapi, handleapi}; - let resource = state.resource_table.get::(rid)?; - if cbreak { return Err(deno_core::error::not_supported()); } - let std_file = resource.std_file(); - let std_file = std_file.lock(); // hold the lock - let handle = std_file.as_raw_handle(); + StdFileResource::with_file(state, rid, move |std_file| { + let handle = std_file.as_raw_handle(); - if handle == handleapi::INVALID_HANDLE_VALUE { - return Err(Error::last_os_error().into()); - } else if handle.is_null() { - return Err(custom_error("ReferenceError", "null handle")); - } - let mut original_mode: DWORD = 0; - if unsafe { consoleapi::GetConsoleMode(handle, &mut original_mode) } - == FALSE - { - return Err(Error::last_os_error().into()); - } - let new_mode = if is_raw { - original_mode & !RAW_MODE_MASK - } else { - original_mode | RAW_MODE_MASK - }; - if unsafe { consoleapi::SetConsoleMode(handle, new_mode) } == FALSE { - return Err(Error::last_os_error().into()); - } + if handle == handleapi::INVALID_HANDLE_VALUE { + return Err(Error::last_os_error().into()); + } else if handle.is_null() { + return Err(custom_error("ReferenceError", "null handle")); + } + let mut original_mode: DWORD = 0; + if unsafe { consoleapi::GetConsoleMode(handle, &mut original_mode) } + == FALSE + { + return Err(Error::last_os_error().into()); + } + let new_mode = if is_raw { + original_mode & !RAW_MODE_MASK + } else { + original_mode | RAW_MODE_MASK + }; + if unsafe { consoleapi::SetConsoleMode(handle, new_mode) } == FALSE { + return Err(Error::last_os_error().into()); + } - Ok(()) + Ok(()) + }) } #[cfg(unix)] { From f73a066fb1198715f7443862905560d7a56d8fac Mon Sep 17 00:00:00 2001 From: David Sherret Date: Tue, 5 Jul 2022 17:33:42 -0400 Subject: [PATCH 02/13] More tests and comments. --- cli/tests/integration/run_tests.rs | 10 ++++++++-- cli/tests/testdata/run/stdin_read_all.out | 1 + cli/tests/testdata/run/stdin_read_all.ts | 17 +++++++++++++++++ .../testdata/{ => run}/stdout_write_all.out | 0 .../testdata/{ => run}/stdout_write_all.ts | 0 runtime/ops/io.rs | 3 +++ 6 files changed, 29 insertions(+), 2 deletions(-) create mode 100644 cli/tests/testdata/run/stdin_read_all.out create mode 100644 cli/tests/testdata/run/stdin_read_all.ts rename cli/tests/testdata/{ => run}/stdout_write_all.out (100%) rename cli/tests/testdata/{ => run}/stdout_write_all.ts (100%) diff --git a/cli/tests/integration/run_tests.rs b/cli/tests/integration/run_tests.rs index 1cd1db0ef6b801..3d051817808129 100644 --- a/cli/tests/integration/run_tests.rs +++ b/cli/tests/integration/run_tests.rs @@ -6,8 +6,14 @@ use test_util as util; use test_util::TempDir; itest!(stdout_write_all { - args: "run --quiet stdout_write_all.ts", - output: "stdout_write_all.out", + args: "run --quiet run/stdout_write_all.ts", + output: "run/stdout_write_all.out", +}); + +itest!(stdin_read_all { + args: "run --quiet run/stdin_read_all.ts", + output: "run/stdin_read_all.out", + input: Some("01234567890123456789012345678901234567890123456789"), }); itest!(_001_hello { diff --git a/cli/tests/testdata/run/stdin_read_all.out b/cli/tests/testdata/run/stdin_read_all.out new file mode 100644 index 00000000000000..2f0dfb71a128e6 --- /dev/null +++ b/cli/tests/testdata/run/stdin_read_all.out @@ -0,0 +1 @@ +01234567890123456789012345678901234567890123456789 diff --git a/cli/tests/testdata/run/stdin_read_all.ts b/cli/tests/testdata/run/stdin_read_all.ts new file mode 100644 index 00000000000000..d683a2bf60fe91 --- /dev/null +++ b/cli/tests/testdata/run/stdin_read_all.ts @@ -0,0 +1,17 @@ +const encoder = new TextEncoder(); + +const pending = []; + +// do this a bunch of times to ensure it doesn't race +// and everything happens in order +for (let i = 0; i < 50; i++) { + const buf = new Uint8Array(1); + pending.push( + Deno.stdin.read(buf).then(() => { + return Deno.stdout.write(buf); + }), + ); +} + +await Promise.all(pending); +await Deno.stdout.write(encoder.encode("\n")); diff --git a/cli/tests/testdata/stdout_write_all.out b/cli/tests/testdata/run/stdout_write_all.out similarity index 100% rename from cli/tests/testdata/stdout_write_all.out rename to cli/tests/testdata/run/stdout_write_all.out diff --git a/cli/tests/testdata/stdout_write_all.ts b/cli/tests/testdata/run/stdout_write_all.ts similarity index 100% rename from cli/tests/testdata/stdout_write_all.ts rename to cli/tests/testdata/run/stdout_write_all.ts diff --git a/runtime/ops/io.rs b/runtime/ops/io.rs index b76f442b5f3aee..44e1a99e8f5ecb 100644 --- a/runtime/ops/io.rs +++ b/runtime/ops/io.rs @@ -385,6 +385,9 @@ pub struct StdFileResource { inner: StdFileResourceInner, /// This is used to help synchronize asynchronous reads and writes /// so they happen in order. Tokio's mutex is FIFO. + /// We use a separate sync primitive here instead of putting the + /// StdFile inside this async mutex because we still need to be + /// able to access it synchronously. order_sync: tokio::sync::Mutex<()>, metadata: RefCell, name: String, From 38e3c7e6da63891384dccb5122bd4f8490634d37 Mon Sep 17 00:00:00 2001 From: David Sherret Date: Tue, 5 Jul 2022 18:04:11 -0400 Subject: [PATCH 03/13] Maybe fix linux --- runtime/ops/tty.rs | 68 +++++++++++++++++++++++----------------------- 1 file changed, 34 insertions(+), 34 deletions(-) diff --git a/runtime/ops/tty.rs b/runtime/ops/tty.rs index 814c0a225215c0..e98e12d977210b 100644 --- a/runtime/ops/tty.rs +++ b/runtime/ops/tty.rs @@ -116,46 +116,46 @@ fn op_set_raw(state: &mut OpState, args: SetRawArgs) -> Result<(), AnyError> { { use std::os::unix::io::AsRawFd; - let resource = state.resource_table.get::(rid)?; - let std_file = resource.std_file(); - let raw_fd = std_file.lock().as_raw_fd(); - let mut meta_data = resource.metadata_mut(); - let maybe_tty_mode = &mut meta_data.tty.mode; - - if is_raw { - if maybe_tty_mode.is_none() { - // Save original mode. - let original_mode = termios::tcgetattr(raw_fd)?; - maybe_tty_mode.replace(original_mode); - } + StdFileResource::with_file(state, rid, move |std_file| { + let raw_fd = std_file.as_raw_fd(); + let mut meta_data = resource.metadata_mut(); + let maybe_tty_mode = &mut meta_data.tty.mode; + + if is_raw { + if maybe_tty_mode.is_none() { + // Save original mode. + let original_mode = termios::tcgetattr(raw_fd)?; + maybe_tty_mode.replace(original_mode); + } - let mut raw = maybe_tty_mode.clone().unwrap(); + let mut raw = maybe_tty_mode.clone().unwrap(); - raw.input_flags &= !(termios::InputFlags::BRKINT - | termios::InputFlags::ICRNL - | termios::InputFlags::INPCK - | termios::InputFlags::ISTRIP - | termios::InputFlags::IXON); + raw.input_flags &= !(termios::InputFlags::BRKINT + | termios::InputFlags::ICRNL + | termios::InputFlags::INPCK + | termios::InputFlags::ISTRIP + | termios::InputFlags::IXON); - raw.control_flags |= termios::ControlFlags::CS8; + raw.control_flags |= termios::ControlFlags::CS8; - raw.local_flags &= !(termios::LocalFlags::ECHO - | termios::LocalFlags::ICANON - | termios::LocalFlags::IEXTEN); - if !cbreak { - raw.local_flags &= !(termios::LocalFlags::ISIG); - } - raw.control_chars[termios::SpecialCharacterIndices::VMIN as usize] = 1; - raw.control_chars[termios::SpecialCharacterIndices::VTIME as usize] = 0; - termios::tcsetattr(raw_fd, termios::SetArg::TCSADRAIN, &raw)?; - } else { - // Try restore saved mode. - if let Some(mode) = maybe_tty_mode.take() { - termios::tcsetattr(raw_fd, termios::SetArg::TCSADRAIN, &mode)?; + raw.local_flags &= !(termios::LocalFlags::ECHO + | termios::LocalFlags::ICANON + | termios::LocalFlags::IEXTEN); + if !cbreak { + raw.local_flags &= !(termios::LocalFlags::ISIG); + } + raw.control_chars[termios::SpecialCharacterIndices::VMIN as usize] = 1; + raw.control_chars[termios::SpecialCharacterIndices::VTIME as usize] = 0; + termios::tcsetattr(raw_fd, termios::SetArg::TCSADRAIN, &raw)?; + } else { + // Try restore saved mode. + if let Some(mode) = maybe_tty_mode.take() { + termios::tcsetattr(raw_fd, termios::SetArg::TCSADRAIN, &mode)?; + } } - } - Ok(()) + Ok(()) + }) } } From 7a50bec87b065a3a62b01501aecf20303d439ec8 Mon Sep 17 00:00:00 2001 From: David Sherret Date: Tue, 5 Jul 2022 18:50:30 -0400 Subject: [PATCH 04/13] Fix linux. --- runtime/ops/io.rs | 17 +++++++--- runtime/ops/tty.rs | 81 ++++++++++++++++++++++++---------------------- 2 files changed, 56 insertions(+), 42 deletions(-) diff --git a/runtime/ops/io.rs b/runtime/ops/io.rs index 44e1a99e8f5ecb..16624c07bb6aeb 100644 --- a/runtime/ops/io.rs +++ b/runtime/ops/io.rs @@ -412,10 +412,6 @@ impl StdFileResource { } } - pub fn metadata_mut(&self) -> std::cell::RefMut { - self.metadata.borrow_mut() - } - async fn read( self: Rc, mut buf: ZeroCopyBuf, @@ -462,6 +458,19 @@ impl StdFileResource { resource.inner.with_file(f) } + pub fn with_file_and_metadata( + state: &mut OpState, + rid: ResourceId, + f: F, + ) -> Result + where + F: FnOnce(&mut StdFile, &mut FileMetadata) -> Result, + { + let resource = state.resource_table.get::(rid)?; + let mut meta_data = resource.metadata.borrow_mut(); + resource.inner.with_file(|file| f(file, &mut meta_data)) + } + pub async fn with_file_async( state: Rc>, rid: ResourceId, diff --git a/runtime/ops/tty.rs b/runtime/ops/tty.rs index e98e12d977210b..ab9553025db145 100644 --- a/runtime/ops/tty.rs +++ b/runtime/ops/tty.rs @@ -116,46 +116,51 @@ fn op_set_raw(state: &mut OpState, args: SetRawArgs) -> Result<(), AnyError> { { use std::os::unix::io::AsRawFd; - StdFileResource::with_file(state, rid, move |std_file| { - let raw_fd = std_file.as_raw_fd(); - let mut meta_data = resource.metadata_mut(); - let maybe_tty_mode = &mut meta_data.tty.mode; - - if is_raw { - if maybe_tty_mode.is_none() { - // Save original mode. - let original_mode = termios::tcgetattr(raw_fd)?; - maybe_tty_mode.replace(original_mode); - } - - let mut raw = maybe_tty_mode.clone().unwrap(); - - raw.input_flags &= !(termios::InputFlags::BRKINT - | termios::InputFlags::ICRNL - | termios::InputFlags::INPCK - | termios::InputFlags::ISTRIP - | termios::InputFlags::IXON); - - raw.control_flags |= termios::ControlFlags::CS8; - - raw.local_flags &= !(termios::LocalFlags::ECHO - | termios::LocalFlags::ICANON - | termios::LocalFlags::IEXTEN); - if !cbreak { - raw.local_flags &= !(termios::LocalFlags::ISIG); - } - raw.control_chars[termios::SpecialCharacterIndices::VMIN as usize] = 1; - raw.control_chars[termios::SpecialCharacterIndices::VTIME as usize] = 0; - termios::tcsetattr(raw_fd, termios::SetArg::TCSADRAIN, &raw)?; - } else { - // Try restore saved mode. - if let Some(mode) = maybe_tty_mode.take() { - termios::tcsetattr(raw_fd, termios::SetArg::TCSADRAIN, &mode)?; + StdFileResource::with_file_and_metadata( + state, + rid, + move |std_file, meta_data| { + let raw_fd = std_file.as_raw_fd(); + let maybe_tty_mode = &mut meta_data.tty.mode; + + if is_raw { + if maybe_tty_mode.is_none() { + // Save original mode. + let original_mode = termios::tcgetattr(raw_fd)?; + maybe_tty_mode.replace(original_mode); + } + + let mut raw = maybe_tty_mode.clone().unwrap(); + + raw.input_flags &= !(termios::InputFlags::BRKINT + | termios::InputFlags::ICRNL + | termios::InputFlags::INPCK + | termios::InputFlags::ISTRIP + | termios::InputFlags::IXON); + + raw.control_flags |= termios::ControlFlags::CS8; + + raw.local_flags &= !(termios::LocalFlags::ECHO + | termios::LocalFlags::ICANON + | termios::LocalFlags::IEXTEN); + if !cbreak { + raw.local_flags &= !(termios::LocalFlags::ISIG); + } + raw.control_chars[termios::SpecialCharacterIndices::VMIN as usize] = + 1; + raw.control_chars[termios::SpecialCharacterIndices::VTIME as usize] = + 0; + termios::tcsetattr(raw_fd, termios::SetArg::TCSADRAIN, &raw)?; + } else { + // Try restore saved mode. + if let Some(mode) = maybe_tty_mode.take() { + termios::tcsetattr(raw_fd, termios::SetArg::TCSADRAIN, &mode)?; + } } - } - Ok(()) - }) + Ok(()) + }, + ) } } From 009f5c0b3af2057b45f308e1d63a77602186982f Mon Sep 17 00:00:00 2001 From: David Sherret Date: Tue, 5 Jul 2022 19:47:14 -0400 Subject: [PATCH 05/13] Add incrementing count to output. --- cli/tests/testdata/run/stdout_write_all.out | 200 ++++++++++---------- cli/tests/testdata/run/stdout_write_all.ts | 2 +- 2 files changed, 101 insertions(+), 101 deletions(-) diff --git a/cli/tests/testdata/run/stdout_write_all.out b/cli/tests/testdata/run/stdout_write_all.out index 7c386a5924b6d7..d0e667fd4cd20b 100644 --- a/cli/tests/testdata/run/stdout_write_all.out +++ b/cli/tests/testdata/run/stdout_write_all.out @@ -1,100 +1,100 @@ -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! -Hello, world! +Hello, world! 0 +Hello, world! 1 +Hello, world! 2 +Hello, world! 3 +Hello, world! 4 +Hello, world! 5 +Hello, world! 6 +Hello, world! 7 +Hello, world! 8 +Hello, world! 9 +Hello, world! 10 +Hello, world! 11 +Hello, world! 12 +Hello, world! 13 +Hello, world! 14 +Hello, world! 15 +Hello, world! 16 +Hello, world! 17 +Hello, world! 18 +Hello, world! 19 +Hello, world! 20 +Hello, world! 21 +Hello, world! 22 +Hello, world! 23 +Hello, world! 24 +Hello, world! 25 +Hello, world! 26 +Hello, world! 27 +Hello, world! 28 +Hello, world! 29 +Hello, world! 30 +Hello, world! 31 +Hello, world! 32 +Hello, world! 33 +Hello, world! 34 +Hello, world! 35 +Hello, world! 36 +Hello, world! 37 +Hello, world! 38 +Hello, world! 39 +Hello, world! 40 +Hello, world! 41 +Hello, world! 42 +Hello, world! 43 +Hello, world! 44 +Hello, world! 45 +Hello, world! 46 +Hello, world! 47 +Hello, world! 48 +Hello, world! 49 +Hello, world! 50 +Hello, world! 51 +Hello, world! 52 +Hello, world! 53 +Hello, world! 54 +Hello, world! 55 +Hello, world! 56 +Hello, world! 57 +Hello, world! 58 +Hello, world! 59 +Hello, world! 60 +Hello, world! 61 +Hello, world! 62 +Hello, world! 63 +Hello, world! 64 +Hello, world! 65 +Hello, world! 66 +Hello, world! 67 +Hello, world! 68 +Hello, world! 69 +Hello, world! 70 +Hello, world! 71 +Hello, world! 72 +Hello, world! 73 +Hello, world! 74 +Hello, world! 75 +Hello, world! 76 +Hello, world! 77 +Hello, world! 78 +Hello, world! 79 +Hello, world! 80 +Hello, world! 81 +Hello, world! 82 +Hello, world! 83 +Hello, world! 84 +Hello, world! 85 +Hello, world! 86 +Hello, world! 87 +Hello, world! 88 +Hello, world! 89 +Hello, world! 90 +Hello, world! 91 +Hello, world! 92 +Hello, world! 93 +Hello, world! 94 +Hello, world! 95 +Hello, world! 96 +Hello, world! 97 +Hello, world! 98 +Hello, world! 99 diff --git a/cli/tests/testdata/run/stdout_write_all.ts b/cli/tests/testdata/run/stdout_write_all.ts index 1d27302561208d..cfb2981e401de0 100644 --- a/cli/tests/testdata/run/stdout_write_all.ts +++ b/cli/tests/testdata/run/stdout_write_all.ts @@ -6,7 +6,7 @@ const pending = []; // and everything happens in order for (let i = 0; i < 100; i++) { pending.push(Deno.stdout.write(encoder.encode("Hello, "))); - pending.push(Deno.stdout.write(encoder.encode("world!"))); + pending.push(Deno.stdout.write(encoder.encode(`world! ${i}`))); pending.push(Deno.stdout.write(encoder.encode("\n"))); } From 26b1860f20e502706d337db760b20e4e11afc186 Mon Sep 17 00:00:00 2001 From: David Sherret Date: Tue, 5 Jul 2022 20:00:19 -0400 Subject: [PATCH 06/13] Use a semaphore instead (tokio's Mutex uses that internally anyway) --- runtime/ops/io.rs | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/runtime/ops/io.rs b/runtime/ops/io.rs index 16624c07bb6aeb..4ec6eee6056ad0 100644 --- a/runtime/ops/io.rs +++ b/runtime/ops/io.rs @@ -28,6 +28,7 @@ use tokio::io::AsyncReadExt; use tokio::io::AsyncWrite; use tokio::io::AsyncWriteExt; use tokio::process; +use tokio::sync::Semaphore; #[cfg(unix)] use std::os::unix::io::FromRawFd; @@ -384,11 +385,11 @@ impl Write for StdFileResourceInner { pub struct StdFileResource { inner: StdFileResourceInner, /// This is used to help synchronize asynchronous reads and writes - /// so they happen in order. Tokio's mutex is FIFO. + /// so they happen in order. Tokio's semaphore is FIFO. /// We use a separate sync primitive here instead of putting the - /// StdFile inside this async mutex because we still need to be + /// StdFile inside an async mutex because we still need to be /// able to access it synchronously. - order_sync: tokio::sync::Mutex<()>, + order_semaphore: tokio::sync::Semaphore, metadata: RefCell, name: String, } @@ -397,7 +398,7 @@ impl StdFileResource { fn stdio(inner: StdFileResourceInner, name: &str) -> Self { Self { inner, - order_sync: Default::default(), + order_semaphore: Semaphore::new(1), metadata: Default::default(), name: name.to_string(), } @@ -406,7 +407,7 @@ impl StdFileResource { pub fn fs_file(fs_file: StdFile) -> Self { Self { inner: StdFileResourceInner::file(fs_file), - order_sync: Default::default(), + order_semaphore: Semaphore::new(1), metadata: Default::default(), name: "fsFile".to_string(), } @@ -417,7 +418,7 @@ impl StdFileResource { mut buf: ZeroCopyBuf, ) -> Result<(usize, ZeroCopyBuf), AnyError> { let mut inner = self.inner.clone(); - let _sync = self.order_sync.lock().await; + let _permit = self.order_semaphore.acquire().await.unwrap(); tokio::task::spawn_blocking( move || -> Result<(usize, ZeroCopyBuf), AnyError> { Ok((inner.read(&mut buf)?, buf)) @@ -428,7 +429,7 @@ impl StdFileResource { async fn write(self: Rc, buf: ZeroCopyBuf) -> Result { let mut inner = self.inner.clone(); - let _sync = self.order_sync.lock().await; + let _permit = self.order_semaphore.acquire().await.unwrap(); tokio::task::spawn_blocking(move || inner.write_and_maybe_flush(&buf)) .await? .map_err(AnyError::from) @@ -485,7 +486,7 @@ impl StdFileResource { .get::(rid)?; let inner = resource.inner.clone(); - let _sync = resource.order_sync.lock().await; + let _permit = resource.order_semaphore.acquire().await.unwrap(); tokio::task::spawn_blocking(move || inner.with_file(f)) .await .unwrap() From f6e1f99e5d944eb738274f9842fe377478e4f7e7 Mon Sep 17 00:00:00 2001 From: David Sherret Date: Wed, 6 Jul 2022 12:48:51 -0400 Subject: [PATCH 07/13] Add task queue --- core/lib.rs | 3 + core/task_queue.rs | 142 +++++++++++++++++++++++++++++++++++++++++++++ runtime/ops/io.rs | 100 +++++++++++++++++++------------ 3 files changed, 209 insertions(+), 36 deletions(-) create mode 100644 core/task_queue.rs diff --git a/core/lib.rs b/core/lib.rs index 064c15fc13d5e7..6c001b1ca09e02 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -18,6 +18,7 @@ mod ops_metrics; mod resources; mod runtime; mod source_map; +mod task_queue; // Re-exports pub use anyhow; @@ -101,6 +102,8 @@ pub use crate::runtime::RuntimeOptions; pub use crate::runtime::SharedArrayBufferStore; pub use crate::runtime::Snapshot; pub use crate::source_map::SourceMapGetter; +pub use crate::task_queue::TaskQueue; +pub use crate::task_queue::TaskQueuePermit; pub use deno_ops::op; pub fn v8_version() -> &'static str { diff --git a/core/task_queue.rs b/core/task_queue.rs new file mode 100644 index 00000000000000..6c66cac053746b --- /dev/null +++ b/core/task_queue.rs @@ -0,0 +1,142 @@ +use std::collections::LinkedList; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::sync::Arc; + +use crate::futures::task::AtomicWaker; +use crate::futures::Future; +use crate::parking_lot::Mutex; + +#[derive(Default)] +struct TaskQueueTaskWaker { + is_ready: AtomicBool, + waker: AtomicWaker, +} + +#[derive(Default)] +struct TaskQueueTasks { + is_running: bool, + wakers: LinkedList>, +} + +/// A queue that executes tasks sequentially one after the other +/// ensuring order and that no task runs at the same time as another. +#[derive(Clone, Default)] +pub struct TaskQueue { + tasks: Arc>, +} + +impl TaskQueue { + pub fn new() -> Self { + Default::default() + } + + pub async fn queue(&self, future: impl Future) -> R { + let _permit = self.acquire().await; + future.await + } + + pub async fn acquire(&self) -> TaskQueuePermit { + let acquire = TaskQueuePermitAcquire::new(self.tasks.clone()); + acquire.await; + TaskQueuePermit { + tasks: self.tasks.clone(), + } + } +} + +/// A permit that when dropped will allow another task to proceed. +pub struct TaskQueuePermit { + tasks: Arc>, +} + +impl Drop for TaskQueuePermit { + fn drop(&mut self) { + let next_item = { + let mut tasks = self.tasks.lock(); + let next_item = tasks.wakers.pop_front(); + tasks.is_running = next_item.is_some(); + next_item + }; + if let Some(next_item) = next_item { + next_item.is_ready.store(true, Ordering::SeqCst); + next_item.waker.wake(); + } + } +} + +struct TaskQueuePermitAcquire { + tasks: Arc>, + initialized: AtomicBool, + waker: Arc, +} + +impl TaskQueuePermitAcquire { + pub fn new(tasks: Arc>) -> Self { + Self { + tasks, + initialized: Default::default(), + waker: Default::default(), + } + } +} + +impl Future for TaskQueuePermitAcquire { + type Output = (); + + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + // update with the latest waker + self.waker.waker.register(cx.waker()); + + // ensure this is initialized + if !self.initialized.swap(true, Ordering::SeqCst) { + let mut tasks = self.tasks.lock(); + if !tasks.is_running { + tasks.is_running = true; + return std::task::Poll::Ready(()); + } + tasks.wakers.push_back(self.waker.clone()); + return std::task::Poll::Pending; + } + + // check if we're ready to run + if self.waker.is_ready.load(Ordering::SeqCst) { + std::task::Poll::Ready(()) + } else { + std::task::Poll::Pending + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::TaskQueue; + use crate::parking_lot::Mutex; + + #[tokio::test] + async fn task_queue_runs_one_after_other() { + let task_queue = TaskQueue::new(); + let mut tasks = Vec::new(); + let data = Arc::new(Mutex::new(0)); + for i in 0..100 { + let data = data.clone(); + tasks.push(task_queue.queue(async move { + tokio::task::spawn_blocking(move || { + let mut data = data.lock(); + if *data != i { + panic!("Value was not equal."); + } + *data = i + 1; + }) + .await + .unwrap(); + })); + } + futures::future::join_all(tasks).await; + } +} diff --git a/runtime/ops/io.rs b/runtime/ops/io.rs index 4ec6eee6056ad0..9f206dfa9d3be8 100644 --- a/runtime/ops/io.rs +++ b/runtime/ops/io.rs @@ -13,6 +13,7 @@ use deno_core::OpState; use deno_core::RcRef; use deno_core::Resource; use deno_core::ResourceId; +use deno_core::TaskQueue; use deno_core::ZeroCopyBuf; use once_cell::sync::Lazy; use std::borrow::Cow; @@ -28,7 +29,6 @@ use tokio::io::AsyncReadExt; use tokio::io::AsyncWrite; use tokio::io::AsyncWriteExt; use tokio::process; -use tokio::sync::Semaphore; #[cfg(unix)] use std::os::unix::io::FromRawFd; @@ -43,35 +43,47 @@ use { // alive for the duration of the application since the last handle/fd // being dropped will close the corresponding pipe. #[cfg(unix)] -static STDIN_HANDLE: Lazy = Lazy::new(|| { +static STDIN_HANDLE: Lazy>> = Lazy::new(|| { // SAFETY: corresponds to OS stdin - unsafe { StdFile::from_raw_fd(0) } + unsafe { Arc::new(Mutex::new(StdFile::from_raw_fd(0))) } }); #[cfg(unix)] -static STDOUT_HANDLE: Lazy = Lazy::new(|| { +static STDOUT_HANDLE: Lazy>> = Lazy::new(|| { // SAFETY: corresponds to OS stdout - unsafe { StdFile::from_raw_fd(1) } + unsafe { Arc::new(Mutex::new(StdFile::from_raw_fd(1))) } }); #[cfg(unix)] -static STDERR_HANDLE: Lazy = Lazy::new(|| { +static STDERR_HANDLE: Lazy>> = Lazy::new(|| { // SAFETY: corresponds to OS stderr - unsafe { StdFile::from_raw_fd(2) } + unsafe { Arc::new(Mutex::new(StdFile::from_raw_fd(2))) } }); #[cfg(windows)] -static STDIN_HANDLE: Lazy = Lazy::new(|| { +static STDIN_HANDLE: Lazy>> = Lazy::new(|| { // SAFETY: corresponds to OS stdin - unsafe { StdFile::from_raw_handle(GetStdHandle(winbase::STD_INPUT_HANDLE)) } + unsafe { + Arc::new(Mutex::new(StdFile::from_raw_handle(GetStdHandle( + winbase::STD_INPUT_HANDLE, + )))) + } }); #[cfg(windows)] -static STDOUT_HANDLE: Lazy = Lazy::new(|| { +static STDOUT_HANDLE: Lazy>> = Lazy::new(|| { // SAFETY: corresponds to OS stdout - unsafe { StdFile::from_raw_handle(GetStdHandle(winbase::STD_OUTPUT_HANDLE)) } + unsafe { + Arc::new(Mutex::new(StdFile::from_raw_handle(GetStdHandle( + winbase::STD_OUTPUT_HANDLE, + )))) + } }); #[cfg(windows)] -static STDERR_HANDLE: Lazy = Lazy::new(|| { +static STDERR_HANDLE: Lazy>> = Lazy::new(|| { // SAFETY: corresponds to OS stderr - unsafe { StdFile::from_raw_handle(GetStdHandle(winbase::STD_ERROR_HANDLE)) } + unsafe { + Arc::new(Mutex::new(StdFile::from_raw_handle(GetStdHandle( + winbase::STD_ERROR_HANDLE, + )))) + } }); pub fn init() -> Extension { @@ -126,9 +138,9 @@ pub fn init_stdio(stdio: Stdio) -> Extension { let t = &mut state.resource_table; t.add(StdFileResource::stdio( match stdio.stdin { - StdioPipe::Inherit => StdFileResourceInner::Stdin(Arc::new( - Mutex::new(STDIN_HANDLE.try_clone().unwrap()), - )), + StdioPipe::Inherit => { + StdFileResourceInner::Stdin(STDIN_HANDLE.clone()) + } StdioPipe::File(pipe) => StdFileResourceInner::file(pipe), }, "stdin", @@ -329,8 +341,14 @@ impl StdFileResourceInner { let mut file = file.lock(); f(&mut file) } - Self::Stdout => f(&mut STDOUT_HANDLE.try_clone().unwrap()), - Self::Stderr => f(&mut STDERR_HANDLE.try_clone().unwrap()), + Self::Stdout => { + let mut stdout = STDOUT_HANDLE.lock(); + f(&mut stdout) + } + Self::Stderr => { + let mut stderr = STDERR_HANDLE.lock(); + f(&mut stderr) + } } } @@ -385,11 +403,11 @@ impl Write for StdFileResourceInner { pub struct StdFileResource { inner: StdFileResourceInner, /// This is used to help synchronize asynchronous reads and writes - /// so they happen in order. Tokio's semaphore is FIFO. + /// so they happen in order. /// We use a separate sync primitive here instead of putting the /// StdFile inside an async mutex because we still need to be /// able to access it synchronously. - order_semaphore: tokio::sync::Semaphore, + task_queue: TaskQueue, metadata: RefCell, name: String, } @@ -398,7 +416,7 @@ impl StdFileResource { fn stdio(inner: StdFileResourceInner, name: &str) -> Self { Self { inner, - order_semaphore: Semaphore::new(1), + task_queue: TaskQueue::new(), metadata: Default::default(), name: name.to_string(), } @@ -407,7 +425,7 @@ impl StdFileResource { pub fn fs_file(fs_file: StdFile) -> Self { Self { inner: StdFileResourceInner::file(fs_file), - order_semaphore: Semaphore::new(1), + task_queue: TaskQueue::new(), metadata: Default::default(), name: "fsFile".to_string(), } @@ -418,21 +436,29 @@ impl StdFileResource { mut buf: ZeroCopyBuf, ) -> Result<(usize, ZeroCopyBuf), AnyError> { let mut inner = self.inner.clone(); - let _permit = self.order_semaphore.acquire().await.unwrap(); - tokio::task::spawn_blocking( - move || -> Result<(usize, ZeroCopyBuf), AnyError> { - Ok((inner.read(&mut buf)?, buf)) - }, - ) - .await? + self + .task_queue + .queue(async { + tokio::task::spawn_blocking( + move || -> Result<(usize, ZeroCopyBuf), AnyError> { + Ok((inner.read(&mut buf)?, buf)) + }, + ) + .await? + }) + .await } async fn write(self: Rc, buf: ZeroCopyBuf) -> Result { let mut inner = self.inner.clone(); - let _permit = self.order_semaphore.acquire().await.unwrap(); - tokio::task::spawn_blocking(move || inner.write_and_maybe_flush(&buf)) - .await? - .map_err(AnyError::from) + self + .task_queue + .queue(async { + tokio::task::spawn_blocking(move || inner.write_and_maybe_flush(&buf)) + .await? + .map_err(AnyError::from) + }) + .await } fn with_inner( @@ -486,10 +512,12 @@ impl StdFileResource { .get::(rid)?; let inner = resource.inner.clone(); - let _permit = resource.order_semaphore.acquire().await.unwrap(); - tokio::task::spawn_blocking(move || inner.with_file(f)) + resource + .task_queue + .queue(async { + tokio::task::spawn_blocking(move || inner.with_file(f)).await? + }) .await - .unwrap() } pub fn clone_file( From a6268c5f1f627558317ed564dd38f9a0c1eb8ae1 Mon Sep 17 00:00:00 2001 From: David Sherret Date: Tue, 12 Jul 2022 12:57:30 -0400 Subject: [PATCH 08/13] Rename method and move task_queue to runtime. --- core/lib.rs | 3 --- runtime/lib.rs | 1 + runtime/ops/fs.rs | 16 ++++++++-------- runtime/ops/io.rs | 5 +++-- {core => runtime}/task_queue.rs | 10 +++++----- 5 files changed, 17 insertions(+), 18 deletions(-) rename {core => runtime}/task_queue.rs (94%) diff --git a/core/lib.rs b/core/lib.rs index 330913b77d9ca5..ab22392c492067 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -18,7 +18,6 @@ mod ops_metrics; mod resources; mod runtime; mod source_map; -mod task_queue; // Re-exports pub use anyhow; @@ -103,8 +102,6 @@ pub use crate::runtime::RuntimeOptions; pub use crate::runtime::SharedArrayBufferStore; pub use crate::runtime::Snapshot; pub use crate::source_map::SourceMapGetter; -pub use crate::task_queue::TaskQueue; -pub use crate::task_queue::TaskQueuePermit; pub use deno_ops::op; pub fn v8_version() -> &'static str { diff --git a/runtime/lib.rs b/runtime/lib.rs index 543d3a0a21e85c..b006fe74f8e3f3 100644 --- a/runtime/lib.rs +++ b/runtime/lib.rs @@ -27,5 +27,6 @@ pub mod tokio_util; pub mod web_worker; pub mod worker; +mod task_queue; mod worker_bootstrap; pub use worker_bootstrap::BootstrapOptions; diff --git a/runtime/ops/fs.rs b/runtime/ops/fs.rs index 37e874c09e3129..5c9c5c99b0e4f1 100644 --- a/runtime/ops/fs.rs +++ b/runtime/ops/fs.rs @@ -334,7 +334,7 @@ async fn op_seek_async( ) -> Result { let (rid, seek_from) = seek_helper(args)?; - StdFileResource::with_file_async(state, rid, move |std_file| { + StdFileResource::with_file_blocking_task(state, rid, move |std_file| { std_file.seek(seek_from).map_err(AnyError::from) }) .await @@ -355,7 +355,7 @@ async fn op_fdatasync_async( state: Rc>, rid: ResourceId, ) -> Result<(), AnyError> { - StdFileResource::with_file_async(state, rid, move |std_file| { + StdFileResource::with_file_blocking_task(state, rid, move |std_file| { std_file.sync_data().map_err(AnyError::from) }) .await @@ -373,7 +373,7 @@ async fn op_fsync_async( state: Rc>, rid: ResourceId, ) -> Result<(), AnyError> { - StdFileResource::with_file_async(state, rid, move |std_file| { + StdFileResource::with_file_blocking_task(state, rid, move |std_file| { std_file.sync_all().map_err(AnyError::from) }) .await @@ -396,7 +396,7 @@ async fn op_fstat_async( rid: ResourceId, ) -> Result { let metadata = - StdFileResource::with_file_async(state, rid, move |std_file| { + StdFileResource::with_file_blocking_task(state, rid, move |std_file| { std_file.metadata().map_err(AnyError::from) }) .await?; @@ -431,7 +431,7 @@ async fn op_flock_async( use fs3::FileExt; super::check_unstable2(&state, "Deno.flock"); - StdFileResource::with_file_async(state, rid, move |std_file| { + StdFileResource::with_file_blocking_task(state, rid, move |std_file| { if exclusive { std_file.lock_exclusive()?; } else { @@ -464,7 +464,7 @@ async fn op_funlock_async( use fs3::FileExt; super::check_unstable2(&state, "Deno.funlock"); - StdFileResource::with_file_async(state, rid, move |std_file| { + StdFileResource::with_file_blocking_task(state, rid, move |std_file| { std_file.unlock()?; Ok(()) }) @@ -1538,7 +1538,7 @@ async fn op_ftruncate_async( let rid = args.rid; let len = args.len as u64; - StdFileResource::with_file_async(state, rid, move |std_file| { + StdFileResource::with_file_blocking_task(state, rid, move |std_file| { std_file.set_len(len)?; Ok(()) }) @@ -1822,7 +1822,7 @@ async fn op_futime_async( let atime = filetime::FileTime::from_unix_time(args.atime.0, args.atime.1); let mtime = filetime::FileTime::from_unix_time(args.mtime.0, args.mtime.1); - StdFileResource::with_file_async(state, rid, move |std_file| { + StdFileResource::with_file_blocking_task(state, rid, move |std_file| { filetime::set_file_handle_times(std_file, Some(atime), Some(mtime))?; Ok(()) }) diff --git a/runtime/ops/io.rs b/runtime/ops/io.rs index 9f206dfa9d3be8..11bd923f8fc136 100644 --- a/runtime/ops/io.rs +++ b/runtime/ops/io.rs @@ -13,7 +13,6 @@ use deno_core::OpState; use deno_core::RcRef; use deno_core::Resource; use deno_core::ResourceId; -use deno_core::TaskQueue; use deno_core::ZeroCopyBuf; use once_cell::sync::Lazy; use std::borrow::Cow; @@ -30,6 +29,8 @@ use tokio::io::AsyncWrite; use tokio::io::AsyncWriteExt; use tokio::process; +use crate::task_queue::TaskQueue; + #[cfg(unix)] use std::os::unix::io::FromRawFd; @@ -498,7 +499,7 @@ impl StdFileResource { resource.inner.with_file(|file| f(file, &mut meta_data)) } - pub async fn with_file_async( + pub async fn with_file_blocking_task( state: Rc>, rid: ResourceId, f: F, diff --git a/core/task_queue.rs b/runtime/task_queue.rs similarity index 94% rename from core/task_queue.rs rename to runtime/task_queue.rs index 6c66cac053746b..4c10edf03fcc58 100644 --- a/core/task_queue.rs +++ b/runtime/task_queue.rs @@ -1,12 +1,11 @@ +use deno_core::futures::task::AtomicWaker; +use deno_core::futures::Future; +use deno_core::parking_lot::Mutex; use std::collections::LinkedList; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::sync::Arc; -use crate::futures::task::AtomicWaker; -use crate::futures::Future; -use crate::parking_lot::Mutex; - #[derive(Default)] struct TaskQueueTaskWaker { is_ready: AtomicBool, @@ -113,10 +112,11 @@ impl Future for TaskQueuePermitAcquire { #[cfg(test)] mod tests { + use deno_core::futures; + use deno_core::parking_lot::Mutex; use std::sync::Arc; use super::TaskQueue; - use crate::parking_lot::Mutex; #[tokio::test] async fn task_queue_runs_one_after_other() { From 736ea9c9def2cbb03890dce55c659b574c6c959f Mon Sep 17 00:00:00 2001 From: David Sherret Date: Tue, 12 Jul 2022 13:06:42 -0400 Subject: [PATCH 09/13] Reduce locking in write_and_maybe_flush --- runtime/ops/io.rs | 32 ++++++++++++++++++++++---------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/runtime/ops/io.rs b/runtime/ops/io.rs index 11bd923f8fc136..e3c6fd8b837e64 100644 --- a/runtime/ops/io.rs +++ b/runtime/ops/io.rs @@ -1,5 +1,6 @@ // Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. +use deno_core::anyhow::bail; use deno_core::error::AnyError; use deno_core::op; use deno_core::parking_lot::Mutex; @@ -357,17 +358,28 @@ impl StdFileResourceInner { &mut self, buf: &[u8], ) -> Result { - let nwritten = self.write(buf)?; - if !matches!(self, StdFileResourceInner::File(_)) { - // Rust will line buffer and we don't want that behavior - // (see https://github.com/denoland/deno/issues/948), so flush. - // Although an alternative solution could be to bypass Rust's std by - // using the raw fds/handles, it will cause encoding issues on Windows - // that we get solved for free by using Rust's stdio wrappers (see - // std/src/sys/windows/stdio.rs in Rust's source code). - self.flush()?; + // Rust will line buffer and we don't want that behavior + // (see https://github.com/denoland/deno/issues/948), so flush stdout and stderr. + // Although an alternative solution could be to bypass Rust's std by + // using the raw fds/handles, it will cause encoding issues on Windows + // that we get solved for free by using Rust's stdio wrappers (see + // std/src/sys/windows/stdio.rs in Rust's source code). + match self { + Self::File(file) => Ok(file.lock().write(buf)?), + Self::Stdin(_) => bail!("cannot write to stdin."), + Self::Stdout => { + let mut stdout = std::io::stdout().lock(); + let nwritten = stdout.write(buf)?; + stdout.flush()?; + Ok(nwritten) + } + Self::Stderr => { + let mut stderr = std::io::stderr().lock(); + let nwritten = stderr.write(buf)?; + stderr.flush()?; + Ok(nwritten) + } } - Ok(nwritten) } } From 724fdd49e86ab69a792df63e5e47ec16aecebb8c Mon Sep 17 00:00:00 2001 From: David Sherret Date: Tue, 12 Jul 2022 14:53:53 -0400 Subject: [PATCH 10/13] Use AsyncRefCell and remove Mutexes --- runtime/lib.rs | 1 - runtime/ops/io.rs | 259 ++++++++++++++++++++++-------------------- runtime/task_queue.rs | 142 ----------------------- 3 files changed, 134 insertions(+), 268 deletions(-) delete mode 100644 runtime/task_queue.rs diff --git a/runtime/lib.rs b/runtime/lib.rs index b006fe74f8e3f3..543d3a0a21e85c 100644 --- a/runtime/lib.rs +++ b/runtime/lib.rs @@ -27,6 +27,5 @@ pub mod tokio_util; pub mod web_worker; pub mod worker; -mod task_queue; mod worker_bootstrap; pub use worker_bootstrap::BootstrapOptions; diff --git a/runtime/ops/io.rs b/runtime/ops/io.rs index e3c6fd8b837e64..9ace5ab6ec586f 100644 --- a/runtime/ops/io.rs +++ b/runtime/ops/io.rs @@ -1,9 +1,9 @@ // Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. use deno_core::anyhow::bail; +use deno_core::error::resource_unavailable; use deno_core::error::AnyError; use deno_core::op; -use deno_core::parking_lot::Mutex; use deno_core::AsyncMutFuture; use deno_core::AsyncRefCell; use deno_core::AsyncResult; @@ -23,15 +23,12 @@ use std::io::ErrorKind; use std::io::Read; use std::io::Write; use std::rc::Rc; -use std::sync::Arc; use tokio::io::AsyncRead; use tokio::io::AsyncReadExt; use tokio::io::AsyncWrite; use tokio::io::AsyncWriteExt; use tokio::process; -use crate::task_queue::TaskQueue; - #[cfg(unix)] use std::os::unix::io::FromRawFd; @@ -45,47 +42,35 @@ use { // alive for the duration of the application since the last handle/fd // being dropped will close the corresponding pipe. #[cfg(unix)] -static STDIN_HANDLE: Lazy>> = Lazy::new(|| { +static STDIN_HANDLE: Lazy = Lazy::new(|| { // SAFETY: corresponds to OS stdin - unsafe { Arc::new(Mutex::new(StdFile::from_raw_fd(0))) } + unsafe { StdFile::from_raw_fd(0) } }); #[cfg(unix)] -static STDOUT_HANDLE: Lazy>> = Lazy::new(|| { +static STDOUT_HANDLE: Lazy = Lazy::new(|| { // SAFETY: corresponds to OS stdout - unsafe { Arc::new(Mutex::new(StdFile::from_raw_fd(1))) } + unsafe { StdFile::from_raw_fd(1) } }); #[cfg(unix)] -static STDERR_HANDLE: Lazy>> = Lazy::new(|| { +static STDERR_HANDLE: Lazy = Lazy::new(|| { // SAFETY: corresponds to OS stderr unsafe { Arc::new(Mutex::new(StdFile::from_raw_fd(2))) } }); #[cfg(windows)] -static STDIN_HANDLE: Lazy>> = Lazy::new(|| { +static STDIN_HANDLE: Lazy = Lazy::new(|| { // SAFETY: corresponds to OS stdin - unsafe { - Arc::new(Mutex::new(StdFile::from_raw_handle(GetStdHandle( - winbase::STD_INPUT_HANDLE, - )))) - } + unsafe { StdFile::from_raw_handle(GetStdHandle(winbase::STD_INPUT_HANDLE)) } }); #[cfg(windows)] -static STDOUT_HANDLE: Lazy>> = Lazy::new(|| { +static STDOUT_HANDLE: Lazy = Lazy::new(|| { // SAFETY: corresponds to OS stdout - unsafe { - Arc::new(Mutex::new(StdFile::from_raw_handle(GetStdHandle( - winbase::STD_OUTPUT_HANDLE, - )))) - } + unsafe { StdFile::from_raw_handle(GetStdHandle(winbase::STD_OUTPUT_HANDLE)) } }); #[cfg(windows)] -static STDERR_HANDLE: Lazy>> = Lazy::new(|| { +static STDERR_HANDLE: Lazy = Lazy::new(|| { // SAFETY: corresponds to OS stderr - unsafe { - Arc::new(Mutex::new(StdFile::from_raw_handle(GetStdHandle( - winbase::STD_ERROR_HANDLE, - )))) - } + unsafe { StdFile::from_raw_handle(GetStdHandle(winbase::STD_ERROR_HANDLE)) } }); pub fn init() -> Extension { @@ -141,7 +126,7 @@ pub fn init_stdio(stdio: Stdio) -> Extension { t.add(StdFileResource::stdio( match stdio.stdin { StdioPipe::Inherit => { - StdFileResourceInner::Stdin(STDIN_HANDLE.clone()) + StdFileResourceInner::Stdin(STDIN_HANDLE.try_clone().unwrap()) } StdioPipe::File(pipe) => StdFileResourceInner::file(pipe), }, @@ -149,14 +134,18 @@ pub fn init_stdio(stdio: Stdio) -> Extension { )); t.add(StdFileResource::stdio( match stdio.stdout { - StdioPipe::Inherit => StdFileResourceInner::Stdout, + StdioPipe::Inherit => { + StdFileResourceInner::Stdout(STDOUT_HANDLE.try_clone().unwrap()) + } StdioPipe::File(pipe) => StdFileResourceInner::file(pipe), }, "stdout", )); t.add(StdFileResource::stdio( match stdio.stderr { - StdioPipe::Inherit => StdFileResourceInner::Stderr, + StdioPipe::Inherit => { + StdFileResourceInner::Stderr(STDERR_HANDLE.try_clone().unwrap()) + } StdioPipe::File(pipe) => StdFileResourceInner::file(pipe), }, "stderr", @@ -320,37 +309,28 @@ impl Resource for ChildStderrResource { } } -#[derive(Clone)] enum StdFileResourceInner { - File(Arc>), - Stdin(Arc>), - // Ideally we would store stdio as an StdFile, but we get some Windows - // specific functionality for free by using Rust std's wrappers. So we - // take a bit of a complexity hit here in order to not have to duplicate - // the functionality in Rust's std/src/sys/windows/stdio.rs - Stdout, - Stderr, + File(StdFile), + Stdin(StdFile), + // For stdout and stderr, we sometimes instead use std::io::stdout() directly, + // because we get some Windows specific functionality for free by using Rust + // std's wrappers. So we take a bit of a complexity hit in order to not + // have to duplicate the functionality in Rust's std/src/sys/windows/stdio.rs + Stdout(StdFile), + Stderr(StdFile), } impl StdFileResourceInner { pub fn file(fs_file: StdFile) -> Self { - StdFileResourceInner::File(Arc::new(Mutex::new(fs_file))) + StdFileResourceInner::File(fs_file) } - pub fn with_file(&self, f: impl FnOnce(&mut StdFile) -> R) -> R { + pub fn with_file(&mut self, f: impl FnOnce(&mut StdFile) -> R) -> R { match self { - Self::File(file) | Self::Stdin(file) => { - let mut file = file.lock(); - f(&mut file) - } - Self::Stdout => { - let mut stdout = STDOUT_HANDLE.lock(); - f(&mut stdout) - } - Self::Stderr => { - let mut stderr = STDERR_HANDLE.lock(); - f(&mut stderr) - } + Self::File(file) + | Self::Stdin(file) + | Self::Stdout(file) + | Self::Stderr(file) => f(file), } } @@ -365,15 +345,15 @@ impl StdFileResourceInner { // that we get solved for free by using Rust's stdio wrappers (see // std/src/sys/windows/stdio.rs in Rust's source code). match self { - Self::File(file) => Ok(file.lock().write(buf)?), + Self::File(file) => Ok(file.write(buf)?), Self::Stdin(_) => bail!("cannot write to stdin."), - Self::Stdout => { + Self::Stdout(_) => { let mut stdout = std::io::stdout().lock(); let nwritten = stdout.write(buf)?; stdout.flush()?; Ok(nwritten) } - Self::Stderr => { + Self::Stderr(_) => { let mut stderr = std::io::stderr().lock(); let nwritten = stderr.write(buf)?; stderr.flush()?; @@ -386,9 +366,8 @@ impl StdFileResourceInner { impl Read for StdFileResourceInner { fn read(&mut self, buf: &mut [u8]) -> std::io::Result { match self { - Self::File(file) | Self::Stdin(file) => file.lock().read(buf), - Self::Stdout => Err(ErrorKind::Unsupported.into()), - Self::Stderr => Err(ErrorKind::Unsupported.into()), + Self::File(file) | Self::Stdin(file) => file.read(buf), + Self::Stdout(_) | Self::Stderr(_) => Err(ErrorKind::Unsupported.into()), } } } @@ -396,31 +375,27 @@ impl Read for StdFileResourceInner { impl Write for StdFileResourceInner { fn write(&mut self, buf: &[u8]) -> std::io::Result { match self { - Self::File(file) => file.lock().write(buf), + Self::File(file) => file.write(buf), Self::Stdin(_) => Err(ErrorKind::Unsupported.into()), - Self::Stdout => std::io::stdout().write(buf), - Self::Stderr => std::io::stderr().write(buf), + // ignore the file and write using Rust's helpers + // (see comment at definition) + Self::Stdout(_) => std::io::stdout().write(buf), + Self::Stderr(_) => std::io::stderr().write(buf), } } fn flush(&mut self) -> std::io::Result<()> { match self { - Self::File(file) => file.lock().flush(), + Self::File(file) => file.flush(), Self::Stdin(_) => Err(ErrorKind::Unsupported.into()), - Self::Stdout => std::io::stdout().flush(), - Self::Stderr => std::io::stderr().flush(), + Self::Stdout(_) => std::io::stdout().flush(), + Self::Stderr(_) => std::io::stderr().flush(), } } } pub struct StdFileResource { - inner: StdFileResourceInner, - /// This is used to help synchronize asynchronous reads and writes - /// so they happen in order. - /// We use a separate sync primitive here instead of putting the - /// StdFile inside an async mutex because we still need to be - /// able to access it synchronously. - task_queue: TaskQueue, + inner: AsyncRefCell>, metadata: RefCell, name: String, } @@ -428,8 +403,7 @@ pub struct StdFileResource { impl StdFileResource { fn stdio(inner: StdFileResourceInner, name: &str) -> Self { Self { - inner, - task_queue: TaskQueue::new(), + inner: AsyncRefCell::new(Some(inner)), metadata: Default::default(), name: name.to_string(), } @@ -437,53 +411,75 @@ impl StdFileResource { pub fn fs_file(fs_file: StdFile) -> Self { Self { - inner: StdFileResourceInner::file(fs_file), - task_queue: TaskQueue::new(), + inner: AsyncRefCell::new(Some(StdFileResourceInner::file(fs_file))), metadata: Default::default(), name: "fsFile".to_string(), } } + fn with_inner( + self: Rc, + action: impl FnOnce(&mut StdFileResourceInner) -> Result, + ) -> Result { + match RcRef::map(&self, |r| &r.inner).try_borrow_mut() { + Some(mut inner_borrow) => { + let mut inner = inner_borrow.take().unwrap(); + let result = action(&mut inner); + inner_borrow.replace(inner); + result + } + None => Err(resource_unavailable()), + } + } + + async fn with_inner_blocking_task( + self: Rc, + action: F, + ) -> R + where + F: FnOnce(&mut StdFileResourceInner) -> R + Send + 'static, + { + let mut inner_borrow = RcRef::map(&self, |r| &r.inner).borrow_mut().await; + let mut inner = inner_borrow.take().unwrap(); + let (inner, result) = tokio::task::spawn_blocking(move || { + let result = action(&mut inner); + (inner, result) + }) + .await + .unwrap(); + inner_borrow.replace(inner); + result + } + async fn read( self: Rc, mut buf: ZeroCopyBuf, ) -> Result<(usize, ZeroCopyBuf), AnyError> { - let mut inner = self.inner.clone(); self - .task_queue - .queue(async { - tokio::task::spawn_blocking( - move || -> Result<(usize, ZeroCopyBuf), AnyError> { - Ok((inner.read(&mut buf)?, buf)) - }, - ) - .await? - }) + .with_inner_blocking_task( + move |inner| -> Result<(usize, ZeroCopyBuf), AnyError> { + Ok((inner.read(&mut buf)?, buf)) + }, + ) .await } async fn write(self: Rc, buf: ZeroCopyBuf) -> Result { - let mut inner = self.inner.clone(); self - .task_queue - .queue(async { - tokio::task::spawn_blocking(move || inner.write_and_maybe_flush(&buf)) - .await? - .map_err(AnyError::from) - }) + .with_inner_blocking_task(move |inner| inner.write_and_maybe_flush(&buf)) .await } - fn with_inner( + fn with_resource( state: &mut OpState, rid: ResourceId, - mut f: F, + f: F, ) -> Result where - F: FnMut(StdFileResourceInner) -> Result, + F: FnOnce(Rc) -> Result, { let resource = state.resource_table.get::(rid)?; - f(resource.inner.clone()) + f(resource) } pub fn with_file( @@ -494,8 +490,9 @@ impl StdFileResource { where F: FnOnce(&mut StdFile) -> Result, { - let resource = state.resource_table.get::(rid)?; - resource.inner.with_file(f) + Self::with_resource(state, rid, move |resource| { + resource.with_inner(move |inner| inner.with_file(f)) + }) } pub fn with_file_and_metadata( @@ -508,7 +505,15 @@ impl StdFileResource { { let resource = state.resource_table.get::(rid)?; let mut meta_data = resource.metadata.borrow_mut(); - resource.inner.with_file(|file| f(file, &mut meta_data)) + match RcRef::map(&resource, |r| &r.inner).try_borrow_mut() { + Some(mut inner_borrow) => { + let mut inner = inner_borrow.take().unwrap(); + let result = inner.with_file(|file| f(file, &mut meta_data)); + inner_borrow.replace(inner); + result + } + None => Err(resource_unavailable()), + } } pub async fn with_file_blocking_task( @@ -524,12 +529,8 @@ impl StdFileResource { .resource_table .get::(rid)?; - let inner = resource.inner.clone(); resource - .task_queue - .queue(async { - tokio::task::spawn_blocking(move || inner.with_file(f)).await? - }) + .with_inner_blocking_task(move |inner| inner.with_file(f)) .await } @@ -546,12 +547,14 @@ impl StdFileResource { state: &mut OpState, rid: u32, ) -> Result { - Self::with_inner(state, rid, |inner| match inner { - StdFileResourceInner::File(file) => { - let file = file.lock().try_clone()?; - Ok(file.into()) - } - _ => Ok(std::process::Stdio::inherit()), + Self::with_resource(state, rid, |resource| { + resource.with_inner(|inner| match inner { + StdFileResourceInner::File(file) => { + let file = file.try_clone()?; + Ok(file.into()) + } + _ => Ok(std::process::Stdio::inherit()), + }) }) } } @@ -581,10 +584,12 @@ pub fn op_print( is_err: bool, ) -> Result<(), AnyError> { let rid = if is_err { 2 } else { 1 }; - StdFileResource::with_inner(state, rid, move |mut inner| { - inner.write_all(msg.as_bytes())?; - inner.flush().unwrap(); - Ok(()) + StdFileResource::with_resource(state, rid, move |resource| { + resource.with_inner(|inner| { + inner.write_all(msg.as_bytes())?; + inner.flush().unwrap(); + Ok(()) + }) }) } @@ -594,11 +599,13 @@ fn op_read_sync( rid: ResourceId, mut buf: ZeroCopyBuf, ) -> Result { - StdFileResource::with_inner(state, rid, move |mut inner| { - inner - .read(&mut buf) - .map(|n: usize| n as u32) - .map_err(AnyError::from) + StdFileResource::with_resource(state, rid, move |resource| { + resource.with_inner(|inner| { + inner + .read(&mut buf) + .map(|n: usize| n as u32) + .map_err(AnyError::from) + }) }) } @@ -608,10 +615,12 @@ fn op_write_sync( rid: ResourceId, buf: ZeroCopyBuf, ) -> Result { - StdFileResource::with_inner(state, rid, move |mut inner| { - inner - .write_and_maybe_flush(&buf) - .map(|nwritten: usize| nwritten as u32) - .map_err(AnyError::from) + StdFileResource::with_resource(state, rid, move |resource| { + resource.with_inner(|inner| { + inner + .write_and_maybe_flush(&buf) + .map(|nwritten: usize| nwritten as u32) + .map_err(AnyError::from) + }) }) } diff --git a/runtime/task_queue.rs b/runtime/task_queue.rs deleted file mode 100644 index 4c10edf03fcc58..00000000000000 --- a/runtime/task_queue.rs +++ /dev/null @@ -1,142 +0,0 @@ -use deno_core::futures::task::AtomicWaker; -use deno_core::futures::Future; -use deno_core::parking_lot::Mutex; -use std::collections::LinkedList; -use std::sync::atomic::AtomicBool; -use std::sync::atomic::Ordering; -use std::sync::Arc; - -#[derive(Default)] -struct TaskQueueTaskWaker { - is_ready: AtomicBool, - waker: AtomicWaker, -} - -#[derive(Default)] -struct TaskQueueTasks { - is_running: bool, - wakers: LinkedList>, -} - -/// A queue that executes tasks sequentially one after the other -/// ensuring order and that no task runs at the same time as another. -#[derive(Clone, Default)] -pub struct TaskQueue { - tasks: Arc>, -} - -impl TaskQueue { - pub fn new() -> Self { - Default::default() - } - - pub async fn queue(&self, future: impl Future) -> R { - let _permit = self.acquire().await; - future.await - } - - pub async fn acquire(&self) -> TaskQueuePermit { - let acquire = TaskQueuePermitAcquire::new(self.tasks.clone()); - acquire.await; - TaskQueuePermit { - tasks: self.tasks.clone(), - } - } -} - -/// A permit that when dropped will allow another task to proceed. -pub struct TaskQueuePermit { - tasks: Arc>, -} - -impl Drop for TaskQueuePermit { - fn drop(&mut self) { - let next_item = { - let mut tasks = self.tasks.lock(); - let next_item = tasks.wakers.pop_front(); - tasks.is_running = next_item.is_some(); - next_item - }; - if let Some(next_item) = next_item { - next_item.is_ready.store(true, Ordering::SeqCst); - next_item.waker.wake(); - } - } -} - -struct TaskQueuePermitAcquire { - tasks: Arc>, - initialized: AtomicBool, - waker: Arc, -} - -impl TaskQueuePermitAcquire { - pub fn new(tasks: Arc>) -> Self { - Self { - tasks, - initialized: Default::default(), - waker: Default::default(), - } - } -} - -impl Future for TaskQueuePermitAcquire { - type Output = (); - - fn poll( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll { - // update with the latest waker - self.waker.waker.register(cx.waker()); - - // ensure this is initialized - if !self.initialized.swap(true, Ordering::SeqCst) { - let mut tasks = self.tasks.lock(); - if !tasks.is_running { - tasks.is_running = true; - return std::task::Poll::Ready(()); - } - tasks.wakers.push_back(self.waker.clone()); - return std::task::Poll::Pending; - } - - // check if we're ready to run - if self.waker.is_ready.load(Ordering::SeqCst) { - std::task::Poll::Ready(()) - } else { - std::task::Poll::Pending - } - } -} - -#[cfg(test)] -mod tests { - use deno_core::futures; - use deno_core::parking_lot::Mutex; - use std::sync::Arc; - - use super::TaskQueue; - - #[tokio::test] - async fn task_queue_runs_one_after_other() { - let task_queue = TaskQueue::new(); - let mut tasks = Vec::new(); - let data = Arc::new(Mutex::new(0)); - for i in 0..100 { - let data = data.clone(); - tasks.push(task_queue.queue(async move { - tokio::task::spawn_blocking(move || { - let mut data = data.lock(); - if *data != i { - panic!("Value was not equal."); - } - *data = i + 1; - }) - .await - .unwrap(); - })); - } - futures::future::join_all(tasks).await; - } -} From fc79eaa5d4828f9adaf91535109722199990385a Mon Sep 17 00:00:00 2001 From: David Sherret Date: Tue, 12 Jul 2022 16:23:10 -0400 Subject: [PATCH 11/13] Cleanup. --- runtime/ops/io.rs | 124 ++++++++++++++++++++++++---------------------- 1 file changed, 64 insertions(+), 60 deletions(-) diff --git a/runtime/ops/io.rs b/runtime/ops/io.rs index 9ace5ab6ec586f..6a3f3c1e7c60eb 100644 --- a/runtime/ops/io.rs +++ b/runtime/ops/io.rs @@ -54,7 +54,7 @@ static STDOUT_HANDLE: Lazy = Lazy::new(|| { #[cfg(unix)] static STDERR_HANDLE: Lazy = Lazy::new(|| { // SAFETY: corresponds to OS stderr - unsafe { Arc::new(Mutex::new(StdFile::from_raw_fd(2))) } + unsafe { StdFile::from_raw_fd(2) } }); #[cfg(windows)] @@ -348,12 +348,14 @@ impl StdFileResourceInner { Self::File(file) => Ok(file.write(buf)?), Self::Stdin(_) => bail!("cannot write to stdin."), Self::Stdout(_) => { + // bypass the file and use std::io::stdout() let mut stdout = std::io::stdout().lock(); let nwritten = stdout.write(buf)?; stdout.flush()?; Ok(nwritten) } Self::Stderr(_) => { + // bypass the file and use std::io::stderr() let mut stderr = std::io::stderr().lock(); let nwritten = stderr.write(buf)?; stderr.flush()?; @@ -361,71 +363,78 @@ impl StdFileResourceInner { } } } -} -impl Read for StdFileResourceInner { - fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + pub fn write_all_and_maybe_flush( + &mut self, + buf: &[u8], + ) -> Result<(), AnyError> { + // this method exists instead of using a `Write` implementation + // so that we can acquire the locks once and do both actions match self { - Self::File(file) | Self::Stdin(file) => file.read(buf), - Self::Stdout(_) | Self::Stderr(_) => Err(ErrorKind::Unsupported.into()), + Self::File(file) => Ok(file.write_all(buf)?), + Self::Stdin(_) => bail!("cannot write to stdin."), + Self::Stdout(_) => { + // bypass the file and use std::io::stdout() + let mut stdout = std::io::stdout().lock(); + stdout.write_all(buf)?; + stdout.flush()?; + Ok(()) + } + Self::Stderr(_) => { + // bypass the file and use std::io::stderr() + let mut stderr = std::io::stderr().lock(); + stderr.write_all(buf)?; + stderr.flush()?; + Ok(()) + } } } } -impl Write for StdFileResourceInner { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - match self { - Self::File(file) => file.write(buf), - Self::Stdin(_) => Err(ErrorKind::Unsupported.into()), - // ignore the file and write using Rust's helpers - // (see comment at definition) - Self::Stdout(_) => std::io::stdout().write(buf), - Self::Stderr(_) => std::io::stderr().write(buf), - } - } - - fn flush(&mut self) -> std::io::Result<()> { +impl Read for StdFileResourceInner { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { match self { - Self::File(file) => file.flush(), - Self::Stdin(_) => Err(ErrorKind::Unsupported.into()), - Self::Stdout(_) => std::io::stdout().flush(), - Self::Stderr(_) => std::io::stderr().flush(), + Self::File(file) | Self::Stdin(file) => file.read(buf), + Self::Stdout(_) | Self::Stderr(_) => Err(ErrorKind::Unsupported.into()), } } } pub struct StdFileResource { - inner: AsyncRefCell>, - metadata: RefCell, name: String, + cell: AsyncRefCell>, } impl StdFileResource { fn stdio(inner: StdFileResourceInner, name: &str) -> Self { Self { - inner: AsyncRefCell::new(Some(inner)), - metadata: Default::default(), + cell: AsyncRefCell::new(Some((inner, Default::default()))), name: name.to_string(), } } pub fn fs_file(fs_file: StdFile) -> Self { Self { - inner: AsyncRefCell::new(Some(StdFileResourceInner::file(fs_file))), - metadata: Default::default(), + cell: AsyncRefCell::new(Some(( + StdFileResourceInner::file(fs_file), + Default::default(), + ))), name: "fsFile".to_string(), } } - fn with_inner( + fn with_inner_and_metadata( self: Rc, - action: impl FnOnce(&mut StdFileResourceInner) -> Result, + action: impl FnOnce( + &mut StdFileResourceInner, + &mut FileMetadata, + ) -> Result, ) -> Result { - match RcRef::map(&self, |r| &r.inner).try_borrow_mut() { - Some(mut inner_borrow) => { - let mut inner = inner_borrow.take().unwrap(); - let result = action(&mut inner); - inner_borrow.replace(inner); + match RcRef::map(&self, |r| &r.cell).try_borrow_mut() { + Some(mut cell) => { + let mut file = cell.take().unwrap(); + let result = action(&mut file.0, &mut file.1); + cell.replace(file); result } None => Err(resource_unavailable()), @@ -439,15 +448,17 @@ impl StdFileResource { where F: FnOnce(&mut StdFileResourceInner) -> R + Send + 'static, { - let mut inner_borrow = RcRef::map(&self, |r| &r.inner).borrow_mut().await; - let mut inner = inner_borrow.take().unwrap(); - let (inner, result) = tokio::task::spawn_blocking(move || { - let result = action(&mut inner); - (inner, result) + // we take the value out of the cell, use it on a blocking task, + // then put it back into the cell when we're done + let mut cell = RcRef::map(&self, |r| &r.cell).borrow_mut().await; + let mut file = cell.take().unwrap(); + let (file, result) = tokio::task::spawn_blocking(move || { + let result = action(&mut file.0); + (file, result) }) .await .unwrap(); - inner_borrow.replace(inner); + cell.replace(file); result } @@ -491,7 +502,7 @@ impl StdFileResource { F: FnOnce(&mut StdFile) -> Result, { Self::with_resource(state, rid, move |resource| { - resource.with_inner(move |inner| inner.with_file(f)) + resource.with_inner_and_metadata(move |inner, _| inner.with_file(f)) }) } @@ -503,17 +514,11 @@ impl StdFileResource { where F: FnOnce(&mut StdFile, &mut FileMetadata) -> Result, { - let resource = state.resource_table.get::(rid)?; - let mut meta_data = resource.metadata.borrow_mut(); - match RcRef::map(&resource, |r| &r.inner).try_borrow_mut() { - Some(mut inner_borrow) => { - let mut inner = inner_borrow.take().unwrap(); - let result = inner.with_file(|file| f(file, &mut meta_data)); - inner_borrow.replace(inner); - result - } - None => Err(resource_unavailable()), - } + Self::with_resource(state, rid, move |resource| { + resource.with_inner_and_metadata(move |inner, metadata| { + inner.with_file(move |file| f(file, metadata)) + }) + }) } pub async fn with_file_blocking_task( @@ -548,7 +553,7 @@ impl StdFileResource { rid: u32, ) -> Result { Self::with_resource(state, rid, |resource| { - resource.with_inner(|inner| match inner { + resource.with_inner_and_metadata(|inner, _| match inner { StdFileResourceInner::File(file) => { let file = file.try_clone()?; Ok(file.into()) @@ -585,9 +590,8 @@ pub fn op_print( ) -> Result<(), AnyError> { let rid = if is_err { 2 } else { 1 }; StdFileResource::with_resource(state, rid, move |resource| { - resource.with_inner(|inner| { - inner.write_all(msg.as_bytes())?; - inner.flush().unwrap(); + resource.with_inner_and_metadata(|inner, _| { + inner.write_all_and_maybe_flush(msg.as_bytes())?; Ok(()) }) }) @@ -600,7 +604,7 @@ fn op_read_sync( mut buf: ZeroCopyBuf, ) -> Result { StdFileResource::with_resource(state, rid, move |resource| { - resource.with_inner(|inner| { + resource.with_inner_and_metadata(|inner, _| { inner .read(&mut buf) .map(|n: usize| n as u32) @@ -616,7 +620,7 @@ fn op_write_sync( buf: ZeroCopyBuf, ) -> Result { StdFileResource::with_resource(state, rid, move |resource| { - resource.with_inner(|inner| { + resource.with_inner_and_metadata(|inner, _| { inner .write_and_maybe_flush(&buf) .map(|nwritten: usize| nwritten as u32) From b07616df6ef14d255dd49f07129960a722d96bb6 Mon Sep 17 00:00:00 2001 From: David Sherret Date: Tue, 12 Jul 2022 17:11:37 -0400 Subject: [PATCH 12/13] Ooops... bail was temp during refactoring. Revert back to ErrorKind::Unsupported. --- runtime/ops/io.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/runtime/ops/io.rs b/runtime/ops/io.rs index 6a3f3c1e7c60eb..2dcb9964a69220 100644 --- a/runtime/ops/io.rs +++ b/runtime/ops/io.rs @@ -1,6 +1,5 @@ // Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. -use deno_core::anyhow::bail; use deno_core::error::resource_unavailable; use deno_core::error::AnyError; use deno_core::op; @@ -346,7 +345,9 @@ impl StdFileResourceInner { // std/src/sys/windows/stdio.rs in Rust's source code). match self { Self::File(file) => Ok(file.write(buf)?), - Self::Stdin(_) => bail!("cannot write to stdin."), + Self::Stdin(_) => { + Err(Into::::into(ErrorKind::Unsupported).into()) + } Self::Stdout(_) => { // bypass the file and use std::io::stdout() let mut stdout = std::io::stdout().lock(); @@ -372,7 +373,9 @@ impl StdFileResourceInner { // so that we can acquire the locks once and do both actions match self { Self::File(file) => Ok(file.write_all(buf)?), - Self::Stdin(_) => bail!("cannot write to stdin."), + Self::Stdin(_) => { + Err(Into::::into(ErrorKind::Unsupported).into()) + } Self::Stdout(_) => { // bypass the file and use std::io::stdout() let mut stdout = std::io::stdout().lock(); From 6d2d64cd6b368b27cb55e8e7fbc730926640a2da Mon Sep 17 00:00:00 2001 From: David Sherret Date: Wed, 13 Jul 2022 10:13:03 -0400 Subject: [PATCH 13/13] Add unit test --- cli/tests/unit/files_test.ts | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/cli/tests/unit/files_test.ts b/cli/tests/unit/files_test.ts index d15f1f5385c0f7..5fb590d726e6d2 100644 --- a/cli/tests/unit/files_test.ts +++ b/cli/tests/unit/files_test.ts @@ -263,6 +263,38 @@ Deno.test( }, ); +Deno.test( + { permissions: { write: true } }, + async function writeSyncWhileAsyncFails() { + const tempDir = await Deno.makeTempDir(); + try { + const filePath = tempDir + "/file.txt"; + const file = await Deno.open(filePath, { create: true, write: true }); + const rid = file.rid; + try { + // set a file lock so the async write will be held up + await Deno.flock(rid, true); + let p: Promise | undefined; + try { + p = Deno.write(rid, new TextEncoder().encode("test")); + assertThrows( + () => Deno.writeSync(rid, new TextEncoder().encode("test")), + Error, + "Resource is unavailable because it is in use by a promise", + ); + } finally { + await Deno.funlock(rid); + } + await p; + } finally { + file.close(); + } + } finally { + Deno.removeSync(tempDir, { recursive: true }); + } + }, +); + Deno.test(async function openOptions() { const filename = "cli/tests/testdata/fixture.json"; await assertRejects(