永发信息网

hadoop怎么使用算法

答案:1  悬赏:20  手机版
解决时间 2021-03-08 05:48
  • 提问者网友:轮囘Li巡影
  • 2021-03-07 05:15
hadoop怎么使用算法
最佳答案
  • 五星知识达人网友:孤独的牧羊人
  • 2021-03-07 06:02
实例一、对以下数据进行排序,根据收入减去支出得到最后结余从大到小排序,数据如下:


SumStep运行之后结果如下:


SortStep运行之后结果为上图根据结余从大到小排序。

代码如下:
[java] view plain copy
public class InfoBean implements WritableComparable{  
private String account;  
private double income;  
private double expenses;  
private double surplus;  
public void set(String account, double income, double expenses){  
this.account = account;  
this.income = income;  
this.expenses = expenses;  
this.surplus = income - expenses;  
}  
@Override  
public String toString() {  
return this.income + " " + this.expenses + " " + this.surplus;  
}  
  
public void write(DataOutput out) throws IOException {  
out.writeUTF(account);  
out.writeDouble(income);  
out.writeDouble(expenses);  
out.writeDouble(surplus);  
}  
  
public void readFields(DataInput in) throws IOException {  
this.account = in.readUTF();  
this.income = in.readDouble();  
this.expenses = in.readDouble();  
this.surplus = in.readDouble();  
}  
public int compareTo(InfoBean o) {  
if(this.income == o.getIncome()){  
return this.expenses > o.getExpenses() ? 1 : -1;   
} else {  
return this.income > o.getIncome() ? -1 : 1;  
}  
}  
public String getAccount() {  
return account;  
}  
public void setAccount(String account) {  
this.account = account;  
}  
public double getIncome() {  
return income;  
}  
public void setIncome(double income) {  
this.income = income;  
}  
public double getExpenses() {  
return expenses;  
}  
public void setExpenses(double expenses) {  
this.expenses = expenses;  
}  
public double getSurplus() {  
return surplus;  
}  
public void setSurplus(double surplus) {  
this.surplus = surplus;  
}  
}  
[java] view plain copy
public class SumStep {  
public static void main(String[] args) throws Exception {  
Configuration conf = new Configuration();  
Job job = Job.getInstance(conf);  
job.setJarByClass(SumStep.class);  
job.setMapperClass(SumMapper.class);  
job.setMapOutputKeyClass(Text.class);  
job.setMapOutputValueClass(InfoBean.class);  
FileInputFormat.setInputPaths(job, new Path(args[0]));  
job.setReducerClass(SumReducer.class);  
job.setOutputKeyClass(Text.class);  
job.setOutputValueClass(InfoBean.class);  
FileOutputFormat.setOutputPath(job, new Path(args[1]));  
job.waitForCompletion(true);  
}  
public static class SumMapper extends Mapper{  
private InfoBean bean = new InfoBean();  
private Text k = new Text();  
@Override  
protected void map(LongWritable key, Text value, Context context)  
throws IOException, InterruptedException {  
// split   
String line = value.toString();  
String[] fields = line.split(" ");  
// get useful field  
String account = fields[0];  
double income = Double.parseDouble(fields[1]);  
double expenses = Double.parseDouble(fields[2]);  
k.set(account);  
bean.set(account, income, expenses);  
context.write(k, bean);  
}  
}  
public static class SumReducer extends Reducer{  
private InfoBean bean = new InfoBean();  
@Override  
protected void reduce(Text key, Iterable v2s, Context context)  
throws IOException, InterruptedException {  
double in_sum = 0;  
double out_sum = 0;  
for(InfoBean bean : v2s){  
in_sum += bean.getIncome();  
out_sum += bean.getExpenses();  
}  
bean.set("", in_sum, out_sum);  
context.write(key, bean);  
}  
}  
}  

此处的输入为SumStep的输出而不是源文件作为输入,当然也可以将两个job合并到一起执行,此处不再讨论。

