16

For arbitrary promise implementation, the deferred pattern (not to be confused with antipattern) may may look like:

const deferred = new Deferred;
...
// scopes where `deferred` object reference was passed before promise settlement
deferred.promise.then((result) => { ... }, (error) => { ... });
...
deferred.resolve(...);
// doesn't affect promise state
deferred.reject();
...
// after promise settlement
deferred.promise.then((result) => { ... }, (error) => { ... });

deferred object holds unsettled promise that can be passed to other function scopes by reference. All promise chains will be executed on promise settlement, it doesn't matter if deferred.promise was settled before chaining with then or after. The state of promise cannot be changed after it was settled.


As the answer suggests, the initial choices are ReplaySubject and AsyncSubject.

For the given setup (a demo)

var subject = new Rx.AsyncSubject;
var deferred = subject.first();

deferred.subscribe(
  console.log.bind(console, 'Early result'),
  console.log.bind(console, 'Early error')
);

setTimeout(() => {
  deferred.subscribe(
    console.log.bind(console, 'Late result'),
    console.log.bind(console, 'Late error')
  );
});

This results in desirable behaviour:

subject.error('one');
subject.next('two');

Early error one

Late error one

This results in undesirable behaviour:

subject.error('one');
subject.next('two');
subject.complete();

Early error one

Late result two

This results in undesirable behaviour:

subject.next('two');
subject.complete();
subject.next('three');

Early result two

Late result three

The results from ReplaySubject differ but are still inconsistent with expected results. next values and error errors are treated separately, and complete doesn't prevent the observers from receiving new data. This may work for single next/error, the problem is that next or error may be called multiple times unintentionally.

The reason why first() is used is because subscribes are one-time subscriptions, and I would like to remove them to avoid leaks.

How should it be implemented with RxJS observables?

2 Answers 2

3

You are probably looking for a Rx.ReplaySubject(1) (or an Rx.AsyncSubject() depending on your use case).

For a more detailed explanation of subjects, see What are the semantics of different RxJS subjects?.

Basically, a subject can be passed around by reference, like a deferred. You can emit values (resolve would be an 'next' (Rxjs v5) or 'onNext' (Rxjs v4) followed by 'complete' or 'onCompleted()') to it, as long as you hold that reference.

You can have any amount of subscribers to a subject, similar to the then to a deferred. If you use a replaySubject(1), any subscribers will receive the last emitted value, which should answer your it doesn't matter if deferred.promise was settled before chaining with then or after.. In Rxjs v4, a replaySubject will emit its last value to a subscriber subscribing after it has completed. I am not sure about the behaviour in Rxjs v5.

Update

The following code executed with Rxjs v4 :

var subject = new Rx.AsyncSubject();
var deferred = subject;

deferred.subscribe(
  console.log.bind(console, 'First result'),
  console.log.bind(console, 'First error')
);

setTimeout(() => {
  deferred.subscribe(
    console.log.bind(console, 'Second result'),
    console.log.bind(console, 'Second error')
  );
});

subject.onNext('one');
subject.onCompleted();
subject.onNext('two');
subject.onNext('three');
subject.onNext('four');

produces the following output:

First result one
Second result one

However, the same code executed with Rxjs v5 does not :

First result one
Second result four

So basically that means that subjects' semantics have changed in Rxjs v5!!! That really is a breaking change to be aware of. Anyways, you could consider moving back to Rxjs v4, or use the turnaround suggested by artur grzesiak in his answer. You could also file an issue on the github site. I would believe that the change is intentional, but in the advent it is not, filing the issue might help clarify the situation. In any case, whatever behaviour chosen should definitely be documented properly.

The question about subjects' semantics features a link showing the async subject in relation with multiple and late subscription

Sign up to request clarification or add additional context in comments.

10 Comments

Thank you, finally, a use case for subjects. So the only difference between replay and async in this case is that replay subscriptions will be triggered synchronously, and async will be triggered on the next tick, isn't it? I've tried replay subject here, added first to clear subscriptions. The problem I see is that deferred resolution can be (accidentally) changed with next, it would be different with promises. How can it be treated? Please, update the answer if possible.
I already answered that, the equivalent of 'resolving' which by the way is a terminology that does not fit observables, would be next followed by complete. Read the link on subjects, it helps understand. Once an subject is complete, all the following next are ignored. For the difference between async and replay, read the link and the documentation
Like 99% the behavior is a bug in RxJS5 - created issue: github.com/ReactiveX/rxjs/issues/1800. Still to mirror resolve-reject-promise structure of deferred properly you need some sort of additional control mechanism.
Alright I saw the issue seems to be already solved. Now I would be curious to know if that impacts other areas of Rxjs v5. Anyways, back to the additional control mechanism you mention, why would that be? Once a subject is completed ('resolved') you can send anything else through it, that will be ignored. Same for erroring ('reject'). Errors are final, you send another error or else, it won't go through. And you don't have to use 'first' as it is done here, because calling complete does that already. So basically the control mechanism you mention is already built in.
No - you still need it as you do not want to pass the whole deferred around. You create deferred and share only the observable/promise. But only those who have access to the deferred itself can resolve/reject it. With a good discipline you could use .asObservable() and pass it around, but definitely not the whole AsyncSubject.
|
2
+150

As @user3743222 wrote AsyncSubject maybe used in deferred implementation, but the thing is it has to be private and guarded from multiple resolves / rejects.

Below is a possible implementation mirroring resolve-reject-promise structure:

const createDeferred = () => {
  const pending = new Rx.AsyncSubject(); // caches last value / error
  const end = (result) => {
    if (pending.isStopped) {
      console.warn('Deferred already resloved/rejected.'); // optionally throw
      return;
    }
    
    if (result.isValue) {
      pending.next(result.value);
      pending.complete();
    } else {
      pending.error(result.error);
    }
  }
  return {
    resolve: (value) => end({isValue: true, value: value }),
    reject: (error) => end({isValue: false, error: error }),
    observable: pending.asObservable() // hide subject
  };
}

// sync example
let def = createDeferred();
let obs = def.observable;
obs.subscribe(n => console.log('BEFORE-RESOLVE'));
def.resolve(1);
def.resolve(2); // warn - no action
def.reject('ERROR') // warn - no action
def.observable.subscribe(n => console.log('AFTER-RESOLVE'));

// async example
def = createDeferred();
def.observable.subscribe(() => console.log('ASYNC-BEFORE-RESOLVE'));
setTimeout(() => {
  def.resolve(1);
  setTimeout(() => {
    def.observable.subscribe(() => console.log('ASYNC-AFTER-RESOLVE'));
    def.resolve(2); // warn
    def.reject('err'); // warn
  }, 1000)
}, 1000);

// async error example
const def3 = createDeferred();
def3.observable.subscribe(
  (n) => console.log(n, 'ERROR-BEFORE-REJECTED (I will not be called)'),
  (err) => console.error('ERROR-BEFORE-REJECTED', err));
setTimeout(() => {
  def3.reject('ERR');
  setTimeout(() => {
    def3.observable.subscribe(
      (n) => console.log(n, 'ERROR-AFTER-REJECTED (I will not be called)'),
      (err) => console.error('ERROR-AFTER-REJECTED', err));
    def3.resolve(2); // warn
    def3.reject('err'); // warn
  }, 1000)
}, 3000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.0-beta.9/Rx.umd.js"></script>

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.