Java客户端连接Hbase_java连接hbase_江湖小蟹的博客

CSDN博客 · · 968 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

使用

	下面介绍了Java客户端连接Hbase的使用,1.2.0版本和2.3.4版本亲测下面使用的API都是一致的。(也猜测1.x版本
和2.x版本的应该基本一致)。Java连接Hbase也很简单,Maven中导入对应的 `hbase-client` 依赖即可。

1.pom配置

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.2.0</version>
        </dependency>
    </dependencies>
    
    <repositories>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
        <repository>
            <id>apache release</id>
            <url>https://repository.apache.org/content/repositories/releases/</url>
        </repository>
    </repositories>

语法

0)初始化

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
    public static void main(String[] args) {
        Configuration config = null;
        Connection conn = null;
        Table table = null;
        // 创建配置
        config = HBaseConfiguration.create();
        config.set("hbase.zookeeper.quorum", "192.168.100.98");
        config.set("hbase.zookeeper.property.clientPort", "2181");

        try {
            // 创建连接
            conn = ConnectionFactory.createConnection(config);
            // 获取表
            table = conn.getTable(TableName.valueOf("FEI:WEN"));

            // 查询指定表的全部数据
//            queryAllTableData(table);

            // 查询指定rowkey的数据
            queryRowKey(table);

			// 略。。。

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (conn != null) {
                    conn.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }

            try {
                if (table != null) {
                    table.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }


    /**
         * 输出
         * @param result
         * @throws IOException
         */
    private static void output(Result result) throws IOException {
        CellScanner cellScanner = result.cellScanner();
        while (cellScanner.advance()) {
            Cell cell = cellScanner.current();
            byte[] rowArray = cell.getRowArray();  //本kv所属的行键的字节数组
            byte[] familyArray = cell.getFamilyArray();  //列族名的字节数组
            byte[] qualifierArray = cell.getQualifierArray();  //列名的字节数据
            byte[] valueArray = cell.getValueArray(); // value的字节数组

            System.out.printf("|%10s|%10s|%10s|%10s|\n",
                              new String(rowArray, cell.getRowOffset(), cell.getRowLength()),
                              new String(familyArray, cell.getFamilyOffset(), cell.getFamilyLength()),
                              new String(qualifierArray, cell.getQualifierOffset(), cell.getQualifierLength()),
                              new String(valueArray, cell.getValueOffset(), cell.getValueLength()));
        }
    }

1)创建命名空间

/**
 * 创建命名空间
 * @param conn
 * @throws IOException
 */
private static void createNamespace(Connection conn) throws IOException {
    Admin admin = conn.getAdmin();

    NamespaceDescriptor.Builder builder = NamespaceDescriptor.create("FEI");
    NamespaceDescriptor build = builder.build();

    admin.createNamespace(build);

    admin.close();
}

2)查看命名空间

查看指定命名空间
/**
 * 查看指定命名空间
 * @param conn
 */
private static void queryOneNamespace(Connection conn) throws IOException {
    Admin admin = conn.getAdmin();

    NamespaceDescriptor test = admin.getNamespaceDescriptor("FEI");
    System.out.println(test);

    Map<String, String> configuration = test.getConfiguration();
    System.out.println(configuration);

    String name = test.getName();
    System.out.println(name);

    admin.close();
}

输出:

{NAME => 'FEI'}
{}
FEI
查看全部命名空间
/**
 * 查看全部命名空间
 * @param conn
 */
private static void queryAllNamespace(Connection conn) throws IOException {
    Admin admin = conn.getAdmin();

    NamespaceDescriptor[] namespaceDescriptors = admin.listNamespaceDescriptors();

    for (NamespaceDescriptor namespaceDescriptor : namespaceDescriptors) {
        System.out.println(namespaceDescriptor);
    }
    admin.close();
}

输出:

{NAME => 'FEI'}
{NAME => 'SYSTEM'}
{NAME => 'TEST'}
{NAME => 'default'}
{NAME => 'hbase'}

3)查看指定命名空间下表

/**
 * 查看指定命名空间下的表
 * @param conn
 */
private static void queryTableByNamespace(Connection conn) throws IOException {
    Admin admin = conn.getAdmin();

    HTableDescriptor[] feis = admin.listTableDescriptorsByNamespace("FEI");

    for (HTableDescriptor fei : feis) {
        System.out.println(fei.getNameAsString());
    }

    admin.close();
}

输出:

FEI:EMP
FEI:IMAGES
FEI:STUDENTS
FEI:TWO
FEI:WEN

4)查看所有表

/**
 * 查看所有表
 * @param conn
 */
private static void queryAllTable(Connection conn) throws IOException {
    Admin admin = conn.getAdmin();

    TableName[] tableNames = admin.listTableNames();

    for (TableName tableName : tableNames) {
        System.out.println(tableName);
    }

    admin.close();
}

输出:

FEI:EMP
FEI:IMAGES
FEI:STUDENTS
FEI:TWO
FEI:WEN
SYSTEM:CATALOG
SYSTEM:FUNCTION
SYSTEM:LOG
SYSTEM:MUTEX
SYSTEM:SEQUENCE
SYSTEM:STATS
US_POPULATION
user

