Nível 1
Nível 2
Faça login na Comunidade
Faça logon para exibir todas as medalhas
When we subscribe to event journal using getEventsObservableFromJournal, does the subscriber WAIT for each event callback to complete before switching to next event?
Say if 500 events are queued into the journal, and I invoke getEventsObservableFromJournal.
Are the events consumed synchronously or asynchronously?
Solucionado! Ir para a Solução.
Os tópicos ajudam a categorizar o conteúdo da comunidade e aumentam sua capacidade de descobrir conteúdo relevante.
Visualizações
respostas
Total de curtidas
After multiple trial-n-error, figured the answer to this
.subscribe(
async (x) => await processEvent(state, x), // any action onNext event
(e) => logger.error("onError: " + e.message), // any action onError
() => logger.log("onCompleted")
just the events are consumed asynchronously, not exact sequential order. Does not guarantee if previous event completes before next event fires. So incase one event fails, already the next events are consumed, so index moves ahead.
import { filter, concatMap } from "rxjs/operators";
import { of } from "rxjs";
journalObservable
.pipe(
filter((evt) => evt.position && evt.event.data),
concatMap((evt) => {
return of(evt).pipe(
concatMap((event) => processEvent(event).then(() => event))
);
})
)
.subscribe(
async (x) => await saveEventIndexToState(state, x), // any action onNext event
(e) => logger.error("onError: " + e.message), // any action onError
() => logger.log("onCompleted") //action onComplete
);
This code is guaranteeing to execute the events sequentially.
Visualizações
respostas
Total de curtidas
After multiple trial-n-error, figured the answer to this
.subscribe(
async (x) => await processEvent(state, x), // any action onNext event
(e) => logger.error("onError: " + e.message), // any action onError
() => logger.log("onCompleted")
just the events are consumed asynchronously, not exact sequential order. Does not guarantee if previous event completes before next event fires. So incase one event fails, already the next events are consumed, so index moves ahead.
import { filter, concatMap } from "rxjs/operators";
import { of } from "rxjs";
journalObservable
.pipe(
filter((evt) => evt.position && evt.event.data),
concatMap((evt) => {
return of(evt).pipe(
concatMap((event) => processEvent(event).then(() => event))
);
})
)
.subscribe(
async (x) => await saveEventIndexToState(state, x), // any action onNext event
(e) => logger.error("onError: " + e.message), // any action onError
() => logger.log("onCompleted") //action onComplete
);
This code is guaranteeing to execute the events sequentially.
Visualizações
respostas
Total de curtidas
Visualizações
Curtida
respostas
Visualizações
Curtida
respostas
Visualizações
Curtida
respostas