hdfs auditlog(審計日誌)
hdfs審計日誌(Auditlog)記錄了使用者針對hdfs的所有操作,詳細資訊包括操作成功與否、使用者名稱稱、客戶機地址、操作命令、操作的目錄等。對於使用者的每一個操作,namenode都會將這些資訊以key-value對的形式組織成固定格式的一條日誌,然後記錄到audit.log檔案中。通過審計日誌,我們可以實時檢視hdfs的各種操作狀況、可以追蹤各種誤操作、可以做一些指標監控等等。
hdfs的審計日誌功能是可插拔的,使用者可以通過實現預設介面擴展出滿足自己所需的外掛來替換hdfs預設提供的審計日誌功能,或者與之並用。
啟用審計日誌
如果僅僅只啟用預設的AuditLogger(DefaultAuditLogger),則在log4j.properties新增如下配置(hdfs.audit.logger必須配置為INFO級別)即可,審計日誌會與namenode的系統日誌獨立開來儲存,log4j.appender.RFAAUDIT.File可配置儲存的位置及檔案。 FSNamesystem根據log4j.properties中hdfs.audit.logger是否為INFO,以及是否配置了DefaultAuditLogger之外的其他AuditLogger,來決定是否啟用審計日誌功能。
# # hdfs audit logging # hdfs.audit.logger=INFO,NullAppender hdfs.audit.log.maxfilesize=256MB hdfs.audit.log.maxbackupindex=20 log4j.logger.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=${hdfs.audit.logger} log4j.additivity.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=false log4j.appender.RFAAUDIT=org.apache.log4j.RollingFileAppender log4j.appender.RFAAUDIT.File=${hadoop.log.dir}/hdfs-audit.log log4j.appender.RFAAUDIT.layout=org.apache.log4j.PatternLayout log4j.appender.RFAAUDIT.layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n log4j.appender.RFAAUDIT.MaxFileSize=${hdfs.audit.log.maxfilesize} log4j.appender.RFAAUDIT.MaxBackupIndex=${hdfs.audit.log.maxbackupindex}
審計日誌的介面及實現
Namenode開放了AuditLogger介面,並定義抽象類HdfsAuditLogger 實現AuditLogger,預設提供實現類DefaultAuditLogger。構造FSNamesystem時通過initAuditLoggers(Configuration conf)方法建立AuditLogger列表。在記錄使用者操作時,會將操作資訊逐一傳給列表中的每一個AuditLogger,由其做對應的審計處理。
通過實現Auditloger介面或者擴充套件HdfsAuditLogger類,使用者可以實現自己的AuditLogger來滿足所需,例如有針對性的記錄審計日誌(當叢集、訪問量上規模之後瘋狂刷日誌必然對namenode有影響,有針對性的記錄有必要的日誌是緩解此狀況的一種可選方案)、擴充套件功能、將日誌接入實時系統做實時分析或監控等。使用者通過配置項dfs.namenode.audit.loggers在hdfs-site.xml中配置Auditloger的實現類,多個實現可以通過逗號分開,更改配置後重啟namenode介面生效。FSNamesystem的initAuditLoggers(Configuration conf)方法通過該配置項載入並例項化實現類,初始化後存入集合。如果使用者沒有配置,那麼預設使用DefaultAuditLogger。如果啟動了nntop功能,還會使用TopAuditLogger。
FSNamesystem 初始化所有的AuditLogger:
private List<AuditLogger> initAuditLoggers(Configuration conf) {
// Initialize the custom access loggers if configured.
//DFS_NAMENODE_AUDIT_LOGGERS_KEY=dfs.namenode.audit.loggers
Collection<String> alClasses = conf.getStringCollection(DFS_NAMENODE_AUDIT_LOGGERS_KEY);
List<AuditLogger> auditLoggers = Lists.newArrayList();
if (alClasses != null && !alClasses.isEmpty()) {
for (String className : alClasses) {
try {
AuditLogger logger;
if (DFS_NAMENODE_DEFAULT_AUDIT_LOGGER_NAME.equals(className)) {
logger = new DefaultAuditLogger();
} else {
logger = (AuditLogger) Class.forName(className).newInstance();
}
logger.initialize(conf);
auditLoggers.add(logger);
} catch (RuntimeException re) {
throw re;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
// Make sure there is at least one logger installed.
// 如果使用者沒有提供AuditLoggers,則預設使用DefaultAuditLogger
if (auditLoggers.isEmpty()) {
auditLoggers.add(new DefaultAuditLogger());
}
// Add audit logger to calculate top users
// 預設topConf.isEnabled是開啟的,用於指標聚合、上報
// TopAuditLogger類似 top命令
if (topConf.isEnabled) {
topMetrics = new TopMetrics(conf, topConf.nntopReportingPeriodsMs);
auditLoggers.add(new TopAuditLogger(topMetrics));
}
return Collections.unmodifiableList(auditLoggers);
}
DefaultAuditLogger記錄日誌:
@Override
public void logAuditEvent(boolean succeeded, String userName,
InetAddress addr, String cmd, String src, String dst,
FileStatus status, UserGroupInformation ugi,
DelegationTokenSecretManager dtSecretManager) {
if (auditLog.isInfoEnabled()) {
final StringBuilder sb = auditBuffer.get();
sb.setLength(0);
sb.append("allowed=").append(succeeded).append("\t");
sb.append("ugi=").append(userName).append("\t");
sb.append("ip=").append(addr).append("\t");
sb.append("cmd=").append(cmd).append("\t");
sb.append("src=").append(src).append("\t");
sb.append("dst=").append(dst).append("\t");
if (null == status) {
sb.append("perm=null");
} else {
sb.append("perm=");
sb.append(status.getOwner()).append(":");
sb.append(status.getGroup()).append(":");
sb.append(status.getPermission());
}
if (logTokenTrackingId) {
sb.append("\t").append("trackingId=");
String trackingId = null;
if (ugi != null && dtSecretManager != null
&& ugi.getAuthenticationMethod() == AuthenticationMethod.TOKEN) {
for (TokenIdentifier tid: ugi.getTokenIdentifiers()) {
if (tid instanceof DelegationTokenIdentifier) {
DelegationTokenIdentifier dtid =
(DelegationTokenIdentifier)tid;
trackingId = dtSecretManager.getTokenTrackingId(dtid);
break;
}
}
}
sb.append(trackingId);
}
sb.append("\t").append("proto=");
sb.append(NamenodeWebHdfsMethods.isWebHdfsInvocation() ? "webhdfs" : "rpc");
logAuditMessage(sb.toString());
}
}
public void logAuditMessage(String message) {
auditLog.info(message);
}
審計過程
客戶端對hdfs的所有操作,不管成功與否都會由FSNamesystem記錄下。以刪除操作為例,FSNamesystem在正常刪除給定src後呼叫logAuditEvent(true, "delete", src)記錄此次成功的delete操作,如果刪除失敗丟擲異常,則呼叫logAuditEvent(false, "delete", src)記錄此次失敗的delete操作。
boolean delete(String src, boolean recursive, boolean logRetryCache)
throws IOException {
waitForLoadingFSImage();
BlocksMapUpdateInfo toRemovedBlocks = null;
writeLock();
boolean ret = false;
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot delete " + src);
toRemovedBlocks = FSDirDeleteOp.delete(
this, src, recursive, logRetryCache);
ret = toRemovedBlocks != null;
} catch (AccessControlException e) {
logAuditEvent(false, "delete", src);
throw e;
} finally {
writeUnlock();
}
getEditLog().logSync();
if (toRemovedBlocks != null) {
removeBlocks(toRemovedBlocks); // Incremental deletion of blocks
}
logAuditEvent(true, "delete", src);
return ret;
}
//判斷是否是外部呼叫,只對rpc呼叫和webHdfs呼叫做審計
boolean isExternalInvocation() {
return Server.isRpcInvocation() || NamenodeWebHdfsMethods.isWebHdfsInvocation();
}
//判斷是否啟用審計日誌功能
public boolean isAuditEnabled() {
return !isDefaultAuditLogger || auditLog.isInfoEnabled();
}
//succeeded:操作是否成功 cmd:操作命令 src:操作物件
private void logAuditEvent(boolean succeeded, String cmd, String src)
throws IOException {
logAuditEvent(succeeded, cmd, src, null, null);
}
private void logAuditEvent(boolean succeeded, String cmd, String src,
String dst, HdfsFileStatus stat) throws IOException {
if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(succeeded, getRemoteUser(), getRemoteIp(),
cmd, src, dst, stat);
}
}
//獲取操作物件的資訊,呼叫所有的auditloger 做審計
private void logAuditEvent(boolean succeeded,
UserGroupInformation ugi, InetAddress addr, String cmd, String src,
String dst, HdfsFileStatus stat) {
FileStatus status = null;
if (stat != null) {
Path symlink = stat.isSymlink() ? new Path(stat.getSymlink()) : null;
Path path = dst != null ? new Path(dst) : new Path(src);
status = new FileStatus(stat.getLen(), stat.isDir(),
stat.getReplication(), stat.getBlockSize(), stat.getModificationTime(),
stat.getAccessTime(), stat.getPermission(), stat.getOwner(),
stat.getGroup(), symlink, path);
}
for (AuditLogger logger : auditLoggers) {
if (logger instanceof HdfsAuditLogger) {
HdfsAuditLogger hdfsLogger = (HdfsAuditLogger) logger;
hdfsLogger.logAuditEvent(succeeded, ugi.toString(), addr, cmd, src, dst,
status, ugi, dtSecretManager);
} else {
logger.logAuditEvent(succeeded, ugi.toString(), addr,
cmd, src, dst, status);
}
}
}
審計日誌接入實時系統的方法
- 方法1:擴充套件Log4J的appender,由appender將日誌傳送到kafka。
- 方法2:直接讓kafka的producer讀取日誌檔案。