hadoop怎么使用算法
答案:1 悬赏:20 手机版
解决时间 2021-03-08 05:48
- 提问者网友:轮囘Li巡影
- 2021-03-07 05:15
hadoop怎么使用算法
最佳答案
- 五星知识达人网友:孤独的牧羊人
- 2021-03-07 06:02
实例一、对以下数据进行排序,根据收入减去支出得到最后结余从大到小排序,数据如下:
SumStep运行之后结果如下:
SortStep运行之后结果为上图根据结余从大到小排序。
代码如下:
[java] view plain copy
public class InfoBean implements WritableComparable{
private String account;
private double income;
private double expenses;
private double surplus;
public void set(String account, double income, double expenses){
this.account = account;
this.income = income;
this.expenses = expenses;
this.surplus = income - expenses;
}
@Override
public String toString() {
return this.income + " " + this.expenses + " " + this.surplus;
}
public void write(DataOutput out) throws IOException {
out.writeUTF(account);
out.writeDouble(income);
out.writeDouble(expenses);
out.writeDouble(surplus);
}
public void readFields(DataInput in) throws IOException {
this.account = in.readUTF();
this.income = in.readDouble();
this.expenses = in.readDouble();
this.surplus = in.readDouble();
}
public int compareTo(InfoBean o) {
if(this.income == o.getIncome()){
return this.expenses > o.getExpenses() ? 1 : -1;
} else {
return this.income > o.getIncome() ? -1 : 1;
}
}
public String getAccount() {
return account;
}
public void setAccount(String account) {
this.account = account;
}
public double getIncome() {
return income;
}
public void setIncome(double income) {
this.income = income;
}
public double getExpenses() {
return expenses;
}
public void setExpenses(double expenses) {
this.expenses = expenses;
}
public double getSurplus() {
return surplus;
}
public void setSurplus(double surplus) {
this.surplus = surplus;
}
}
[java] view plain copy
public class SumStep {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(SumStep.class);
job.setMapperClass(SumMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(InfoBean.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
job.setReducerClass(SumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(InfoBean.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
public static class SumMapper extends Mapper{
private InfoBean bean = new InfoBean();
private Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// split
String line = value.toString();
String[] fields = line.split(" ");
// get useful field
String account = fields[0];
double income = Double.parseDouble(fields[1]);
double expenses = Double.parseDouble(fields[2]);
k.set(account);
bean.set(account, income, expenses);
context.write(k, bean);
}
}
public static class SumReducer extends Reducer{
private InfoBean bean = new InfoBean();
@Override
protected void reduce(Text key, Iterable v2s, Context context)
throws IOException, InterruptedException {
double in_sum = 0;
double out_sum = 0;
for(InfoBean bean : v2s){
in_sum += bean.getIncome();
out_sum += bean.getExpenses();
}
bean.set("", in_sum, out_sum);
context.write(key, bean);
}
}
}
此处的输入为SumStep的输出而不是源文件作为输入,当然也可以将两个job合并到一起执行,此处不再讨论。
[java] view plain copy
public class SortStep {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(SortStep.class);
job.setMapperClass(SortMapper.class);
job.setMapOutputKeyClass(InfoBean.class);
job.setMapOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
job.setReducerClass(SortReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(InfoBean.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
public static class SortMapper extends Mapper{
private InfoBean bean = new InfoBean();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split(" ");
String account = fields[0];
double income = Double.parseDouble(fields[1]);
double expenses = Double.parseDouble(fields[2]);
bean.set(account, income, expenses);
context.write(bean, NullWritable.get());
}
}
public static class SortReducer extends Reducer{
private Text k = new Text();
@Override
protected void reduce(InfoBean bean, Iterable v2s, Context context)
throws IOException, InterruptedException {
String account = bean.getAccount();
k.set(account);
context.write(k, bean);
}
}
}
实例二、倒排索引,过程如下:
[plain] view plain copy
Map阶段
<0,"hello tom">
....
context.write("hello->a.txt",1);
context.write("hello->a.txt",1);
context.write("hello->a.txt",1);
context.write("hello->a.txt",1);
context.write("hello->a.txt",1);
context.write("hello->b.txt",1);
context.write("hello->b.txt",1);
context.write("hello->b.txt",1);
--------------------------------------------------------
combiner阶段
<"hello->a.txt",1>
<"hello->a.txt",1>
<"hello->a.txt",1>
<"hello->a.txt",1>
<"hello->a.txt",1>
<"hello->b.txt",1>
<"hello->b.txt",1>
<"hello->b.txt",1>
context.write("hello","a.txt->5");
context.write("hello","b.txt->3");
--------------------------------------------------------
Reducer阶段
<"hello",{"a.txt->5","b.txt->3"}>
context.write("hello","a.txt->5 b.txt->3");
-------------------------------------------------------
hello "a.txt->5 b.txt->3"
tom "a.txt->2 b.txt->1"
kitty "a.txt->1"
.......
代码如下:
[java] view plain copy
public class InverseIndex {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//设置jar
job.setJarByClass(InverseIndex.class);
//设置Mapper相关的属性
job.setMapperClass(IndexMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));//words.txt
//设置Reducer相关属性
job.setReducerClass(IndexReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setCombinerClass(IndexCombiner.class);
//提交任务
job.waitForCompletion(true);
}
public static class IndexMapper extends Mapper{
private Text k = new Text();
private Text v = new Text();
@Override
protected void map(LongWritable key, Text value,
Mapper.Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split(" ");
FileSplit inputSplit = (FileSplit) context.getInputSplit();
Path path = inputSplit.getPath();
String name = path.getName();
for(String f : fields){
k.set(f + "->" + name);
v.set("1");
context.write(k, v);
}
}
}
public static class IndexCombiner extends Reducer{
private Text k = new Text();
private Text v = new Text();
@Override
protected void reduce(Text key, Iterable values,
Reducer.Context context)
throws IOException, InterruptedException {
String[] fields = key.toString().split("->");
long sum = 0;
for(Text t : values){
sum += Long.parseLong(t.toString());
}
k.set(fields[0]);
v.set(fields[1] + "->" + sum);
context.write(k, v);
}
}
public static class IndexReducer extends Reducer{
private Text v = new Text();
@Override
protected void reduce(Text key, Iterable values,
SumStep运行之后结果如下:
SortStep运行之后结果为上图根据结余从大到小排序。
代码如下:
[java] view plain copy
public class InfoBean implements WritableComparable
private String account;
private double income;
private double expenses;
private double surplus;
public void set(String account, double income, double expenses){
this.account = account;
this.income = income;
this.expenses = expenses;
this.surplus = income - expenses;
}
@Override
public String toString() {
return this.income + " " + this.expenses + " " + this.surplus;
}
public void write(DataOutput out) throws IOException {
out.writeUTF(account);
out.writeDouble(income);
out.writeDouble(expenses);
out.writeDouble(surplus);
}
public void readFields(DataInput in) throws IOException {
this.account = in.readUTF();
this.income = in.readDouble();
this.expenses = in.readDouble();
this.surplus = in.readDouble();
}
public int compareTo(InfoBean o) {
if(this.income == o.getIncome()){
return this.expenses > o.getExpenses() ? 1 : -1;
} else {
return this.income > o.getIncome() ? -1 : 1;
}
}
public String getAccount() {
return account;
}
public void setAccount(String account) {
this.account = account;
}
public double getIncome() {
return income;
}
public void setIncome(double income) {
this.income = income;
}
public double getExpenses() {
return expenses;
}
public void setExpenses(double expenses) {
this.expenses = expenses;
}
public double getSurplus() {
return surplus;
}
public void setSurplus(double surplus) {
this.surplus = surplus;
}
}
[java] view plain copy
public class SumStep {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(SumStep.class);
job.setMapperClass(SumMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(InfoBean.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
job.setReducerClass(SumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(InfoBean.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
public static class SumMapper extends Mapper
private InfoBean bean = new InfoBean();
private Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// split
String line = value.toString();
String[] fields = line.split(" ");
// get useful field
String account = fields[0];
double income = Double.parseDouble(fields[1]);
double expenses = Double.parseDouble(fields[2]);
k.set(account);
bean.set(account, income, expenses);
context.write(k, bean);
}
}
public static class SumReducer extends Reducer
private InfoBean bean = new InfoBean();
@Override
protected void reduce(Text key, Iterable
throws IOException, InterruptedException {
double in_sum = 0;
double out_sum = 0;
for(InfoBean bean : v2s){
in_sum += bean.getIncome();
out_sum += bean.getExpenses();
}
bean.set("", in_sum, out_sum);
context.write(key, bean);
}
}
}
此处的输入为SumStep的输出而不是源文件作为输入,当然也可以将两个job合并到一起执行,此处不再讨论。
[java] view plain copy
public class SortStep {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(SortStep.class);
job.setMapperClass(SortMapper.class);
job.setMapOutputKeyClass(InfoBean.class);
job.setMapOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
job.setReducerClass(SortReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(InfoBean.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
public static class SortMapper extends Mapper
private InfoBean bean = new InfoBean();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split(" ");
String account = fields[0];
double income = Double.parseDouble(fields[1]);
double expenses = Double.parseDouble(fields[2]);
bean.set(account, income, expenses);
context.write(bean, NullWritable.get());
}
}
public static class SortReducer extends Reducer
private Text k = new Text();
@Override
protected void reduce(InfoBean bean, Iterable
throws IOException, InterruptedException {
String account = bean.getAccount();
k.set(account);
context.write(k, bean);
}
}
}
实例二、倒排索引,过程如下:
[plain] view plain copy
Map阶段
<0,"hello tom">
....
context.write("hello->a.txt",1);
context.write("hello->a.txt",1);
context.write("hello->a.txt",1);
context.write("hello->a.txt",1);
context.write("hello->a.txt",1);
context.write("hello->b.txt",1);
context.write("hello->b.txt",1);
context.write("hello->b.txt",1);
--------------------------------------------------------
combiner阶段
<"hello->a.txt",1>
<"hello->a.txt",1>
<"hello->a.txt",1>
<"hello->a.txt",1>
<"hello->a.txt",1>
<"hello->b.txt",1>
<"hello->b.txt",1>
<"hello->b.txt",1>
context.write("hello","a.txt->5");
context.write("hello","b.txt->3");
--------------------------------------------------------
Reducer阶段
<"hello",{"a.txt->5","b.txt->3"}>
context.write("hello","a.txt->5 b.txt->3");
-------------------------------------------------------
hello "a.txt->5 b.txt->3"
tom "a.txt->2 b.txt->1"
kitty "a.txt->1"
.......
代码如下:
[java] view plain copy
public class InverseIndex {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//设置jar
job.setJarByClass(InverseIndex.class);
//设置Mapper相关的属性
job.setMapperClass(IndexMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));//words.txt
//设置Reducer相关属性
job.setReducerClass(IndexReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setCombinerClass(IndexCombiner.class);
//提交任务
job.waitForCompletion(true);
}
public static class IndexMapper extends Mapper
private Text k = new Text();
private Text v = new Text();
@Override
protected void map(LongWritable key, Text value,
Mapper
throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split(" ");
FileSplit inputSplit = (FileSplit) context.getInputSplit();
Path path = inputSplit.getPath();
String name = path.getName();
for(String f : fields){
k.set(f + "->" + name);
v.set("1");
context.write(k, v);
}
}
}
public static class IndexCombiner extends Reducer
private Text k = new Text();
private Text v = new Text();
@Override
protected void reduce(Text key, Iterable
Reducer
throws IOException, InterruptedException {
String[] fields = key.toString().split("->");
long sum = 0;
for(Text t : values){
sum += Long.parseLong(t.toString());
}
k.set(fields[0]);
v.set(fields[1] + "->" + sum);
context.write(k, v);
}
}
public static class IndexReducer extends Reducer
private Text v = new Text();
@Override
protected void reduce(Text key, Iterable
我要举报
如以上回答内容为低俗、色情、不良、暴力、侵权、涉及违法等信息,可以点下面链接进行举报!
点此我要举报以上问答信息
大家都在看
推荐资讯