hadoop
MapReduceの紹介
サーチ…
構文
この例を実行するには、コマンドの構文は次のとおりです。
bin/hadoop jar hadoop-*-examples.jar wordcount [-m <#maps>] [-r <#reducers>] <in-dir> <out-dir>
HDFSにデータをコピーするには(ローカルから):
bin/hadoop dfs -mkdir <hdfs-dir> //not required in hadoop 0.17.2 and later bin/hadoop dfs -copyFromLocal <local-dir> <hdfs-dir>
備考
HadoopでMapReduceを使用するワードカウントプログラム。
ワードカウントプログラム(JavaおよびPythonで)
単語カウントプログラムはMapReduceの "Hello World"プログラムに似ています。
Hadoop MapReduceは、信頼性の高いフォールトトレラントな方法で、大規模なクラスタ(数千ノード)のコモディティハードウェアに並行して膨大な量のデータ(マルチテラバイトのデータセット)を処理するアプリケーションを簡単に作成するためのソフトウェアフレームワークです。
MapReduceジョブは、通常、入力データセットを完全な並列方法でマップタスクによって処理される独立したチャンクに分割します。フレームワークはマップの出力をソートし、次にそれらを縮小タスクに入力します。通常、ジョブの入力と出力の両方がファイルシステムに格納されます。フレームワークはタスクのスケジューリング、タスクの監視、失敗したタスクの再実行を行います。
ワードカウントの例:
WordCountの例では、テキストファイルを読み込み、単語の出現頻度を数えます。入力はテキストファイルで、出力はテキストファイルで、各行には単語とその発生頻度のカウントがタブで区切られています。
各マッパーは行を入力として取り、それを単語に分解します。次に、単語のキー/値のペアを出力し、各レデューサは各単語のカウントを合計し、単語と合計で単一のキー/値を出力します。
最適化として、減速器はマップ出力上のコンバイナとしても使用されます。これにより、各単語を1つのレコードにまとめることで、ネットワーク経由で送信されるデータ量が削減されます。
ワードカウントコード:
package org.myorg;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class WordCount {
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
context.write(word, one);
}
}
}
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "wordcount");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
この例を実行するには、コマンドの構文は次のとおりです。
bin/hadoop jar hadoop-*-examples.jar wordcount [-m <#maps>] [-r <#reducers>] <in-dir> <out-dir>
入力ディレクトリ内のすべてのファイル(上のコマンドラインのin-dir)が読み込まれ、入力内の単語の数が出力ディレクトリに書き出されます(上記のout-dirと呼ばれます)。入力と出力の両方がHDFSに格納されていると仮定します。入力がまだHDFSにはなく、ローカルファイルシステムにある場合は、次のようなコマンドを使用してHDFSにデータをコピーする必要があります。
bin/hadoop dfs -mkdir <hdfs-dir> //not required in hadoop 0.17.2 and later
bin/hadoop dfs -copyFromLocal <local-dir> <hdfs-dir>
Pythonの単語数の例:
mapper.py
import sys
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into words
words = line.split()
# increase counters
for word in words:
print '%s\t%s' % (word, 1)
reducer.py
import sys
current_word = None
current_count = 0
word = None
for line in sys.stdin:
# remove leading and trailing whitespaces
line = line.strip()
# parse the input we got from mapper.py
word, count = line.split('\t', 1)
# convert count (currently a string) to int
try:
count = int(count)
except ValueError:
# count was not a number, so silently
# ignore/discard this line
continue
if current_word == word:
current_count += count
else:
if current_word:
print '%s\t%s' % (current_word, current_count)
current_count = count
current_word = word
if current_word == word:
print '%s\t%s' % (current_word, current_count)
上記のプログラムは、 cat filename.txt | python mapper.py | sort -k1,1 | python reducer.py
を使用して実行できcat filename.txt | python mapper.py | sort -k1,1 | python reducer.py