Authors: Yeshwanth Vijaykumar, Zhengping Wang (@zp_wang), and Jayna Patel
In this blog post, we are going to go over how the data lake house architecture at Adobe Experience Platform combines with the Real-time Customer Profile architecture to increase our Apache Spark Batch workload throughputs and reduce costs while maintaining functionality.
Adobe Experience Platform enables you to drive coordinated, consistent, and relevant experiences for your customers no matter where or when they interact with your brand. With Adobe Experience Platform Real-time Customer Profile, you can see a holistic view of each individual customer that combines data from multiple channels, including online, offline, CRM, and third-party data. It allows you to consolidate your disparate customer data into a unified view offering an actionable, timestamped account of every customer interaction.
Given that the Adobe Experience Platform assimilates data from various sources, Real-time Customer Profile acts as the central hub ingesting data from different data sources; it stitches data together with the Adobe Identity Service and ships the data to various marketing destinations to enable new experiences.
The relationship between Adobe Experience Platform Real-time Customer Profile and other services within Adobe Experience Platform is highlighted in the following diagram:
When data is ingested into Profile Hot Store (Azure Cosmos DB) from various sources, each mutation to the profile store generates a change feed message sent on a Kafka Topic. We want to create a merged view of the Real-Time Customer Profile on the data lake which will enable us to write queries to segment the population into useful targetable audiences. Running such queries on the hot store is extremely expensive and not optimized due to the following reasons
Various sources be it Online/Offline contribute data into the Adobe Experience Platform. Data can flow as batches or be streaming in nature. An example of streaming data would be clicks, and page views flowing in as soon as the event happens. Data from each source can have its own schema and each source can have a different frequency. As a system we need to be able scale up and handle these various load types during ingestion.
As the data makes it into the Real-Time Customer Profile, the data is simultaneously ingested by the Unified Identity Service which manages the creation/updation and deletion of linkages between various identities as shown in Figure 3.
Before we go into optimizing our workload, let us take a look at a toy data example as shown in Figure 4. In the table, consider each entity having a
primaryId. Any event generated by the entity will have the
primaryId sent. Along with that if there is any extra information about the linkages, we will get the
relatedIds that will also be populated. Focusing on the last 2 rows, we see that ids 789 and 101 are linked to each other. This tells us ALL records pertaining to 101 or 789 need to have linkage information. Fields 1 to 1000 are data fields indicating some attribute about the event recorded in that row.
Since this is a dynamic system, we want to see how the data evolves as new data flows in. Especially when changes in linkages happen. Figure 5 shows an example where a new event came in for an id 101, interestingly enough it also contains linkage information showing it is linked to 789 and 101! This causes a cascading change in the records that have come before.
While some eyebrows might get raised on this update amplification. This is a sane way to optimize for the groupBy clause we need for our bread and butter use case as explained in the next section.
Figure 6 goes over an access pattern where we want to run a bunch of queries over a merged view of the entities. From our previous examples, the merged view of the entity with
primaryID 101 would be the assimilation of all events of the
relatedIDs also. So the merged view would have ALL events of ids 101, 789 and 103.
In order to get the merged view, we need to first
relatedIds field. Once we do that, we get the list of records within the aggregation context and we are able to execute queries on top of them.
Quoting directly from the horse’s mouth: DeltaLake docs
“Delta Lake is an open source project that enables building a Lakehouse architecture on top of data lakes. Delta Lake provides ACID transactions, scalable metadata handling, and unifies streaming and batch data processing on top of existing data lakes, such as S3, ADLS, GCS, and HDFS.”
Specifically, Delta Lake offers:
Figure 7 shows some examples of usage of the delta lake. The most common use case of trying to save as a parquet file becomes as easy as doing a
.save(“path/to/data”). More importantly, the example showing the upsert of
newData a data frame into an existing oldData data frame is an important one since it shows the potential of now being able to modify a parquet-based file. The SQL compatible bindings are just the icing on the cake.
DeltaLake is not a magic solution. We still need to take certain design precautions to avoid pitfalls.
The table above gives a good idea as to what operations conflict with each other. Take away is that appends are safe with a multi-cluster writer. We DO need to take precautions in spite of the optimistic concurrency control provided by DeltaLake in the case of
DELETE issued from multiple Spark jobs.
When the individual column exceeds 2GB, we see degradation in writes or OOM.
Figure 8 shows how every application that makes a mutation/change on the Real-time Customer Profile Store waits for the Hot Store to ACK the change and then emits a change feed message on the CDC topic. Each message on the CDC topic captures what action was performed (PATCH/PUT/DELETE) along with the payload of the change and various other metadata like source of the data, record type, and timestamp if it’s an event or record that is a function of time.
We will utilize these change feed messages to recreate a row by row copy of the Cosmos DB Hot Store on the data lake. As shown in Figure 9, we have a long-running Spark Structured Streaming Application called the CDC Dumper that consumes the change feed notifications that writes to a multi-tenant global Staging table in a
APPEND ONLY mode. This ensures that we can expand on the parallelism of the CDC Dumper process without having to worry about write conflicts that we looked at in the preview sections. The Staging table is also a delta lake table which is partitioned by the tenantId and 15 minute time intervals.
Figure 10 shows a logical view of how the staging table holds onto records within the time intervals. We also have a ProgressMap that is held on Redis to communicate how many records per tenant were written vs processed.
A Processor Spark Job is a long-running batch Spark app that will keep checking if there is any work to be done i.e if any records need to be taken from the staging table and transformed+written into the Destination Delta Lake table which we will call as the Raw table.
The transformations are based on the operation type of the CDC Message itself.
Before starting to process the records for a tenant, the processor will try to obtain a lock for that tenant and proceed only if it is successful. This again is to avoid conflicting UPDATE/DELETE operations from multiple sources.
A key callout is the Raw table stores the values not as Struct Types as is normal with delta lake or parquet but as JSON Strings. We go into detail about this very important design decision in the following section.
UPSERTis supported with the latest version of delta lake. For mutable data, we want to apply conflict resolution before upsert-ing. This would be the case where get a
PATCHmessage for an existing row. In the Hot Store, we have Stored Procedures at a partition level to handle this atomically.
** An example would be functions
** Spark UDF’s are strict on types, with the plethora of different schemas per tenant, it is crazy to manage UDFs per org in a multi-tenant fashion for a function like this. The out of the box Spark merge functions were not enough to merge complex structs with each other.
** The other consideration was to write a Native Spark UDF and recompile Spark with it but was deemed too complex as a deployment model on the Databricks SaaS.
** Now we just have simple JSON merge UDF.
** We use json-iter which is very efficient in loading partial bits of JSON and in manipulating them.
Our ingestion pipeline has gone from having a single sink i.e Azure Cosmos DB to having 2 additional pit sinks, the Staging Table and per tenant Raw Table. Instrumentation of the data pipeline gives more confidence to the reader of the data as to understand how consistent their read is.
To aid the end-user and the downstream components, we track 2 types of lag.
CDC lag from Kafka tells us how much more work we need to do to catch up to write to Staging Table. We can see how many minutes/seconds off we are from the time a CDC was produced to it being dumped into the Staging Table.
TimeStamp) in CDC per tenant.
TSKEY) processed in Processor.
In the introductory sections and specifically in Figure 5, we emphasized the update amplification that can occur with our use case. We decided to put that to the test where we observed 2 scenarios as shown in Figure 13. We see impressive UPSERT performance even with a very minimal 24 core cluster.
Two important Databricks-only settings we needed to set to achieve this throughput and latency:
OPTIMIZEjob (with 128 MB file sizes instead of the 1 GB file size used in the standard
OPTIMIZE) to further compact files for partitions that have the most number of small files.
Now comes the critical part where we execute and test our Main Access Pattern shown in Figure 6 on top of the data in the Delta Lake in the format we have chosen.
Figure 15 shows the massive change in time taken (80% +) between the exact same workload between the Hot store and the Delta Lake Raw table for a 1 TB load.
The reasoning is due to faster IO from the delta lake for the scan workload along with the compression of data we get from 1TB to 64 GB on the lake. This means we read less data that has more partitions that in turn helps us use the Spark cores in the cluster more efficiently.
The main takeaways are:
Readfrom Hot Store.
We expect to share more lessons as we port more scan-intensive workloads to the data lake architecture and continue to evolve this.
Originally published: Jun 17, 2021
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.