When we subscribe to event journal using getEventsObservableFromJournal, does the subscriber WAIT for each event callback to complete before switching to next event?
Say if 500 events are queued into the journal, and I invoke getEventsObservableFromJournal.
Are the events consumed synchronously or asynchronously?
Solved! Go to Solution.
Topics help categorize Community content and increase your ability to discover relevant content.
Views
Replies
Total Likes
After multiple trial-n-error, figured the answer to this
.subscribe(
async (x) => await processEvent(state, x), // any action onNext event
(e) => logger.error("onError: " + e.message), // any action onError
() => logger.log("onCompleted")
just the events are consumed asynchronously, not exact sequential order. Does not guarantee if previous event completes before next event fires. So incase one event fails, already the next events are consumed, so index moves ahead.
import { filter, concatMap } from "rxjs/operators";
import { of } from "rxjs";
journalObservable
.pipe(
filter((evt) => evt.position && evt.event.data),
concatMap((evt) => {
return of(evt).pipe(
concatMap((event) => processEvent(event).then(() => event))
);
})
)
.subscribe(
async (x) => await saveEventIndexToState(state, x), // any action onNext event
(e) => logger.error("onError: " + e.message), // any action onError
() => logger.log("onCompleted") //action onComplete
);
This code is guaranteeing to execute the events sequentially.
Views
Replies
Total Likes
After multiple trial-n-error, figured the answer to this
.subscribe(
async (x) => await processEvent(state, x), // any action onNext event
(e) => logger.error("onError: " + e.message), // any action onError
() => logger.log("onCompleted")
just the events are consumed asynchronously, not exact sequential order. Does not guarantee if previous event completes before next event fires. So incase one event fails, already the next events are consumed, so index moves ahead.
import { filter, concatMap } from "rxjs/operators";
import { of } from "rxjs";
journalObservable
.pipe(
filter((evt) => evt.position && evt.event.data),
concatMap((evt) => {
return of(evt).pipe(
concatMap((event) => processEvent(event).then(() => event))
);
})
)
.subscribe(
async (x) => await saveEventIndexToState(state, x), // any action onNext event
(e) => logger.error("onError: " + e.message), // any action onError
() => logger.log("onCompleted") //action onComplete
);
This code is guaranteeing to execute the events sequentially.
Views
Replies
Total Likes
Views
Likes
Replies