Follow my blog with Bloglovin

Tuesday, January 28, 2014

Mustache Template in Talend ETL

This blog is about using Mustache template in ETL/ELT to generate rich textual output like HTML, JSON, XML etc from data in ETL flow.

First, download and install Talend in you don't have.

Get tMustache component from here. Install component as described in Step #1 in this tutorial. Need to add mustache module in TOS, similar to Elasticsearch.

Get sample jobs for TOS v5.0.3 and v5.3.1 from here and import them in talend to understand usage of component.

Configuration of tMustache component:


In schema add all columns in flow to both side as highlighted in red rectangle. This will make row data available to next components in flow. In case, row must have only out column, like in Elasticsearch Component  use tFilterColumns component next to tMustache.

Add one extra column, like newColumn (highlighted in green rectangle) to hold output of template execution. This column will be empty in incoming connection and will be written by this component.
Select this column in "Output Column" parameter.

In "Mustache Template" put any valid Mustache template. All variable names must be same as column labels in schema.

Try with sample jobs and me know if there is any issue.

Wednesday, January 15, 2014

Talend Elasticsearch Indexing Tutorial

We are going to do indexing data in existing Elasticsearch server in ETL/ELT flow.

Prerequisites:
Make sure Elasticsearch server is running, if you are testing in local machine. In, deployments it is generally running and indexing  continuously. For this you can unzip and run elasticsearch sh/bat. See.

NOTE:
Elasticsearch version notes 

[Skip this note if using Elasticsearch version <2.0.0]:
Before adding component to Job add dependent libraries in Modules view of Talend Studio to enable component to detect ES version.
ES >2.x need more libraries (as shown in below image). Load them using tLibraryLoad component or Java Routine (right click, on any routine used in the Job) "Edit Routine Library" option, as shown in image.

Additional libraries for ES >2.x


[Skip this note if using Elasticsearch version <1.2.0]:
Above 1.2.0+ Elasticsearch requires JDK 7, so make sure you have JDK 7 installed and Talend Studio is using this.
Talend Studio JDK

See 1.2.0 release notes.

Also need to use Talend Studio Version 5.2.1+.

Step #0: Setup Elasticsearch: Do any configuration you need. Create indexes, types and mappings you need. Generally in development you don't have to do anything, as ETL job starts requesting Elasticsearch will add these metadata with defaults, so you can import jobs in talend and run them.
In, production however defaults are not sensible like need to specify mappings. For this create all of them explicitly. For example you want to specify analyze or storing perticular field :

{
    "tweet" : {
        "properties" : {
            "message" : {"type" : "string", "store" : "yes"},
            "sender" : {"type" : "string", "store" : "yes",
                        "index":"not analyzed"}
        }
    }
}

You can do this as a one time process before running ETL or even in ETL by sending a HTTP request using Http/Rest components in talend. Suggest me if a separate component will be useful for doing this.

After, Elasticsearch is setup and running we can import sample job in talend to explore how this component can be used.

Step #1: Download and import job in Talend from here. This component can be placed in ETL flow to index data.
1.1: Get Elasticsearch extension from exchange

How to install Talend extension/component from Talend Exchange

  • Click "Exchange" link.
  • Search for "elasticsearch" in Available Extension.
  • Check tElasticsearcIndex component appears in pallate under section fulltextsearch. If not present try Ctrl+Shift+F3 to reload pallate.
NOTE: at present talendforge has some issues in adding new revision, so I am not able to add this revision to talendforge. If your cluster name is not "elasticsearch" (the default) then you need to specify cluster name in component. This input is added in laterst version of component and hosted on Github. Latest version 1.0.3 uploaded to Talend Exchange.

1.2: Import job in Talend studio. After importing you may get compilation error in Talend version 5.3 or higher. To resolve this, go to "Module" view and add two jars, elasticsearch and lucene-core. Copy these two jars form your Elasticsearch installation lib folder in some folder and rename them to remove version (make them elasticsearch.jar and lucene-core.jar).

Look for elasticsearch.jar and lucene-core.jar and add them as shown in snaps:


Browse to directory copied in previous section and add two jars for two modules:
 


Do the same for lucene-core.jar.

1.3: Job flow overview:

This job will:
  • Generate 100 rows.
  • Transform them.
  • Add nested document.
  • Index documents.
Generate Rows:

Generate 100 random rows with one key column as int (randomly generated from 0-100) and four string column randomly generated.

Transform them:
Key1 is of type Object and a String array will be passed to this (nested documents). Next section is more in nested documents.

Reform for complex nested documents:
I you need more than two level of nesting, pass all data from tMap in flat single document like:
key:value
child.key=value1
child.child.key=value2
.....
And, in tJavaRow restructure them in nested Map:


