In the realm of technology and data management, a significant portion of a company’s budget is often allocated to data storage and retrieval. A common issue faced by many companies is the presence of duplicate data. By identifying and removing these redundant, outdated, or irrelevant records, organizations can substantially cut costs.
Moreover, cleansing data of duplicates enhances the manageability of data layers and accelerates query response times.
To initiate this process, the first step is identifying duplicate records in Elasticsearch. This can be challenging, especially when a single record may appear across multiple Elasticsearch indexes.
SQL Approach to Removing Duplicates
Let’s first consider how we would handle this situation using SQL. Imagine we want to find duplicate data across different indexes in Elasticsearch. In SQL, we might use an intersect clause between datasets. For instance, with two datasets, the SQL query would look like this:
select distinct country, name_norm from trainstations
INTERSECT
select distinct country, name_norm from trainstations_de
In this example, we’re dealing with a dataset of train stations across the European Union. We have an index named “trainstations” for all EU records and additional indexes for each country (like “trainstations_de”). Each record is stored twice: once in the general index and once in a country-specific index.
However, Elasticsearch doesn’t support an intersect clause directly.
Adapting SQL Logic for Elasticsearch
We can rewrite the intersect SQL query using tools available in Elasticsearch. Here’s how you might write an SQL query to find duplicates:
with relevant_data as (
select distinct country, name_norm from trainstations
union all
select distinct country, name_norm from trainstations_de
)
select ountry, name_norm
from relevant_data
group by 1,2
having count(*)>1
Elasticsearch Query for Finding Duplicates
Now, let’s translate this logic into an Elasticsearch query:
GET /trainstation*/_search
{
"size":0,
"query":{
"bool":{
"filter":{
"terms": {
"_index": [
"trainstations",
"trainstations_de"
]
}
}
}
},
"aggs": {
"duplicates": {
"composite": {
"size":65535,
"sources":[
{
"country_value":{
"terms":{
"field": "country.keyword",
"order" : "asc"
}
}
},
{
"name_value":{
"terms":{
"field": "name_norm.keyword" ,
"order" : "asc"
}
}
}
]
},
"aggs": {
"having" : {
"bucket_selector" : {
"buckets_path" : {
"count_path":"_count"
},
"script" : {
"source" : "params.count_path>1",
"lang" : "painless"
},
"gap_policy" : "skip"
}
}
}
}
}
}
{
"took" : 4,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 7855,
"relation" : "eq"
},
"max_score" : null,
"hits" : [ ]
},
"aggregations" : {
"duplicates" : {
"after_key" : {
"country_value" : "DE",
"name_value" : "Zwotental"
},
"buckets" : [
{
"key" : {
"country_value" : "DE",
"name_value" : "Aalen"
},
"doc_count" : 2
},
{
"key" : {
"country_value" : "DE",
"name_value" : "Aglasterhausen"
},
"doc_count" : 2
},...
Handling Large-Scale Duplicates
When dealing with a vast number of duplicates, it’s important to remember that Elasticsearch has a limit of returning a maximum of 65,535 values. To circumvent this limitation, we use composite aggregation. After each query, Elasticsearch provides an “after_key” in the response. This key is crucial as it indicates where to resume the search in the subsequent query. Automating this process, perhaps with tools like PySpark or Pandas, can streamline the operation. A loop can be set up to run until no more results are returned, using the “after_key” value from each iteration.
GET /trainstation*/_search
{
"size":0,
"query":{
"bool":{
"filter":{
"terms": {
"_index": [...]
}
}
}
},
"aggs": {
"duplicates": {
"composite": {
"size":65535,
"sources":[...],
"after" : {
"country_value" : "DE",
"name_value" : "Zwotental"
}
},
"aggs": {...
Innovative SQL Solutions for Elasticsearch Duplicate Detection
There are many SQL platforms designed to facilitate SQL-based querying in Elasticsearch environments. What sets our platform apart is its emphasis on high-efficiency operations that do not demand the complete loading of data into memory. This advantage allows for the processing of complex queries on more modest, cost-effective hardware systems, optimizing the use of computational resources and significantly reducing operational costs.