-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
responseObserver.complete() is not called #116
Comments
@christian-schlichtherle I might be missing something here, but I'm fairly sure that the code should be causing the worker to terminate after the input observable has terminated. This is the behaviour observed in the demo site too - a Are you sure that your input observable is completing? |
Yes, because I'm only using For reference, please have a look at my PR - it adds the missing call to |
Also, I'm using |
Right after your |
Here's a simple test case: In import { DoWork, runWorker } from 'observable-webworker'
import { concatMap, finalize, interval, map, Observable, take, takeWhile } from 'rxjs'
export type EngineConfig = {
limit: number
rate: number
}
class EngineWorker implements DoWork<EngineConfig, string> {
public work(config$: Observable<EngineConfig>): Observable<string> {
return config$.pipe(
take(1), // FIXME: The resulting observable never finishes without this call!
concatMap((config) => {
const gen = (function* double(): Generator<number> {
for (let i = 1; i <= config.limit; i++) {
yield 2 * i
}
})()
return interval(1000 / config.rate).pipe(
map(() => gen.next().value),
takeWhile((item) => item),
finalize(() => console.log('OVER!'))
)
})
)
}
}
runWorker(EngineWorker) In import { fromWorker } from 'observable-webworker'
import { of, Subscription } from 'rxjs'
import { EngineConfig } from './engine-worker'
const engineWorkerFactory = (): Worker => {
return new Worker(new URL('./engine-worker', import.meta.url), {
type: 'module'
})
}
fromWorker<EngineConfig, number>(engineWorkerFactory, of({ limit: 30, rate: 30 })).subscribe({
next(item): void {
console.log(item)
},
complete(): void {
console.log('OVER AND OUT!')
}
}) If you run this unchanged, it will print:
This is because I put the workaround |
Thanks for the example. Bug confirmed with a live repro on stackblitz. |
What's weird is that it looks like this bit of the code is tested 🤔 EDIT: The content of the worker is mocked in the before each so that may not help |
I have created a minimal repro in our demo app (but I'm not going to commit it) with the following: export class BasicWorker implements DoWork<number, number> {
public work(input$: Observable<number>): Observable<number> {
return input$.pipe(map(x => x * 10));
}
}
runWorker(BasicWorker); and const input$: Observable<number> = of(1, 2, 3);
fromWorker<number, number>(() => {
const worker = new Worker(new URL('../basic.worker', import.meta.url), {
name: 'md5-multiply-worker',
type: 'module',
});
this.events$.next('Main: basic worker created');
return worker;
}, input$).subscribe({
next(item): void {
console.log(item);
},
complete(): void {
console.log('OVER AND OUT!');
},
}); Do note that there is no The logs I get: So the fix works as expected, whereas if I remove my fix I get I originally thought that the issue was coming from the I have added a unit test to prove the fix which fails with the following error when the fix isn't applied: And then passes, along the 25 other tests. |
🎉 This issue has been resolved in version 6.0.1 🎉 The release is available on: Your semantic-release bot 📦🚀 |
observable-webworker/projects/observable-webworker/src/lib/from-worker.ts
Line 15 in f5fe049
First, thanks for this useful library. When using it for a single web worker, I noticed that my web worker never completes and I had to insert a call to
take(1)
into its pipeline. Looking at your source code, I believe the reason is thatresponseObserver.complete()
is never called in thefromWorker
function - see link.For the full picture:
Expectation: If I use
fromWorker
and give it an observable which completes after emitting N events as input and use a one-to-one transformation (e.g. the identity function), then the output observable should complete after emitting N events, too.Actual behavior: The output observable never completes.
Workaround: Insert
take(N)
into the pipeline on the worker side.The text was updated successfully, but these errors were encountered: