hadoop
Einführung in MapReduce
Suche…
Syntax
Um das Beispiel auszuführen, lautet die Befehlssyntax:
bin/hadoop jar hadoop-*-examples.jar wordcount [-m <#maps>] [-r <#reducers>] <in-dir> <out-dir>
So kopieren Sie Daten in HDFS (von lokal):
bin/hadoop dfs -mkdir <hdfs-dir> //not required in hadoop 0.17.2 and later bin/hadoop dfs -copyFromLocal <local-dir> <hdfs-dir>
Bemerkungen
Word Count-Programm mit MapReduce in Hadoop.
Word Count-Programm (in Java und Python)
Das Wortzählprogramm ist wie das Programm "Hello World" in MapReduce.
Hadoop MapReduce ist ein Software-Framework für das einfache Schreiben von Anwendungen, das große Mengen von Daten (Datensätze mit mehreren Terabyte) parallel auf großen Clustern (Tausende von Knoten) von Standardhardware zuverlässig und fehlertolerant verarbeitet.
Ein MapReduce-Job teilt normalerweise den Eingabedatensatz in unabhängige Blöcke auf, die von den Map-Tasks vollständig parallel verarbeitet werden. Das Framework sortiert die Ausgaben der Karten, die dann in die reduzierten Aufgaben eingegeben werden. In der Regel werden sowohl die Eingabe als auch die Ausgabe des Jobs in einem Dateisystem gespeichert. Das Framework sorgt für die Planung von Aufgaben, deren Überwachung und führt die fehlgeschlagenen Aufgaben erneut aus.
Word Count Beispiel:
Das WordCount-Beispiel liest Textdateien und zählt, wie oft Wörter auftreten. Die Eingabe sind Textdateien und die Ausgabe sind Textdateien, von denen jede ein Wort und die Anzahl der Vorkommen enthält, die durch ein Tabulatorzeichen getrennt sind.
Jeder Mapper nimmt eine Zeile als Eingabe und zerlegt diese in Worte. Es gibt dann ein Schlüssel / Wert-Paar des Wortes aus, und jeder Reduzierer summiert die Zählungen für jedes Wort und gibt einen einzelnen Schlüssel / Wert mit dem Wort und der Summe aus.
Zur Optimierung wird der Reduzierer auch als Combiner für die Kartenausgänge verwendet. Dies reduziert die Datenmenge, die über das Netzwerk gesendet wird, indem jedes Wort zu einem einzigen Datensatz zusammengefasst wird.
Word Count Code:
package org.myorg;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class WordCount {
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
context.write(word, one);
}
}
}
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "wordcount");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
Um das Beispiel auszuführen, lautet die Befehlssyntax:
bin/hadoop jar hadoop-*-examples.jar wordcount [-m <#maps>] [-r <#reducers>] <in-dir> <out-dir>
Alle Dateien im Eingabeverzeichnis (in der Befehlszeile oben in-dir genannt) werden gelesen, und die Anzahl der Wörter in der Eingabe wird in das Ausgabeverzeichnis (oben in das Ausgabeverzeichnis) geschrieben. Es wird davon ausgegangen, dass sowohl Eingaben als auch Ausgaben in HDFS gespeichert sind. Wenn Ihre Eingabe nicht bereits in HDFS, sondern in einem lokalen Dateisystem vorhanden ist, müssen Sie die Daten mit einem Befehl wie folgt in HDFS kopieren:
bin/hadoop dfs -mkdir <hdfs-dir> //not required in hadoop 0.17.2 and later
bin/hadoop dfs -copyFromLocal <local-dir> <hdfs-dir>
Word Count-Beispiel in Python:
mapper.py
import sys
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into words
words = line.split()
# increase counters
for word in words:
print '%s\t%s' % (word, 1)
reducer.py
import sys
current_word = None
current_count = 0
word = None
for line in sys.stdin:
# remove leading and trailing whitespaces
line = line.strip()
# parse the input we got from mapper.py
word, count = line.split('\t', 1)
# convert count (currently a string) to int
try:
count = int(count)
except ValueError:
# count was not a number, so silently
# ignore/discard this line
continue
if current_word == word:
current_count += count
else:
if current_word:
print '%s\t%s' % (current_word, current_count)
current_count = count
current_word = word
if current_word == word:
print '%s\t%s' % (current_word, current_count)
Das obige Programm kann mit cat filename.txt | python mapper.py | sort -k1,1 | python reducer.py