hadoop
Введение в MapReduce
Поиск…
Синтаксис
Для запуска примера синтаксис команды:
bin/hadoop jar hadoop-*-examples.jar wordcount [-m <#maps>] [-r <#reducers>] <in-dir> <out-dir>
Чтобы скопировать данные в HDFS (из локального):
bin/hadoop dfs -mkdir <hdfs-dir> //not required in hadoop 0.17.2 and later bin/hadoop dfs -copyFromLocal <local-dir> <hdfs-dir>
замечания
Word Count, используя MapReduce в Hadoop.
Программа подсчета слов (в Java и Python)
Программа подсчета слов похожа на программу Hello World в MapReduce.
Hadoop MapReduce - это программная среда для удобного написания приложений, которые обрабатывают огромное количество данных (многотабайтные наборы данных) в параллельном режиме на больших кластерах (тысячи узлов) товарного оборудования надежным, отказоустойчивым способом.
Задача MapReduce обычно разбивает входные данные на независимые фрагменты, которые обрабатываются задачами карты полностью параллельным образом. Структура сортирует выходные данные карт, которые затем вводятся в задачи сокращения. Как правило, вход и выход задания хранятся в файловой системе. Рамка заботится о планировании задач, контролирует их и повторно выполняет неудавшиеся задачи.
Пример подсчета слов:
Пример WordCount читает текстовые файлы и подсчитывает, как часто встречаются слова. Ввод - текстовые файлы, а вывод - текстовые файлы, каждая строка которых содержит слово и количество, как часто это происходило, разделенные вкладкой.
Каждый картограф берет строку в качестве входных данных и разбивает ее на слова. Затем он испускает пару ключ / значение слова, и каждый редуктор суммирует подсчеты для каждого слова и испускает один ключ / значение со словом и суммой.
В качестве оптимизации редуктор также используется в качестве объединителя на выходах карты. Это уменьшает количество данных, отправляемых по сети, путем объединения каждого слова в одну запись.
Количество слов:
package org.myorg;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class WordCount {
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
context.write(word, one);
}
}
}
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "wordcount");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
Для запуска примера синтаксис команды:
bin/hadoop jar hadoop-*-examples.jar wordcount [-m <#maps>] [-r <#reducers>] <in-dir> <out-dir>
Все файлы в каталоге ввода (называемые in-dir в командной строке выше) считываются, а количество слов на входе записывается в выходной каталог (вызывается out-dir выше). Предполагается, что оба входа и выхода хранятся в HDFS. Если ваш вход еще не находится в HDFS, но где-то в локальной файловой системе, вам нужно скопировать данные в HDFS с помощью следующей команды:
bin/hadoop dfs -mkdir <hdfs-dir> //not required in hadoop 0.17.2 and later
bin/hadoop dfs -copyFromLocal <local-dir> <hdfs-dir>
Пример примера Word в Python:
mapper.py
import sys
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into words
words = line.split()
# increase counters
for word in words:
print '%s\t%s' % (word, 1)
reducer.py
import sys
current_word = None
current_count = 0
word = None
for line in sys.stdin:
# remove leading and trailing whitespaces
line = line.strip()
# parse the input we got from mapper.py
word, count = line.split('\t', 1)
# convert count (currently a string) to int
try:
count = int(count)
except ValueError:
# count was not a number, so silently
# ignore/discard this line
continue
if current_word == word:
current_count += count
else:
if current_word:
print '%s\t%s' % (current_word, current_count)
current_count = count
current_word = word
if current_word == word:
print '%s\t%s' % (current_word, current_count)
Вышеупомянутая программа может быть запущена с помощью cat filename.txt | python mapper.py | sort -k1,1 | python reducer.py