Hbase教程
作者: 时海 风自在
完整示例代码

参考:

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
import org.apache.hadoop.hbase.filter.PageFilter;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class HbaseTest {
    static Connection connection;

    static {
        try {
            Configuration conf = HBaseConfiguration.create();
            connection = ConnectionFactory.createConnection(conf);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 查询单条记录
     * @param tableName
     * @param rowKey
     * @return
     * @throws IOException
     */
    public static Map<String, String> getData(String tableName, String rowKey) throws IOException {

        Map<String, String> returnResult = new HashMap<>();

        Table table = connection.getTable(TableName.valueOf(tableName));

        Get get = new Get(rowKey.getBytes());

        Result result = table.get(get);

        if (result.getRow() == null) {
            return null;
        }

        String row = new String(result.getRow());

        returnResult.put(Constants.hbaseRowKey, row);

        List<Cell> listCells = result.listCells();
        for (Cell cell : listCells) {

            String key = new String(cell.getQualifierArray(), cell.getQualifierOffset(), cell
                    .getQualifierLength());
            String value = new String(cell.getValueArray(), cell.getValueOffset(), cell
                    .getValueLength());

            if (StringUtils.isBlank(key) || StringUtils.isBlank(value) || "null".equals(value.trim())) {
                continue;
            }

            if (key.matches("^pre\\d+_")) {
                key = key.substring(key.indexOf("_") + 1);
            }

            returnResult.put(key, value);

        }

        return returnResult;

    }

    /**
     * 分页查询
     * @param tableName
     * @param pageNum
     * @param pageSize
     * @return
     * @throws IOException
     */
    public static List<Map<String, String>> listCommunityByPage(String tableName, long pageNum, long pageSize) throws IOException {

        List<Map<String, String>> returnResult = new ArrayList<>();
        try {

            Table table = connection.getTable(TableName.valueOf(tableName));

            long skip = pageSize * (pageNum - 1);

            String preRowKey = null;
            boolean stop = false;
            while (skip > 0 && !stop) {
                long step = 0;
                if (skip >= 1000) {
                    step = 1000;
                } else {
                    step = skip;
                }

                skip -= step;

                Filter pageFilter = new PageFilter(step + 1);
                Filter kof = new KeyOnlyFilter();

                List<Filter> filters = new ArrayList<Filter>();
                filters.add(pageFilter);
                filters.add(kof);
                FilterList fl = new FilterList(FilterList.Operator.MUST_PASS_ALL, filters);
                Scan scan = new Scan();
                scan.setFilter(fl);
                if (!StringUtils.isBlank(preRowKey)) {
                    scan.setStartRow(preRowKey.getBytes());
                }

                ResultScanner rs = table.getScanner(scan);
                long count = 0;
                long endIndex = step + 1;
                for (Result result : rs) {
                    count++;
                    if (count == endIndex) {
                        preRowKey = new String(result.getRow());
                    }
                }
                if (count < endIndex) {
                    stop = true;
                }

            }

            if (stop == true) {
                return returnResult;
            }

            Filter pageFilter = new PageFilter(pageSize);
            Scan scan = new Scan();
            scan.setFilter(pageFilter);
            if (!StringUtils.isBlank(preRowKey)) {
                scan.setStartRow(preRowKey.getBytes());
            }

            ResultScanner rs = table.getScanner(scan);

            for (Result result : rs) {

                Map<String, String> item = new HashMap<>();

                String row = new String(result.getRow());

                item.put(Constants.hbaseRowKey, row);

                List<Cell> listCells = result.listCells();
                for (Cell cell : listCells) {

                    String key = new String(cell.getQualifierArray(), cell.getQualifierOffset(), cell
                            .getQualifierLength());
                    String value = new String(cell.getValueArray(), cell.getValueOffset(), cell
                            .getValueLength());

                    if (StringUtils.isBlank(key) || StringUtils.isBlank(value)) {
                        continue;
                    }

                    if (key.matches("^pre\\d+_")) {
                        key = key.substring(key.indexOf("_") + 1);
                    }

                    item.put(key, value);

                }

                returnResult.add(item);
            }

        } catch (IOException e) {
            e.printStackTrace();
        }

        return returnResult;
    }

    /**
     * 查询整张表内容
     * @param tableName
     * @return
     * @throws IOException
     */
    public static List<Map<String, String>> queryTable(String tableName) throws IOException {

        List<Map<String, String>> returnResult = new ArrayList<>();

        Table table = connection.getTable(TableName.valueOf(tableName));

        ResultScanner scanner = table.getScanner(new Scan());

        for (Result result : scanner) {

            Map<String, String> item = new HashMap<>();

            String row = new String(result.getRow());

            item.put(Constants.hbaseRowKey, row);

            List<Cell> listCells = result.listCells();
            for (Cell cell : listCells) {

                String key = new String(cell.getQualifierArray(), cell.getQualifierOffset(), cell
                        .getQualifierLength());
                String value = new String(cell.getValueArray(), cell.getValueOffset(), cell
                        .getValueLength());

                if (StringUtils.isBlank(key) || StringUtils.isBlank(value) || "null".equals(value.trim())) {
                    continue;
                }

                item.put(key, value);

            }

            returnResult.add(item);
        }

        return returnResult;

    }

}

标签: cell、tablename、returnresult、scan、import
一个创业中的苦逼程序员
  • 回复
隐藏