Skip to content

Commit

Permalink
ZStream scalajs from File (zio#8853)
Browse files Browse the repository at this point in the history
  • Loading branch information
kaplan-shaked authored May 21, 2024
1 parent 83940e8 commit d9ffa16
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,17 @@ import zio.{Cause, Chunk, Promise, Ref, Schedule, ZIO, ZIOBaseSpec}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

import java.nio.charset.StandardCharsets
import zio.test.Assertion.containsString

object ZStreamPlatformSpecificSpec extends ZIOBaseSpec {

private def getTempDir: String = {
import scalajs.js.Dynamic.{global => g}
val os = g.require("os")
os.tmpdir().toString
}

def spec = suite("ZStream JS")(
test("async")(check(Gen.chunkOf(Gen.int)) { chunk =>
val s = ZStream.async[Any, Throwable, Int](k => chunk.foreach(a => k(ZIO.succeed(Chunk.single(a)))))
Expand Down Expand Up @@ -214,6 +224,24 @@ object ZStreamPlatformSpecificSpec extends ZIOBaseSpec {
} yield assert(isDone)(isFalse) &&
assert(exit.untraced)(failsCause(containsCause(Cause.interrupt(selfId))))
}
)
),
test("FromFile should work") {
val content = "Hello World"
val path = getTempDir + "/zio-streams-test.txt"

val s = ZStream.fromFile(path)
for {
_ <- ZIO.writeFile(path, content)
res <- s.runCollect.map(chunk => new String(chunk.toArray, StandardCharsets.UTF_8))
} yield assert(res)(equalTo(content))
},
test("FromFile should fail on non existing file") {
val path = getTempDir + "/zio-streams-test-2.txt"
val s = ZStream.fromFile(path)
for {

res <- s.runCollect.flip
} yield assert(res.getMessage())(containsString(s"Error: ENOENT: no such file or directory"))
}
)
}
29 changes: 29 additions & 0 deletions streams/js/src/main/scala/zio/stream/platform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package zio.stream
import zio._
import zio.stacktracer.TracingImplicits.disableAutoTrace

import scala.scalajs.js.typedarray._
import scala.concurrent.Future
import scala.scalajs.js

private[stream] trait ZStreamPlatformSpecificConstructors {
self: ZStream.type =>
Expand Down Expand Up @@ -167,6 +169,33 @@ private[stream] trait ZStreamPlatformSpecificConstructors {
asyncInterrupt(k => register(k).toRight(ZIO.unit), outputBuffer)

trait ZStreamConstructorPlatformSpecific extends ZStreamConstructorLowPriority1

def fromFile(file: => String, chunkSize: => Int = ZStream.DefaultChunkSize)(implicit
trace: Trace
): ZStream[Any, Throwable, Byte] = {
import scalajs.js.Dynamic.{global => g}
val fs = g.require("fs")
val reader = fs.createReadStream(
file,
new js.Object {
val highWaterMark = chunkSize
}
)
ZStream.async[Any, Throwable, Byte] { cb =>
reader
.on(
"data",
(data: js.typedarray.ArrayBuffer) =>
cb(
ZIO.succeed(
Chunk.fromArray(new Int8Array(data).toArray)
)
)
)
.on("end", () => cb(ZIO.fail(None)))
.on("error", (err: js.Dynamic) => cb(ZIO.fail(Some(new Throwable(err.toString)))))
}
}
}

private[stream] trait ZSinkPlatformSpecificConstructors
Expand Down

0 comments on commit d9ffa16

Please sign in to comment.