Observables: Chain Hell

Wer kennst es nicht, ein Aufruf ist noch nicht abgeschlossen, aber die Response wird bereits in einem neuen Aufruf benötigt und es kracht mit dem Hinweis auf ein undefined. Dies ist eine typische Race Condition, die daneben geht.

In diesem Beitrag möchte ich dir zeigen, wie du in Angular (auch in anderen Frameworks, die mit RxJS/ Observables arbeiten) mit ein paar kleinen Helfern deine Race Conditions in den Griff bekommst.

Es gibt mehrere Möglichkeiten, Observables in RxJS nacheinander aufzurufen. Eine Möglichkeit wäre die Verwendung von concat. Diese Operator nimmt eine Liste von Observables und gibt ein neues Observable zurück, das alle Observables der Liste nacheinander ausführt.

const source1 = of(1, 2, 3);
const source2 = of(4, 5, 6);

const example = concat(source1, source2);

example.subscribe(val => console.log(val));

// Output:
// 1
// 2
// 3
// 4
// 5
// 6

Eine andere Möglichkeit wäre die Verwendung von concatMap. Dieser Operator arbeitet ähnlich wie concat, aber er ermöglicht es Ihnen, eine Transformation der Eingabewerte vorzunehmen, bevor sie zum neuen Observable hinzugefügt werden.

const source = of(1, 2, 3);

const example = source.pipe(
  concatMap(val => of(val + 1))
);

example.subscribe(val => console.log(val));

// Output:
// 2
// 3
// 4

Es gibt noch einige andere Operatoren, die man verwenden können, um Observables nacheinander aufzurufen, wie zum Beispiel merge und switchMap. Welcher der am besten geeignete ist, hängt von dem spezifischen Anwendungsfall ab. Es empfiehlt sich immer, sich die Dokumentation und das API-Referenzmaterial anzusehen, um zu verstehen, wie jeder Operator funktioniert und wann er am besten verwendet werden sollte.

Angular, HttpClients und Observables

In Angular kannst Du mehrere HttpClient-Aufrufe hintereinander ausführen, indem du entweder concat oder concatMap verwendest.

Hier ist ein Beispiel mit concatMap:

import { HttpClient } from '@angular/common/http';
import { concatMap } from 'rxjs/operators';

constructor(private http: HttpClient) {}

makeHttpCalls() {
  this.http.get('/api/endpoint1')
    .pipe(
      concatMap(response1 => this.http.get('/api/endpoint2')),
      concatMap(response2 => this.http.get('/api/endpoint3'))
    )
    .subscribe(response3 => console.log(response3));
}

In diesem Beispiel werden die HttpClient-Aufrufe für /api/endpoint1/api/endpoint2 und /api/endpoint3 nacheinander ausgeführt, wobei jeder Aufruf erst gestartet wird, wenn der vorherige abgeschlossen ist.

Du kannst auch concat anstelle von concatMap verwenden, wie im folgenden Beispiel gezeigt:

import { HttpClient } from '@angular/common/http';
import { concat } from 'rxjs/operators';

constructor(private http: HttpClient) {}

makeHttpCalls() {
  concat(
    this.http.get('/api/endpoint1'),
    this.http.get('/api/endpoint2'),
    this.http.get('/api/endpoint3')
  ).subscribe(response => console.log(response));
}

Welcher der beiden Operatoren am besten geeignet ist, hängt von Ihrem spezifischen Anwendungsfall ab.

Wenn du allerdings die Responses wiederverwenden möchtest, da die Aufrufe aufeinander aufbauen, könntest du wie folgt vorgehen:

Wir möchten zum Beispiel den Response von endpoint1 in endpoint2 wieder verwenden, dann kannst du concatMap oder mergeMap verwenden, um den Response von endpoint1 als Argument an endpoint2 zu übergeben.

Hier ist ein Beispiel mit concatMap:

import { HttpClient } from '@angular/common/http';
import { concatMap } from 'rxjs/operators';

constructor(private http: HttpClient) {}

makeHttpCalls() {
  this.http.get('/api/endpoint1')
    .pipe(
      concatMap(response1 => this.http.get('/api/endpoint2', { params: response1 }))
    )
    .subscribe(response2 => console.log(response2));
}

In diesem Beispiel wird der Response von endpoint1 als Query-Parameter an endpoint2 übergeben.

Du kannst aber auch mergeMap anstelle von concatMap verwenden, wie im folgenden Beispiel gezeigt:

import { HttpClient } from '@angular/common/http';
import { mergeMap } from 'rxjs/operators';

constructor(private http: HttpClient) {}

makeHttpCalls() {
  this.http.get('/api/endpoint1')
    .pipe(
      mergeMap(response1 => this.http.get('/api/endpoint2', { params: response1 }))
    )
    .subscribe(response2 => console.log(response2));
}

Wenn Du eine Property aus der Response von endpoint1 in endpoint3 verwenden möchten, kannst du das in ähnlicher Weise wie im vorherigen Beispiel tun, indem Du entweder concatMap oder mergeMap verwenden.

