Transactional SQL Join operation on Elasticsearch with Pandas

Elasticsearch, a dynamic analytical tool, merges storage architecture with a powerful engine to enable rapid full-text scans, delivering results in milliseconds. Distinct from traditional databases, it excels as a non-relational search engine.

Organizations seeking the advantages of both transactional databases and Elasticsearch often face data duplication and complex migration processes. It can be mitigated natively on Elasticsearch or by integrating with SQL platforms.

We will start with how we do that with Elasticsearch alone. The typical solution in Elasticsearch is by setting parent-child relationships in indexes.

Another approach is to take advantage of Elasticsearch’s free-text search capabilities. This approach involves pre-planning the structure and using Pandas for efficient data manipulation, allowing SQL join operations on Elasticsearch as if working with tables in a relational database. The key lies in pre-storing potential join column values, creating a dedicated index for these keys, and facilitating joint operations across various indexes in a stable database environment.

 

Structure

In relational jargon, we are describing the use case of a very large table joined to a small one. The small table is not related to the big table in terms of absolute value for Elasticsearch, and it depends on the size and type of the Elasticsearch cluster. It is a good rule of thumb to store an index which returns its results in less than 500 milliseconds when a single request is made.

				
					# Create an index template for the bigger table, it is better to be devided to indxes by time slices.

curl -X PUT "localhost:9200/_template/big_table_index?pretty" -H 'Content-Type: application/json' -d'
{
  "index_patterns": ["big_table_*"],
  "settings": {
    "number_of_shards": 1
  },
  "mappings": {
    "_source": {
      "enabled": false
    },
    "properties": {
      "big_table_join_field": {
        "type": "long"
      },
      "additional_field": {
        "type": "text"
      },
      "created_at": {
        "type": "date",
        "format": "dd/MMM/yyyy:HH:mm:ss Z"
      }
    }
  }
}
'

# Create index for the smaller table

curl -X PUT "localhost:9200/small_table_index?pretty" -H 'Content-Type: application/json' -d'
{
    "settings" : {
        "number_of_shards" : 1
    },
    "mappings" : {
        "properties" : {
            "small_table_join_field" : { 
              "type" : "long" 
            },
            "additional_field": {
              "type": "text"
            }
        }
    }
}
'


# Create index for join keys from both tables, the value key should have the type of the join filed, this is the tag that stores the actual value of the join filed 

curl -X PUT "localhost:9200/join_keys_index?pretty" -H 'Content-Type: application/json' -d'
{
    "settings" : {
        "number_of_shards" : 1
    },
    "mappings" : {
        "properties" : {
            "table_name" : { 
              "type" : "keyword" 
            },
            "join_field_name": {
              "type": "keyword"
            },
            "value": {
              "type": "long"
            },
            "last_update": {
              "type": "date",
              "format": "dd/MMM/yyyy:HH:mm:ss Z"
            }
        }
    }
}
'
				
			

 

Writing to indices:

				
					input { stdin { } }

filter {
	mutate
  {
    add_field => { "table_name" => "big_table" }
    add_field => { "join_field_name" => "big_table_join_field" }
    add_field => { "value" => "%{big_table_join_field_value}" }
  }
}
output {
  elasticsearch {                 
  	hosts => ["localhost:9200"]
	index => "big_table_"${INDEX_DATE}
    document_type => "_doc" 
    }
  elasticsearch {                 
  	hosts => ["localhost:9200"]
	  index => "join_keys_index"
	  document_id => "%{table_name}%{join_field_name}%{value}"
    document_type => "_doc" 
    }
}
				
			
				
					input { stdin { } }

filter {
	mutate
  {
     add_field => { "table_name" => "small_table_name" }
     add_field => { "join_field_name" => "small_table_join_field" }
     add_field => { "value" => "%{small_table_join_field_value}" }
  }

output {
  elasticsearch {                 
  	hosts => ["localhost:9200"]
	index => "small_table_index"
    document_type => "_doc" 
    }
  elasticsearch {                 
  	hosts => ["localhost:9200"]
	index => "join_keys_index"
	document_id => "%{table_name}%{join_field_name}%{value}"
    document_type => "_doc" 
    }
}
				
			

Querying inner join operation:

				
					def inner_join(left_table_name, left_table_join_key, right_table_name, right_table_join_key):
	connect_elastic()

	body_join_potential={"query": "aggs": {
      categories: {            
        filter: {bool:{ should:[{bool:{must:[{"term": {"table_name": "big_table"}},{"term": {"join_field_name": "big_table_join_field"}}]}},
        {bool:{must:[{"term": {"table_name": "small_table"}},{"term": {"join_field_name": "small_table_join_field"}}]}}]}} ,
        aggs: {
          inner_join: {
            terms: {field: 'value', "min_doc_count": 2}
          }
        }
      }
    }}
	results_join_potential = elasticsearch.helpers.scan(es, query=body_join_potential, index="join_keys_index")
	df_join_potential = pd.DataFrame.from_dict([document['_source'] for document in results_join_potential])

	list_of_join_values=list:(df_join_potential['value'])
	
	body_big_table={"query" : {
        "terms" : {
            "big_table_join_field" : [ list_of_join_values]
        }
    }
	results_big_table_data = elasticsearch.helpers.scan(es, query=body_big_table, index="join_keys_index")
	df_big_table = pd.DataFrame.from_dict([document['_source'] for document in results_big_table_data])


	body_small_table={"query" : {
        "terms" : {
            "small_table_join_field" : [ list_of_join_values]
        }
    }
	results_samll_table_data = elasticsearch.helpers.scan(es, query=body_small_table, index="join_keys_index")
	df_samll_table = pd.DataFrame.from_dict([document['_source'] for document in results_samll_table_data])

	final_result=results_big_table_data.set_index('big_table_join_field').join(results_small_table_data.set_index('small_table_join_field'))


def connect_elastic():
	es = Elasticsearch('127.0.0.1',
    http_auth=('my_username', 'my_password'),
    port=9200)
				
			

Manage and control

With running chunks of data, you can control the process and memory usage:
A join like this can be even more efficient than a SQL database. Retrieving the data in changes by time intervals allows you to control the data you load in memory. You can decide in advance which type of instance will run the pandas code, and whether it will be one or 100.

Additional capabilities

By storing keys from both tables with timestamps, you can create a join between them.
Join multiple keys – add each key to the keys index as a distinct document and update the query bool constraints.
Select which output fields are included in the operation for each table.
You can use HDFS as an external data source with an SQL interface like Athena or BigQuery by writing orc or parquet files to HDFS.

Using SQL platform to join indices

Utilizing SQL platforms for index joining is a seamless process, requiring no alterations to existing data structures. This method bypasses the complexities of Lucene, allowing for direct SQL execution.

Our platform is particularly effective for this purpose, offering high-efficiency operations without the need for full data memory loading. Consequently, it enables query processing on less powerful, cost-efficient hardware, optimizing resource utilization and reducing operational costs.

For a free trial, contact us!

Tags: