This website uses cookies. By using the website you agree with our use of cookies. Know more

Technology

Adapt to overcome - A tale on dynamic event ingestion using Apache Spark

Vítor Oliveira
Casual geek, usually found riding the streets on his BMX wearing Levi's apparel.
View All Posts
Adapt to overcome - A tale on dynamic event ingestion using Apache Spark
As the luxury market is getting more and more competitive, becoming a data-driven enterprise is a must if boundaries are to be pushed as fast as possible. As such, we at Farfetch want to be on the front line and base our decisions on data as it allows us to perfect our customer experience.
To get insights from data, it first has to be gathered and made available for use. In the beginning, services produced data into SQL databases and they were replicated into the data platform for further use. Easy work, right? Then, the need for geo-distributed services arose and we started to shift into a Kafka event-based platform that used Cassandra as a storage layer.

"Wait, how do we get the data in, now that the SQL replication is gone?”

No one had previous knowledge of the subject. The first, very naive approach, was to build software that consumed events in a streaming fashion from Kafka and replicated the relational models using HBase, SQLServer, and Spark on Scala. We, the Messaging Team, experimented a bit using this approach but shortly noticed that it was not feasible since models were too complex and data coming as events was a whole new paradigm. We had to maintain the correct and up-to-date state of an entity in real-time while dealing with late events, which means models were not strongly consistent like they were before.
The next approach was to consume raw events coming from event producing services. For each producer service, we built an ingestion service in Spark that consumed Kafka topics, added some processing metadata and made the data available in the data lake for further use.

As the team became more mature and we onboarded more team members, we decided to create a framework to abstract common operations, given that the software we were developing was all very similar. This increased the speed of the development of new ingestion services. Even though the framework eased the work, we were still struggling to deliver as fast as the business requirements kept coming in.

#1 The work was very manual and repetitive

The scope of work was always the same: transport the events from the producer services into the data lake. The only aspects that changed between services were the topics to consume, where to save the data, the serialization format and the schema. In addition to this, we also had to produce manual documentation for the downstream teams.

#2 Manual documentation for events was proven to be error-prone

Farfetch’s core software is developed using .Net and so exchanging message contracts between applications is as easy as installing a NuGet package containing the Data Transfer Objects. That isn’t true for Scala applications, so we needed to rely on manual documentation from other teams to build the integration services.

#3 Code reviews got harder as events complexity increased

Events can get pretty big sometimes, add repetitive work and manual documentation to the equation and you get decreasing quality in code reviews.

#4 Teams use several serialization formats for their events like JSON, Protobuf and their respective GZIPed approaches


Each team used the serialization format that best suited their needs, which made it harder to build a generic solution that just worked.

Overall, the whole situation was very demanding and demotivating. A small team and a constant increase in the number of services would simply not scale.

"We need to change.”

The question was, who needs to change? It is obvious that the overhead of changing dozens of teams is far greater than changing one. With that said, we started aiming for the automation of the data ingestion layer. Even though business requirements were still coming in, we reserved some roadmap space to brainstorm about a potential solution for this problem. 

Solution

We assessed the fit of several technologies and tools to our needs. After the assessment, we concluded that none had any huge advantage over our existing stack. We had a Spark/Scala proficient team and we could reuse our CI/CD pipelines which meant we didn’t have to do any breaking changes to our workflow. These issues had to be addressed as soon as possible given the impact that it was having on the team. 
After some brainstorming sessions, we had a name and the components of the final solution, Pacman. In sum, it was designed to have three major components:

  • A central repository of schemas and app configurations
  • Ingestion instances
  • An orchestrator of ingestion instances
For the repository, we believed that Farfetch’s current version control system was a fit for our needs. As for the orchestrator, the first iteration could be a very simple bash script with start, stop and restart functionalities. The real deal was the ingestion system. 
Given the complexity of the software, the first approach that came to mind was to use code generation as a way to satisfy the needs of all of the stakeholders. We quickly realized that it would be rather expensive to develop all the templates as well as maintain them.
Given that we were getting more and more overwhelmed by all the constant updates in the microservices and also building more and more legacy software as time went by, we had to figure out a simpler, yet functional way of doing it that meant we did not have to reinvent the wheel.

The following diagram depicts the high level deploy flow of the solution:


Central repository

We needed a place to store metadata about applications and Kafka topics. We stored all the data in Farfetch’s current version control system so that we had our own centralized configuration and schema repository with all the historical changes that happened in the past. To make use of the ability to check historical changes we needed a repository that was human-readable. For that reason, in the case of protobuf, we chose to store the proto files directly instead of the descriptor of its messages (it would avoid a compilation before running the applications). A descriptor is nothing less than the (non-human-readable) information about the message definitions that can be used to deserialize the message and perform runtime checks.

Each application, apart from Spark and logging configurations, requires input and output information, for instance, which Kafka topics it will consume, and the location where each event type from each topic should be stored. On the other hand, each topic will have information about what events it contains and their respective schema.
We wanted to easily integrate our solution with every team and avoid errors as much as possible, especially when exchanging schemas. Protobuf, since it has a very well defined schema, was not a problem. As for JSON, the same can’t be said. To tackle this issue, we built a tool that teams can use to output a JSON schema that is compatible with Pacman given their C# Data Transfer Objects. This way we can have a structured valid schema that we can use to validate data and be sure that it contains no errors.

Following are some examples of application configurations as well as Protobuf and JSON serialized Kafka topics configurations:

Simplified version of an application configuration

Schema metadata for a JSON Kafka topic

Schema metadata for a Protobuf Kafka topic