Hier ist ein Beispiel mit concatMap:

import { HttpClient } from '@angular/common/http';
import { concatMap } from 'rxjs/operators';

constructor(private http: HttpClient) {}

makeHttpCalls() {
  this.http.get('/api/endpoint1')
    .pipe(
      concatMap(response1 => this.http.get('/api/endpoint2', { params: response1.property }))
    )
    .pipe(
      concatMap(response2 => this.http.get('/api/endpoint3', { params: response2.property }))
    )
    .subscribe(response3 => console.log(response3));
}

In diesem Beispiel wird die Property property aus der Response von endpoint1 als Query-Parameter an endpoint2 übergeben, und die Property property aus der Response von endpoint2 wird als Query-Parameter an endpoint3 übergeben.

Du könntest auch mergeMap anstelle von concatMap verwenden, wie im folgenden Beispiel gezeigt:

import { HttpClient } from '@angular/common/http';
import { mergeMap } from 'rxjs/operators';

constructor(private http: HttpClient) {}

makeHttpCalls() {
  this.http.get('/api/endpoint1')
    .pipe(
      mergeMap(response1 => this.http.get('/api/endpoint2', { params: response1.property }))
    )
    .pipe(
      mergeMap(response2 => this.http.get('/api/endpoint3', { params: response2.property }))
    )
    .subscribe(response3 => console.log(response3));
}

In Angular können somit 2 oder mehr Observables mithilfe des mergeMap-Operator kombiniert werden. Der mergeMap-Operator führt eine Funktion aus, für jedes Element, das von einem Observable emittiert wird, und gibt das Ergebnis als Observable zurück.

Hier ist ein Beispiel, wie Sie 2 Observables mithilfe von mergeMap kombinieren könnten:

import { mergeMap } from 'rxjs/operators';

// Observable 1
const obs1 = this.http.get('/api/endpoint1');

// Observable 2
const obs2 = this.http.get('/api/endpoint2');

obs1.pipe(
  mergeMap(response1 => {
    // Do something with response1
    return obs2.pipe(
      map(response2 => {
        // Do something with response2
        return combinedResponse;
      })
    );
  })
).subscribe(combinedResponse => {
  // Do something with the combined response
});

Hier ist, was in dem obigen Code passiert:

  1. Wir rufen das erste Observable auf und speichern das Ergebnis in der obs1-Variablen.
  2. Wir rufen das zweite Observable auf und speichern das Ergebnis in der obs2-Variablen.
  3. Wir abonnieren obs1 und verwenden den mergeMap-Operator, um auf jedes emittierte Element von obs1 zu reagieren. Für jedes Element von obs1 führt der mergeMap-Operator eine Funktion aus, in der wir auf obs2 abonnieren und den map-Operator verwenden, um auf jedes emittierte Element von obs2 zu reagieren.
  4. In der Funktion, die vom map-Operator ausgeführt wird, können wir die Antworten von response1 und response2 verarbeiten und eine kombinierte Antwort erstellen, die wir als Observable zurückgeben.
  5. Schließlich abonnieren wir das kombinierte Observable und verarbeiten die kombinierte Antwort in der subscribe-Funktion.

Dieses Beispiel können wir sogar mit Exceptions ganz einfach erweitern.

Um die Exceptions, bzw. Fehler, die von einem Observable emittiert werden, zu behandeln, kannst Du den catchError-Operator verwenden.

Hier ist ein Beispiel, wie Sie die catchError-Operator verwenden könnten, um Fehler von beiden Observables zu behandeln:

import { catchError, mergeMap } from 'rxjs/operators';

// Observable 1
const obs1 = this.http.get('/api/endpoint1').pipe(
  catchError(error => {
    // Handle error for obs1
    return of(null);
  })
);

// Observable 2
const obs2 = this.http.get('/api/endpoint2').pipe(
  catchError(error => {
    // Handle error for obs2
    return of(null);
  })
);

obs1.pipe(
  mergeMap(response1 => {
    // Do something with response1
    return obs2.pipe(
      map(response2 => {
        // Do something with response2
        return combinedResponse;
      })
    );
  })
).subscribe(
  combinedResponse => {
    // Do something with the combined response
  },
  error => {
    // Handle combined error
  }
);

In diesem Beispiel fügen wir den catchError-Operator zu beiden Observables hinzu, um jedes Observable zu einzeln zu behandeln, wenn es einen Fehler emittiert. Wir können auch einen catchError-Operator für das kombinierte Observable hinzufügen, um Fehler zu behandeln, die von beiden Observables emittiert werden.

Wie du siehst, gibt es keine konkrete Antwort. Es führen mal wieder mehrere Wege nach Rom. Daher betrachte deine Anforderung genau und entscheide dann, welchen Weg du für deine Anwenden gehen möchtest, oder sogar vielleicht gehen musst.

Ich hoffe, ich konnte dir ein wenig aus der „Chain Hell“ in Observables helfen. Solltest du Fragen oder Anregungen haben, melde dich gerne bei mir!