Sunday, July 30, 2017

Sqoop2 Server

Sqoop CLI limitations
  • User running Scoop CLI , requires connectors installed in the same machine
  • User need to know credentials ( admin may not want to share with all )
  • Scoop jobs are stored locally , and cannot be shared with other team members
  • People can unknowingly create load on RDBMS
Hence a new version is  under development ... called sqoop2 server.
  • sqoop2-sheel only submits jobs to sqoop2 server
  • sqoop2 server does all the work
  • communication to sqoop2 is via json + rest
  • client doesnt need to access hadoop or rdbms ( only server does )

Sqoop : Export


scoop export --connect jdbc:mysql://localhost/db --username root 
--table employee --export-dir /emp/emp_data  

  • Table must already exist in db
  • Input files are read and parsed as per user provided inputs
  • Default is to generate insert statement , can use update  mode to generate update statements.
  • If sqoop attempt to insert violates PK constraint , the export will fail ?? whole or partial ??
  • Inserts are performed by multiple sqoop threads , each thread uses a seperate connection & transaction
  • Every 100 statement sqoop commits transaction ( so if 1 insert fail in 1 thread ... rest of them will commit ??)
  • So export is not an atomic operation , some commits will be visible before others.

scoop export --connect jdbc:mysql://localhost/DBNAME -username root 
-password root --export-dir /input/abc --table test 
--fields-terminated-by "," --columns "id,name,age"  

can use -update-key as well

scoop export--create --connect jdbc:mysql://localhost/DBNAME -username root
-password root --export-dir /input/abc --table test --fields-terminated-by "," 
--columns "id,name,age" --update-key id

Sqoop : Incremental Imports

Incremental import is a technique that imports only the newly added rows in a table. It is required to add ‘incremental’, ‘check-column’, and ‘last-value’ options to perform the incremental import.

Following Syntax
--check-column <column name> : tells which column will be used to check that its incremental value.
--last value <last check column value&gt : tells maximum value of the check parameter.

If you run the sqoop import in 'last modified' mode then there is an assumption that there is last


Consider a table with 3 records which you already imported to hdfs using sqoop

| sid  | city       | state    | rank | rDate      |
|  101 | Chicago    | Illinois |    1 | 2014-01-25 |
|  101 | Schaumburg | Illinois |    3 | 2014-01-25 |
|  101 | Columbus   | Ohio     |    7 | 2014-01-25 |

sqoop import --connect jdbc:mysql://localhost:3306/ydb --table yloc --username root -P

Now you have additional records in the table but no updates on existing records
| sid  | city       | state    | rank | rDate      |
|  101 | Chicago    | Illinois |    1 | 2014-01-25 |
|  101 | Schaumburg | Illinois |    3 | 2014-01-25 |
|  101 | Columbus   | Ohio     |    7 | 2014-01-25 |
|  103 | Charlotte  | NC       |    9 | 2013-04-22 |
|  103 | Greenville | SC       |    9 | 2013-05-12 |
|  103 | Atlanta    | GA       |   11 | 2013-08-21 |
Here you should use an --incremental append with --check-column which specifies the column to be examined when determining which rows to import.
sqoop import --connect jdbc:mysql://localhost:3306/ydb --table yloc --username root
 -P --check-column rank --incremental append --last-value 7

The above code will insert all the new rows based on the last value. Now we can think of second case where there are updates in rows

| sid  | city       | state    | rank | rDate      |
|  101 | Chicago    | Illinois |    1 | 2015-01-01 |
|  101 | Schaumburg | Illinois |    3 | 2014-01-25 |
|  101 | Columbus   | Ohio     |    7 | 2014-01-25 |
|  103 | Charlotte  | NC       |    9 | 2013-04-22 |
|  103 | Greenville | SC       |    9 | 2013-05-12 |
|  103 | Atlanta    | GA       |   11 | 2013-08-21 |
|  104 | Dallas     | Texas    |    4 | 2015-02-02 |
|  105 | Phoenix    | Arzona   |   17 | 2015-02-24 |

Here we use incremental lastmodified where we will fetch all the updated rows based on date.
sqoop import --connect jdbc:mysql://localhost:3306/ydb --table yloc --username root -P 
--check-column rDate --incremental lastmodified --last-value 2014-01-25 --target-dir yloc/loc
Nice Example
Example # 2

sqoop import --connect jdbc:mysql://localhost/retail_db --username retail_dba
--password cloudera --table products --target-dir /lav/sqoop/retail_db1 
--check-column product_id --incremental lastmodified --last-value 10

Tried running in lastmodified mode with column which is not time stamp.
Got a runtime exception

