1. 程式人生 > >zipkin+sleuth定時任務生成全域性呼叫鏈(十二)

zipkin+sleuth定時任務生成全域性呼叫鏈(十二)

問題背景

github提供的方式,是一個zipkin-dependencies-xxx.jar ,通過啟動這個jar包生成呼叫鏈,這個jar包啟動後,執行一次執行緒就會自動結束了,這在生產環境是不可忍受的。

準備

在github上下載最新的 release原始碼。

找到裡面包含elasticsearch的資料夾zipkin-dependencies-release-1.9.1\elasticsearch\src\main\java\zipkin\dependencies\elasticsearch,從該檔案中複製出

ElasticsearchDependenciesJob.java

TraceIdAndJsonToDependencyLinks.java

找到main資料夾下面的日誌列印類zipkin-dependencies-release-1.9.1\main\src\main\java\zipkin\dependencies

LogInitializer.java

pom修改

<properties>
    <spark.version>2.1.1</spark.version>
    <scala.binary.version>2.11</scala.binary.version>
 </properties>
<dependencyManagement>
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${spark.version}</version> <exclusions
>
<exclusion> <groupId>commons-codec</groupId> <artifactId>commons-codec</artifactId> </exclusion> </exclusions> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.binary.version}</artifactId> </dependency> <!-- avoids compile error: Could not access type DataFrame in package org.apache.spark.sql --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark-20_${scala.binary.version}</artifactId> <version>6.0.0-beta2</version> </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <version>${assertj.version}</version> <scope>test</scope> </dependency> <!-- 當前最新版本1.1.3-SNAPSHOT --> <dependency> <groupId>com.enmonster.platform</groupId> <artifactId>monster-rocketmq-spring-boot-starter</artifactId> <version>1.1.3-SNAPSHOT</version> </dependency> </dependencies>

建立config檔案

@ConfigurationProperties("zipkin.storage.elasticsearch")
@Data
@Component
public class ElasticSearchConfig {
	// es的叢集地址
    private String hosts;
   // 叢集名稱
    private String cluster;
   // 索引名稱
    private String index;

}

建立定時任務


public class TimedJobListener implements Listener<String> {

    @Resource
    private ElasticSearchConfig elasticSearchConfig;
   
    @Override
    public void onMessage(Message<String> message) throws Exception {
        Runnable logInitializer = LogInitializer.create("info"); // 日誌級別
        logInitializer.run(); // 啟動日誌列印執行緒
        ElasticsearchDependenciesJob.builder()
                .logInitializer(logInitializer) // 設定日誌列印 執行緒
                .day(LocalDateTime.now().plusDays(-1L).toInstant(ZoneOffset.of("+8")).toEpochMilli()) // 生成呼叫鏈的日誌,這裡是生成昨天的
                .hosts(elasticSearchConfig.getHosts())  //  es的叢集地址
                .index(elasticSearchConfig.getIndex())  // es的索引名稱
                .build()
                .run();
    }
}

我這邊使用的是xxl-job + mq 的分散式定時任務平臺, 僅需要注意onMessage裡面的的程式碼即可 , 根據自身的專案技術棧,然後去實現就好了。 在這裡插入圖片描述