Copy Specific Records Between HBase Tables
package copyrecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;
import java.util.StringTokenizer;
/**
* Created with IntelliJ IDEA.
* User: Hadoop Share
* Date: 2/12/13
* Time: 11:58 AM
* To change this template use File | Settings | File Templates.
*/
public class CopySpecificRecordsBetweenTables {
private static Configuration configuration = HBaseConfiguration.create();
static{
configuration.set("hbase.zookeeper.quorum","localhost");
configuration.set("hbase.zookeeper.property.port","2181");
}
private void copyRecords(String sourceTable, String sourceDate, String targetTable, String targetDate) throws IOException {
System.out.println("Begin Copy");
HTableInterface source = new HTable(configuration,sourceTable);
HTableInterface target = new HTable(configuration,targetTable);
RowFilter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(sourceDate));
Scan scan = new Scan();
scan.setFilter(filter);
ResultScanner resultScanner = source.getScanner(scan);
List<Put> putList = new ArrayList<Put>();
for(Result result:resultScanner){
System.out.println(" Row Key To Copy: " + Bytes.toString(result.getRow()));
StringTokenizer tokenizer = new StringTokenizer(Bytes.toString(result.getRow()),":");
tokenizer.nextToken();//skip date
String newKey = targetDate + tokenizer.nextToken();
Put put = new Put(newKey.getBytes());
System.out.println("Row Key to Insert: " + Bytes.toString(put.getRow()));
NavigableMap<byte[], NavigableMap<byte[], byte[]>> familyCellMap = result.getNoVersionMap();
for(byte[] family:familyCellMap.keySet()){
NavigableMap<byte[], byte[]> cell = familyCellMap.get(family);
for (byte[] qualifier: cell.keySet()){
// System.out.println("Family: " + Bytes.toString(family) + " Qualifier: " + Bytes.toString(qualifier) + " Value: " + Bytes.toString(cell.get(qualifier)));
put.add(family,qualifier,cell.get(qualifier));
}
}
putList.add(put);
}
target.put(putList);
System.out.println("End of Copy");
}
public static void main(String[] args) throws IOException {
/*
* Row key format assumed to be date:key*/
CopySpecificRecordsBetweenTables copyRecordFromDateToDate = new CopySpecificRecordsBetweenTables();
copyRecordFromDateToDate.copyRecords("FROM_TABLE", "20131011:", "TO_TABLE", "20131012:");
}
}
package copyrecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;
import java.util.StringTokenizer;
/**
* Created with IntelliJ IDEA.
* User: Hadoop Share
* Date: 2/12/13
* Time: 11:58 AM
* To change this template use File | Settings | File Templates.
*/
public class CopySpecificRecordsBetweenTables {
private static Configuration configuration = HBaseConfiguration.create();
static{
configuration.set("hbase.zookeeper.quorum","localhost");
configuration.set("hbase.zookeeper.property.port","2181");
}
private void copyRecords(String sourceTable, String sourceDate, String targetTable, String targetDate) throws IOException {
System.out.println("Begin Copy");
HTableInterface source = new HTable(configuration,sourceTable);
HTableInterface target = new HTable(configuration,targetTable);
RowFilter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(sourceDate));
Scan scan = new Scan();
scan.setFilter(filter);
ResultScanner resultScanner = source.getScanner(scan);
List<Put> putList = new ArrayList<Put>();
for(Result result:resultScanner){
System.out.println(" Row Key To Copy: " + Bytes.toString(result.getRow()));
StringTokenizer tokenizer = new StringTokenizer(Bytes.toString(result.getRow()),":");
tokenizer.nextToken();//skip date
String newKey = targetDate + tokenizer.nextToken();
Put put = new Put(newKey.getBytes());
System.out.println("Row Key to Insert: " + Bytes.toString(put.getRow()));
NavigableMap<byte[], NavigableMap<byte[], byte[]>> familyCellMap = result.getNoVersionMap();
for(byte[] family:familyCellMap.keySet()){
NavigableMap<byte[], byte[]> cell = familyCellMap.get(family);
for (byte[] qualifier: cell.keySet()){
// System.out.println("Family: " + Bytes.toString(family) + " Qualifier: " + Bytes.toString(qualifier) + " Value: " + Bytes.toString(cell.get(qualifier)));
put.add(family,qualifier,cell.get(qualifier));
}
}
putList.add(put);
}
target.put(putList);
System.out.println("End of Copy");
}
public static void main(String[] args) throws IOException {
/*
* Row key format assumed to be date:key*/
CopySpecificRecordsBetweenTables copyRecordFromDateToDate = new CopySpecificRecordsBetweenTables();
copyRecordFromDateToDate.copyRecords("FROM_TABLE", "20131011:", "TO_TABLE", "20131012:");
}
}