Tuesday 3 December 2013

Copy Specific Records Between HBase Tables

Copy Specific Records Between HBase Tables

package copyrecord;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;
import java.util.StringTokenizer;

/**
 * Created with IntelliJ IDEA.
 * User: Hadoop Share
 * Date: 2/12/13
 * Time: 11:58 AM
 * To change this template use File | Settings | File Templates.
 */
public class CopySpecificRecordsBetweenTables {
    private static Configuration configuration = HBaseConfiguration.create();
    static{
        configuration.set("hbase.zookeeper.quorum","localhost");
        configuration.set("hbase.zookeeper.property.port","2181");
    }



    private void copyRecords(String sourceTable, String sourceDate, String targetTable, String targetDate) throws IOException {
        System.out.println("Begin Copy");
        HTableInterface source = new HTable(configuration,sourceTable);
        HTableInterface target = new HTable(configuration,targetTable);
        RowFilter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(sourceDate));
        Scan scan = new Scan();
        scan.setFilter(filter);

        ResultScanner resultScanner = source.getScanner(scan);
        List<Put> putList = new ArrayList<Put>();
        for(Result result:resultScanner){
            System.out.println("  Row Key To Copy: " + Bytes.toString(result.getRow()));
            StringTokenizer tokenizer = new StringTokenizer(Bytes.toString(result.getRow()),":");
            tokenizer.nextToken();//skip date
            String newKey = targetDate + tokenizer.nextToken();
            Put put = new Put(newKey.getBytes());
            System.out.println("Row Key to Insert: " + Bytes.toString(put.getRow()));
            NavigableMap<byte[], NavigableMap<byte[], byte[]>> familyCellMap = result.getNoVersionMap();
            for(byte[] family:familyCellMap.keySet()){
                NavigableMap<byte[], byte[]> cell = familyCellMap.get(family);
                for (byte[] qualifier: cell.keySet()){
                   // System.out.println("Family: " + Bytes.toString(family) + " Qualifier: " + Bytes.toString(qualifier) + " Value: " + Bytes.toString(cell.get(qualifier)));
                    put.add(family,qualifier,cell.get(qualifier));
                }
            }
            putList.add(put);
        }
        target.put(putList);
        System.out.println("End of Copy");
    }

    public static void main(String[] args) throws IOException {
        /*
        * Row key format assumed to be date:key*/
        CopySpecificRecordsBetweenTables copyRecordFromDateToDate = new CopySpecificRecordsBetweenTables();
        copyRecordFromDateToDate.copyRecords("FROM_TABLE", "20131011:", "TO_TABLE", "20131012:");
    }


}

No comments:

Post a Comment