Write records with condition

DimaKudriavcev

04-08-2020

Hi

 

We are attempting to integrate Adobe with a Kafka stream.

 

I have created a custom connector that invokes xtk.session.WriteCollection function for every batch of kafka record. 

 

It seems that our Kafka stream contains multiple versions of the same records. Basically, every time when one of record attribute has changed, the new record is published to the stream and these records can come really quick, with one or several consecutive batches. The latest record only needs to be recorded in Adobe DB and there is record sequence number to distinguish which of them are latest. 

 

I cannot figure out how to create write request with a condition. What I want is to add a condition, something like in xtk.queryDef <condition exp={"@sequenceNumber < " + currentSequenceNumber }/>. Is something like that possible with xtk.session.WriteCollection and what should be an exact syntax for the XML document?

 

I have noticed another function like xtk.session.ImportCollection and xtk.queryDef.Update, but I cannot find any decent documentation or examples on them.

 

Is there any way to record collection with a condition except parsing schemas and generating raw SQL queries?

Accepted Solutions (0)

Answers (1)

Answers (1)

supratim320

09-08-2020

By default, xtk.session.write is doing insert and you need to make an insert/update call. So add attributes in your session.writeCollection method as:

_operation="insertOrUpdate" _key="@a,@b" where a and b are your reconciliation columns in Kafka stream

Hi supratim320, thanks for the response.

 

Unfortunately, that is not the answer I was looking for. I'm aware I can specify keys, but they must be exact keys. In the case of the sequence number, that does not work, because the previous sequence number is unknown. Instead, the record should be updated if new sequence number is greater than previous, so the request should look something like this:

 

xtk.session.WriteCollection(

  <recipient-collection xtkschem="nms:recipient">

    <recipient key="@customerId" customerId="123" field="aa" sequenceNumber="123">

       <where>

<!-- condition to check if new sequence number is greater that old sequence number-->

       </where>

   </recipeint>

  </recipient-collection>

)

 

Obviously, that is not the right syntax, so I'm looking for a correct one.

 

No, I understand, I can find current sequence number for each customer using xtk.queryDef, but even then I cannot update it, because if insertOrUpdate statement is used, the keys cannot be changed. They are used to actually find the record, so there is no way to provide new values to them.