Expand my Community achievements bar.

Event-Based Triggering on Adobe Experience Platform Orchestration Service using Apache Airflow

Avatar

Employee

11/16/21

Authors: Raman Gupta and Vardan Gupta

Banner image.jpeg

Adobe Experience Platform’s Orchestration Service provides APIs to author and manage multi-step workflows. Behind the scenes, this service leverages Apache Airflow as its workflow execution engine. It supports two types of workflow triggering/execution mechanism.

  • Time-based trigger based on some cron schedule
  • On-demand/ad-hoc trigger using APIs.

But there are use cases/requirements to trigger a workflow based on some events. Like trigger data-prep or data warehousing job on the arrival of new data into Adobe Experience Platform. So what's missing is the capability to trigger a workflow based on some pipeline events.

AEP Orchestration Service is being used by multiple solutions and event-based requirements are coming from most of them. So instead of every service writing their own event listener and placing a custom rule engine for identifying matched events, we thought of building event-based triggering capability as part of the orchestration service offering which could be leveraged by different solutions to trigger a workflow based on some event. Solutions have to just define Event matching rules and the corresponding action via Orchestration Service API and the rest of the things like publishing rules to event listener, rule matching, error-handling, and workflow-triggering are taken care of by POS itself.

Design Details

Architecture

Orchestration service is leveraging different Adobe Experience Platform components such as pipeline-smart, profile query language with the pipeline to provide end to end event-based triggering functionality.

Figure 1: Control Flow for Event-Based TriggeringFigure 1: Control Flow for Event-Based Triggering

  • 1A. Provides Rules — Solutions registers rules to Orchestration Service via POST /rules API.
  • 1B. Cold Reload: Initialise with Rules — In this step, when smart is getting deployed, it’s init block is fetching the latest state of rules in POS via GET /rules API.
  • 1C. Push Events — Orchestration Watcher listening to source topics.
  • 2. Push Rules Updates — Orchestration Service publishing events to Pipeline for rule updates.
  • 3. Hot Reload: Push Rules — In this step, any change(addition of new rule or deletion on existing rule) to the rule set by the solutions is getting refreshed via auxiliary event callback.
  • 4A. PQL SDK — Orchestration Watcher interacting with PQL SDK for matching rules against events coming from source topics.
  • 4B. Observability SDK — Orchestration Watcher sending metrics to observability.
  • 5A. Trigger Actions — Actions are being performed for matched events.
  • 5B. Push Failure — In the case the above action is failed, it’s sent to the failure topic for later reprocessing.

Orchestration Service enhancements — Run as Semantics

Pipeline Smarts is acting as a multi-tenant service sitting in front of the Orchestration Service responsible for triggering workflow or performing other workflow actions, but these operations will be performed via an admin client which will do the impersonification and will be delegation request based on workflow owner as it won’t be legitimate to persist credentials for various workflow owners. We have added two additional headers to the request i.e. x-acp-orchestrate-as-clientid & referrer where the first one will have the name of the client for which the request has been delegated and later one will have information like from where a request has been delegated(there could be more routes in the future where delegating may happen). There will be a POST API /users where the workflow owner will be authorizing a clientId to perform an operation on its behalf.

Profile Query Language

PQL provides a SQL-like interface and Supports 40+ operators which can be leveraged in writing complex queries e.g. fetching array element within an array. It's quite generic and can be used to defines rules on JSON Objects. Orchestration service leverage PQL to define event rules. It provides API to register an event rule in PQL which is matched with pipeline events and if there is a hit corresponding action is performed.

The rule description is to filter all the donut type dishes which has ppu > 0.5 and have chocolate batter & topping type.

  • Profile Query Language Query: type="donut" AND ppu > 0.5 AND batters.batters[type = "Chocolate"] AND topping[type = "Chocolate"]

Figure 2: Sample EventFigure 2: Sample Event

Pipeline Smarts

AEP Pipeline Smart is leveraged to listen to Source topics and to match rules on received events. If rules are matched corresponding action is taken and execution of workflow actions. It provides setting up pipeline consumers out of the box. It provides the unification of message encoding from different formats to JSON. It provides basic transformation and interfaces for advanced transformation like custom function & filters. It provides callbacks to load rules dynamically and supports constructs like branching out of the box.

  • Message Deduplication — In Pipeline, a message could be read more than once but the action would not be required multiple times, this is achieved by leveraging a unique messageId as part of runId which comes along with every message, in case it’s repeated, again we raise DagRunAlreadyExists exception and handle the exception accordingly.

Figure 3: Smarts DefinitionFigure 3: Smarts Definition

Rules API

Orchestration service provides API to manage rules and pipeline topics.

Figure 4: Rules APIFigure 4: Rules API

Sequence Diagram

Following is a sequence diagram that shows service interactions. It depicts the services involved in the scenario and the sequence of messages exchanged between them needed to carry out the functionality of the scenario.

