Part of the Orange Group

Back to Blogroll
Big Data

4 min read

DIY: Data Deduplication Filter in 10 Minutes

Article written by:

DIY: Data Deduplication Filter in 10 Minutes

It is a Big Data world we live in, and data pipelines are gaining popularity as a means of effective data collection, storage, and analysis. As they develop, they are also becoming easier to implement. One of the most common tasks related to processing these data flows is data cleaning, which may have multiple aspects, but one considered the most useful and complex at the same time is data deduplication, our today’s focus.

Our task will be to create a data deduplication filter based on a set of high requirements. Tough? Used to be. Now, the recent versions of tools such as Apache Kafka and Flink are making this exercise a child’s play. All you need is a few minutes to spare.

Creating Data Deduplication Filter

Kafka and Flink make implementing data deduplication very straightforward. Let’s see that on an example of an end-to-end deduplication filter from a Kafka topic to another Kafka topic, using the new features introduced in Kafka v.0.11 and Flink v.1.4.

Must-Have Requirements

We do not want to add too much latency on top of our data pipeline; to keep it near the real-time value, we should set it below100ms. The purpose of the filter is to remove duplicates, therefore the system itself should provide strong, exactly-once guarantees that it won’t add any new duplicates, even upon failures. On top of that, the filter should be easily scalable.

To conclude, here is the complete spec for our use case:

  • from Kafka topic – to Kafka topic
  • latency < 100ms
  • exactly-once guarantees (fault-tolerant)
  • scalable

Creating Checkpoints in Apache Flink

It seems to verge on the impossible to meet all the above requirements, but the recent versions of Flink and Kafka indeed save us much hassle:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.

getExecutionEnvironment

();

env.enableCheckpointing(

CHECKPOINTING_INTERVAL_MS

);

env.addSource(

kafkaConsumer

())

        .keyBy(

keySelector

())

        .filter(new DedupeFilterFunction<>(

keySelector

(),

CACHE_EXPIRATION_TIME_MS

))

        .addSink(

kafkaProducer

());

env.execute(

JOB_NAME

);

The above excerpt is an implementation of a Flink job in Java (for more information on how to start with Flink).

In the first two lines, we’re creating a streaming environment object where to configure the job:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.

getExecutionEnvironment

();

env.enableCheckpointing(

CHECKPOINTING_INTERVAL_MS

);

The job can be used to enable checkpointing, i.e., Flink’s light-weight fault-tolerance mechanism.

checkpoint is a snapshot of the current state from all Flink operators. Periodically, a barrier marker is emitted among a stream of ordinary events. When such a marker reaches an operator, then the snapshot of this operator is triggered asynchronously. The snapshot can also consist of incremental changes only. When the marker goes all the way to the sink, then the checkpoint is completed. Apart from when triggering the asynchronous snapshot, the processing remains uninterrupted throughout the whole checkpoint. Check for more details.

Building Data Flows

As a next step, we will create a data flow:

env.addSource(

kafkaConsumer

())

        .keyBy(

keySelector

())

        .filter(new DedupeFilterFunction<>(

keySelector

(),

CACHE_EXPIRATION_TIME_MS

))

        .addSink(

kafkaProducer

());

There are separate functions for constructing a Kafka consumer:

private static FlinkKafkaConsumer011<String> kafkaConsumer() {

    final Properties properties = new Properties();

    properties.put(ConsumerConfig.

BOOTSTRAP_SERVERS_CONFIG

,

KAFKA_BROKER_ADDRESS

);

    properties.put(ConsumerConfig.

GROUP_ID_CONFIG

, "dedupe-filter-group");

    return new FlinkKafkaConsumer011<>(

INPUT_TOPIC

,

            new SimpleStringSchema(),

            properties);

}

and a Kafka producer:

private static FlinkKafkaProducer011<String> kafkaProducer() {

    final Properties properties = new Properties();

    properties.put(ProducerConfig.

BOOTSTRAP_SERVERS_CONFIG

,

KAFKA_BROKER_ADDRESS

);

    properties.put(ProducerConfig.

TRANSACTION_TIMEOUT_CONFIG

, 30_000);

    return new FlinkKafkaProducer011<>(

OUTPUT_TOPIC

,

            new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),

            properties,

            Semantic.

EXACTLY_ONCE

);

}

Note the property Semantic.EXACTLY_ONCE, which we need to specify in the Kafka producer. It is a new feature introduced in Flink 1.4, which allows us to use the exactly-once producer if the broker also supports it (in Kafka, it is implemented starting from version 0.11).

The set of requirements for data deduplication we assumed in our task enforce the use of the exactly-once semantics. However, different types are available:

  • at-most-once – the most basic approach where we send a message and don’t retry on failure or check if it was delivered; a message will be delivered 0 or 1 times, which means that some messages can be dropped;
  • at-least-once – a basic fault tolerance where we make sure that a message is delivered (e.g., using ACK messages); a message can be delivered 1 or more times, which means that no messages are dropped, but there could be duplicates;
  • exactly-once – the hardest to implement; no duplicates, no lost messages.

We only have to specify the delivery guarantee at the producer’s side, not at the consumer’s.

To determine the exactly-once guarantees in Flink for the consumer, we only need to enable checkpointing, that is all. Flink will store its consumer offsets within its fault-tolerant state. The additional complexity on the producer’s side comes from the fact that consumers in Kafka monitor on their own which messages they have processed, while for producers more coordination is needed.

Where the Real Deduplication Happens

But where does all the deduplication magic happen? Right here!

.keyBy(

keySelector

())

.filter(new DedupeFilterFunction<>(

keySelector

(),

CACHE_EXPIRATION_TIME_MS

))

The above is the implementation of Flink’s deduplication filter created by Jamie Grier in, which internally uses Flink’s fault-tolerant state to keep seen IDs in memory. Memory is a limited resource, so we need to scale it appropriately, together with an expiration time.

And what about the keyBy(keySelector)? All operations in Flink are executed in parallel, in separate stream partitions. This feature, combined with Kafka partitions, makes it for the solution’s capability to scale horizontally.

We also need to make sure that if there are duplicates within the incoming data stream, they will be processed on the same machine. This condition is fulfilled by the keyBy() function, which shuffles data between machines, so we are always processing the same IDs on the same machines. In our keySelector() function, we return an ID, and we will be filtering out events when they happen to have the same ID.

Putting the Deduplication Filter Bricks Together

We have just put together a nice and neat implementation, which fulfills all the specified requirements. In only a few lines (and probably in 10 minutes or less), we have crafted a piece of software that is ultra-fast, scalable and has strong consistency guarantees. That is all a decent deduplication filter needs.

You might also be interested in

Let's talk business

Subscribe to our newsletter.

Receive the latest updates from us.