//Code generated according to input schema and output schema
output_row.id = input_row.id;
output_row.key1 = input_row.key1;
output_row.key2 = input_row.key2;
output_row.key3 = input_row.key3;
output_row.key4 = input_row.key4;
output_row.key5 = input_row.key5;

Map<String, Object> nested = new HashMap<String, Object>();
nested.put("child1", "val1");
nested.put("child2", "val2");
Map<String, String> nestedChild = new HashMap<String, String>();
nestedChild.put("nestedChild1", "val1");
nestedChild.put("nestedChild2", "val2");
nested.put("child3", nestedChild);
output_row.key6 = nested;


Add, java.util.Map and java.util.HashMap imports in advanced settings of tJavaRow.
Note: As, all the flows in Talend are flat rows driven by schema we need to handle them separately.

Alternatively,  
Index data:
Configure Elasticsearch extension, like:

The column names in right side will be keys in JSON document indexed. For nested documents you can use tMustache component instead of tJavaRow.  Add configuration parameters:
Host: Elasticsearch server host.
Port: Elasticsearch server port. 9300 is default port for Elasticsearch server and 9200 is for REST API
Index Name: This is equivalent of database in SQL world.
Collection Name: This is equivalent of tables in database.
Cluster Name: Name of cluster this host belongs. Default is "elasticsearch". To check your cluster name:
Hit http://<HOST>:<REST PORT>/_cluster/health?pretty=true in browser and get "cluster_name" value, if REST API (HTTP server).is enabled.
Or, check "cluster.name" in Elasticsearch installation config/elasticsearch.yml file.

Note: There is additional level of hierarchy "mapping", under table. As, documents does not have strict schema they are grouped in mapping. If a new document structure is encountered in indexing new mapping is created with default settings.
Better, if you know all your document possible schema and create mapping with your need in advance. Also, if document of same structure have different data type, it will have weird behavior.

Run the job and see indexed document  in "Sense" Google Chrome plugin.

See Indexed Documents:
It shows there are 61 documents, as we generated 100 random numbers and designated that as key, duplicate keys are updated (upsert). See "key6" is nested.

Note: As this plugin is not available now, hit http://<HOST>:<REST PORT>/<INDEX_NAME>/<COLLECTION_NAME>/_search

Thats it! Let me know if there is any issue.

Count (*) : Cassandra Data Modeling

How to model Cassandra for count queries?

Consider same "playlist" database from previous post.

We need to do following queries in tracks table:
  • Count number of tracks. SQL : SELECT COUNT(*) FROM tracks;
  • Count number of tracks for a particular genre. SQL : SELECT COUNT(*) FROM tracks WHERE genre=?;
Step #1:
Create table to store counts:
CREATE TABLE track_count {
   count counter,
   table_name text,
   genre text,
   PRIMARY KEY(table_name,genre)
}

This table can be used to store all table counts, hence table_name column added. In case, if there we need to have count for only one table this column can be removed. And, if grouping is not required or distinct groups are large (so to implement count all need to read all record in memory), remove this column. So, for one table count all query there will be only one cell in this table.

Generalized table to store counts for all tables in database:
CREATE TABLE track_count {
   count counter,
   table_name text,
   <grouping columns>,
   PRIMARY KEY(table_name,<grouping columns>...)
}
 

Step #2:
Update count on adding track:
BATCH BEGIN
INSERT INTO tracks (...) VALUES (...);
UPDATE tracks SET count=count+1 WHERE table_name='tracks' and genre=?;
APPLY BATCH;

On every insert of track the count will be updated and grouped by genre. As, genre will of small number its okay to get all records in memory and sum counts to get number of records.

Step #3:
CQL queries modeling:

[SQL] : SELECT COUNT(*) FROM tracks;
[CQL] : SELECT count FROM track_count WHERE table_name='tracks';
Sum count in application code.

[SQL] : SELECT COUNT(*) FROM tracks WHERE genre=?;
[CQL] : SELECT count FROM track_count WHERE table_name='tracks' genre=?;

[SQL] : SELECT COUNT(*) FROM tracks GROUP BY genre;
[CQL] : SELECT count FROM track_count WHERE table_name='tracks';
 
In SQL we query same table for count and in CQL we create new tables(s) and query other table(s).

Friday, January 10, 2014

Playramework 2 : WebSocket Continious Streaming Java Example

Refer to my earlier post to setup Play with Spring.

Create controller class as:


package controllers;

import java.util.concurrent.TimeUnit;

import org.springframework.stereotype.Component;

