實現.Net程式中OpenTracing取樣和上報配置的自動更新
前言
OpenTracing是一個鏈路跟蹤的開放協議,已經有開源的.net實現:opentracing-csharp,同時支援.net framework和.net core,Github地址:https://github.com/opentracing/opentracing-csharp。
這個庫支援多種鏈路跟蹤模式,不過僅提供了最基礎的功能,想用在實際專案中還需要做很多增強,還好也有人做了開源專案:opentracing-contrib,Github地址:https://github.com/opentracing-contrib/csharp-netcore。
opentracing-contrib中集成了一個名為Jaeger的類庫,這個庫實現了鏈路跟蹤資料的取樣和上報,支援將資料上傳到Jaeger進行分析統計。
為了同時保障效能和跟蹤關鍵資料,能夠遠端調整取樣率是很重要的,Jaeger本身也提供了遠端配置取樣率的支援。
不過我這裡用的阿里雲鏈路跟蹤不支援,配置的設計也和想要的不同,所以自己做了一個取樣和上報配置的動態更新,也才有了這篇文章。
思路
使用Jaeger初始化Tracer大概是這樣的:
var tracer = new Tracer.Builder(serviceName) .WithSampler(sampler) .WithReporter(reporter) .Build(); GlobalTracer.Register(tracer);
首先是提供當前服務的名字,然後需要提供一個取樣器,再提供一個上報器,Build下生成ITracer的一個例項,最後註冊到全域性。
可以分析得出,取樣和上報配置的更新就是更新取樣器和上報器。
不過Tracer並沒有提供UpdateSampler和UdapteReporter的方法,被卡住了,怎麼辦呢?
前文提到Jaeger是支援取樣率的動態調整的,看看它怎麼做的:
private RemoteControlledSampler(Builder builder) { ... _pollTimer = new Timer(_ => UpdateSampler(), null, TimeSpan.Zero, builder.PollingInterval); } /// <summary> /// Updates <see cref="Sampler"/> to a new sampler when it is different. /// </summary> internal void UpdateSampler() { try { SamplingStrategyResponse response = _samplingManager.GetSamplingStrategyAsync(_serviceName) .ConfigureAwait(false).GetAwaiter().GetResult(); ... UpdateRateLimitingOrProbabilisticSampler(response); } catch (Exception ex) { ... } } private void UpdateRateLimitingOrProbabilisticSampler(SamplingStrategyResponse response) { ... lock (_lock) { if (!Sampler.Equals(sampler)) { Sampler.Close(); Sampler = sampler; ... } } }
這裡只留下關鍵程式碼,可以看到核心就是:通過一個Timer定時獲取取樣策略,然後替換原來的Sampler。
這是一個很好理解的辦法,下邊就按照這個思路來搞。
方案
分別提供一個可更新的Sampler和可更新的Reporter,Build Tracer時使用這兩個可更新的類。這裡延續開源專案中Samper和Reporter的建立方式,給出這兩個類。
可更新的Sampler:
internal class UpdatableSampler : ValueObject, ISampler { public const string Type = "updatable"; private readonly ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(); private readonly string _serviceName; private readonly ILoggerFactory _loggerFactory; private readonly ILogger _logger; private readonly IMetrics _metrics; internal ISampler Sampler { get; private set; } private UpdatableSampler(Builder builder) { _serviceName = builder.ServiceName; _loggerFactory = builder.LoggerFactory; _logger = _loggerFactory.CreateLogger<UpdatableSampler>(); _metrics = builder.Metrics; Sampler = builder.InitialSampler; } /// <summary> /// Updates <see cref="Sampler"/> to a new sampler when it is different. /// </summary> public void UpdateSampler(ISampler sampler) { try { _lock.EnterWriteLock(); if (!Sampler.Equals(sampler)) { Sampler.Close(); Sampler = sampler; _metrics.SamplerUpdated.Inc(1); } } catch (System.Exception ex) { _logger.LogWarning(ex, "Updating sampler failed"); _metrics.SamplerQueryFailure.Inc(1); } finally { _lock.ExitWriteLock(); } } public SamplingStatus Sample(string operation, TraceId id) { try { _lock.EnterReadLock(); var status= Sampler.Sample(operation, id); return status; } finally { _lock.ExitReadLock(); } } public override string ToString() { try { _lock.EnterReadLock(); return $"{nameof(UpdatableSampler)}(Sampler={Sampler})"; } finally { _lock.ExitReadLock(); } } public void Close() { try { _lock.EnterWriteLock(); Sampler.Close(); } finally { _lock.ExitWriteLock(); } } protected override IEnumerable<object> GetAtomicValues() { yield return Sampler; } public sealed class Builder { internal string ServiceName { get; } internal ILoggerFactory LoggerFactory { get; private set; } internal ISampler InitialSampler { get; private set; } internal IMetrics Metrics { get; private set; } public Builder(string serviceName) { ServiceName = serviceName ?? throw new ArgumentNullException(nameof(serviceName)); } public Builder WithLoggerFactory(ILoggerFactory loggerFactory) { LoggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory)); return this; } public Builder WithInitialSampler(ISampler initialSampler) { InitialSampler = initialSampler ?? throw new ArgumentNullException(nameof(initialSampler)); return this; } public Builder WithMetrics(IMetrics metrics) { Metrics = metrics ?? throw new ArgumentNullException(nameof(metrics)); return this; } public UpdatableSampler Build() { if (LoggerFactory == null) { LoggerFactory = NullLoggerFactory.Instance; } if (InitialSampler == null) { InitialSampler = new ProbabilisticSampler(); } if (Metrics == null) { Metrics = new MetricsImpl(NoopMetricsFactory.Instance); } return new UpdatableSampler(this); } } }
可更新的Reporter:
internal class UpdatableReporter : IReporter { public const string Type = "updatable"; private readonly string _serviceName; private readonly ILoggerFactory _loggerFactory; private readonly ILogger _logger; private readonly IMetrics _metrics; private readonly ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(); internal IReporter Reporter { get; private set; } private UpdatableReporter(Builder builder) { _serviceName = builder.ServiceName; _loggerFactory = builder.LoggerFactory; _logger = _loggerFactory.CreateLogger<UpdatableReporter>(); _metrics = builder.Metrics; Reporter = builder.InitialReporter; } /// <summary> /// Updates <see cref="Reporter"/> to a new reporter when it is different. /// </summary> public void UpdateReporter(IReporter reporter) { try { _lock.EnterWriteLock(); if (!Reporter.Equals(reporter)) { Reporter.CloseAsync(CancellationToken.None).ConfigureAwait(false).GetAwaiter().GetResult(); Reporter = reporter; _metrics.SamplerUpdated.Inc(1); } } catch (System.Exception ex) { _logger.LogWarning(ex, "Updating reporter failed"); _metrics.ReporterFailure.Inc(1); } finally { _lock.ExitWriteLock(); } } public void Report(Span span) { try { _lock.EnterReadLock(); Reporter.Report(span); } finally { _lock.ExitReadLock(); } } public override string ToString() { try { _lock.EnterReadLock(); return $"{nameof(UpdatableReporter)}(Reporter={Reporter})"; } finally { _lock.ExitReadLock(); } } public async Task CloseAsync(CancellationToken cancellationToken) { try { _lock.EnterWriteLock(); await Reporter.CloseAsync(cancellationToken); } finally { _lock.ExitWriteLock(); } } public sealed class Builder { internal string ServiceName { get; } internal ILoggerFactory LoggerFactory { get; private set; } internal IReporter InitialReporter { get; private set; } internal IMetrics Metrics { get; private set; } public Builder(string serviceName) { ServiceName = serviceName ?? throw new ArgumentNullException(nameof(serviceName)); } public Builder WithLoggerFactory(ILoggerFactory loggerFactory) { LoggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory)); return this; } public Builder WithInitialReporter(IReporter initialReporter) { InitialReporter = initialReporter ?? throw new ArgumentNullException(nameof(initialReporter)); return this; } public Builder WithMetrics(IMetrics metrics) { Metrics = metrics ?? throw new ArgumentNullException(nameof(metrics)); return this; } public UpdatableReporter Build() { if (LoggerFactory == null) { LoggerFactory = NullLoggerFactory.Instance; } if (InitialReporter == null) { InitialReporter = new NoopReporter(); } if (Metrics == null) { Metrics = new MetricsImpl(NoopMetricsFactory.Instance); } return new UpdatableReporter(this); } } }
注意這裡邊用到了讀寫鎖,因為要做到不停止服務的更新,而且大部分情況下都是讀,使用lock就有點大柴小用了。
現在初始化Tracer大概是這樣的:
sampler = new UpdatableSampler.Builder(serviceName) .WithInitialSampler(BuildSampler(configuration)) .Build(); reporter = new UpdatableReporter.Builder(serviceName) .WithInitialReporter(BuildReporter(configuration)) .Build(); var tracer = new Tracer.Builder(serviceName) .WithSampler(sampler) .WithReporter(reporter) .Build();
當配置發生改變時,呼叫sampler和reporter的更新方法:
private void OnTracingConfigurationChanged(TracingConfiguration newConfiguration, TracingConfigurationChangedInfo changedInfo) { ... ((UpdatableReporter)_reporter).UpdateReporter(BuildReporter(newConfiguration)); ((UpdatableSampler)_sampler).UpdateSampler(BuildSampler(newConfiguration)); ... }
這裡就不寫如何監聽配置的改變了,使用Timer或者阻塞查詢等等都可以。
後記
opentracing-contrib這個專案只支援.net core,如果想用在.net framwork中還需要自己搞,這個方法會單獨寫一篇文章,這裡就不做介紹了。
&n