Since the application’s configuration is the only manual task and we wanted to avoid the same mistakes we did in the past, we built a validator that checks the structure as well as some predefined rules to avoid the deployment of wrong configurations. 

Ingestion instances orchestrator

We have suffered a lot in the past when network or infrastructure issues took down our applications and we had to manually restart all of them. Our current version of the orchestrator is rather simple: a bash application that can start, stop and restart applications one at a time or in bulk. We decided not to focus our efforts on this part. We prioritized the development of the ingestion instances since it was the most important piece of the whole project. We plan to evaluate the best fitting existing orchestrator in the near future.

Ingestion instance

Ingestion instances are the most crucial and non-trivial piece of the whole system. Before we started the development we designed the following requirements:
  • Ingest data from multiple Kafka topics
  • Deal with different serialization formats
  • Support deserialization of messages with custom message envelopes
  • Validate the ingested data with the defined schema with custom error handling
  • Augment the data with processing metadata
  • Be able to run additional transformations by configuration
  • Save data and final schema into a configured location by event type
We had multiple challenges while trying to develop ingestion instances. Below we will explain the main challenges and how we dealt with them.

#1 Define the necessary steps to process the data

Even before thinking deeply about the subject we imagined that something like a pipeline would fit perfectly in this scenario. We needed a sequence of fixed specialized modules and some pluggable ones that would be defined by configuration.
We defined our pipeline as a set of predefined stages that have a goal, for instance, transform A to C. A to C could be accomplished by composing a set of tasks that lead from A to C, for instance, B to C composed with A to B. An example of the pipeline building blocks is illustrated below:

Where StageFunction is a function that transforms an A into B


A task or several tasks will satisfy a given Stage goal.

With this architecture, we can satisfy several requirements like augmenting the data with processing metadata, have various sinks like an output schema saving task or output data saving task that satisfies an output stage.

#2 Deserialize events with different formats into the same common language

It makes sense that the steps after deserialization should be common for all input formats. Since we are using Spark, the most logical approach was to choose something that would easily integrate with Spark internals. Since Spark uses StructType to internally give a schema to the data it holds on Dataframes, we took advantage of that. 
We had two serialization formats that were required to cover all our current use cases, namely JSON and Protobuf. 


Example of a JSON schema


Example of a protobuf schema

The end goal for each format was to, given a message identifier, have a matching lambda function that transformed the received bytes from Kafka into a Spark Row with a schema defined by a StructType that was built using the serialization format schema. Having this in place we could then use the message identifier from the received message envelopes to get the respective parser and deserialize the whole message. The deserialization has to be made in two steps, the first to identify the message and the latter deserializes the whole message.
The matching between message types and schemas are fetched from the configuration repository.

Spark can read JSON data into DataFrames, so we knew that we could tweak some of its capabilities to build our parsers. After a brief investigation, we found that Spark had an internal Jackson row conversion module that is capable of creating rows from bytes given a JSON Schema. Having this, it is straightforward to get the executors to deserialize the events. All we need is to parallelize the JSON schema across executors and have each one creating its own parsers for the events.

As for Protobuf, the work was not so simple. We investigated ScalaPB, which is a Scala protobuf compiler plugin that contains several utility tools to deal with protobuf files. 
The first step was to compile the protobuf files. ScalaPB has a protoc-bridge module that enables running protoc in the JVM to generate Scala code. It was a deliberate decision to compile the files in the startup of applications instead of doing it beforehand so that they didn’t depend on upstream processes and the development impact was minimized.
The next step, after having the generated case classes, parsers and serializers for the protocol buffers, was to distribute them across Spark executors, compile to bytecode and inject it during the startup so that they have the ability to deserialize the received messages. We used scala.tools compiler module to compile the files and load them into the executor’s classloader.
After gathering all the needed information to deserialize messages all that was left was to generate our parsers. ScalaPB has a SparkSQL module that contains SparkSQL utility methods for ScalaPB. With this module, we could then use the generated parsers to build our parser index.

With this, we were able, with a small initialization penalty, to save data at the same speed as before, while maintaining full compatibility with what we had.

#3 Validate the ingested data against its schema

After completing #2 it was really easy to move on to #3 as Jackson has several validation properties to deal with JSON data given a schema. In the case of Protobu, validation comes for free.
We can specify the behavior in the application configuration when validation fails. Either by incrementing a validation metric, just skip the message or even stop the consumption.

What now?

This whole project has been a fun challenge due to its technical details. Now that the main challenges have been conquered, we are not only more motivated due to how fun it was to develop Pacman but more importantly due to how much more alleviated we feel. 
It is really important to note that what we presented here was achieved in under three months of part-time development by a small team. Today, we can say that what in the past took probably a week of boring work, now takes minutes to go into production with a much lower development and maintenance cost, which were our main goals.

Now that we are on track on the ingestion side the next steps include improving the ingestion instances with more features. Abstract away from Kafka as the only source, implement more sinks as well as application wise custom metrics. In addition to this, moving on from simple bash scripts on the orchestrator side is a priority, as well as improving the central repository configurations with more automatic validations and a cleaner structure so that they can be deployed by practically anyone. Stay tuned for more updates!

Sometimes it is not easy to take a step back and slow down for a while, but in the end, the result is worth the effort. In our case, not only did we get stronger as a team, but we also had a lot of fun while substantially improving our work. Facing new challenges every day is what makes it great to work and live Farfetch’s values. #BeRevolutionary #BeBrilliant #TodosJuntos

Authored by Vítor Oliveira on behalf of the Data Messaging team: João Silva, Jorge Arada and Renato Ayres.
Related Articles