1. 程式人生 > 程式設計 >聊聊rocketmq的AclClientRPCHook

聊聊rocketmq的AclClientRPCHook

本文主要研究一下rocketmq的AclClientRPCHook

RPCHook

rocketmq-remoting-4.5.2-sources.jar!/org/apache/rocketmq/remoting/RPCHook.java

public interface RPCHook {
    void doBeforeRequest(final String remoteAddr,final RemotingCommand request);

    void doAfterResponse(final String remoteAddr,final RemotingCommand request,final RemotingCommand response);
}
複製程式碼
  • RPCHook定義了doBeforeRequest、doAfterResponse方法

AclClientRPCHook

rocketmq-acl-4.5.2-sources.jar!/org/apache/rocketmq/acl/common/AclClientRPCHook.java

public class AclClientRPCHook implements RPCHook {
    private final SessionCredentials sessionCredentials;
    protected ConcurrentHashMap<Class<? extends CommandCustomHeader>,Field[]> fieldCache =
        new ConcurrentHashMap<Class<? extends CommandCustomHeader>,Field[]>();

    public AclClientRPCHook(SessionCredentials sessionCredentials) {
        this.sessionCredentials = sessionCredentials;
    }

    @Override
    public void do
BeforeRequest(String remoteAddr,RemotingCommand request) { byte[] total = AclUtils.combineRequestContent(request,parseRequestContent(request,sessionCredentials.getAccessKey(),sessionCredentials.getSecurityToken())); String signature = AclUtils.calSignature(total,sessionCredentials.getSecretKey()); request.addExtField(SIGNATURE,signature); request.addExtField(ACCESS_KEY,sessionCredentials.getAccessKey()); // The SecurityToken value is unneccessary,user can choose this one. if
(sessionCredentials.getSecurityToken() != null) { request.addExtField(SECURITY_TOKEN,sessionCredentials.getSecurityToken()); } } @Override public void doAfterResponse(String remoteAddr,RemotingCommand request,RemotingCommand response) { } protected SortedMap<String,String> parseRequestContent(RemotingCommand request,String ak,String securityToken) { CommandCustomHeader header = request.readCustomHeader(); // Sort property SortedMap<String,String> map = new TreeMap<String,String>(); map.put(ACCESS_KEY,ak); if (securityToken != null) { map.put(SECURITY_TOKEN,securityToken); } try { // Add header properties if (null != header) { Field[] fields = fieldCache.get(header.getClass()); if (null == fields) { fields = header.getClass().getDeclaredFields(); for (Field field : fields) { field.setAccessible(true); } Field[] tmp = fieldCache.putIfAbsent(header.getClass(),fields); if (null != tmp) { fields = tmp; } } for (Field field : fields) { Object value = field.get(header); if (null != value && !field.isSynthetic()) { map.put(field.getName(),value.toString()); } } } return map; } catch (Exception e) { throw new RuntimeException("incompatible exception.",e); } } public SessionCredentials getSessionCredentials() { return sessionCredentials; } } 複製程式碼
  • AclClientRPCHook實現了RPCHook介面,其構造器接收SessionCredentials引數;其doBeforeRequest首先通過parseRequestContent從request讀取CommandCustomHeader,將其field連同accessKey、securityToken放到一個SortedMap,再通過AclUtils.combineRequestContent計算要傳送的請求內容;然後通過AclUtils.calSignature計算出signature,最後往request的extFields新增SIGNATURE、ACCESS_KEY;若設定securityToken,則會往request的extFields新增SECURITY_TOKEN

SessionCredentials

rocketmq-acl-4.5.2-sources.jar!/org/apache/rocketmq/acl/common/SessionCredentials.java

public class SessionCredentials {
    public static final Charset CHARSET = Charset.forName("UTF-8");
    public static final String ACCESS_KEY = "AccessKey";
    public static final String SECRET_KEY = "SecretKey";
    public static final String SIGNATURE = "Signature";
    public static final String SECURITY_TOKEN = "SecurityToken";

    public static final String KEY_FILE = System.getProperty("rocketmq.client.keyFile",System.getProperty("user.home") + File.separator + "key");

    private String accessKey;
    private String secretKey;
    private String securityToken;
    private String signature;

    public SessionCredentials() {
        String keyContent = null;
        try {
            keyContent = MixAll.file2String(KEY_FILE);
        } catch (IOException ignore) {
        }
        if (keyContent != null) {
            Properties prop = MixAll.string2Properties(keyContent);
            if (prop != null) {
                this.updateContent(prop);
            }
        }
    }

    public SessionCredentials(String accessKey,String secretKey) {
        this.accessKey = accessKey;
        this.secretKey = secretKey;
    }

    public SessionCredentials(String accessKey,String secretKey,String securityToken) {
        this(accessKey,secretKey);
        this.securityToken = securityToken;
    }

    public void updateContent(Properties prop) {
        {
            String value = prop.getProperty(ACCESS_KEY);
            if (value != null) {
                this.accessKey = value.trim();
            }
        }
        {
            String value = prop.getProperty(SECRET_KEY);
            if (value != null) {
                this.secretKey = value.trim();
            }
        }
        {
            String value = prop.getProperty(SECURITY_TOKEN);
            if (value != null) {
                this.securityToken = value.trim();
            }
        }
    }

    //......
}
複製程式碼
  • SessionCredentials提供了三個構造器,一個無參構造器從KEY_FILE載入keyContent然後解析為Properties再通過updateContent方法給accessKey、secretKey、securityToken賦值;一個是accessKey,secretKey的構造器;還有一個是accessKey,secretKey,securityToken的構造器

AclUtils

rocketmq-acl-4.5.2-sources.jar!/org/apache/rocketmq/acl/common/AclUtils.java

public class AclUtils {

    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);

    public static byte[] combineRequestContent(RemotingCommand request,SortedMap<String,String> fieldsMap) {
        try {
            StringBuilder sb = new StringBuilder("");
            for (Map.Entry<String,String> entry : fieldsMap.entrySet()) {
                if (!SessionCredentials.SIGNATURE.equals(entry.getKey())) {
                    sb.append(entry.getValue());
                }
            }

            return AclUtils.combineBytes(sb.toString().getBytes(CHARSET),request.getBody());
        } catch (Exception e) {
            throw new RuntimeException("Incompatible exception.",e);
        }
    }

    public static byte[] combineBytes(byte[] b1,byte[] b2) {
        int size = (null != b1 ? b1.length : 0) + (null != b2 ? b2.length : 0);
        byte[] total = new byte[size];
        if (null != b1)
            System.arraycopy(b1,total,b1.length);
        if (null != b2)
            System.arraycopy(b2,b1.length,b2.length);
        return total;
    }

    public static String calSignature(byte[] data,String secretKey) {
        String signature = AclSigner.calSignature(data,secretKey);
        return signature;
    }

    //......
}
複製程式碼
  • combineRequestContent首先將fieldsMap拼接為字串,然後通過AclUtils.combineBytes將其與request.getBody()結合在一起;calSignature方法內部是委託給AclSigner.calSignature(data,secretKey)來實現

AclSigner

rocketmq-acl-4.5.2-sources.jar!/org/apache/rocketmq/acl/common/AclSigner.java

public class AclSigner {
    public static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");
    public static final SigningAlgorithm DEFAULT_ALGORITHM = SigningAlgorithm.HmacSHA1;
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.ROCKETMQ_AUTHORIZE_LOGGER_NAME);
    private static final int CAL_SIGNATURE_FAILED = 10015;
    private static final String CAL_SIGNATURE_FAILED_MSG = "[%s:signature-failed] unable to calculate a request signature. error=%s";

