【Flink系列五】Flink-hive-connector 使用的HiveMetastoreClient不支援Kerberos代理
阿新 • • 發佈:2021-01-08
Flink-hive-connector
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
由於目前專案中連線的是Hive 1.1.0,而HADOOP_PROXY_USER是 2.3.0才支援的 HIVE-COMMIT
可以看到HiveMetaStoreClient.java中增加了如下程式碼:
//If HADOOP_PROXY_USER is set in env or property, //then need to create metastore client that proxies as that user. String HADOOP_PROXY_USER = "HADOOP_PROXY_USER"; String proxyUser = System.getenv(HADOOP_PROXY_USER); if (proxyUser == null) { proxyUser = System.getProperty(HADOOP_PROXY_USER); } //if HADOOP_PROXY_USER is set, create DelegationToken using real user if(proxyUser != null) { LOG.info(HADOOP_PROXY_USER + " is set. Using delegation " + "token for HiveMetaStore connection."); try { UserGroupInformation.getLoginUser().getRealUser().doAs( new PrivilegedExceptionAction<Void>() { @Override public Void run() throws Exception { open(); return null; } }); String delegationTokenPropString = "DelegationTokenForHiveMetaStoreServer"; String delegationTokenStr = getDelegationToken(proxyUser, proxyUser); Utils.setTokenStr(UserGroupInformation.getCurrentUser(), delegationTokenStr, delegationTokenPropString); this.conf.setVar(ConfVars.METASTORE_TOKEN_SIGNATURE, delegationTokenPropString); close(); } catch (Exception e) { LOG.error("Error while setting delegation token for " + proxyUser, e); if(e instanceof MetaException) { throw (MetaException)e; } else { throw new MetaException(e.getMessage()); } } }