数据:
zhangsan@163.com600002014-02-20
lisi@163.com200002014-02-20
lisi@163.com01002014-02-20
zhangsan@163.com300002014-02-20
wangwu@126.com900002014-02-20
wangwu@126.com02002014-02-20
zhangsan .com600002014-02-20
lisi .com200002014-02-20
lisi .com01002014-02-20
zhangsan .com300002014-02-20
wangwu .com900002014-02-20
wangwu .com02002014-02-20
数据分析
用户 | 收入 | 支出 | 结余 |
lisi@163.com | 2000 | 100 | 1900 |
wangwu@126.com | 9000 | 200 | 8800 |
zhangsan@163.com | 9000 | 0 | 9000 |
用户 | 收入 | 支出 | 结余 |
zhangsan@163.com | 9000 | 0 | 9000 |
wangwu@126.com | 9000 | 200 | 8800 |
lisi@163.com | 2000 | 100 | 1900 |
InfoBean类
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class InfoBean implements WritableComparable<InfoBean>{
private String accout;
private double incom;
private double zhichu;
private double jieyu;
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeUTF(accout);
out.writeDouble(incom);
out.writeDouble(zhichu);
out.writeDouble(jieyu);
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
this.accout=in.readUTF();
this.incom=in.readDouble();
this.zhichu=in.readDouble();
this.jieyu=in.readDouble();
}
@Override
public int compareTo(InfoBean infoBean) {
if(this.incom == infoBean.getIncom()){
return this.zhichu > infoBean.getZhichu() ? 1 : -1;
}else{
return this.incom > infoBean.getIncom() ? -1 : 1;
}
}
public String getAccout() {
return accout;
}
public void setAccout(String accout) {
this.accout = accout;
}
public double getIncom() {
return incom;
}
public void setIncom(double incom) {
this.incom = incom;
}
public double getZhichu() {
return zhichu;
}
public void setZhichu(double zhichu) {
this.zhichu = zhichu;
}
public double getJieyu() {
return jieyu;
}
public void setJieyu(double jieyu) {
this.jieyu = jieyu;
}
@Override
public String toString() {
return incom +"\t" + zhichu +"\t" + jieyu;
}
public void set(String accout,double incom,double zhichu){
this.accout=accout;
this.incom=incom;
this.zhichu=zhichu;
this.jieyu=incom - zhichu;
}
}
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class InfoBean implements WritableComparable<InfoBean>{
private String accout;
private double incom;
private double zhichu;
private double jieyu;
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeUTF(accout);
out.writeDouble(incom);
out.writeDouble(zhichu);
out.writeDouble(jieyu);
}
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
this.accout=in.readUTF();
this.incom=in.readDouble();
this.zhichu=in.readDouble();
this.jieyu=in.readDouble();
}
public int compareTo(InfoBean infoBean) {
if(this.incom == infoBean.getIncom()){
return this.zhichu > infoBean.getZhichu() ? 1 : -1;
}else{
return this.incom > infoBean.getIncom() ? -1 : 1;
}
}
public String getAccout() {
return accout;
}
public void setAccout(String accout) {
this.accout = accout;
}
public double getIncom() {
return incom;
}
public void setIncom(double incom) {
this.incom = incom;
}
public double getZhichu() {
return zhichu;
}
public void setZhichu(double zhichu) {
this.zhichu = zhichu;
}
public double getJieyu() {
return jieyu;
}
public void setJieyu(double jieyu) {
this.jieyu = jieyu;
}
public String toString() {
return incom +"\t" + zhichu +"\t" + jieyu;
}
public void set(String accout,double incom,double zhichu){
this.accout=accout;
this.incom=incom;
this.zhichu=zhichu;
this.jieyu=incom - zhichu;
}
}
SumStep类
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SumStep {
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Job job=Job.getInstance(new Configuration());
job.setJarByClass(SumStep.class);
job.setMapperClass(SumMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(InfoBean.class);
FileInputFormat.setInputPaths(job, args[0]);
job.setReducerClass(sumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(InfoBean.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
public static class SumMapper extends Mapper<LongWritable, Text, Text, InfoBean>{
private Text k2 = new Text();
private InfoBean v2 = new InfoBean();
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, InfoBean>.Context context)
throws IOException, InterruptedException {
String line=value.toString();
String[] hang=line.split("\t");
String accout=hang[0];
double incom=Double.parseDouble(hang[1]);
double zhichu=Double.parseDouble(hang[2]);
k2.set(accout);
v2.set(accout, incom, zhichu);
context.write(k2, v2);
}
}
public static class sumReducer extends Reducer<Text, InfoBean, Text, InfoBean>{
private InfoBean v3 = new InfoBean();
@Override
protected void reduce(Text k2, Iterable<InfoBean> v2,
Reducer<Text, InfoBean, Text, InfoBean>.Context context)
throws IOException, InterruptedException {
double sumIncom = 0;
double sumZhichu = 0;
for(InfoBean infoBean : v2){
sumIncom += infoBean.getIncom();
sumZhichu += infoBean.getZhichu();
}
v3.set("", sumIncom, sumZhichu);
context.write(k2, v3);
}
}
}
x
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SumStep {
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Job job=Job.getInstance(new Configuration());
job.setJarByClass(SumStep.class);
job.setMapperClass(SumMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(InfoBean.class);
FileInputFormat.setInputPaths(job, args[0]);
job.setReducerClass(sumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(InfoBean.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
public static class SumMapper extends Mapper<LongWritable, Text, Text, InfoBean>{
private Text k2 = new Text();
private InfoBean v2 = new InfoBean();
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, InfoBean>.Context context)
throws IOException, InterruptedException {
String line=value.toString();
String[] hang=line.split("\t");
String accout=hang[0];
double incom=Double.parseDouble(hang[1]);
double zhichu=Double.parseDouble(hang[2]);
k2.set(accout);
v2.set(accout, incom, zhichu);
context.write(k2, v2);
}
}
public static class sumReducer extends Reducer<Text, InfoBean, Text, InfoBean>{
private InfoBean v3 = new InfoBean();
protected void reduce(Text k2, Iterable<InfoBean> v2,
Reducer<Text, InfoBean, Text, InfoBean>.Context context)
throws IOException, InterruptedException {
double sumIncom = 0;
double sumZhichu = 0;
for(InfoBean infoBean : v2){
sumIncom += infoBean.getIncom();
sumZhichu += infoBean.getZhichu();
}
v3.set("", sumIncom, sumZhichu);
context.write(k2, v3);
}
}
}
SortStep类
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SortStep {
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Job job=Job.getInstance(new Configuration());
job.setJarByClass(SortStep.class);
job.setMapperClass(SortMapper.class);
job.setMapOutputKeyClass(InfoBean.class);
job.setMapOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
job.setReducerClass(SortReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(InfoBean.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
public static class SortMapper extends
Mapper<LongWritable, Text, InfoBean, NullWritable>{
private InfoBean k2 = new InfoBean();
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, InfoBean, NullWritable>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] hang=line.split("\t");
String accout = hang[0];
double incom = Double.parseDouble(hang[1]);
double zhichu = Double.parseDouble(hang[2]);
k2.set(accout, incom, zhichu);
context.write(k2,NullWritable.get());
}
}
public static class SortReducer extends Reducer<InfoBean, NullWritable, Text, InfoBean>{
private Text k3=new Text();
@Override
protected void reduce(InfoBean k2, Iterable<NullWritable> v2,
Reducer<InfoBean, NullWritable, Text, InfoBean>.Context context)
throws IOException, InterruptedException {
k3.set(k2.getAccout());
context.write(k3, k2);
}
}
}
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SortStep {
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Job job=Job.getInstance(new Configuration());
job.setJarByClass(SortStep.class);
job.setMapperClass(SortMapper.class);
job.setMapOutputKeyClass(InfoBean.class);
job.setMapOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
job.setReducerClass(SortReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(InfoBean.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
public static class SortMapper extends
Mapper<LongWritable, Text, InfoBean, NullWritable>{
private InfoBean k2 = new InfoBean();
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, InfoBean, NullWritable>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] hang=line.split("\t");
String accout = hang[0];
double incom = Double.parseDouble(hang[1]);
double zhichu = Double.parseDouble(hang[2]);
k2.set(accout, incom, zhichu);
context.write(k2,NullWritable.get());
}
}
public static class SortReducer extends Reducer<InfoBean, NullWritable, Text, InfoBean>{
private Text k3=new Text();
protected void reduce(InfoBean k2, Iterable<NullWritable> v2,
Reducer<InfoBean, NullWritable, Text, InfoBean>.Context context)
throws IOException, InterruptedException {
k3.set(k2.getAccout());
context.write(k3, k2);
}
}
}
原文:https://www.cnblogs.com/TiePiHeTao/p/a7d96f7c86d7ab52eb021dcc2b0ff8d5.html