15

I am working on a file encryption and upload class using Angular. Many of these operations are async and therefore the methods I wrote are returning RxJS Observables.

// 1.
private prepareUpload(file): Observable<T>;

// 2.
private encryptData(data, filekey): Observable<T>

// 3.
private uploadEncryptedData(formData, token, range): Observable<T>

// 4.
private completeUpload(updatedFilekey, token): Observable<T>

I want to encapsulate this logic in a public upload(file) method and I ended up using nested subscriptions and it works but I know that it is wrong and an anti-pattern in RxJS for several reasons. Here is a simplified version of the code:

public upload(file) {
    const gen = this.indexGenerator(); // generator function

    this.prepareUpload(file).subscribe(values => {
    const [response, filekey, data] = values;
    
      this.encryptData(data, filekey).subscribe(encryptedDataContainer => {
      const formData = this.prepareEncDataUpload(encryptedDataContainer.data, file.name)
      const range = this.getRange(file.size, gen.next().value);
      
        this.uploadEncryptedData(formData, response.token, range).subscribe(() => {
          if (range.isFinalPart) {
            this.completeUpload(encryptedDataContainer.updatedFilekey, response.token).subscribe(console.log);
          }
        });
      
      });
    
    });

}

I failed to clean this code using combinations of several RxJS operators. My goal is to avoid nested subscriptions and instead return a single Observable from the public upload() method when the workflow is completed.

3
  • Maybe you would want to have a look at streams to Commented Mar 29, 2019 at 11:06
  • Maybe you could use RxJS "toPromise" function so you can put your function one after another instead of nesting subscribtion Commented Mar 29, 2019 at 11:09
  • I think chaining your observables would do it, you can do it with flatMap maybe - stackoverflow.com/a/37777382/9176461 and stackoverflow.com/questions/34701304/… Commented Mar 29, 2019 at 11:38

4 Answers 4

18

You can use mergeMap and filter operators from RxJs and chain your calls. You will need to create some function level variables to use during the chaining.

import { mergeMap, filter, catchError } from 'rxjs/operators`
public upload(file) {
    const gen = this.indexGenerator(); // generator function
    let range, token;
    this.prepareUpload(file)
      .pipe(
        mergeMap((values) => {
          const [response, filekey, data] = values;
          token = response.token;
          return this.encryptData(data, filekey);
        }),
        mergeMap(encryptedDataContainer => {
          const formData = this.prepareEncDataUpload(encryptedDataContainer.data, file.name)
          range = this.getRange(file.size, gen.next().value);

          return this.uploadEncryptedData(formData, token, range);
        }),
        filter(() => !!range.isFinalPart),
        mergeMap(() => {
          return this.completeUpload(encryptedDataContainer.updatedFilekey, token);
        })
        catchError((error) => {
          console.log(error);
          // handle the error accordingly.
        })
      )
      .subscribe(() => {
        console.log('success');
      });

  }
Sign up to request clarification or add additional context in comments.

4 Comments

no error handling here - otherwise looks fairly healthy
Added the error handling too. Thanks for the suggestion.
Thanks @MuhammadAhsanAyaz that is what I was looking for! I just needed to declare encryptedDataContainer variable alongside the range and token variable scope to make it work. I also needed to replace mergeMap with concatMap. But this is due to how the encrypted upload works and has nothing to do with the general solution.
Awesome @Benny1158. Glad I could help.
2

You want to use pipe before subscribing. Pipe allows you to make changes to values coming down the stream before the stream emits them. Also, use mergeMap to flatten the subscribe chain. Here's an overview. This doesn't provide a full solution - not paying me enough ;) - but is sufficient to point you in the right direction:

this.prepareUpload(file).pipe(
  tap(values => console.log('hello1', values)),
  map(values => [response, filekey, data]),
  tap(values => console.log('hello2', values)),
  mergeMap(() =>
      // essential to catchError else an HTTP error response will disable this effect - if it uses HTTP - catchError essentially prevents the stream from erroring in which case it will never emit again
      this.encryptData(data, filekey).pipe(
        map(res => res), // do something with res here if you want
        catchError(() => {
          return of(null)
        })
      )
    ),
    filter(res => !!res)
    // more mergemap stuff here
  ).subscribe(values => {
    console.log(values)
  })

Hint: use the tap operator to console.log values as they are passing down the stream

PS: Syntax not checked, may be missing a comma or bracket or 2

PPS: the functions in the pipe are all RxJS operators

Comments

0

You can merge those observables using mergeMap rxjs operator and get rid of nested subscriptions.

Though there is one catch, Be aware that because mergeMap maintains multiple active inner subscriptions at once it's possible to create a memory leak through long-lived inner subscriptions.

For reference and example: https://www.learnrxjs.io/operators/transformation/mergemap.html

Comments

0

I think chaining your observables would do it, you can do it with flatMap (alias for mergeMap) maybe - https://stackoverflow.com/a/37777382/9176461 and RxJS Promise Composition (passing data)

As in my comment mentoined, something like the following should work (pseudo code):

public upload(file) {
    const gen = this.indexGenerator(); // generator function

    return Rx.Observable.just(file).pipe(
         mergeMap(this.prepareUpload),
         mergeMap(this.encryptData),
         mergeMap(this.prepareEncDataUpload),
         mergeMap(this.prepareEncDataUpload),
         .... )
}

1 Comment

using pipe is the new RxJS format - this looks like the old school format - also, isn't flatMap deprecated?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.