Expand my Community achievements bar.

SOLVED

Is event subscription from journal API synchronous or asynchronous?

Avatar

Level 6

When we subscribe to event journal using getEventsObservableFromJournal, does the subscriber WAIT for each event callback to complete before switching to next event?

 

Say if 500 events are queued into the journal, and I invoke getEventsObservableFromJournal.

  1. Do ALL events get consumed parallel and 500 threads are spawned to process each event asynchronously?
  2. OR will the observable, consume events synchronously? Once an event is subscribed, the observable WAITs for callback function to complete fully before moving to next event. 

Are the events consumed synchronously or asynchronously? 

Topics

Topics help categorize Community content and increase your ability to discover relevant content.

1 Accepted Solution

Avatar

Correct answer by
Level 6

After multiple trial-n-error, figured the answer to this

 

  1. onNext is invoked asynchronously - I added log statements and confirmed onNext function gets invoked for ALL available events and action doesnt wait for any of event completion.
  2. I even forced await like this 

 

.subscribe(
      async (x) => await processEvent(state, x), // any action onNext event
      (e) => logger.error("onError: " + e.message), // any action onError
      () => logger.log("onCompleted")

just the events are consumed asynchronously, not exact sequential order. Does not guarantee if previous event completes before next event fires. So incase one event fails, already the next events are consumed, so index moves ahead. 

 

  •  The proper way to guarantee sequential execution was like this 

 

import { filter, concatMap } from "rxjs/operators";
import { of } from "rxjs";

journalObservable
    .pipe(
      filter((evt) => evt.position && evt.event.data),
      concatMap((evt) => {
        return of(evt).pipe(
          concatMap((event) => processEvent(event).then(() => event))
        );
      })
    )
    .subscribe(
      async (x) => await saveEventIndexToState(state, x), // any action onNext event
      (e) => logger.error("onError: " + e.message), // any action onError
      () => logger.log("onCompleted") //action onComplete
    );​

 

  • From above code,The inner concatMap ensures that processEvent is called sequentially for each event.
  • In the inner concatMap, after processing the event with processEvent, return the original event so that it can be passed to the outer subscribe.
  • The outer subscribe now receives the original event (x) and passes it to saveEventIndex.

This code is guaranteeing to execute the events sequentially. 

View solution in original post

1 Reply

Avatar

Correct answer by
Level 6

After multiple trial-n-error, figured the answer to this

 

  1. onNext is invoked asynchronously - I added log statements and confirmed onNext function gets invoked for ALL available events and action doesnt wait for any of event completion.
  2. I even forced await like this 

 

.subscribe(
      async (x) => await processEvent(state, x), // any action onNext event
      (e) => logger.error("onError: " + e.message), // any action onError
      () => logger.log("onCompleted")

just the events are consumed asynchronously, not exact sequential order. Does not guarantee if previous event completes before next event fires. So incase one event fails, already the next events are consumed, so index moves ahead. 

 

  •  The proper way to guarantee sequential execution was like this 

 

import { filter, concatMap } from "rxjs/operators";
import { of } from "rxjs";

journalObservable
    .pipe(
      filter((evt) => evt.position && evt.event.data),
      concatMap((evt) => {
        return of(evt).pipe(
          concatMap((event) => processEvent(event).then(() => event))
        );
      })
    )
    .subscribe(
      async (x) => await saveEventIndexToState(state, x), // any action onNext event
      (e) => logger.error("onError: " + e.message), // any action onError
      () => logger.log("onCompleted") //action onComplete
    );​

 

  • From above code,The inner concatMap ensures that processEvent is called sequentially for each event.
  • In the inner concatMap, after processing the event with processEvent, return the original event so that it can be passed to the outer subscribe.
  • The outer subscribe now receives the original event (x) and passes it to saveEventIndex.

This code is guaranteeing to execute the events sequentially.