Figure 5: Sequence DiagramFigure 5: Sequence Diagram

  • 1A. Create Workflow — Solution creates a workflow.
  • 1B. Consume Events — Orchestration Watcher consuming events from Pipeline.
  • 2A. Add Rule — Solution creates a rule.
  • 2B. Validate Rule — Orchestration Service validates rules.
  • 2C. Publish Rule — Post Validation, the rule is sent to Orchestration Watcher via Kafka Pipeline.
  • 2D. Consume Rule Events — Orchestration Watcher consuming rule events from Pipeline.
  • 3. Refresh Rule Cache — Orchestration Watcher is an updates rule cache.
  • 4. Match Events — Orchestration Watcher starts matching events against the rule.
  • 5A. Trigger — Post Matching event, Workflow triggering action is sent to Orchestration Service.
  • 5B. Failure Management — If there is any failure in triggering actions, the event is sent to the failure topic for reprocessing at a later time.
  • 5C. Notify(Email & Pipeline) — Post Workflow completion, success/failure messaging will be done to the workflow owner.

Error handling

Adobe Experience Platform Orchestration Service also provisions one error topic where all the recoverable failures (like 5xx) are getting pushed for retrials by Smarts Branching feature. Non-recoverable errors will be published to observability with monitoring & Alerting to take appropriate action. Each error is categorized by error code and the same information is also enriched in the original message along with retry_number before publishing to error topic. Every time an error message is published its retry_count (with max value to max_retry_count) is incremented to restrict the number of times an error message can be consumed. Smarts will be listening to this topic as well as solutions requested topics.

Airflow Configuration, Changes, and Contribution

We are leveraging some of Apache Airflow constructs like pools and runId. to provide certain guarantees and functionalities for event-based triggering. We also built upon the existing Airflow capabilities like cluster pool to provide more transparency in the workflow management.

  • Restriction on Number of Workflow Runs — We acknowledge Apache Airflow is not meant for streaming use cases and our customers also understand this. Mostly events we are talking here are control-plane event but there could be 1000s of events per second or even more, so to control this from deteriorating Airflow cluster performance, we have offered an optional windowing parameter where at the time of registration of the event, a user can define a time interval in which workflow action can be executed only once irrespective of a number of times an event arrives within that interval. Windowing parameter is an optional field while defining rule, there can be a situation where it’s not configured and there are tonnes of matched events which may result in workflow triggering and that may potentially deteriorate Airflow cluster performance, so we make sure Airflow Pool is getting bind with a workflow which are entering to our system, so there is no starvation for other workflows needing resources to run.
  • Audit Trail- We are logging events metadata(messageId) which has resulted in the corresponding action on the workflow as well as other events which didn’t result in any action because of the action strategy of message deduplication
  • Inter DAG Dependency
    We have seen requirements where a workflow needs to be triggered based on the success or failure event of another workflow though there is ExternalTaskSensor operator available has its challenges, one of them is mentioned below.
  • Both Dependent & Depending workflow needs to be in sync wrt the execution date, either they need to have the same execution date or should be aware beforehand. This limitation doesn’t fit very well

To overcome, we started pushing tasks/dags success/failure/retry events to Pipeline by leveraging cluster policy, and based on those events, one can configure to trigger another workflow. With cluster policy, we override the airflow_local_settings.py file to have code for extending the implementation of success/retry/failure callbacks for tasks & dags, so now they also include code to send produce pipeline message as well.

Next Steps

  1. It’s not meant for high-frequency events like 100s of events coming every minute. There is a restriction on the number of workflows that can be executed concurrently. We are working on providing more transparency to the users if events start piling up so that appropriate corrective action can be taken.
  2. Currently, we are supporting only Workflow Trigger as valid action but in the future, based on certain events, we may need to support workflow creation/updation as well.
  3. There is a scenario where we need to trigger a specific workflow task based on some event. Currently, Apache Airflow provides sensors for the same but they are based on a polling mechanism that uses compute resources/network to check for events. We can extend event-based triggering mechanism to execute a specific workflow task based on certain events

Follow the Adobe Experience Platform Community Blog for more developer stories and resources, and check out Adobe Developers on Twitter for the latest news and developer products. Sign up here for future Adobe Experience Platform Meetups.

References

  1. Adobe Experience Platform — https://www.adobe.com/experience-platform.html
  2. Apache Airflow — https://airflow.apache.org/
  3. Apache Kafka- https://docs.confluent.io/current/getting-started.html

Additional Reading

  1. Adobe Experience Platform Orchestration Service with Apache Airflow
  2. Adobe Experience Platform’s Setup for Apache Airflow on Kubernetes
  3. Adobe Experience Platform Insights on Achieving High Scale Using Apache Airflow
  4. Adobe Experience Platform Pipeline Cost Management: A Case Study
  5. Rate Limiting in Pipeline: High Quality of Service at Millions of Requests/Second
  6. Adopting Modern CI/CD Practices for Adobe Experience Platform Pipeline
  7. How Adobe Experience Platform Pipeline Became the Cornerstone of In-Flight Processing for Adobe
  8. Creating the Adobe Experience Platform Pipeline with Kafka

Originally published: Oct 1, 2020