    public static String calSignature(String data,String key) throws AclException {
        return calSignature(data,key,DEFAULT_ALGORITHM,DEFAULT_CHARSET);
    }

    public static String calSignature(String data,String key,SigningAlgorithm algorithm,Charset charset) throws AclException {
        return signAndBase64Encode(data,algorithm,charset);
    }

    private static String signAndBase64Encode(String data,Charset charset)
        throws AclException {
        try {
            byte[] signature = sign(data.getBytes(charset),key.getBytes(charset),algorithm);
            return new String(Base64.encodeBase64(signature),DEFAULT_CHARSET);
        } catch (Exception e) {
            String message = String.format(CAL_SIGNATURE_FAILED_MSG,CAL_SIGNATURE_FAILED,e.getMessage());
            log.error(message,e);
            throw new AclException("CAL_SIGNATURE_FAILED",message,e);
        }
    }

    private static byte[] sign(byte[] data,byte[] key,SigningAlgorithm algorithm) throws AclException {
        try {
            Mac mac = Mac.getInstance(algorithm.toString());
            mac.init(new SecretKeySpec(key,algorithm.toString()));
            return mac.doFinal(data);
        } catch (Exception e) {
            String message = String.format(CAL_SIGNATURE_FAILED_MSG,e);
        }
    }

    public static String calSignature(byte[] data,DEFAULT_CHARSET);
    }

    public static String calSignature(byte[] data,charset);
    }

    private static String signAndBase64Encode(byte[] data,Charset charset)
        throws AclException {
        try {
            byte[] signature = sign(data,e);
        }
    }

}
複製程式碼
  • calSignature預設使用的是SigningAlgorithm.HmacSHA1及Charset.forName("UTF-8")字符集來簽名;signAndBase64Encode方法首先通過sign方法簽名,然後將其轉為Base64的字串

小結

AclClientRPCHook實現了RPCHook介面,其構造器接收SessionCredentials引數;其doBeforeRequest首先通過parseRequestContent從request讀取CommandCustomHeader,將其field連同accessKey、securityToken放到一個SortedMap,再通過AclUtils.combineRequestContent計算要傳送的請求內容;然後通過AclUtils.calSignature計算出signature,最後往request的extFields新增SIGNATURE、ACCESS_KEY;若設定securityToken,則會往request的extFields新增SECURITY_TOKEN

doc