0

I am having some issues with working with the data from RxJs and other data in a service in Angular.

I am working with Angular 15 and standalone components, but also trying to follow some best practices for working with data.

My component is to show data from various sources. The issue I have is combining all of that data into a response from my service. I believe my issue is simply in not fully understanding the RxJs pipe, subscribe, map functionality.

In my example service, I can get the different parts working correctly, but I cannot combine the results into what I want.

I will omit a lot of the service that is not relevant to this task. Basically, my code is fetching data from a web service back end using Http, and getting that response as an observable of data in the database.

My application then has a local copy of data, in an array, that may have some of the same records, different records, etc. I am trying to combine the results into an internal BehaviorSubject that would then be used by my component to show the data.

    private ApiEndPoint: string = '/timeentry';
    private Data$_: BehaviorSubject<Array<LIB_TimeEntry_ResponseDTO>> = new BehaviorSubject<Array<LIB_TimeEntry_ResponseDTO>>(new Array<LIB_TimeEntry_ResponseDTO>());
    private MergedData$_: BehaviorSubject<Array<LIB_TimeEntry_ResponseDTO>> = new BehaviorSubject<Array<LIB_TimeEntry_ResponseDTO>>(new Array<LIB_TimeEntry_ResponseDTO>());

    constructor(
        private Http_: HttpClient,
    ) {}

    // component uses this with async pipe and can loop through and show data
    get Entries$(): Observable<Array<LIB_TimeEntry_ResponseDTO>> {
        return this.Data$_.asObservable();
    }

    // component shows this value using async pipe, which is correct, so the Entries$ does have the data from the backend
    get Count$(): Observable<number> {
        return this.Entries$.pipe(
            map( (Entries: Array<LIB_TimeEntry_ResponseDTO>) => {
                return Entries.length;
            })
        );
    }

    get MergedData$(): Observable<Array<LIB_TimeEntry_ResponseDTO>> {
        return this.MergedData$_.asObservable();
    }
    
    // component calls this to load intiial data when starting
    public Initialize(BaseURL: string, UserId: string, EntryDate: number): void {
        const AccessEndPoint: string = `user/${UserId}/${EntryDate}`;
        this.Subscription$_ = this.Http_.get<Array<LIB_TimeEntry_ResponseDTO>>(`${BaseURL}${this.ApiEndPoint}/${AccessEndPoint}`, { observe: 'response' }).pipe(
            catchError(() => {
                console.log('Error received in TimeSheet Service');
                return throwError(() => {
                    console.log('Error rethrown by TimeSheet Service');
                    return new Error(`Unable to load Time Sheet data`);
                })
            })
        ).subscribe(data => {
            this.Data$_.next(data.body as Array<LIB_TimeEntry_ResponseDTO>); // Loads into internal BehaviorSubject
        });
    }

    // Getting the data from the backend seems to work.  Now I am trying to loop through this data, and combine it with an array
    // of data I have into a new BehaviorSubject.  In my testing I call this as a public function from the component but it doesn't work
    public MergeDatabaseDataWithUploadedData_(LocalData: Array<LIB_TimeEntry_ResponseDTO>): void {
        const DatabaseData: Array<LIB_TimeEntry_ResponseDTO> = new Array<LIB_TimeEntry_ResponseDTO>(); // Create the array to build the data into
        
        this.Entries$.pipe( // Follow Count$() call to get the Array (Data) to work with
        map ( Data => {
            Data.forEach( (OneEntry: LIB_TimeEntry_ResponseDTO) => { // Loop through each record and add it to my Array for subsequent use.
                DatabaseData.push(OneEntry); // Data does not appear into my array. Trying things, as I did want this to be a Map, but array illustrates the point.
                // const KeyValue = this.BuildUniqueKey(OneEntry); 
                // Map.set(KeyValue, OneEntry);
            });
        })
        // ).subscribe(Data => { // HAd tried this thinking it was the problem?
        //  return Data;
        // })
        ;
        console.log(DatabaseData); // No Data???

        // At this point, I will process through my LocalData updating information
        LocalData.forEach( (OneRecord: LIB_TimeEntry_ResponseDTO) => {
            ... // Determine if record exists
            const LocalKey = this.BuildUniqueKey(OneRecord);
            if (Map.has(LocalKey)) {
                ... // Make changes to OneRecord
                Map.set(LocalKey, OneRecord); // Update Map entry from DB with Local Results
            } else {
                ... // Make OneRecord indicate this is a new record
                Map.set(LocalKey, OneRecord); 
            }
            ... // Additional logic to indicate records to be deleted
        });
        const MergedData: Array<LIB_TimeEntry_ResponseDTO> = new Array<LIB_TimeEntry_ResponseDTO>(); // Make array to process Map into
        Map.forEach( (OneMapRecord) => {
            MergedData.push(OneMapRecord);
        });
        this.MergedData$_.next(MergedData); // This would be the Map data that has been built
        
    }

I have more code trying to merge data etc., but as I cannot simply convert the data into an internal array, the remainder of the code will obviously not work until this first issue is resolved.

Once I can populate the Array(Map in the future) from the observable, I will then loop through my local array data, and update the records in the Array from the observable. This updated

1
  • In my opinion, although I don't feel an expert in a field, I think that what you're missing is streamlined dataflow. RxJS retrieve data on "subscribe call", so no wonder that console.log(DatabaseData); might've returned empty array, just because Initialize haven't retrieved data from API yet. It's just a matter of async data flow in your class/service. It might be handy if you could provide a "dummy component" in which you call your service functions and in what manner. Also I am assuming that you've registered your Service as "root", otherwise it might be a problem. Commented Feb 2, 2023 at 23:26

1 Answer 1

1

In reference to this part of your code:

01    public MergeDatabaseDataWithUploadedData_(LocalData: Array<LIB_TimeEntry_ResponseDTO>): void {
02        const DatabaseData: Array<LIB_TimeEntry_ResponseDTO> = new Array<LIB_TimeEntry_ResponseDTO>(); // Create the array to build the data into
03        
04        this.Entries$.pipe( // Follow Count$() call to get the Array (Data) to work with
05        map ( Data => {
06            Data.forEach( (OneEntry: LIB_TimeEntry_ResponseDTO) => { // Loop through each record and add it to my Array for subsequent use.
07                DatabaseData.push(OneEntry); // Data does not appear into my array. Trying things, as I did want this to be a Map, but array illustrates the point.
08                // const KeyValue = this.BuildUniqueKey(OneEntry); 
09                // Map.set(KeyValue, OneEntry);
10            });
11        })
12        // ).subscribe(Data => { // HAd tried this thinking it was the problem?
13        //  return Data;
14        // });
15        console.log(DatabaseData); // No Data???
...

There are two reasons the console.log() on line 15 shows an empty array:

  • You are not subscribing. If you don't subscribe, the logic within the .pipe() is never executed.
  • Even if you do subscribe, DatabaseData is still empty because the code is executed asynchronously
    • line 04 simply defines an observable
    • line 12 creates a subscription
    • line 15 is executed, but the logic within the pipe hasn't happened yet, therefore your array is still empty

Hopefully that makes sense.


You have a lot of code above, so I'm going to use a more simplified scenario to show you a general example of how to compose observable streams from one another. This typically involves defining a new observable that begins with another observable source, and pipes its emissions through some operators to produce the desired output. (ex: newObs$ = obs$.pipe(...);). You won't typically create a method that returns an observable, you just define the observable.

RxJS provides a plethora of operators and static functions to help us create observables with well-defined behavior. It's important to think about the "what" and the "when" aspect of the observable you are defining.

  • What do you want it to emit?
  • When do you want it to emit?

In the case of wanting to merge the emissions from an observable source with an array of values:

  • What: the latest emission from the observable combined with the array
  • When: whenever the source observable emits
private data$ = new BehaviorSubject<Data[]>([]);
private localData: Data[];

mergedData$ = this.data$.pipe(
    map(data => data.concat(this.localData))
);

The above mergedData$ observable will emit the combined array whenever data$ emits a value.


I'm not sure about your particular use case though. Where does your LocalData come from? Is it the result of some other observable? If so, the approach would be different, because in that case we'd want our mergedData$ to emit whenever either of our two sources emit.

RxJS provides a static function that is perfect for this scenario called combineLatest. combineLatest will take multiple sources and emit the latest value from each source whenever any of them emit. Here's an example:

private data$ = new BehaviorSubject<Data[]>([]);
private otherData$ = new BehaviorSubject<Data[]>([]);

mergedData$ = combineLatest([this.data$, this.otherData]).pipe(
    map(([data, otherData]) => data.concat(otherData))
);

Here, mergedData$ will emit:

  • What - the concatenation of the latest emission of data$ and otherData$
  • When - either data$ or otherData$ emit a value

If your merge logic was more complicated, you'd simply need to modify the result of the map operator:

mergedData$ = combineLatest([this.data$, this.data2$, this.data3$]).pipe(
    map(([data, data2, data3]) => {
        // return something here...
    })
);
Sign up to request clarification or add additional context in comments.

5 Comments

Thanks for the detailed explanation. I had tried the subscribe, thinking that was the issue, since the basic observables are 'cold' observables, the subscribe is what causes them to run. However, there is likely another issue in my code, as that did not fix the problem. Currently, LocalData is an array, passed into the component from local storage, that is to be used to update the database data coming back from the service. However, I can convert this into an observable (from, of) and then use the combineLatest as you indicated, which should simplify this code (hopefully).
If one source come from observable and the other source comes from a synchronous source, like localStorage, there’s no need for combineLatest, simply use the first example above.
My merge is a little more complicated, not only does it combine both data feeds, it is to make 1 feed with a status. So if the DB data has [1, 2, 3], and local has [2, 3 ,4], then the data returned should be [1, 2, 3, 4], but it also compares the data in 2 and 3 which exist in both feeds, to determine if there was a change or not. As such, the response is then 1 was deleted, 2 has not changed, 3 has been updated, 4 has been added.
I am not sure the concat works for the status, but I think I get the gist of what you are suggesting, and instead of the concat, simply do the processing there, and return the data.
Yep I get it. Your logic is more complicated. Sounds like you have a direction to go in. Good luck!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.