Is event subscription from journal API synchronous or asynchronous? | Community
Skip to main content
sarav_prakash
Community Advisor
Community Advisor
September 23, 2024
Solved

Is event subscription from journal API synchronous or asynchronous?

  • September 23, 2024
  • 1 reply
  • 641 views

When we subscribe to event journal using getEventsObservableFromJournal, does the subscriber WAIT for each event callback to complete before switching to next event?

 

Say if 500 events are queued into the journal, and I invoke getEventsObservableFromJournal.

  1. Do ALL events get consumed parallel and 500 threads are spawned to process each event asynchronously?
  2. OR will the observable, consume events synchronously? Once an event is subscribed, the observable WAITs for callback function to complete fully before moving to next event. 

Are the events consumed synchronously or asynchronously? 

This post is no longer active and is closed to new replies. Need help? Start a new post to ask your question.
Best answer by sarav_prakash

After multiple trial-n-error, figured the answer to this

 

  1. onNext is invoked asynchronously - I added log statements and confirmed onNext function gets invoked for ALL available events and action doesnt wait for any of event completion.
  2. I even forced await like this 

 

.subscribe( async (x) => await processEvent(state, x), // any action onNext event (e) => logger.error("onError: " + e.message), // any action onError () => logger.log("onCompleted")

just the events are consumed asynchronously, not exact sequential order. Does not guarantee if previous event completes before next event fires. So incase one event fails, already the next events are consumed, so index moves ahead. 

 

  •  The proper way to guarantee sequential execution was like this 

 

import { filter, concatMap } from "rxjs/operators"; import { of } from "rxjs"; journalObservable .pipe( filter((evt) => evt.position && evt.event.data), concatMap((evt) => { return of(evt).pipe( concatMap((event) => processEvent(event).then(() => event)) ); }) ) .subscribe( async (x) => await saveEventIndexToState(state, x), // any action onNext event (e) => logger.error("onError: " + e.message), // any action onError () => logger.log("onCompleted") //action onComplete );​

 

  • From above code,The inner concatMap ensures that processEvent is called sequentially for each event.
  • In the inner concatMap, after processing the event with processEvent, return the original event so that it can be passed to the outer subscribe.
  • The outer subscribe now receives the original event (x) and passes it to saveEventIndex.

This code is guaranteeing to execute the events sequentially. 

1 reply

sarav_prakash
Community Advisor
sarav_prakashCommunity AdvisorAuthorAccepted solution
Community Advisor
September 25, 2024

After multiple trial-n-error, figured the answer to this

 

  1. onNext is invoked asynchronously - I added log statements and confirmed onNext function gets invoked for ALL available events and action doesnt wait for any of event completion.
  2. I even forced await like this 

 

.subscribe( async (x) => await processEvent(state, x), // any action onNext event (e) => logger.error("onError: " + e.message), // any action onError () => logger.log("onCompleted")

just the events are consumed asynchronously, not exact sequential order. Does not guarantee if previous event completes before next event fires. So incase one event fails, already the next events are consumed, so index moves ahead. 

 

  •  The proper way to guarantee sequential execution was like this 

 

import { filter, concatMap } from "rxjs/operators"; import { of } from "rxjs"; journalObservable .pipe( filter((evt) => evt.position && evt.event.data), concatMap((evt) => { return of(evt).pipe( concatMap((event) => processEvent(event).then(() => event)) ); }) ) .subscribe( async (x) => await saveEventIndexToState(state, x), // any action onNext event (e) => logger.error("onError: " + e.message), // any action onError () => logger.log("onCompleted") //action onComplete );​

 

  • From above code,The inner concatMap ensures that processEvent is called sequentially for each event.
  • In the inner concatMap, after processing the event with processEvent, return the original event so that it can be passed to the outer subscribe.
  • The outer subscribe now receives the original event (x) and passes it to saveEventIndex.

This code is guaranteeing to execute the events sequentially.