[java] view plain copy
public class SortStep {  
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {  
Configuration conf = new Configuration();  
Job job = Job.getInstance(conf);  
job.setJarByClass(SortStep.class);  
job.setMapperClass(SortMapper.class);  
job.setMapOutputKeyClass(InfoBean.class);  
job.setMapOutputValueClass(NullWritable.class);  
FileInputFormat.setInputPaths(job, new Path(args[0]));  
job.setReducerClass(SortReducer.class);  
job.setOutputKeyClass(Text.class);  
job.setOutputValueClass(InfoBean.class);  
FileOutputFormat.setOutputPath(job, new Path(args[1]));  
job.waitForCompletion(true);  
}  
public static class SortMapper extends Mapper{  
private InfoBean bean = new InfoBean();  
@Override  
protected void map(LongWritable key, Text value, Context context)  
throws IOException, InterruptedException {  
String line = value.toString();  
String[] fields = line.split(" ");  
String account = fields[0];  
double income = Double.parseDouble(fields[1]);  
double expenses = Double.parseDouble(fields[2]);  
bean.set(account, income, expenses);  
context.write(bean, NullWritable.get());  
}  
}  
public static class SortReducer extends Reducer{  
private Text k = new Text();  
@Override  
protected void reduce(InfoBean bean, Iterable v2s, Context context)  
throws IOException, InterruptedException {  
String account = bean.getAccount();  
k.set(account);  
context.write(k, bean);  
}  
}  
}  

实例二、倒排索引,过程如下:
[plain] view plain copy
Map阶段  
<0,"hello tom">  
....  
context.write("hello->a.txt",1);  
context.write("hello->a.txt",1);  
context.write("hello->a.txt",1);  
context.write("hello->a.txt",1);  
context.write("hello->a.txt",1);  
context.write("hello->b.txt",1);  
context.write("hello->b.txt",1);  
context.write("hello->b.txt",1);  
--------------------------------------------------------  
combiner阶段  
<"hello->a.txt",1>  
<"hello->a.txt",1>  
<"hello->a.txt",1>  
<"hello->a.txt",1>  
<"hello->a.txt",1>  
<"hello->b.txt",1>  
<"hello->b.txt",1>  
<"hello->b.txt",1>  
context.write("hello","a.txt->5");  
context.write("hello","b.txt->3");  
--------------------------------------------------------  
Reducer阶段  
<"hello",{"a.txt->5","b.txt->3"}>  
context.write("hello","a.txt->5 b.txt->3");  
-------------------------------------------------------  
hello   "a.txt->5 b.txt->3"  
tom     "a.txt->2 b.txt->1"  
kitty   "a.txt->1"  
.......  
代码如下:
[java] view plain copy
public class InverseIndex {  
public static void main(String[] args) throws Exception {  
Configuration conf = new Configuration();  
Job job = Job.getInstance(conf);  
//设置jar  
job.setJarByClass(InverseIndex.class);  
//设置Mapper相关的属性  
job.setMapperClass(IndexMapper.class);  
job.setMapOutputKeyClass(Text.class);  
job.setMapOutputValueClass(Text.class);  
FileInputFormat.setInputPaths(job, new Path(args[0]));//words.txt  
//设置Reducer相关属性  
job.setReducerClass(IndexReducer.class);  
job.setOutputKeyClass(Text.class);  
job.setOutputValueClass(Text.class);  
FileOutputFormat.setOutputPath(job, new Path(args[1]));  
job.setCombinerClass(IndexCombiner.class);  
//提交任务  
job.waitForCompletion(true);  
}  
public static class IndexMapper extends Mapper{  
private Text k = new Text();  
private Text v = new Text();  
@Override  
protected void map(LongWritable key, Text value,  
Mapper.Context context)  
throws IOException, InterruptedException {  
String line = value.toString();  
String[] fields = line.split(" ");  
FileSplit inputSplit = (FileSplit) context.getInputSplit();  
Path path = inputSplit.getPath();  
String name = path.getName();  
for(String f : fields){  
k.set(f + "->" + name);  
v.set("1");  
context.write(k, v);  
}  
}  
}  
public static class IndexCombiner extends Reducer{  
private Text k = new Text();  
private Text v = new Text();  
@Override  
protected void reduce(Text key, Iterable values,  
Reducer.Context context)  
throws IOException, InterruptedException {  
String[] fields = key.toString().split("->");  
long sum = 0;  
for(Text t : values){  
sum += Long.parseLong(t.toString());  
}  
k.set(fields[0]);  
v.set(fields[1] + "->" + sum);  
context.write(k, v);  
}  
}  
public static class IndexReducer extends Reducer{  
private Text v = new Text();  
@Override  
protected void reduce(Text key, Iterable values,  
我要举报
如以上回答内容为低俗、色情、不良、暴力、侵权、涉及违法等信息,可以点下面链接进行举报!
点此我要举报以上问答信息
大家都在看
推荐资讯