首页 > 其他 > 详细

MapReduce on Hbase

时间:2015-09-30 01:11:14      阅读:194      评论:0      收藏:0      [点我收藏+]


org.apache.hadoop.hbase.mapreduce


TableMapper  TableReducer


一个region对应一个map


import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;

public class HbaseMR {

    public class MyMapper extends TableMapper<Text, Text> {

        @Override
        protected void map(ImmutableBytesWritable key, Result value,
                Context context) throws IOException, InterruptedException {
            // key代表rowkey
            Text k = new Text(Bytes.toString(key.get()));
            Text v = new Text(Bytes.toString(value.getValue(
                    "basicinfo".getBytes(), "age".getBytes())));

            context.write(v, k);

        }

    }

    public class MyReducer extends TableReducer<Text, Text, Text> {

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            Put put = new Put(Bytes.toBytes(key.toString()));
            for (Text value : values) {
                put.add(Bytes.toBytes("f1"), Bytes.toBytes(value.toString()),
                        Bytes.toBytes(value.toString()));
            }
            context.write(null, put);
        }

    }

    public static void main(String[] args) {
        Configuration conf=    HBaseConfiguration.create();
        try {
            Job job=new Job(conf, "mapreduce on hbase");
            job.setJarByClass(HbaseMR.class);
            Scan scan=new Scan();
            scan.setCaching(1000);//
            TableMapReduceUtil.initTableMapperJob("students", scan, MyMapper.class, Text.class, Text.class, job);
            TableMapReduceUtil.initTableReducerJob("age",  MyReducer.class,  job);
            job.waitForCompletion(true);
        } catch (Exception e) {
            
            e.printStackTrace();
        }
    }

}


本文出自 “一无所有 QQ:934033381” 博客,请务必保留此出处http://tianxingzhe.blog.51cto.com/3390077/1699284

MapReduce on Hbase

原文:http://tianxingzhe.blog.51cto.com/3390077/1699284

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!