5)查看指定表

/**
 * 查看指定表元数据
 * @param conn
 */
private static void queryOneTableMetadata(Connection conn) throws IOException {
    Admin admin = conn.getAdmin();

    HTableDescriptor fei_wen = admin.getTableDescriptor(TableName.valueOf("FEI:WEN"));

    System.out.println(fei_wen);

    String name = fei_wen.getNameAsString();
    System.out.println("\n命名空间+表名: " + name);

    HColumnDescriptor[] columnFamilies = fei_wen.getColumnFamilies();
    for (HColumnDescriptor columnFamily : columnFamilies) {
        System.out.println("\n列族: " + columnFamily);
    }

    Map<String, String> configuration = fei_wen.getConfiguration();
    System.out.println("\n表配置: " + configuration);

    TableName tableName = fei_wen.getTableName();
    System.out.println("\n命名空间+表名: " + tableName.getNameAsString());
    System.out.println("\n命名空间: " + tableName.getNamespaceAsString());
    System.out.println("\n表名: " + tableName.getQualifierAsString());

    admin.close();
}

输出:

'FEI:WEN', {NAME => 'co', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}

命名空间+表名: FEI:WEN

列族: {NAME => 'co', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}

表配置: {}

命名空间+表名: FEI:WEN

命名空间: FEI

表名: WEN

6)创建表

/**
 * 创建表
 *
 * 可以用多个HColumnDescriptor来定义多个列族,然后通过hTableDescriptor.addFamily添加,但是目前只建议一个表创建一个列族,
 * 防止对性能有影响
 * @param conn
 */
