
flink版本:flink1.13.1
codeimport org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.concurrent.TimeUnit;
public class StateBackendTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 1.设置状态后台
env.setStateBackend(new MemoryStateBackend()); // 本地内存状态后台
//env.setStateBackend(new FsStateBackend("hdfs://hadoop100:9000/flink/checkpoints"));
//env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true));
// 2. 设置检查点 每秒执行1次checkPoint
env.enableCheckpointing(1000);
// checkPoint 参数优化
// checkPoint 模式,精确一次
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// checkPoint超时时间,60s
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 同一时间只允许进行一个检查点
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 设置两次检查点的最小的时间间隔
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
//设置是否优先使用检查点恢复机制。默认为false,即checkpoint和savepoint之间采用就近原则,设为true,则优先使用checkpoint
env.getCheckpointConfig().setPreferCheckpointForRecovery(false);
// 设置可容忍的检查点失败数量,默认是0,即不允许任何checkpoint检查点失败,如果checkpoint失败则任务失败,直接重启
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0);
// 3. 设置重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,60000L));
env.setRestartStrategy(RestartStrategies.failureRateRestart(5, Time.of(5, TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)));
env.execute("StateBackend");
}
}
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)