Follow my blog with Bloglovin

Wednesday, January 15, 2014

Talend Elasticsearch Indexing Tutorial

We are going to do indexing data in existing Elasticsearch server in ETL/ELT flow.

Make sure Elasticsearch server is running, if you are testing in local machine. In, deployments it is generally running and indexing  continuously. For this you can unzip and run elasticsearch sh/bat. See.

Elasticsearch version notes 

[Skip this note if using Elasticsearch version <2.0.0]:
Before adding component to Job add dependent libraries in Modules view of Talend Studio to enable component to detect ES version.
ES >2.x need more libraries (as shown in below image). Load them using tLibraryLoad component or Java Routine (right click, on any routine used in the Job) "Edit Routine Library" option, as shown in image.

Additional libraries for ES >2.x

[Skip this note if using Elasticsearch version <1.2.0]:
Above 1.2.0+ Elasticsearch requires JDK 7, so make sure you have JDK 7 installed and Talend Studio is using this.
Talend Studio JDK

See 1.2.0 release notes.

Also need to use Talend Studio Version 5.2.1+.

Step #0: Setup Elasticsearch: Do any configuration you need. Create indexes, types and mappings you need. Generally in development you don't have to do anything, as ETL job starts requesting Elasticsearch will add these metadata with defaults, so you can import jobs in talend and run them.
In, production however defaults are not sensible like need to specify mappings. For this create all of them explicitly. For example you want to specify analyze or storing perticular field :

    "tweet" : {
        "properties" : {
            "message" : {"type" : "string", "store" : "yes"},
            "sender" : {"type" : "string", "store" : "yes",
                        "index":"not analyzed"}

You can do this as a one time process before running ETL or even in ETL by sending a HTTP request using Http/Rest components in talend. Suggest me if a separate component will be useful for doing this.

After, Elasticsearch is setup and running we can import sample job in talend to explore how this component can be used.

Step #1: Download and import job in Talend from here. This component can be placed in ETL flow to index data.
1.1: Get Elasticsearch extension from exchange

How to install Talend extension/component from Talend Exchange

  • Click "Exchange" link.
  • Search for "elasticsearch" in Available Extension.
  • Check tElasticsearcIndex component appears in pallate under section fulltextsearch. If not present try Ctrl+Shift+F3 to reload pallate.
NOTE: at present talendforge has some issues in adding new revision, so I am not able to add this revision to talendforge. If your cluster name is not "elasticsearch" (the default) then you need to specify cluster name in component. This input is added in laterst version of component and hosted on Github. Latest version 1.0.3 uploaded to Talend Exchange.

1.2: Import job in Talend studio. After importing you may get compilation error in Talend version 5.3 or higher. To resolve this, go to "Module" view and add two jars, elasticsearch and lucene-core. Copy these two jars form your Elasticsearch installation lib folder in some folder and rename them to remove version (make them elasticsearch.jar and lucene-core.jar).

Look for elasticsearch.jar and lucene-core.jar and add them as shown in snaps:

Browse to directory copied in previous section and add two jars for two modules:

Do the same for lucene-core.jar.

1.3: Job flow overview:

This job will:
  • Generate 100 rows.
  • Transform them.
  • Add nested document.
  • Index documents.
Generate Rows:

Generate 100 random rows with one key column as int (randomly generated from 0-100) and four string column randomly generated.

Transform them:
Key1 is of type Object and a String array will be passed to this (nested documents). Next section is more in nested documents.

Reform for complex nested documents:
I you need more than two level of nesting, pass all data from tMap in flat single document like:
And, in tJavaRow restructure them in nested Map:

//Code generated according to input schema and output schema =;
output_row.key1 = input_row.key1;
output_row.key2 = input_row.key2;
output_row.key3 = input_row.key3;
output_row.key4 = input_row.key4;
output_row.key5 = input_row.key5;

Map<String, Object> nested = new HashMap<String, Object>();
nested.put("child1", "val1");
nested.put("child2", "val2");
Map<String, String> nestedChild = new HashMap<String, String>();
nestedChild.put("nestedChild1", "val1");
nestedChild.put("nestedChild2", "val2");
nested.put("child3", nestedChild);
output_row.key6 = nested;

Add, java.util.Map and java.util.HashMap imports in advanced settings of tJavaRow.
Note: As, all the flows in Talend are flat rows driven by schema we need to handle them separately.

Index data:
Configure Elasticsearch extension, like:

The column names in right side will be keys in JSON document indexed. For nested documents you can use tMustache component instead of tJavaRow.  Add configuration parameters:
Host: Elasticsearch server host.
Port: Elasticsearch server port. 9300 is default port for Elasticsearch server and 9200 is for REST API
Index Name: This is equivalent of database in SQL world.
Collection Name: This is equivalent of tables in database.
Cluster Name: Name of cluster this host belongs. Default is "elasticsearch". To check your cluster name:
Hit http://<HOST>:<REST PORT>/_cluster/health?pretty=true in browser and get "cluster_name" value, if REST API (HTTP server).is enabled.
Or, check "" in Elasticsearch installation config/elasticsearch.yml file.

Note: There is additional level of hierarchy "mapping", under table. As, documents does not have strict schema they are grouped in mapping. If a new document structure is encountered in indexing new mapping is created with default settings.
Better, if you know all your document possible schema and create mapping with your need in advance. Also, if document of same structure have different data type, it will have weird behavior.

Run the job and see indexed document  in "Sense" Google Chrome plugin.

See Indexed Documents:
It shows there are 61 documents, as we generated 100 random numbers and designated that as key, duplicate keys are updated (upsert). See "key6" is nested.

Note: As this plugin is not available now, hit http://<HOST>:<REST PORT>/<INDEX_NAME>/<COLLECTION_NAME>/_search

Thats it! Let me know if there is any issue.

Popular Posts