I want to write data in hbase sink table, I have Hbase version 2.2.0 which is compatible flink version 1.14.4 I defined the sink hbase table as follows:
sink_ddl = """
CREATE TABLE hTable (
datemin STRING,
family2 ROW<datemax STRING>,
family3 ROW<channel_title STRING, channel_id STRING>,
PRIMARY KEY (datemin) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'test',
'zookeeper.quorum' = '127.0.0.1:2181'
)
"""
And I write data into it with:
table_env.execute_sql("""
INSERT INTO hTable
SELECT
datemin,
ROW(datemax),
ROW(channel_title, channel_id)
FROM table_api_table
""")
but I got error
py4j.protocol.Py4JJavaError: An error occurred while calling o1.executeSql.
: org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'default_catalog.default_database.hTable'.
Table options are:
'connector'='hbase-2.2'
'table-name'='test'
'zookeeper.quorum'='127.0.0.1:2181'
Caused by: java.lang.NoSuchMethodError: org.apache.flink.table.factories.DynamicTableFactory$Context.getPhysicalRowDataType()Lorg/apache/flink/table/types/DataType;
at org.apache.flink.connector.hbase2.HBase2DynamicTableFactory.createDynamicTableSink(HBase2DynamicTableFactory.java:95)
at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:181)
... 28 more
btw: I added connector jar
please any help? what is the cause of this error?
how can I connect flink with hbase
Finally it's working !! I fixed this issue by doing the following:
I edited hbase-env.sh :
# Extra Java CLASSPATH elements. Optional.
export HBASE_CLASSPATH=/home/hadoop/hbase/conf
I edited hbase-site.xml, so I added the following propertie:
<property>
<name>hbase.defaults.for.version.skip</name>
<value>true</value>
</property>
Then editing the connector jar , indeed I unpackaged the jar and then I edited hbase-default.xml
<property>
<name>hbase.defaults.for.version.skip</name>
<value>true</value>
<description>Set to true to skip the 'hbase.defaults.for.version' check.
Setting this to true can be useful in contexts other than
the other side of a maven generation; i.e. running in an
IDE. You'll want to set this boolean to true to avoid
seeing the RuntimeException complaint: "hbase-default.xml file
seems to be for and old version of HBase (\${hbase.version}), this
version is X.X.X-SNAPSHOT"</description>
</property>
and finally, moving the jar in flink lib folder (it's better than :
table_env.get_config().get_configuration().set_string("pipeline.jars","file:///home/hadoop/hbase/conf/flink-sql-connector-hbase-2.2_2.11-1.14.4.jar")
)
this articles helped me a lot: https://www.cnblogs.com/panfeng412/archive/2012/07/22/hbase-exception-hbase-default-xml-file-seems-to-be-for-and-old-version-of-hbase.html