聊聊rocketmq的AclClientRPCHook
序
本文主要研究一下rocketmq的AclClientRPCHook
RPCHook
rocketmq-remoting-4.5.2-sources.jar!/org/apache/rocketmq/remoting/RPCHook.java
public interface RPCHook {
void doBeforeRequest(final String remoteAddr,final RemotingCommand request);
void doAfterResponse(final String remoteAddr,final RemotingCommand request,final RemotingCommand response);
}
複製程式碼
- RPCHook定義了doBeforeRequest、doAfterResponse方法
AclClientRPCHook
rocketmq-acl-4.5.2-sources.jar!/org/apache/rocketmq/acl/common/AclClientRPCHook.java
public class AclClientRPCHook implements RPCHook {
private final SessionCredentials sessionCredentials;
protected ConcurrentHashMap<Class<? extends CommandCustomHeader>,Field[]> fieldCache =
new ConcurrentHashMap<Class<? extends CommandCustomHeader>,Field[]>();
public AclClientRPCHook(SessionCredentials sessionCredentials) {
this.sessionCredentials = sessionCredentials;
}
@Override
public void do BeforeRequest(String remoteAddr,RemotingCommand request) {
byte[] total = AclUtils.combineRequestContent(request,parseRequestContent(request,sessionCredentials.getAccessKey(),sessionCredentials.getSecurityToken()));
String signature = AclUtils.calSignature(total,sessionCredentials.getSecretKey());
request.addExtField(SIGNATURE,signature);
request.addExtField(ACCESS_KEY,sessionCredentials.getAccessKey());
// The SecurityToken value is unneccessary,user can choose this one.
if (sessionCredentials.getSecurityToken() != null) {
request.addExtField(SECURITY_TOKEN,sessionCredentials.getSecurityToken());
}
}
@Override
public void doAfterResponse(String remoteAddr,RemotingCommand request,RemotingCommand response) {
}
protected SortedMap<String,String> parseRequestContent(RemotingCommand request,String ak,String securityToken) {
CommandCustomHeader header = request.readCustomHeader();
// Sort property
SortedMap<String,String> map = new TreeMap<String,String>();
map.put(ACCESS_KEY,ak);
if (securityToken != null) {
map.put(SECURITY_TOKEN,securityToken);
}
try {
// Add header properties
if (null != header) {
Field[] fields = fieldCache.get(header.getClass());
if (null == fields) {
fields = header.getClass().getDeclaredFields();
for (Field field : fields) {
field.setAccessible(true);
}
Field[] tmp = fieldCache.putIfAbsent(header.getClass(),fields);
if (null != tmp) {
fields = tmp;
}
}
for (Field field : fields) {
Object value = field.get(header);
if (null != value && !field.isSynthetic()) {
map.put(field.getName(),value.toString());
}
}
}
return map;
} catch (Exception e) {
throw new RuntimeException("incompatible exception.",e);
}
}
public SessionCredentials getSessionCredentials() {
return sessionCredentials;
}
}
複製程式碼
- AclClientRPCHook實現了RPCHook介面,其構造器接收SessionCredentials引數;其doBeforeRequest首先通過parseRequestContent從request讀取CommandCustomHeader,將其field連同accessKey、securityToken放到一個SortedMap,再通過AclUtils.combineRequestContent計算要傳送的請求內容;然後通過AclUtils.calSignature計算出signature,最後往request的extFields新增SIGNATURE、ACCESS_KEY;若設定securityToken,則會往request的extFields新增SECURITY_TOKEN
SessionCredentials
rocketmq-acl-4.5.2-sources.jar!/org/apache/rocketmq/acl/common/SessionCredentials.java
public class SessionCredentials {
public static final Charset CHARSET = Charset.forName("UTF-8");
public static final String ACCESS_KEY = "AccessKey";
public static final String SECRET_KEY = "SecretKey";
public static final String SIGNATURE = "Signature";
public static final String SECURITY_TOKEN = "SecurityToken";
public static final String KEY_FILE = System.getProperty("rocketmq.client.keyFile",System.getProperty("user.home") + File.separator + "key");
private String accessKey;
private String secretKey;
private String securityToken;
private String signature;
public SessionCredentials() {
String keyContent = null;
try {
keyContent = MixAll.file2String(KEY_FILE);
} catch (IOException ignore) {
}
if (keyContent != null) {
Properties prop = MixAll.string2Properties(keyContent);
if (prop != null) {
this.updateContent(prop);
}
}
}
public SessionCredentials(String accessKey,String secretKey) {
this.accessKey = accessKey;
this.secretKey = secretKey;
}
public SessionCredentials(String accessKey,String secretKey,String securityToken) {
this(accessKey,secretKey);
this.securityToken = securityToken;
}
public void updateContent(Properties prop) {
{
String value = prop.getProperty(ACCESS_KEY);
if (value != null) {
this.accessKey = value.trim();
}
}
{
String value = prop.getProperty(SECRET_KEY);
if (value != null) {
this.secretKey = value.trim();
}
}
{
String value = prop.getProperty(SECURITY_TOKEN);
if (value != null) {
this.securityToken = value.trim();
}
}
}
//......
}
複製程式碼
- SessionCredentials提供了三個構造器,一個無參構造器從KEY_FILE載入keyContent然後解析為Properties再通過updateContent方法給accessKey、secretKey、securityToken賦值;一個是accessKey,secretKey的構造器;還有一個是accessKey,secretKey,securityToken的構造器
AclUtils
rocketmq-acl-4.5.2-sources.jar!/org/apache/rocketmq/acl/common/AclUtils.java
public class AclUtils {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
public static byte[] combineRequestContent(RemotingCommand request,SortedMap<String,String> fieldsMap) {
try {
StringBuilder sb = new StringBuilder("");
for (Map.Entry<String,String> entry : fieldsMap.entrySet()) {
if (!SessionCredentials.SIGNATURE.equals(entry.getKey())) {
sb.append(entry.getValue());
}
}
return AclUtils.combineBytes(sb.toString().getBytes(CHARSET),request.getBody());
} catch (Exception e) {
throw new RuntimeException("Incompatible exception.",e);
}
}
public static byte[] combineBytes(byte[] b1,byte[] b2) {
int size = (null != b1 ? b1.length : 0) + (null != b2 ? b2.length : 0);
byte[] total = new byte[size];
if (null != b1)
System.arraycopy(b1,total,b1.length);
if (null != b2)
System.arraycopy(b2,b1.length,b2.length);
return total;
}
public static String calSignature(byte[] data,String secretKey) {
String signature = AclSigner.calSignature(data,secretKey);
return signature;
}
//......
}
複製程式碼
- combineRequestContent首先將fieldsMap拼接為字串,然後通過AclUtils.combineBytes將其與request.getBody()結合在一起;calSignature方法內部是委託給AclSigner.calSignature(data,secretKey)來實現
AclSigner
rocketmq-acl-4.5.2-sources.jar!/org/apache/rocketmq/acl/common/AclSigner.java
public class AclSigner {
public static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");
public static final SigningAlgorithm DEFAULT_ALGORITHM = SigningAlgorithm.HmacSHA1;
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.ROCKETMQ_AUTHORIZE_LOGGER_NAME);
private static final int CAL_SIGNATURE_FAILED = 10015;
private static final String CAL_SIGNATURE_FAILED_MSG = "[%s:signature-failed] unable to calculate a request signature. error=%s";
public static String calSignature(String data,String key) throws AclException {
return calSignature(data,key,DEFAULT_ALGORITHM,DEFAULT_CHARSET);
}
public static String calSignature(String data,String key,SigningAlgorithm algorithm,Charset charset) throws AclException {
return signAndBase64Encode(data,algorithm,charset);
}
private static String signAndBase64Encode(String data,Charset charset)
throws AclException {
try {
byte[] signature = sign(data.getBytes(charset),key.getBytes(charset),algorithm);
return new String(Base64.encodeBase64(signature),DEFAULT_CHARSET);
} catch (Exception e) {
String message = String.format(CAL_SIGNATURE_FAILED_MSG,CAL_SIGNATURE_FAILED,e.getMessage());
log.error(message,e);
throw new AclException("CAL_SIGNATURE_FAILED",message,e);
}
}
private static byte[] sign(byte[] data,byte[] key,SigningAlgorithm algorithm) throws AclException {
try {
Mac mac = Mac.getInstance(algorithm.toString());
mac.init(new SecretKeySpec(key,algorithm.toString()));
return mac.doFinal(data);
} catch (Exception e) {
String message = String.format(CAL_SIGNATURE_FAILED_MSG,e);
}
}
public static String calSignature(byte[] data,DEFAULT_CHARSET);
}
public static String calSignature(byte[] data,charset);
}
private static String signAndBase64Encode(byte[] data,Charset charset)
throws AclException {
try {
byte[] signature = sign(data,e);
}
}
}
複製程式碼
- calSignature預設使用的是SigningAlgorithm.HmacSHA1及Charset.forName("UTF-8")字符集來簽名;signAndBase64Encode方法首先通過sign方法簽名,然後將其轉為Base64的字串
小結
AclClientRPCHook實現了RPCHook介面,其構造器接收SessionCredentials引數;其doBeforeRequest首先通過parseRequestContent從request讀取CommandCustomHeader,將其field連同accessKey、securityToken放到一個SortedMap,再通過AclUtils.combineRequestContent計算要傳送的請求內容;然後通過AclUtils.calSignature計算出signature,最後往request的extFields新增SIGNATURE、ACCESS_KEY;若設定securityToken,則會往request的extFields新增SECURITY_TOKEN