Sunday, December 23, 2012

Hadoop : Was the job really successful

An accurate determination of success is critical. 

The check for success primarily involves ensuring that the number of records output is roughly the same as the number of records input. Hadoop jobs are generally dealing with bulk real world data, which is never 100% clean, so a small error rate is generally acceptable.

It is a good practice to wrap your map and reduce methods in a try block that catches Throwables and reports on the catches.

Each call on the reporter object or the output collector provides a heartbeat to the framework,
                reporter.incrCounter( "Input", "total records", 1 );
                reporter.incrCounter( "Input", "parsed records", 1 );
                reporter.incrCounter( "Input", "number format", 1 );      
                reporter.incrCounter( "Input", "Exception", 1 );
                // better to use ENUMS to avoid spelling mistakes or extra spaces in end

if (format != 0) {
logger.warn( "There were " + format + " keys that were not "+ "transformable to long values");
/** Check to see if we had any unexpected exceptions. This usually indicates some significant problem, either with the machine running the task that had the exception, or the map or reduce function code. Log an error for each type of exception with the count.
if (exceptions > 0 ) {
                Counters.Group exceptionGroup = jobCounters.getGroup(
                TransformKeysToLongMapper.EXCEPTIONS );
                for (Counters.Counter counter : exceptionGroup) {
                                logger.error( "There were " + counter.getCounter()
                                + " exceptions of type " + counter.getDisplayName() );
if (total == parsed) {
      "The job completed successfully.");
// We had some failures in handling the input records. Did enough records process for this to be a
// successful job is 90% good enough?
if (total * .9 <= parsed) {
logger.warn( "The job completed with some errors, "+ (total - parsed) + " out of " + total );
System.exit( 0 );
logger.error( "The job did not complete successfully,"+" too many errors processing the input, only "
+ parsed + " of " + total + "records completed" );
System.exit( 1 );

Hadoop : Map Reduce

Its old design pattern which is more relevant now. You can have different parts of calculation run on different machines which can talk to each other and hence run calculation faster. 

Google have their own implementation which is similar to Cloudera implementation. Then Amazon has Hadoop Lite , houch DB or mongo DB have some implementation running inside. Each one is solving specific probs.

MapReduce is basically made up of MAP and REDUCE. Where MAP is Stateless and independent and REDUCE for it we need to think of how it fits into overall flow... so there is seperation. Let's think of sorting prob each node needs to sort its data and fit it into Reducer.

Let's suppose a cluster of N nodes having data and running a piece of code F(x) (like sum , avg etc) . Now there is no separation of data ex it's not the case one node has Sept data other Oct node. not data is separated by Product type etc , data is evenly distributed. AND DATA is independent.

Each of the nodes will output pairs. can be date and dollar amount(sales).

Seems Can have more than one reducer. All the key value pairs come to this Reducer and gives you any key value pair ie sales on any given day.

Can do similar stuff in SQL , so whats the advtg here ? First this is not running on enterprise architecture think in terms of clustered architecture. And secondly Advtg here is scalability ... independent portions don't have to wait for anyone to finish.

Example :- suppose 6 nodes and we have 2 reducers one for 1st quarter Jan - Mar and another for second march so each reducers gets data from all of the nodes. 

Map is like a Data Warehouse code which extracts , Transform and loads.

It's not necessary to always write/need reduce , 6 out of 10 times you can pass by the reduce code.
Can also chain map reduce.

Icremental Map Reduce :- Couch DB does this once you ask it a ques ( avg sale this quarter ) it keeps computing this for you. this makes it different from google. Couch DB is interested in 1 to 100 DB server and Google is interested in PeraByte plus info like how to scale the web.  COuch DB is a doc based database where docs are represented as JSON when intreacting with it , else stored as binary.

Suppose i want to find biggest sales. so sorting on each node. sort is done according to key so you need to put the right thing in the key. so basically ask yourself how do i want to query this thing later on ... that defines the key.

you can do lot of diff things with Emit(amount,null) if date is of june then emit if date is in last 30 days then emit else not.

Things like count , sum , min , max , avg , std dev etc are written to the DB.

Ex :- we want to know the total no of sales of certain amount. so basically in mapper we return k,v where k should be amount .. in the mapper code we can have logic if greater than x then return k,v => then in reducer we can count the values HENCE we are retuning .

Ex :- if we are counting the number of items per sale then we can return and in reducer use count. A reducer will be called one for a key each time

Time series aggregation 
Key can be complex not necc single string or int or date.
 emit ( [year,month,day,location] , dollarAmount )
then in reduce we can use _SUM ( again is applied to Value that is emitted )
u can have keys like [location,yr,month,day] from most significant to least.
this way we can STRUCTURE.
you can try and NOT to write your own reducer , most of the times you can use std reducer.
We can group by loc only or year only ... there is group_level its a number between 0 and length of the array. 
if i say group level 3 THEN i get info for [AZ,2010,6]

Note output of Map & reduce is simple text hence keeping comm fast.

Also note all this happening in Real time you don't have a system THAT gets Data from one DB to another overnight.

Parts of a Map Reduce Job

Job Conf
This represents a Job that will be completed via a Map Reduce paradigm
                              JobConf conf = new JobConf(MapReduceIntro.class);
Now that you have a JobConfig object, conf, you need to set the required parameters for the job. These include the input and output directory locations, the format of the input and output, and the mapper and reducer classes.

It is good practice to pass in a class that is contained in the JAR file that has your map and reduce functions.This ensures that the framework will make the JAR available to the map and reduce tasks run for your job.

 Maps are the individual tasks which transform input records into a intermediate records. The transformed intermediate records need not be of the same type as the input records. A given input pair may map to zero or many output pairs.

Configuring the Reduce Phase
To configure the reduce phase, the user must supply the framework with five pieces of information:
• The number of reduce tasks; if zero, no reduce phase is run
• The class supplying the reduce method
• The input key and value types for the reduce task; by default, the same as the reduce output
• The output key and value types for the reduce task
• The output file type for the reduce task output

Hadoop : HDFS

Hadoop Distributed File System

As the name suggests its a Distributed file system. Stores your file in a distributed way.

It has a master slave architecture. There are two types of nodes
1.       Master :- Name Node & Job Tracker :- single (single point of failure )
2.       Slave :- Data Node , Task Tracker :- multiple
In 2nd Gen Hadoop there are more than one Name node.
Name Node keeps everything in RAM has nothing in disk. If it crashes everything is lostr sos a secondary node comes to rescue. So secondary node contacts the name node every hour and takes that data out and cleans and reshuffles it into a file for you. dont htink secondary node as high availability for the name node.

  • Files are not stored on name node , they just keep the meta data. As shown in the figure name node is keeping track of two files. 1st is broken into 3 blocks. 2nd in 2 blocks. 
  • File permissions are stored. 
  •  The last access time is stored.
  •  Disk space quotas tracked here.

here replication factor is 3. so

if one of the data nodes goes down then the heart beat of that data node to name node seizes too. after 10 mins the name node will consider the data node to be dead. the blocks on the dead node will be re-spawned.
Also there is a possibility that one of the nodes is slow ( may be going through a GB collection cycle) then in that case the name node can spawn same process on slow node on one more node to help achieve the task on time.
Now lets see how the second file is stored...

There is some intelligence to this how this distribution happens

Name node also has a web server that hosts a basic web site that has statistics about the File system/. can see configured capacity , can see how many data nodes configure it. the no of live nodes the , no. of dead nodes etc , usage capacity.

You don't format the underlying Hard drive into cluster with HDFS. you format it into slave machine into a logical ext3 or ext4 or xfs. underlying file system options
  • ext3 - used by yahoo
  • ext4 - used by google
  • xfs -

HDFS has Linux like commands.

Name Node is Rack Aware

In above figure file.txt is broken in two blocks , which is stored in different data nodes (replication factor is 3) and also notice that Name node is Rackaware knows which rack has what nodes. (need to manually configure on rack when configuring via a python script)

If there is no rack awareness in hadoop then there is a chance that if that rack goes down then the file is lost :- so intelligent storing.

For a smaller configuring ( less than 20 nodes ) you may not want to make it rack aware and can assume that all are on a default rack.

How Write Happens

the sequence of communication is like
  • client issue copyFromLocal command ( or write  a map reduce ) and everything happens itself.
  • This goes to Name Node and it replies "ok write it to Data Node 1,7,9" 
  • Client will directly contact Data Node 1 "TCP Ready command" and also keep "DN 7 & 9" ready. DN1 hops the same command to DN7 & 9. Then a ready ack goes from 9 to 7 70 1 to client. Hence TCP pipeline is ready to take the block.
  • Note now client doesn't have to go thru the name node any more. Name Node is used only to know where in cluster you will write that block then client directly contacts nodes.
  • Client send the block to DN1 which fwd t0 7 which fwd to 9
  • Then each of the DN (individually) lets the NameNode know they receive the block successfully and hence Name Node makes up its Meta Data
  • AND the 1st DN lets the client node know operation successful
  • Then the client moves on write for block B. which may be stored on entirely diff set of racks and data nodes.
  • NOTE :- we r spanning 2 racks ... 1st rack is randomly chosen then the name node chooses a diff rack , has 2 DN (?always i thnk yes ? BCOZ from DN 1 to 7 u need to go thru 3 diff switches one on 1st rack 2nd on nw and 3rd on rack ) BUT from 7 to 9 you go thru only 2 switches... hence keeping latency low) client needs to write to all 3 DN to consider write to be successful

Write with replica=1 will be 5-10 times faster.

Reading a file
1. client asks what are the block locations for a file
2. Name node has locations for block A & B for the files
3. Name node will respond back saying there are two blocks for this file A[1,4,5] B[9,1,2]
4. Client will directly connect with name node to read the blocks, this list is ordered. first will try to read from DN1 , if not possible read from DN 4
5. Name node can be like a book keeper in lib, in a large lib u can't go from point to point to  find a book instead you can goto the bookkeeper he will tell you the rest. as he has organized books via index cards ( by author , publish date etc )

A file comes in you chop it into blocks ( default 64 MB)  and take these blocks and store it on infrastructure. and you take these blocks and replicate these. File gets broken down into blocks. Each is 64MB.

Why size uniform ? :- else the time taken to read will depend on the largest file block.

It's not a fully POSIX compliant file system, its optimized fo
·         Throughput
·         Put / Get / Delete
·         Appends
·         NOT For latency
·         CANT insert stuff in middle of file
·         BUT can do Append
·         Compression if needed is available at higher layer ( CHS from google) not at this layer

Hadoop : What it is meant to do What it is not

Hadoop's architecture
Hadoop was not designed for Enterprise Architecture , It was designed for Clustered Architecture.
so think in right context.

Hadoop is designed to run on a large number of machines that don’t share any memory or disks. That means you can buy a whole bunch of commodity servers, slap them in a rack, and run the Hadoop software on each one.

Hadoop is Not meant to
  1. Replace RDBMs
  2. do Transactions
  3. Generally used with unstructured data , may be used with structured as well
Hadoop is meant to
  1. Index building, pattern recognitions,
  2. Creating recommendation engines, 
  3. Sentiment analysis -- all situations where data is generated at a high volume, 
  4. Read 1 TB file in minutes rather than hours

Hadoop Vs RDBMS
  1. Hadoop can do interactive data but not same speed
  2. Hadoop cant do ACID
  3. Hadoop best to work with unstructured data , RDMS best to work with structured data

Tuesday, December 18, 2012

Hadoop On Windows 7 With Eclipse

Here i will try to install Hadoop on my windows 7 machine and post along the progress and / or issue i face during same.

Pre Requisite
I already had following installed on my machine
  1. jdk1.7.0_07
  2. Eclipse Juno
 Instead of going with lower versions of either i decided to go ahead with the above versions.

Next Step : Install Cygwin
What is it?
The Cygwin tools are ports of the popular GNU development tools for Microsoft Windows. They run thanks to the Cygwin library which provides the POSIX system calls and environment these programs expect.

With these tools installed, it is possible to write Windows console or GUI applications that make use of significant parts of the POSIX API. As a result, it is possible to easily port many Unix programs without the need for extensive changes to the source code.

Is it free software?

Where Can i get it

What versions of Windows are supported?
Cygwin can be expected to run on all modern 32 bit versions of Windows This includes, as of the time of writing this, Windows 2000, Windows XP, Windows Server 2003, Windows Vista, Windows Server 2008, Windows 7, as well as the WOW64 32 bit environment on released 64 bit versions of Windows (XP/2003/Vista/2008/7/2008 R2).

There you go ... i have a 64 bit machine.. lets see how it goes!!

Steps to install
Check out this post for more details

Next Step : Set Environment Variables
  1. The next step is to set up the PATH environment variable so that Eclipse IDE can access Cygwin commands
  2. Find "My Computer" icon either on the desktop or in the start menu, right-click on it and select Properties item from the menu.
  3. When you see the Properties dialog box, click on the Environment Variables button as shown below
  4. When Environment Variables dialog shows up, click on the Path variable located in the System Variables box and then click the Edit button
  5. When Edit dialog appears append the following text to the end of the Variable value field:
  6. ;c:\cygwin\bin;c:\cygwin\usr\bin 
 Note: If you installed cygwin in the non-standard location, correct the above value accordingly.

Close all three dialog boxes by pressing OK button of each dialog box.

Next Step : Setup SSH Daemon
  1. Open the Cygwin command prompt.
    •  Note :- While configuring SSH, you may need to run the cygwin.bat script. While running cygwin.bat in Microsoft Windows Server 2008 and Microsoft Windows Vista or 7, ensure that you invoke it in administrator mode. To do this, right-click the cygwin.bat file and select Run as administrator.
  2. Execute the following command: "ssh-host-config"
  3. give the following answers

Will try to create an account , also ask you to give a password
Next : Start SSH daemon
  1.     Open Services and Applications in the left-hand panel then select the Services item.
  2.     Find the CYGWIN sshd item in the main section and right-click on it.
  3.     Select Start from the context menu.
On doing so i got message

 I checked C:\cygwin\var\log\sshd.log file and found message "Privilege separation user sshd does not exist"
To solve this i will need to make a user SSHD , looking for ways to do the same on Windows 7. But this article and this article


How to install CYGWin on Windows 7

To install the CYGWin environment follow these steps:
  1.     Download CYGWin installer from
  2.     Run the downloaded file. You will see the window shown on the screenshots below
Step 1
 Step 2
 Step 3
 Step 4
 Step7 :- I ignored this for obvious reason
 Step8 : got a console like this , using the search box check if OPENSSH & OpenSSL are installed
 check for OpenSSL , it should be showing skip as below
make sure you select installing it

Similarly check for OpenSSH, by default it should be showing skip as below
make sure you select install for it

Step 9 :- Now click Next , it took 35-40 minutes to install on 512MBPS connection
How do i know if i installed CYGWin correctly ?
After you finish installation
1) File C:\cygwin\bin\cygwin1.dll should exist
2) Mount.exe should exist in same folder
3) From start menu you should be able to run CYGWin successfully
4) after launching the portal run "mount" command If the only output from the mount command is of the "noumount" variety then cygwin isn't installed in any meaningful way.


If you will not do step 8 correctly then you may see error like

What is Hadoop

Best way to understand it is via a simple analogy

What is an Operating System What does it do at its core ?
      Simple it just does two things at its core ... 
  • Ability to store files
  • Ability to run app on top of files
Then comes device drives , security , libraries and all these things are on top of these two.

Similarly you can think of hadoop as a "Modern day Operating System" which gives you similar service the only difference is that its with many many many machines. In fact its an abstraction above it leverages windows and linux to do same.

Why such a strange name ? Is it an acronym ?
Hadoop was designed by Doug Cutting who decided to name the project after his 3 years old  son's favorite toy :- a stuffed elephant ... whom his son used to call Hadoop. so its no acronym , it doesn't mean anything its not an acronym ...  it's just a name.

And now his son is 12 years old and is proud of his achievement :)

Hadoop's architecture
Hadoop was not designed for Enterprise Architecture , It was designed for Clustered Architecture.
so think in right context.

Why was it made ?
Initial work was done by Google who wanted to index the entire web on daily basis. Doug Cutting got inspiration for Hadoop after reading google's paper on Google File System and Google Map Reduce.
Then Google + Yahoo + Apache joined hands to solve this problem.

Hadoop was inspired from Googles release of 3 white papers
  1. Google File System :- a distributed file system
  2. Google Map Reduce :- a combination of Mapper and Reducer function
  3. Google BigTable - search quickly across large data (this is technically not part of core)
Doug Cutting went through above and implemented the same which later with help of Yahoo , Apache and Google became Hadoop.
Component Details
1.  Hadoop Distributed File System :-
Used to manage data across clusters. Split Scattered Replicate and manage this data across nodes. Takes care in case a node is not available or down.

2. Hadoop Map Reduce :-
Computational Mechanism Execute an application in parallel by dividing into tasks, co locating this tasks with facts of data. Collecting and distributing intermediate results and managing failures across clusters.

3.Apache HBase :-

This is technically not part of Core Hadoop. It's implementation of a paper from Google called "BigTable". It tells how to hide latency of HDFS layer so that we can do v v quick lookups.