こんにちは、Kouです。
みなさま、先日E-MapReduceのMetaServiceをご紹介致しましたが、今回引き続きMetaServiceを利用しつつ、E-MapReduceでMapReduceジョブの実行方法をご紹介させて頂きたいと思います。
- 前提
- EMR-3.16.0
- クラスタータイプは Hadoop
- ハードウェア構成(Header)はecs.sn1ne.2xlargeを1台
- ハードウェア構成(Worker)はecs.sn1ne.2xlargeを3台
- 言語はJava 1.8.0_171
- WordCountジョブの実行
ここで気をつけなければいけないのは、オンプレミスのHadoop環境と違い、処理用のJavaプログラムと入力データの用意を全てOSS上で行う必要がありますので、まず、ローカルで処理用ソースをコンパイルして、maven系などのコマンドでjarファイルを作成します。
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class EmrWordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); } Job job = Job.getInstance(conf, "word count"); job.setJarByClass(EmrWordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); for (int i = 0; i < otherArgs.length - 1; ++i) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
次は、前の手順で準備したjarファイル(mr_job-2.0.jar)及び入力データ(test.txt)をOSSにアップロードします。test.txtデータの内容は以下となります。
Alibaba Cloud offers a integrated suite of cloud products and services that are reliable and secure to help you build cloud infrastructure, data centers in multi regions empower your global business
ジョブのデプロイにおいては、E-MapReduceのデータプラットフォームで行いますので、作業用のMRジョブを作成しました。ジョブの出力先は入力データと同じバケットのoutputディレクトリを指定しています。
上記画面の「実行」ボタンを押すと、ジョブの実行が始まります。ジョブの実行中のログを即座で確認することができます、またエラーが発生した場合も、ランタイムログでエラーメッセージを確認できます。
ジョブの処理結果がOSSに出力されますが、今回のReducerを事前に1つにしたので、出力ログ(part-r-00000)が1つとなります。
出力ログ(part-r-00000)をローカルにダウンロードして開くと、以下の結果となり、テキストファイルの単語数をきちんと数えてくれました。
Alibaba 1 Cloud 1 a 1 and 2 are 1 build 1 business 1 centers 1 cloud 2 data 1 empower 1 global 1 help 1 in 1 infrastructure 1 integrated 1 multi 1 of 1 offers 1 products 1 regions 1 reliable 1 secure 1 services 1 suite 1 that 1 to 1 you 1 your 1
- 最後
いかがでしたでしょうか。MapReduceジョブの実行方法の雰囲気をつかんでいただけましたでしょうか。今後とも、E-MapReduceの実際の利用例を皆さんに紹介していければと考えております。