sentinel实时数据统计源码分析
前言
上一篇分析了sentinel的使用及slotChain的创建过程,对于每一个Slot都有对应的作用,本篇主要学习关于StatisticSlot如果做实时数据统计,以便后面进行各种场景的流控。
数据统计
图来自sentinel架构图的一部分,sentinel数据统计部分采用滑动时间窗口优化算法
算法分析
1、固定时间窗口限流
问题:无法控制窗口边界突发流量
2、滑动时间窗口限流
问题:两个请求可能需要重复统计相交部分,如下图所示:
3、滑动时间窗口改进
思考:与其说改进不如说是一种取舍,减少重复统计,也可能牺牲掉严格滑动窗口的精准度。当样本大小为1时就是固定时间窗口,样本大小为请求阈值时,就是滑动时间窗口。sentinel默认是2个样本窗口(500ms一个样本窗口长度),为了可以使用更少的内存及计算和更好的性能。
源码分析
统计核心类:
// com.alibaba.csp.sentinel.node.StatisticNode
/**
* 秒级别的统计数据,注意不是每秒的数量,而是单位为秒
*/
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
IntervalProperty.INTERVAL);
/**
* 分钟级别的统计数据
*/
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
private LongAdder curThreadNum = new LongAdder();
通过ArrayMetric进行统计,ArrayMetric 为 Metric 的实现:
// com.alibaba.csp.sentinel.slots.statistic.metric.ArrayMetric
/**
* 包括了一个LeapArray对象,泛型为MetricBucket,用于记录样本窗口数据
*
* 注意类名字是Array结尾,但它不是一个数组,是该类内部有一个数组的成员变量
*/
private final LeapArray<MetricBucket> data;
/**
* 构造函数
* @param sampleCount:样本窗口数量,默认2
* @param intervalInMs:总时间间隔,默认1000ms
*/
public ArrayMetric(int sampleCount, int intervalInMs) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}
// com.alibaba.csp.sentinel.slots.statistic.base.LeapArray
/**
* 样本窗口长度,时间毫秒,默认 1000/2 = 500毫秒
*/
protected int windowLengthInMs;
/**
* 样本窗口数量,默认2,越大精准度越高
*/
protected int sampleCount;
/**
* 毫秒级别,总时间间隔,默认1000ms,越小精准度越高
*/
protected int intervalInMs;
/**
* 秒级别,总时间间隔:intervalInMs / 1000.0,限流时用于计算QPS
*/
private double intervalInSecond;
/**
* 真正的数组,所有的样本窗口组成的
*/
protected final AtomicReferenceArray<WindowWrap<T>> array;
/**
* 构造函数,初始化上面的数据
* @param sampleCount:样本窗口数量,默认2
* @param intervalInMs:毫秒级别时间,默认1000ms
*/
public LeapArray(int sampleCount, int intervalInMs) {
AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");
this.windowLengthInMs = intervalInMs / sampleCount;
this.intervalInMs = intervalInMs;
this.intervalInSecond = intervalInMs / 1000.0;
this.sampleCount = sampleCount;
// 数组长度为样本窗口数量
this.array = new AtomicReferenceArray<>(sampleCount);
}
样本窗口定义:
// com.alibaba.csp.sentinel.slots.statistic.base.WindowWrap
/**
* 样本窗口的长度,毫秒,默认500ms
*/
private final long windowLengthInMs;
/**
* 窗口的开始时间,毫秒,动态的
*/
private long windowStart;
/**
* 数据,范型类:MetricBucket
*/
private T value;
真正存数值的类MetricBucket:
// private final LongAdder[] counters;
/**
* 每个样本窗口里的统计数据,包括MetricEvent枚举中所有的类型,包括PASS、BLOCK、EXCEPTION、SUCCESS、RT、OCCUPIED_PASS
*/
private final LongAdder[] counters;
/**
* 最小响应时间
*/
private volatile long minRt;
/**
* 构造counters数组
*/
public MetricBucket() {
MetricEvent[] events = MetricEvent.values();
this.counters = new LongAdder[events.length];
for (MetricEvent event : events) {
counters[event.ordinal()] = new LongAdder();
}
// 初始化minRt
initMinRt();
}
熟悉了关键类,看下统计的核心流程:
接下里通过源码串一下核心流程,从各插槽代码入口:
// com.alibaba.csp.sentinel.CtSph
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
if (chain == null) {
return new CtEntry(resourceWrapper, null, context);
}
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
// 进入各个插槽slot
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
// This should not happen, unless there are errors existing in Sentinel internal.
RecordLog.info("Sentinel unexpected exception", e1);
}
进入统计插槽入口:
// com.alibaba.csp.sentinel.slots.statistic.StatisticSlot#entry
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// Do some checking.
// 进入后续的插槽 -> ParamFlowSlot -> SystemSlot等,通过则进入下一句,不通过则进入catch部分
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// Request passed, add thread count and pass count.
// 请求通过,进入此处,增加线程数(访问结束调用相关api会decrease)
node.increaseThreadNum();
// 请求通过,进入此处,增加通过请求数
node.addPassRequest(count);
// ...
} catch (PriorityWaitException ex) {
node.increaseThreadNum();
// ...
} catch (BlockException e) {
// 进入此处,增加阻塞Qps数
node.increaseBlockQps(count);
// ...
throw e;
} catch (Throwable e) {
context.getCurEntry().setError(e);
throw e;
}
}
通过node.addPassRequest(count);接着调用DefaultNode
// com.alibaba.csp.sentinel.node.DefaultNode
@Override
public void addPassRequest(int count) {
// 添加当前资源的通过的统计数据
super.addPassRequest(count);
// 添加当前资源关联的clusterNode下的通过的统计数据,流程和上面一样
this.clusterNode.addPassRequest(count);
}
调用super.addPassRequest(count); 进入StatisticNode
this.clusterNode同理
// com.alibaba.csp.sentinel.node.StatisticNode
@Override
public void addPassRequest(int count) {
// 增加pass指标是通过一个叫 Metric 的接口进行操作的,并且是通过 ArrayMetric 实现类实现统计
// 秒级统计
rollingCounterInSecond.addPass(count);
// 分钟级别统计
rollingCounterInMinute.addPass(count);
}
关注rollingCounterInSecond.addPass(count);进入ArrayMetric的addPass(count);
// com.alibaba.csp.sentinel.slots.statistic.metric.ArrayMetric
@Override
public void addPass(int count) {
// 1、获取当前窗口
WindowWrap<MetricBucket> wrap = data.currentWindow();
// 2、添加通过数
wrap.value().addPass(count);
}
因为1为算法核心,先看2,通过第1步获取到当前样本窗口,wrap.value().addPass(count);添加通过数,wrap.value()进入到对应的MetricBucket,从上文分析它是最终记录数值的地方,如下:
// com.alibaba.csp.sentinel.slots.statistic.data.MetricBucket
public void addPass(int n) {
// MetricEvent.PASS类型
add(MetricEvent.PASS, n);
}
/**
* counters成员变量,数组类型,获取对应下表后进行add n
*/
public MetricBucket add(MetricEvent event, long n) {
counters[event.ordinal()].add(n);
return this;
}
最后看下获取当前窗口的代码逻辑:ArrayMetric#data.currentWindow();
// com.alibaba.csp.sentinel.slots.statistic.base.LeapArray
// 调用入口data的定义是ArrayMetric类的成员变量
// private final LeapArray<MetricBucket> data;
/**
* 获取当前窗口
*/
public WindowWrap<T> currentWindow() {
return currentWindow(TimeUtil.currentTimeMillis());
}
// 核心方法,获取当前样本时间窗口!!!
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
// 计算当前时间所在的样本窗口下标,见下面方法解释
int idx = calculateTimeIdx(timeMillis);
// 计算当前样本窗口的开始时间,见下面方法解释
long windowStart = calculateWindowStart(timeMillis);
// 循环处理
while (true) {
// 根据下标获取一个样本窗口
WindowWrap<T> old = array.get(idx);
// 为null,说明当前窗口还没创建
if (old == null) {
/*
* B0 B1 B2 NULL B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* bucket is empty, so create new and update
*/
// 创建新的样本窗口,windowLengthInMs窗口长度;windowStart窗口开始时间;newEmptyBucket空的样本窗口数值对象
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
// cas 将计算出的样本窗口下标,改成新WindowWrap
if (array.compareAndSet(idx, null, window)) {
// 成功的话,返回
return window;
} else {
// 当前线程不成功的话,说明其它线程更新成功了。
// 让当前线程从运行状态 转为 就绪状态,以允许具有相同优先级的其他线程获得运行机会
// 无法保证yield()达到让步目的,因为让步的线程还有可能被线程调度程序再次选中
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
/*
* B0 B1 B2 B3 B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* startTime of Bucket 3: 800, so it's up-to-date
*/
// 如果计算出的样本窗口开始时间timeMillis和通过下标获取的窗口的开始时间相等,说明样本窗口没有发生滑动
return old;
} else if (windowStart > old.windowStart()) {
// 下面这个图,可以看出,old是上一轮的一个窗口,过时了
// 已经是第二轮了,出现NULL,因为上一轮的时间窗口内并无访问
/*
* (old)
* B0 B1 B2 NULL B4
* |_______||_______|_______|_______|_______|_______||___
* ... 1200 1400 1600 1800 2000 2200 timestamp
* ^
* time=1676
* startTime of Bucket 2: 400, deprecated, should be reset
*/
// 说明,窗口向前滑动了,old已经过期失效,而且当前窗口没有被reset过
// (这种情况比较少出现,所以大多数情况下不会影响性能)
// 1、哪些线程会有竞争?共享Node的资源会有竞争
// 2、会什么要锁?因为resetWindowTo不是原子操作
if (updateLock.tryLock()) {
try {
// 更新样本窗口
// 1、将样本窗口开始时间更新为计算得到的windowStart
// 2、重置里面的时间
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// 一般不会出现,当前计算的样本窗口开始时间比下标样本窗口的开始时间小
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
/**
* 根据时间计算样本窗口数组的下标
* @param timeMillis
* @return
*/
private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
// 当前时间戳取整样本窗口时间长度,默认windowLengthInMs为500ms
// (该处也可以看出,如果是第一次创建第一个样本窗口,当前时间不是开始时间,因为可能除不尽)
long timeId = timeMillis / windowLengthInMs;
// 根据timeId获取样本窗口数组下标,每经过500ms,timeId就会加1
// (默认样本时间窗口为2,如果timeId % array.length()原来是0,过500ms后变成1,再过500ms又变成0,依次循环)
return (int)(timeId % array.length());
}
/**
* 计算窗口起始时间
* @param timeMillis 当前时间戳
* @return
*/
protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
// timeMillis % windowLengthInMs 从当前窗口开始点到当前时间的长度
return timeMillis - timeMillis % windowLengthInMs;
}
至此,完成了数据的统计,包括DefaultNode和ClusterNode。
流量控制,便是使用的ClusterNode的PassQps。
总结
1、了解时间窗口算法,同时了解Sentinel对滑动时间窗口的改进思想
2、对Sentinel的实时数据统计原理大致了解,知其然知其所以然,并能以此深入其它模块
看到最后,给大家推荐个小程序吧
(变有钱记账本,让我变有钱)