1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
| import java.io.IOException;
import java.util.Random;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.util.Iterator;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.cn.smart.SmartChineseAnalyzer;
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
import org.apache.lucene.analysis.util.CharArraySet;
import org.apache.lucene.util.Version;
public class WordCountOrder {
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
/*
String[] self_stop_words = { "的", "在","了", "呢", "是"};
CharArraySet cas = new CharArraySet(0, true);
for(int i = 0; i < self_stop_words.length; i++) {
cas.add(self_stop_words[i]);
}
// 加入系统默认停用词
Iterator<Object> itor = SmartChineseAnalyzer.getDefaultStopSet().iterator();
while (itor.hasNext()) {
cas.add(itor.next());
}
*/
// 中英文混合分词器
SmartChineseAnalyzer sca = new SmartChineseAnalyzer();
//SmartChineseAnalyzer sca = new SmartChineseAnalyzer(cas);
TokenStream ts = sca.tokenStream("field", value.toString());
CharTermAttribute ch = ts.addAttribute(CharTermAttribute.class);
ts.reset();
while (ts.incrementToken()) {
word.set(ch.toString());
context.write(word, one);
}
ts.end();
ts.close();
}
}
public static class IntSumReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
private static class IntWritableDecreasingComparator extends IntWritable.Comparator {
public int compare(WritableComparable a, WritableComparable b) {
return -super.compare(a, b);
}
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return -super.compare(b1, s1, l1, b2, s2, l2);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Path tempDir = new Path("wordcount-temp-" + Integer.toString(
new Random().nextInt(Integer.MAX_VALUE))); //定义一个临时目录
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCountOrder.class);
try{
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, tempDir);
//先将词频统计任务的输出结果写到临时目录中, 下一个排序任务以临时目录为输入目录。
job.setOutputFormatClass(SequenceFileOutputFormat.class);
if(job.waitForCompletion(true)){ //当word count结束
Job sortJob = Job.getInstance(conf, "sort");
sortJob.setJarByClass(WordCountOrder.class);
FileInputFormat.addInputPath(sortJob, tempDir);
sortJob.setInputFormatClass(SequenceFileInputFormat.class);
sortJob.setMapperClass(InverseMapper.class);
//InverseMapper作用是实现map()之后的数据对的key和value交换
sortJob.setNumReduceTasks(1);
// Reducer 的个数限定为1, 最终输出的结果文件就是一个
FileOutputFormat.setOutputPath(sortJob, new Path(otherArgs[1]));
sortJob.setOutputKeyClass(IntWritable.class);
sortJob.setOutputValueClass(Text.class);
/* Hadoop 默认对 IntWritable 按升序排序,而我们需要的是按降序排列。
* 因此实现了 IntWritableDecreasingComparator 类,
* 并指定使用这个自定义的 Comparator 类对输出结果中的 key (词频)进行排序*/
sortJob.setSortComparatorClass(IntWritableDecreasingComparator.class);
System.exit(sortJob.waitForCompletion(true) ? 0 : 1);
}
}finally{
FileSystem.get(conf).deleteOnExit(tempDir);
}
}
}
|