import play.libs.Akka;
import play.libs.F.Callback;
import play.libs.F.Callback0;
import play.mvc.WebSocket;
import scala.concurrent.duration.Duration;

@Component
public class WebSocketController {

       public WebSocket<String> websocket() {
              return new WebSocket<String>() {
                     public void onReady(final WebSocket.In<String> in, final WebSocket.Out<String> out) {
                           in.onMessage(new Callback<String>() {
                                  public void invoke(String event) {
                                         System.out.println(event);
                                         out.write("Hello! Received "+event);
                                  }
                           });
                           in.onClose(new Callback0() {
                                  public void invoke() {
                                         System.out.println("Disconnected");
                                  }
                           });

                           class StreamNumbers implements Runnable{
                                  private int i = 0;

                                  @Override
                                  public void run() {
                                         out.write(String.valueOf(i));
                                         i++;
                                  }

                           }

                           Akka.system().scheduler().schedule(
                                         Duration.create(0, TimeUnit.MILLISECONDS), //Initial delay 0 milliseconds start immidiately
                                         Duration.create(30, TimeUnit.SECONDS),     //Frequency 30 seconds
                                         new StreamNumbers(),
                                         Akka.system().dispatcher()
                                         );
                     }
              };
       }
}

Add routes entry in conf/routes file as:
GET        /websocket                       @controllers.WebSocketController.websocket()

Run the Play application and test it with http://www.websocket.org/echo.html by using location as ws://localhost:9000/websocket

Cassandra Data Modeling

Forget everything about relational modeling and model queries

In Cassandra Data Modeling we have to model query results and all data need to reside in single table, joins are not supported. Also, only columns part of primary key or indexed can be used in ad-hoc (where clause) query and aggregation/group by is also not supported (important consideration in modeling query results). Indexes makes writes slow, so should be avoided.

Keys in Cassandra data modeling:
  • Primary Key [Partition Key+Clustering Key+Unique Key]:
    • Decides how data will be partitioned [Partition Key(s)]. First column is partition key. If multiple keys need to be used as partition key they can be enclosed in parentheses. These columns can not be updated.
    • Decides how data will be ordered/grouped [Clustering Keys]. All other columns in primary keys except partition key are clustering keys. Data will be ordered according to order of clustering keys in a partition.
    • Uniquely identify rows. All of them identifies data uniquely (at least one column must ensure uniqueness).
    • Syntax:
                           CREATE TABLE table1{
                                  column1 type1,
                                  column2 type1,
                                  column3 type1,
                                  column4 type1,
                                  .....
                                   PRIMARY KEY ((column1,column2),column3,column4)
                             }
  • Ad-hoc query must include primary key columns from left to right continuously leaving columns at right. For example,  it can include column1, column2 and column3 but can not column1, column3 or column1, column2, column4
  • Depending on queries we may need to create multiple table and store same date with different keys and their ordering.
Consider the playist example from Datastax Virtual Training:
We need to store following music track information:
Id
Title
Album
Artist
Genre
Track length
Rating

And we need following queries:
  • Get a particular track selected (track by id).
  • Get all songs from an album (tracks in a album order by rating).
  • Search songs by genre (tracks of a genre type, grouped into albums and order by ratings).
  • Get songs by artist (tracks by an artist, grouped into album and order by rating).
  • GROUP album BY listen_count
  • GROUP album and track BY listen_count
 In relational modeling there will be normalized tables like, artist_detail, album_detail etc and track table will have (N:1) relation with those table, holding only reference ids and need to join for fetching result. In Cassandra we will have to store all details in one table (multiple tables for all queries). This is similar to materialized view in databases.

Create a keyspace say "playlist":
 CREATE KEYSPACE playlist WITH REPLICATION = {
            'class':'SimpleStrategy',
            'replication_factor':3
 };

Create Tables:
CREATE TABLE track_by_id (
id int PRIMARY KEY,
title text,
album text.
artist text,
genre text,
track_length int,
rating int
);

CREATE TABLE tracks_by_album (
id int,
title text,
album text.
artist text,
genre text,
track_length int,
rating int,
PRIMARY KEY(album,rating,id)
) WITH CLUSTERING ORDER BY rating DESC;

CREATE TABLE tracks_by_genre (
id int,
title text,
album text.
artist text,
genre text,
track_length int,
rating int,
PRIMARY KEY(genre,album,rating,id)
) WITH CLUSTERING ORDER BY rating DESC;

CREATE TABLE tracks_by_artist (
id int,
title text,
album text.
artist text,
genre text,
track_length int,
rating int,
PRIMARY KEY(artist,album,rating,id)
) WITH CLUSTERING ORDER BY rating DESC;