17/07/30 04:10:48 ERROR manager.SqlManager: Column type is neither timestamp nor date!
17/07/30 04:10:48 ERROR sqoop.Sqoop: Got exception running Sqoop: java.lang.RuntimeException: Column type is neither timestamp nor date!
java.lang.RuntimeException: Column type is neither timestamp nor date!
 at org.apache.sqoop.manager.ConnManager.datetimeToQueryString(
 at org.apache.sqoop.tool.ImportTool.initIncrementalConstraints(
 at org.apache.sqoop.tool.ImportTool.importTable(
 at org.apache.sqoop.Sqoop.runSqoop(
 at org.apache.sqoop.Sqoop.runTool(
 at org.apache.sqoop.Sqoop.runTool(
 at org.apache.sqoop.Sqoop.main(

Scoop Jobs
Can use 'scoop job --create' to create jobs for incremental imports , so that we can run it in future by just specifying new values.

scoop job --create job_name_1  

running this command will not actually import / export any data , but will just create a job.
To see a list of scoop jobs
scoop job --list 

can see additional details
scoop job --show 

can execute
scoop job --exec 

Scoop remembers the last highest values , and automatically uses values ahead of it.
it stores the last highest value in meta data.

Where does it store last incremented value ?

Note its on local file system , and not on hdfs.

Sqoop : Hive

Scoop CLI automatically creates Hive MetaData

  • Sqoop requires hive meta table to be configured. ( /user/hive/warehouse/emp_table )
  • automatically executes a CREATE TABLE command in Hive.
  • automatically executes a LOAD DATA INPATH command in hive to move data into hive.
  • To import into hive using sqoop , in end just add --hive-import option
  • can also specify --hive-overrite to replace existing hive table.
  • if you want to keep hive table name different from source table name , use --hive-table option.
  • For all hive related commands , sqoop will execute a local hive client from the  machine where sqoop is running , use --hive-home to specify the hive binary.

Sqoop : CLI

It's basically used for RDBMS / Hadoop data copy
It can be used to copy from and to RDBMS as needed.

Use Case # 1 ELT

Use Case # 2 : ETL for DWG
Use Case # 3 : Data Analysis
As RDBMS are not very scalable.Hadoop is.

Use Case # 4 : Data Archival
Use Case # 5 : Move Reports to Hadoop
Reports if they are too demanding.
Reports on archival data.

Note that if you choose to create HIVE tables with same schema as RDBMS then any existing reports will just work, by just modifying a connection string.

Use Case # 6 : Data Consolidation
Save all data of an org in Hadoop.

Scoop can be installed in
  1. Client based model : execute command line
  2. Server model : Can be deployed in service or server model.
Here is how scoop works
  1. Execute sqoop cli with RDBMS connection tables , hadoop details , table names
  2. Examine structure of tables in RDBMS
  3. Generate java class to import data from RDBMS to Hadoop
  4. Execute map only job
  • sqoop import --connect jdbc:mysql://mysql_server/db_name --username foo --table myTable1
  • sqoop --options-file /user/loc/abc.txt --table=my_table1
  • use -P instead of --password in order to read password from console instead of props file.
sqoop import --connect jdbc:mysql://localhost/retail_db --username retail_dba -P --table products --target-dir
if you dont give --target-dir by default data will reside in user home directory
to check "hdfs dfs -cat /dir/name"

can import columns of table
sqoop import --connect jdbc:mysql://localhost/retail_db --username retail_dba -P --table products --target-dir --columns "one,two,three"

can import subset using where condition

$ sqoop import \
--connect jdbc:mysql://localhost/userdb \
--username root \
--table emp_add \
--m 1 \
--where “city =’sec-bad’” \
--target-dir /folder/name1

Where Clause
Sqoop import can also contain a where clause.
It must contain '$CONDITIONS' in WHERE clause.

if you give a query like

sqoop import --connect jdbc:teradata://192.168.xx.xx/DBS_PORT=1025,DATABASE=ds_tbl_db 
--driver com.teradata.jdbc.TeraDriver --username dbc --password dbc 
--query 'select * from reason where id>20' --hive-import
--hive-table reason_hive --target-dir <hdfs-location> -m 1

you get error
Query [select * from reason where id>20] must contain '$CONDITIONS' in WHERE clause

you have to add AND \$CONDITIONS
--query "select * from reason where id>20 AND \$CONDITIONS"

Sqoop requires to access metadata of table for example column type information. Placeholder $CONDITIONS is by default set to '1 = 0' to ensure that sqoop receives only type information. So,
  1. after executing sqoop command
  2. you will see first query that gets fired is with default $CONDITIONS.
  3. Later on, it is substituted by different values defining different ranges based on number of mappers (-m) or --split-by column or --boundary-query so that entire data set can be divided into different data slices or chunks and chunks can be imported in parallel with as much as concurrency available. 
  4. Sqoop will automatically substitute this placeholder with the generated conditions specifying which slice of data should be transferred by each individual task
Can force sqoop to skip parallel run by skipping $CONDITIONS & setting -numMapper=1. but this will cause MR to run sequentially.

split-by is used to slicing your data into multiple parallel tasks. Usually defaults to primary key of the main table.

Scoop CLI Parallel Import

This can greatly improve the performance of the import.
Default value for parallel data import is 4.
Uses splitting column magic.

but do not give value of parallel resource which is greater than number of available resources. this will hurt the performance greatly.

What scoop does is
  • Identify primary key from DB
  • get low and high values of it from DB
  • it will divide into 4 ( or more ) select queries ... it will give minimum & max values accordingly.
  • Each map operates on evenly sized data set
  •  sqoop cannot split on more than 1 column