After multiple trial-n-error, figured the answer to this
- onNext is invoked asynchronously - I added log statements and confirmed onNext function gets invoked for ALL available events and action doesnt wait for any of event completion.
- I even forced await like this
.subscribe(
async (x) => await processEvent(state, x), // any action onNext event
(e) => logger.error("onError: " + e.message), // any action onError
() => logger.log("onCompleted")just the events are consumed asynchronously, not exact sequential order. Does not guarantee if previous event completes before next event fires. So incase one event fails, already the next events are consumed, so index moves ahead.
- The proper way to guarantee sequential execution was like this
import { filter, concatMap } from "rxjs/operators";
import { of } from "rxjs";
journalObservable
.pipe(
filter((evt) => evt.position && evt.event.data),
concatMap((evt) => {
return of(evt).pipe(
concatMap((event) => processEvent(event).then(() => event))
);
})
)
.subscribe(
async (x) => await saveEventIndexToState(state, x), // any action onNext event
(e) => logger.error("onError: " + e.message), // any action onError
() => logger.log("onCompleted") //action onComplete
);
- From above code,The inner concatMap ensures that processEvent is called sequentially for each event.
- In the inner concatMap, after processing the event with processEvent, return the original event so that it can be passed to the outer subscribe.
- The outer subscribe now receives the original event (x) and passes it to saveEventIndex.
This code is guaranteeing to execute the events sequentially.