Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

responseObserver.complete() is not called #116

Closed
christian-schlichtherle opened this issue Mar 25, 2024 · 11 comments · Fixed by #119
Closed

responseObserver.complete() is not called #116

christian-schlichtherle opened this issue Mar 25, 2024 · 11 comments · Fixed by #119
Labels

Comments

@christian-schlichtherle

return new Observable((responseObserver: Observer<Notification<Output>>) => {

First, thanks for this useful library. When using it for a single web worker, I noticed that my web worker never completes and I had to insert a call to take(1) into its pipeline. Looking at your source code, I believe the reason is that responseObserver.complete() is never called in the fromWorker function - see link.

For the full picture:

Expectation: If I use fromWorker and give it an observable which completes after emitting N events as input and use a one-to-one transformation (e.g. the identity function), then the output observable should complete after emitting N events, too.

Actual behavior: The output observable never completes.

Workaround: Insert take(N) into the pipeline on the worker side.

@zakhenry
Copy link
Collaborator

@christian-schlichtherle I might be missing something here, but I'm fairly sure that the code should be causing the worker to terminate after the input observable has terminated. This is the behaviour observed in the demo site too - a fromWorker call is used to hash a file and then the worker terminates once there is no further input.

Are you sure that your input observable is completing?

@christian-schlichtherle
Copy link
Author

christian-schlichtherle commented Mar 26, 2024

Yes, because I'm only using of(item) to create an observable which completes after the given item. Inserting take(1) on the worker side is a possible workaround, but not a good solution.

For reference, please have a look at my PR - it adds the missing call to responseObservable.complete().

@christian-schlichtherle
Copy link
Author

Also, I'm using concatMap on the worker side to transform the single input item into a large (but not infinite) number of output items. Maybe this specific transformation exposes the bug?!

@maxime1992
Copy link
Contributor

Right after your concatMap, try to put a finalize with a console.log to make sure that the observable finishes.
I suspect it may not but could be wrong. Hard to debug and understand without code.
If you could create a minimal repro Stackblitz of the issue using of and concatMap that'd be ideal.

@christian-schlichtherle
Copy link
Author

christian-schlichtherle commented Mar 26, 2024

Here's a simple test case:

In engine-worker.ts:

import { DoWork, runWorker } from 'observable-webworker'
import { concatMap, finalize, interval, map, Observable, take, takeWhile } from 'rxjs'

export type EngineConfig = {
  limit: number
  rate: number
}

class EngineWorker implements DoWork<EngineConfig, string> {
  public work(config$: Observable<EngineConfig>): Observable<string> {
    return config$.pipe(
      take(1), // FIXME: The resulting observable never finishes without this call!
      concatMap((config) => {
        const gen = (function* double(): Generator<number> {
          for (let i = 1; i <= config.limit; i++) {
            yield 2 * i
          }
        })()
        return interval(1000 / config.rate).pipe(
          map(() => gen.next().value),
          takeWhile((item) => item),
          finalize(() => console.log('OVER!'))
        )
      })
    )
  }
}

runWorker(EngineWorker)

In renderer.ts:

import { fromWorker } from 'observable-webworker'
import { of, Subscription } from 'rxjs'
import { EngineConfig } from './engine-worker'

const engineWorkerFactory = (): Worker => {
  return new Worker(new URL('./engine-worker', import.meta.url), {
    type: 'module'
  })
}
fromWorker<EngineConfig, number>(engineWorkerFactory, of({ limit: 30, rate: 30 })).subscribe({
  next(item): void {
    console.log(item)
  },
  complete(): void {
    console.log('OVER AND OUT!')
  }
})

If you run this unchanged, it will print:

2
[...]
60
OVER!
OVER AND OUT!

This is because I put the workaround take(1) into the pipeline on the worker side. However, if you remove this call, then the code will omit the final "OVER AND OUT!" message because the output stream never completes. If you take a closer look at my PR, introducing the call responseObservable.complete() should fix the issue. Calling this method or .error(error) is required according to the interface contract for an Observer.

@maxime1992
Copy link
Contributor

Thanks for the example. Bug confirmed with a live repro on stackblitz.

@maxime1992
Copy link
Contributor

maxime1992 commented Mar 26, 2024

What's weird is that it looks like this bit of the code is tested 🤔

EDIT: The content of the worker is mocked in the before each so that may not help

@maxime1992
Copy link
Contributor

I think I found why here: The input$ is materialized so the complete is never taken into account at this level. I think we could have a different fix than #117 I'll have a look

@maxime1992
Copy link
Contributor

Ok I've got a test highlighting the issue

image

I'll write a fix as well

@maxime1992
Copy link
Contributor

I have created a minimal repro in our demo app (but I'm not going to commit it) with the following:

export class BasicWorker implements DoWork<number, number> {
  public work(input$: Observable<number>): Observable<number> {
    return input$.pipe(map(x => x * 10));
  }
}

runWorker(BasicWorker);

and

const input$: Observable<number> = of(1, 2, 3);

fromWorker<number, number>(() => {
  const worker = new Worker(new URL('../basic.worker', import.meta.url), {
    name: 'md5-multiply-worker',
    type: 'module',
  });
  this.events$.next('Main: basic worker created');
  return worker;
}, input$).subscribe({
  next(item): void {
    console.log(item);
  },
  complete(): void {
    console.log('OVER AND OUT!');
  },
});

Do note that there is no take(1) above.

The logs I get:

image

So the fix works as expected, whereas if I remove my fix I get

image

I originally thought that the issue was coming from the from-worker but after banging my head on this for a bit I figured it came from run-worker.

I have added a unit test to prove the fix which fails with the following error when the fix isn't applied:

image

And then passes, along the 25 other tests.

Copy link

🎉 This issue has been resolved in version 6.0.1 🎉

The release is available on:

Your semantic-release bot 📦🚀

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants