Sqoopを用いてE-MapReduceにデータを取り込む

こんにちは、Kouです。

既存のSQLテーブルに蓄積された情報をHadoopにインポートするため、Sqoopと呼ばれるツールがよく使われています。今回の記事は、E-MapReduceのHadoopクラスターとRDSまたはオンプレミスDBの間のデータインポートについて、ご説明させて頂きたいと思います。

  • 前提
  • EMR-3.16.0
  • クラスタータイプは Hadoop
  • Sqoop 1.4.7
  • ハードウェア構成(Header)はecs.sn1ne.2xlargeを1台
  • ハードウェア構成(Worker)はecs.sn1ne.2xlargeを3台
# cat /etc/redhat-release
CentOS Linux release 7.4.1708 (Core) 
# uname -r
3.10.0-693.2.2.el7.x86_64
# sqoop version
19/03/05 21:45:43 INFO sqoop.Sqoop: Running Sqoop version: 1.4.7
Sqoop 1.4.7
git commit id d2782d7745c561584cc8948b2af90bd6a668bab1
Compiled by willwu on Tue Jun 5 10:04:04 CST 2018
# hadoop version
Hadoop 2.7.2
Subversion http://gitlab.alibaba-inc.com/soe/emr-hadoop.git -r d2cd70f951304b8ca3d12991262e7e0d321abefc
Compiled by root on 2018-11-30T09:31Z
Compiled with protoc 2.5.0
From source with checksum 4447ed9f24dcd981df7daaadd5bafc0
This command was run using /opt/apps/ecm/service/hadoop/2.7.2-1.3.2/package/hadoop-2.7.2-1.3.2/share/hadoop/common/hadoop-common-2.7.2.jar
  • データソースの確認

まず、作業開始の前に、E-MapReduceのクラスターと同じリージョンのRDSにtestdbという名前のデータベースを作成しました。SSH を使用してヘッドノードに接続し、下記のsqoopコマンドを実行します。

sqoop eval \
> --connect jdbc:mysql://rm-8dxq35vzblxx01k706o.mysql.rds.aliyuncs.com/testdb \
> --username your-username \
> -P \
> --query "SELECT * FROM people_info LIMIT 10"

このコマンドは、people_infoという名前のテーブルの10行を一覧に表示します。

19/03/06 12:16:57 INFO sqoop.Sqoop: Running Sqoop version: 1.4.7
19/03/06 12:16:57 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
19/03/06 12:16:57 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/apps/ecm/service/hadoop/2.7.2-1.3.2/package/hadoop-2.7.2-1.3.2/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/apps/ecm/service/hive/2.3.3-1.0.3/package/apache-hive-2.3.3-1.0.3-bin/lib/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
-------------------------------------------------------------------------------------------------------------------------------------------------
| dt | lat | lng | gender | age | citycode | id | 
-------------------------------------------------------------------------------------------------------------------------------------------------
| 702 | 36.180109 | 139.296344 | 2 | 30 | 11218 | 1 | 
| 702 | 35.848949 | 139.669779 | 2 | 0 | 11108 | 2 | 
| 702 | 35.94997 | 139.560135 | 1 | 50 | 11219 | 3 | 
| 702 | 35.905132 | 139.711792 | 1 | 30 | 11109 | 4 | 
| 702 | 35.808296 | 139.845308 | 1 | 50 | 11234 | 5 | 
| 702 | 35.893407 | 139.847298 | 2 | 20 | 11243 | 6 | 
| 702 | 35.866713 | 139.398765 | 1 | 20 | 11215 | 7 | 
| 702 | 36.069465 | 139.307003 | 1 | 30 | 11342 | 8 | 
| 702 | 35.978813 | 139.080887 | 1 | 30 | 11207 | 9 | 
| 702 | 35.904396 | 139.407686 | 1 | 60 | 11201 | 10 | 
-------------------------------------------------------------------------------------------------------------------------------------------------
  • HDFSへのインポート

次のコマンドを使用して、HDFSの/user/sqoop_test/ディレクトリにデータをインポートします。

sqoop import \
--connect jdbc:mysql://rm-8dxq35vzblxx01k706o.mysql.rds.aliyuncs.com/testdb \
--username your-username \
-P \
--table people_info \
--target-dir /user/sqoop_test/

インポートが完了したら、次のコマンドを使用して、新しいディレクトリのデータを列挙します。

# hadoop fs -ls /user/sqoop_test/
Found 5 items
-rw-r-----   2 root hadoop          0 2019-03-06 11:45 /user/sqoop_test/_SUCCESS
-rw-r-----   2 root hadoop       3867 2019-03-06 11:42 /user/sqoop_test/part-m-00000
-rw-r-----   2 root hadoop       3964 2019-03-06 11:45 /user/sqoop_test/part-m-00001
-rw-r-----   2 root hadoop       3976 2019-03-06 11:45 /user/sqoop_test/part-m-00002
-rw-r-----   2 root hadoop       3974 2019-03-06 11:42 /user/sqoop_test/part-m-00003

Job Historyを確認すると、デフォルトの4つのMapperが実行されることが分かりました。

  • Hiveへのインポート

Hive の sqoop_testテーブルにデータをインポートするには、次のコマンドを使用します。

sqoop import \
--connect jdbc:mysql://rm-8dxq35vzblxx01k706o.mysql.rds.aliyuncs.com/testdb \
--username your-username \
-P \
--table people_info \
--hive-import \
--hive-database db1 \
--hive-table sqoop_test \
--target-dir /user/foo/ 

インポートが完了したら、Hiveでテーブルデータを確認することもできます。

hive> show tables;
OK
sqoop_test
Time taken: 0.113 seconds, Fetched: 18 row(s)

hive> select * from sqoop_test limit 10;
OK
702 36.180109 139.296344 2 30 11218 1
702 35.848949 139.669779 2 0 11108 2
702 35.94997 139.560135 1 50 11219 3
702 35.905132 139.711792 1 30 11109 4
702 35.808296 139.845308 1 50 11234 5
702 35.893407 139.847298 2 20 11243 6
702 35.866713 139.398765 1 20 11215 7
702 36.069465 139.307003 1 30 11342 8
702 35.978813 139.080887 1 30 11207 9
702 35.904396 139.407686 1 60 11201 10
Time taken: 1.233 seconds, Fetched: 10 row(s)

  • 最後

皆さん、いかがでしたでしょう、RDBMSからHadoopにデータを取り込み場合、Sqoopを使ってデータを取り込むことが一般的です。また、Sqoopを利用して、HDFS、HiveやOSSからRDSへのエクスポートすることも可能ですので、別の機会でご説明させて頂ければと思います。