private static void createTable(Connection conn) {
    Admin admin = null;
    try {
        // 获取表管理器对象
        admin = conn.getAdmin();

        // 创建一个表描述对象
        HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf("FEI:CLASS"));
        // 创建一个列族描述对象
        HColumnDescriptor base_info = new HColumnDescriptor("base_info");
        // 通过列族描述定义对象,可以设置列族的很多重要属性信息
        base_info.setMaxVersions(3); // 设置该列族中存储数据的最大版本数,默认是1

        
        hTableDescriptor.addFamily(base_info);

        admin.createTable(hTableDescriptor);

    } catch (IOException e) {
        e.printStackTrace();
    }finally {
        try {
            if (admin != null) {
                admin.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

7)修改表

新增列族
    public static void updateTable(Connection conn) {
        Admin admin = null;

        try {
            admin = conn.getAdmin();

            HTableDescriptor fei_class = admin.getTableDescriptor(TableName.valueOf("FEI:CLASS"));

            // 增加列族
            HColumnDescriptor newFamily = new HColumnDescriptor("test_info".getBytes());
            newFamily.setBloomFilterType(BloomType.ROWCOL); // 设置布隆过滤器的类型

            fei_class.addFamily(newFamily);

            admin.modifyTable(TableName.valueOf("FEI:CLASS"),fei_class);
            
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            try {
                if (admin != null) {
                    admin.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
删除列族
    public static void updateTable(Connection conn) {
        Admin admin = null;

        try {
            admin = conn.getAdmin();

            HTableDescriptor fei_class = admin.getTableDescriptor(TableName.valueOf("FEI:CLASS"));

            // 删除列族
            fei_class.removeFamily("test_info".getBytes());

            admin.modifyTable(TableName.valueOf("FEI:CLASS"),fei_class);

        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            try {
                if (admin != null) {
                    admin.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

8)删除表

/**
 * 删除表
 *
 * 想删除表,必须先关闭表
 * @param conn
 */
public static void deleteTable(Connection conn) {
    Admin admin = null;

    try {
        admin = conn.getAdmin();
		// 关闭表
        admin.disableTable(TableName.valueOf("FEI:CLASS"));
        // 删除表
        admin.deleteTable(TableName.valueOf("FEI:CLASS"));

    } catch (IOException e) {
        e.printStackTrace();
    }finally {
        try {
            if (admin != null) {
                admin.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

9)查询表全部数据

    /**
     * 查询指定表的全部数据
     */
    private static void queryAllTableData(Table table) {

        try {
            ResultScanner scanner = table.getScanner(new Scan());

            System.out.printf("|%10s|%10s|%10s|%10s|\n", "row key", "family", "qualifier", "value");
            for (Result result : scanner) {
                output(result);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

输出:

|   row key|    family| qualifier|     value|
|         1|        co|       age|        28|
|         1|        co|        id|         1|
|         1|        co|      name|      jack|
|         2|        co|       age|        33|
|         2|        co|        id|         2|
|         2|        co|      name|      roes|

9)查询指定rowkey的数据

/**
 * 查询指定rowkey的数据
 */
public static void queryRowKey(Table table) {
    try {
        // get对象指定行键
        Get get = new Get("1".getBytes(StandardCharsets.UTF_8));

        Result result = table.get(get);

        System.out.printf("|%10s|%10s|%10s|%10s|\n", "row key", "family", "qualifier", "value");

        output(result);
    } catch (IOException e) {
        e.printStackTrace();
    }
}

输出:

|   row key|    family| qualifier|     value|
|         1|        co|       age|        28|
|         1|        co|        id|         1|
|         1|        co|      name|      jack|

10)根据rowkey查询指定列

    /**
     * 根据rowkey查询指定列
     */
    public static void queryValueByRowKey(Table table) {
        try {
            // get对象指定行键
            Get get = new Get("1".getBytes(StandardCharsets.UTF_8));

            Result result = table.get(get);

            byte[] value = result.getValue("co".getBytes(), "name".getBytes());

            System.out.println(new String(value));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

输出:

jack

11)插入数据

插入单条数据
/**
 * 插入单条数据
 * @param table
 */
private static void insertOneData(Table table) {

    Put put = new Put("3".getBytes());

    put.addColumn("co".getBytes(), "class".getBytes(), "一班".getBytes());

    try {
        table.put(put);
    } catch (IOException e) {
        e.printStackTrace();
    }
}
批量插入数据
/**
 * 批量插入数据
 * @param table
 */
private static void insertBatchData(Table table) {

    List<Put> puts = new ArrayList<>();

    for (int i = 0; i < 10; i++) {
        Put put = new Put((i+"").getBytes());
        put.addColumn("co".getBytes(), "id".getBytes(), ((i + 3) + "").getBytes());
        put.addColumn("co".getBytes(), "name".getBytes(), ("张"+ i).getBytes());
        put.addColumn("co".getBytes(), "age".getBytes(), (i + "").getBytes());

        puts.add(put);
    }

    try {
        table.put(puts);
    } catch (IOException e) {
        e.printStackTrace();
    }
}

12)修改数据

通过Put覆盖,格式和插入一样

/**
 * 修改数据
 * @param table
 */
private static void updateData(Table table) throws IOException {
    Put put1 = new Put("9".getBytes());
    put1.addColumn("co".getBytes(), "name".getBytes(), "刘胡兰".getBytes());
    Put put2 = new Put("8".getBytes());
    put2.addColumn("co".getBytes(), "name".getBytes(), "王伟".getBytes());
    Put put3 = new Put("7".getBytes());
    put3.addColumn("co".getBytes(), "name".getBytes(), "金素荣".getBytes());
    Put put4 = new Put("6".getBytes());
    put4.addColumn("co".getBytes(), "name".getBytes(), "小日本".getBytes());

    List<Put> puts = new ArrayList<>();

    puts.add(put1);
    puts.add(put2);
    puts.add(put3);
    puts.add(put4);

    table.put(puts);
}

13)删除数据

/**
 * 删除数据
 * @param table
 */
private static void deleteData(Table table) throws IOException {
    Delete d1 = new Delete("1".getBytes());
    Delete d2 = new Delete("2".getBytes());
    Delete d3 = new Delete("3".getBytes());
    Delete d4 = new Delete("4".getBytes());

    List<Delete> deletes = new ArrayList<>();

    deletes.add(d1);
    deletes.add(d2);
    deletes.add(d3);
    deletes.add(d4);

    table.delete(deletes);
}

14)图片转为Base64,存入Hbase

/**
 * 图片转为Base64,存入Hbase
 */
private static void imageToBase64(Table table) throws IOException {

    String imageFile = "C:\\Users\\fei\\Desktop\\superset处理500页面.png";

    InputStream in = null;

    byte[] data = null;

    String encode = null; // 返回Base64编码过的字节数组字符串

    BASE64Encoder encoder = new BASE64Encoder();
    try {
        in = new FileInputStream(imageFile);
        data = new byte[in.available()];
        in.read(data);

        encode = encoder.encode(data);
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        try {
            if (in != null) {
                in.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    // 将字符串存入Hbase
    if (encode != null) {
        Put put = new Put("1".getBytes());

        put.addColumn("image".getBytes(), "test".getBytes(), encode.getBytes());
        table.put(put);
    }
}

15)从Hbase获取base64,转换为图表

/**
 * 从Hbase获取base64,转换为图表
 */
private static void base64ToImage(Table table) throws IOException {

    // 从Hbase获取base64
    Get get = new Get("1".getBytes(StandardCharsets.UTF_8));

    Result result = table.get(get);

    byte[] value = result.getValue("image".getBytes(), "test".getBytes());

    // 将字节数组字符串转换为图片

    String encode = new String(value);

    OutputStream out = null;

    String outFile = "D:\\test.png";

    BASE64Decoder decoder = new BASE64Decoder();

    try {
        out = new FileOutputStream(outFile);

        byte[] b = decoder.decodeBuffer(encode);

        for (int i = 0; i < b.length; i++) {
            if (b[i] < 0) {
                b[i] += 256;
            }
        }
        out.write(b);
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        try {
            if (out != null) {
                out.flush();
                out.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

本文来自:CSDN博客

感谢作者:CSDN博客

查看原文:Java客户端连接Hbase_java连接hbase_江湖小蟹的博客

968 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传