Thrift方式連線hiveServer2+Kerberos
阿新 • • 發佈:2018-12-09
最近在做一個大資料查詢平臺,後端引擎有部分用了hive,通過thrift的方式連線hiveServer2,由於叢集加了kerberos,所以實現thrift連線hiveServer2的時候需要加上kerberos認證。網上查了很多文章,寫的thrift連線hive都沒有kerberos,分享一下,以供需要通過thrift連線hiveService2並需要開啟Kerberos認證的同學一個參考,以便能夠快速解決問題。
寫了一個hive-thrift-demo已經放到了github,有需要的同學自行下載完整程式碼。
https://github.com/clj198606061111/hive-thrift-demo
KerberosLogin.java
package com.itclj.kerberos;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Create by ice.chen on 2018/11/16
*/
public class KerberosLogin {
private Logger logger= LoggerFactory.getLogger(KerberosLogin.class);
public void login() {
String hiveUserName = "finance";
String hiveKeytab = "D:\\home\\Data\\dsp.itclj.lotest\\kerberos\\finance.keytab";
String krbconf = "D:\\home\\Data\\dsp.itclj.lotest\\kerberos\\krb5.conf";
System.setProperty("java.security.krb5.conf", krbconf);
Configuration conf = new Configuration();
conf.set("hadoop.security.authentication", "Kerberos");
UserGroupInformation.setConfiguration(conf);
try {
UserGroupInformation.loginUserFromKeytab(hiveUserName, hiveKeytab);
} catch (IOException e) {
logger.error("Kerberos login fail.", e);
}
}
}
QueryTool.java
package com.itclj.hive;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import javax.security.sasl.Sasl;
import org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.service.cli.thrift.TCLIService;
import org.apache.hive.service.cli.thrift.TOpenSessionReq;
import org.apache.hive.service.cli.thrift.TOpenSessionResp;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TSaslClientTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Create by ice.chen on 2018/11/15
*/
public class QueryTool {
private static Logger logger = LoggerFactory.getLogger(QueryTool.class);
public static TTransport getSocketInstance(String host, int port)
throws IOException {
TTransport transport = new TSocket(host, port);
//transport.setTimeout(100000);
Map<String, String> saslProperties = new HashMap<String, String>();
saslProperties.put("javax.security.sasl.qop", "auth");//kerberos 認證關鍵引數
saslProperties.put("javax.security.sasl.server.authentication","true");//kerberos 認證關鍵引數
logger.info("Security is enabled: {}", UserGroupInformation.isSecurityEnabled());
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
logger.info("Current user: {}", currentUser);
TSaslClientTransport saslTransport = new TSaslClientTransport(
"GSSAPI", // tell SASL to use GSSAPI, which supports Kerberos
null, // authorizationid - null
"hive", // kerberos primary for server - "myprincipal" in myprincipal/[email protected]
"gz-cashloan-test-app26-80-43.itclj.host",// kerberos instance for server - "my.server.com" in myprincipal/[email protected]
saslProperties, // Properties set, above
null, // callback handler - null
transport); // underlying transport
TUGIAssumingTransport ugiTransport = new TUGIAssumingTransport(saslTransport, currentUser);
return ugiTransport;
}
/**
* 如果使用此方法中設定的user進行訪問,則需要 HiveServer2 啟用模擬 hive.server2.enable.impersonation,
* hive.server2.enable.doAs = true即HiveServer2 Default Group打鉤 獲取TOpenSessionResp
*
* @return
* @throws TException
*/
/**/
public static TOpenSessionResp openSession(TCLIService.Client client, String user, String pwd)
throws TException {
TOpenSessionReq openSessionReq = new TOpenSessionReq();
openSessionReq.setUsername(user);
openSessionReq.setPassword(pwd);
openSessionReq.setUsernameIsSet(true);
return client.OpenSession(openSessionReq);
}
public static TOpenSessionResp openSession(TCLIService.Client client) throws TException {
TOpenSessionReq openSessionReq = new TOpenSessionReq();
return client.OpenSession(openSessionReq);
}
}
QueryInstance.java
package com.itclj.hive;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.security.sasl.SaslException;
import org.apache.hive.service.cli.RowSet;
import org.apache.hive.service.cli.RowSetFactory;
import org.apache.hive.service.cli.thrift.TCLIService;
import org.apache.hive.service.cli.thrift.TCancelOperationReq;
import org.apache.hive.service.cli.thrift.TColumn;
import org.apache.hive.service.cli.thrift.TColumnDesc;
import org.apache.hive.service.cli.thrift.TExecuteStatementReq;
import org.apache.hive.service.cli.thrift.TExecuteStatementResp;
import org.apache.hive.service.cli.thrift.TFetchOrientation;
import org.apache.hive.service.cli.thrift.TFetchResultsReq;
import org.apache.hive.service.cli.thrift.TFetchResultsResp;
import org.apache.hive.service.cli.thrift.TGetOperationStatusReq;
import org.apache.hive.service.cli.thrift.TGetOperationStatusResp;
import org.apache.hive.service.cli.thrift.TGetResultSetMetadataReq;
import org.apache.hive.service.cli.thrift.TGetResultSetMetadataResp;
import org.apache.hive.service.cli.thrift.TOpenSessionResp;
import org.apache.hive.service.cli.thrift.TOperationHandle;
import org.apache.hive.service.cli.thrift.TOperationState;
import org.apache.hive.service.cli.thrift.TProtocolVersion;
import org.apache.hive.service.cli.thrift.TRowSet;
import org.apache.hive.service.cli.thrift.TSessionHandle;
import org.apache.hive.service.cli.thrift.TTableSchema;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Create by ice.chen on 2018/11/15
*/
public class QueryInstance {
private static Logger logger = LoggerFactory.getLogger(QueryInstance.class);
private static String host = "10.250.80.43";
private static int port = 10000;
private static String username = "hive";
private static String passsword = "hive";
private static TTransport transport;
private static TCLIService.Client client;
private TOperationState tOperationState = null;
private Map<String, Object> resultMap = new HashMap<String, Object>();
static {
try {
transport = QueryTool.getSocketInstance(host, port);
client = new TCLIService.Client(new TBinaryProtocol(transport));
transport.open();
} catch (TTransportException | IOException e) {
logger.error("hive collection error!", e);
}
}
/**
* 提交查詢
*
* @param command
* @return
* @throws Exception
*/
public TOperationHandle submitQuery(String command) throws Exception {
TOperationHandle tOperationHandle;
TExecuteStatementResp resp = null;
//TOpenSessionResp sessionResp = QueryTool.openSession(client, username, passsword) ;
//TSessionHandle sessHandle = sessionResp.getSessionHandle();
TOpenSessionResp sessionResp = QueryTool.openSession(client);
TSessionHandle sessHandle = sessionResp.getSessionHandle();
TExecuteStatementReq execReq = new TExecuteStatementReq(sessHandle, command);
// 非同步執行
execReq.setRunAsync(true);
// 執行sql
resp = client.ExecuteStatement(execReq);// 執行語句
tOperationHandle = resp.getOperationHandle();// 獲取執行的handle
if (tOperationHandle == null) {
//語句執行異常時,會把異常資訊放在resp.getStatus()中。
throw new Exception(resp.getStatus().getErrorMessage());
}
return tOperationHandle;
}
/*
// 獲取查詢日誌 hive.version: 0.12.0-cdh5.0.1(5.3.10前,使用此方法獲取查詢日誌)
@Override
public String getQueryLog(TOperationHandle tOperationHandle) throws Exception {
if(tOperationHandle!=null){
TGetLogReq tGetLogReq = new TGetLogReq(tOperationHandle);
TGetLogResp logResp = client.GetLog(tGetLogReq);
log = logResp.getLog();
}
return log;
}*/
/**
* CDH5.4.0及之後版本 使用此方法獲取查詢日誌
*
* @param tOperationHandle
* @return
* @throws Exception
*/
public String getQueryLog(TOperationHandle tOperationHandle) throws Exception {
String log = "";
if (tOperationHandle != null) {
StringBuffer sbLog = new StringBuffer();
TFetchResultsReq fetchReq = new TFetchResultsReq(tOperationHandle,
TFetchOrientation.FETCH_NEXT, 1000);
fetchReq.setFetchType((short) 1); //主要需要設定為1
TFetchResultsResp resp = client.FetchResults(fetchReq);
TRowSet rs = resp.getResults();
if (null != rs) {
RowSet rowSet = RowSetFactory.create(rs, TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V7);
for (Object[] row : rowSet) {
sbLog.append(String.valueOf(row[0])).append("\n");
}
}
log = sbLog.toString();
}
return log;
}
/*
* 獲取查詢狀態
* 執行狀態在TOperationState 類中,包括:
* INITIALIZED_STATE(0),
* RUNNING_STATE(1),
* FINISHED_STATE(2),
* CANCELED_STATE(3),
* CLOSED_STATE(4),
* ERROR_STATE(5),
* UKNOWN_STATE(6),
* PENDING_STATE(7);
*/
public TOperationState getQueryHandleStatus(
TOperationHandle tOperationHandle) throws Exception {
if (tOperationHandle != null) {
TGetOperationStatusReq statusReq = new TGetOperationStatusReq(
tOperationHandle);
TGetOperationStatusResp statusResp = client
.GetOperationStatus(statusReq);
tOperationState = statusResp.getOperationState();
}
return tOperationState;
}
/**
* 獲取查詢欄位名
*
* @param tOperationHandle
* @return
* @throws Throwable
*/
public List<String> getColumns(TOperationHandle tOperationHandle)
throws Throwable {
TGetResultSetMetadataResp metadataResp;
TGetResultSetMetadataReq metadataReq;
TTableSchema tableSchema;
metadataReq = new TGetResultSetMetadataReq(tOperationHandle);
metadataResp = client.GetResultSetMetadata(metadataReq);
List<TColumnDesc> columnDescs;
List<String> columns = null;
tableSchema = metadataResp.getSchema();
if (tableSchema != null) {
columnDescs = tableSchema.getColumns();
columns = new ArrayList<String>();
for (TColumnDesc tColumnDesc : columnDescs) {
columns.add(tColumnDesc.getColumnName());
}
}
return columns;
}
/**
* 獲取執行結果 select語句 得到的結果為以列的形式返回
*/
public List<Object> getResults(TOperationHandle tOperationHandle) throws Throwable {
TFetchResultsReq fetchReq = new TFetchResultsReq();
fetchReq.setOperationHandle(tOperationHandle);
fetchReq.setMaxRows(1000);
TFetchResultsResp re = client.FetchResults(fetchReq);
TRowSet rowSet = re.getResults();
if (rowSet == null) {
return null;
}
List<TColumn> list = re.getResults().getColumns();
List<Object> list_row = new ArrayList<Object>();
for (TColumn field : list) {
if (field.isSetStringVal()) {
list_row.add(field.getStringVal().getValues());
} else if (field.isSetDoubleVal()) {
list_row.add(field.getDoubleVal().getValues());
} else if (field.isSetI16Val()) {
list_row.add(field.getI16Val().getValues());
} else if (field.isSetI32Val()) {
list_row.add(field.getI32Val().getValues());
} else if (field.isSetI64Val()) {
list_row.add(field.getI64Val().getValues());
} else if (field.isSetBoolVal()) {
list_row.add(field.getBoolVal().getValues());
} else if (field.isSetByteVal()) {
list_row.add(field.getByteVal().getValues());
}
}
/*for(Object obj:list_row){
System.out.println(obj);
}*/
return list_row;
}
/**
* 轉換查詢結果,由原來的列轉為行
*
* @param objs
* @return
*/
public List<Object> toResults(List<Object> objs) {
List<Object> rets = new ArrayList<Object>();
if (objs != null) {
List row = null;
List list = (List) objs.get(0);
int rowCnt = list.size();
for (int i = 0; i < rowCnt; i++) {
rets.add(new ArrayList());
}
for (int i = 0; i < objs.size(); i++) {
list = (List) objs.get(i);
for (int j = 0; j < rowCnt; j++) {
((List) rets.get(j)).add(list.get(j)