
在前面的 Flink学习笔记(十一):flink KeyedState运用介绍了如何使用state进行sum *** 作。但是数据流通常是长时间运行,那么存在的状态将越来越多,如何解决这个问题呢?
1、Flink State Time-To-Live (TTL)Flink提供了StateTtlConfig机制进行处理。首先我们看下提供的策略类型:
- TTL 刷新策略(默认OnCreateAndWrite)
- 状态可见性(默认NeverReturnExpired)
具体可以参考flink的官方文档
里面有更具体的介绍,包括state类型,清理策略和相关例子
2、实例还是上面文章中的一个例子
我们可以看到在keybystream中配置了StateTtlConfig,配置方式如下,当一个状态超过两秒后重新计算状态
StateTtlConfig ttlConfig = StateTtlConfig
newBuilder(Time.seconds(2))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor stateDescriptor = new ValueStateDescriptor("key-fruit", Types.TUPLE(Types.STRING, Types.INT));
stateDescriptor.enableTimeToLive(ttlConfig);
当然清除状态可以使用cleanupIncrementally,如
StateTtlConfig ttlConfig = StateTtlConfig
newBuilder(Time.seconds(2))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupIncrementally(10, true)
.build();
我们看下完整代码
public class TestStateTtlConfig {
private static final String[] FRUIT = {"苹果", "梨", "西瓜", "葡萄", "火龙果", "橘子", "桃子", "香蕉"};
public static void main(String args[]) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream> fruit = env.addSource(new SourceFunction>() {
private volatile boolean isRunning = true;
private final Random random = new Random();
@Override
public void run(SourceContext> ctx) throws Exception {
while (isRunning) {
TimeUnit.SECONDS.sleep(1);
ctx.collect(Tuple2.of(FRUIT[random.nextInt(FRUIT.length)], 1));
}
}
@Override
public void cancel() {
isRunning = false;
}
});
fruit.keyBy(0).map(new RichMapFunction, Tuple2>() {
private ValueState> valueState;
@Override
public void open(Configuration parameters) throws Exception {
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(2))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupIncrementally(10, true)
.build();
ValueStateDescriptor stateDescriptor = new ValueStateDescriptor("key-fruit", Types.TUPLE(Types.STRING, Types.INT));
stateDescriptor.enableTimeToLive(ttlConfig);
valueState = getRuntimeContext().getState(stateDescriptor);
}
@Override
public Tuple2 map(Tuple2 tuple2) throws Exception {
Tuple2 currentState = valueState.value();
// 初始化 ValueState 值
if (null == currentState) {
currentState = new Tuple2<>(tuple2.f0, 0);
}
Tuple2 newState = new Tuple2<>(currentState.f0, currentState.f1 + tuple2.f1);
// 更新 ValueState 值
valueState.update(newState);
return Tuple2.of(newState.f0, newState.f1);
}
}).print();
env.execute("fruit");
}
执行结果
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)