CREATE TABLE album_listen_count (
album text,
listen_count counter,
PRIMARY KEY(album)
);
Update album count every time a track is requested from track_by_id. Counter tables can have only one counter column and rest of columns must be part of primary key, as there will be always update to this column and no insert.

This table group by each track in album and be used for drilldown for above table data. Keys album and track_id are sufficient, but as tables are denormalized need to add title for display to user. Can use title and remove track_id, this example is only to emphasize on denormalization.

CREATE TABLE album_track_listen_count (
album text,
track_id int,
title text,
listen_count counter,
PRIMARY KEY(album,title,track_id)
);
Six tables for six queries. Can reduce tables by creating indexes, but in Cassandra duplication is encouraged rather than performance cost.

Insert data into all four atomically:
BEGIN BATCH
INSERT INTO track_by_id (id, title, album, artist, genre, track_length, rating) VALUES (1,'Song1','Album1','Artist1','g1',300,3);
INSERT INTO tracks_by_album (id, title, album, artist, genre, track_length, rating) VALUES (1,'Song1','Album1','Artist1','g1',300,3);
INSERT INTO tracks_by_genre (id, title, album, artist, genre, track_length, rating) VALUES (1,'Song1','Album1','Artist1','g1',300,3);
INSERT INTO tracks_by_artist (id, title, album, artist, genre, track_length, rating) VALUES (1,'Song1','Album1','Artist1','g1',300,3);
APPLY BATCH;

This will guarantee atomicity but costs ~30% on performance. All queries can be run independently outside BATCH.

Queries to fetch data:
SELECT FROM track_by_id WHERE id=1;

Along with this query always do below two update query, in application code, for populating groupby tables:

UPDATE album_track_listen_count SET listen_count = listen_count+1 WHERE 
album=? AND track_id=? AND title=?;
UPDATE album_listen_count SET listen_count = listen_count+1 WHERE album=?;

These updates will populate aggregate data in count tables. Query them as like any other table to get listen counts, equivalent  of Group By + Count in SQL.

SELECT FROM tracks_by_album WHERE album='Album1';
SELECT FROM tracks_by_genre WHERE genre='g1';
SELECT FROM tracks_by_artist WHERE artist='Artist1';

Java code to do this using Datastax Java Driver:
package dao;

import java.util.ArrayList;
import java.util.List;
 
import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;

public class ArtistsDAO {
        
         private static void insertTrack(Session session, int id, String title, String album,
                       String artist, String genre, int track_length, int rating){
                BatchStatement batch = new BatchStatement();
                PreparedStatement byIdPs = session.prepare("INSERT INTO track_by_id (id, title, album, artist, genre, track_length, rating) VALUES (?,?,?,?,?,?,?);");
                PreparedStatement byAlbumPs = session.prepare("INSERT INTO tracks_by_album (id, title, album, artist, genre, track_length, rating) VALUES (1,'Song1','Album1','Artist1','g1',300,3);");
                PreparedStatement byGenrePs = session.prepare("INSERT INTO tracks_by_genre (id, title, album, artist, genre, track_length, rating) VALUES (1,'Song1','Album1','Artist1','g1',300,3);");
                PreparedStatement byArtistPs = session.prepare("INSERT INTO tracks_by_artist (id, title, album, artist, genre, track_length, rating) VALUES (1,'Song1','Album1','Artist1','g1',300,3);");
               
                batch.add(byIdPs.bind(id, title, album, artist, genre, track_length, rating));
                batch.add(byAlbumPs.bind(id, title, album, artist, genre, track_length, rating));
                batch.add(byGenrePs.bind(id, title, album, artist, genre, track_length, rating));
                batch.add(byArtistPs.bind(id, title, album, artist, genre, track_length, rating));
               
                session.execute(batch);
         }
        
         private static void getTracksByGenre(Session session, String genre){
                PreparedStatement ps = session.prepare("SELECT FROM tracks_by_genre WHERE genre=?;");
                //as one genre can have thousand of songs, we need to process result in chunks (of size 100)
                //setFetchSize sets chunk size of records in auto paging
                ResultSet result = session.execute(ps.bind(genre).setFetchSize(100));
                for(Row row : result){
                       //process row, iteration will iterate whole result in chunks.
                }
         }
        
         public static void main(String[] args) {
              Cluster cluster = Cluster.builder().addContactPoint("localhost").build();
              Session session = cluster.connect("playlist");
              insertTrack(session, 1,"Song1","Album1","Artist1","g1",300,3);
              getTracksByGenre(session, "g1");
              session.shutdown();
              cluster.shutdown();
       }
      
}

This is for simple understanding of folks from relational world.

Popular Posts