
@[TOC] flink 1.14 异步 join 思考
背景flink 1.14 还没提供异步sql 版本jdbc join ,同时也没提供自定义传入SQL 查询结果集,然后再join的功能,但是他们提供了相关接口,恰好这两个功能对提升join 性能,以及SQL的灵活性上有需求,实现了一个版本。其中遇到一个问题,“异步join 的时候,如何保证顺序”?
你需要知道CustomSqlRowDataAsyncLookupFunction extends AsyncTableFunction{ public void eval(CompletableFuture > future, Object... keys) { // 异步实现逻辑 // 通过继承 AsyncTableFunction 实现自己的异步函数 } }
其次写过原生datastream 进行异步join的得知道有两个类:
// 这是异步以前用这个实现异步join 的方法,以及参数:AsyncDataStream public static其次SingleOutputStreamOperator unorderedWait( DataStream in, AsyncFunction func, long timeout, TimeUnit timeUnit) { return addOperator(, OutputMode.UNORDERED); } public static SingleOutputStreamOperator orderedWait( DataStream in, AsyncFunction func, long timeout, TimeUnit timeUnit, int capacity) { return addOperator(, OutputMode.ORDERED); } // 核心参数,也就是Operator 通过这个区分了是否有序 OutputMode.UNORDERED OutputMode.ORDERED
flink 不至于用多套逻辑实现异步,flink-sql 是翻译成算子,也是调用了datastream api 实现这个逻辑,因此我们来看看。
对于async 的 Operator 会有自己的工厂方法:
public AsyncWaitOperatorFactory(
AsyncFunction asyncFunction,
long timeout,
int capacity,
AsyncDataStream.OutputMode outputMode) {
// 定义的function,包括自定义的function。
// 本质上我们的 *** 作都是一个function进行处理的
this.asyncFunction = asyncFunction;
// 异步的情况一般是需要队列+异步完成时间实现的
// 默认timeout:180000 capacity:100
this.timeout = timeout;
this.capacity = capacity;
// 这就是我们的是否顺序参数
this.outputMode = outputMode;
// 默认算子会合并在一起执行
this.chainingStrategy = ChainingStrategy.ALWAYS;
}
@Override
public > T createStreamOperator(
StreamOperatorParameters parameters) {
// outputMode 是 Orderded
AsyncWaitOperator asyncWaitOperator =
new AsyncWaitOperator(
asyncFunction,
timeout,
capacity,
outputMode,
processingTimeService,
getMailboxExecutor());
// 对asyncWaitOperator 内部进行初始化
// 看看内部做了啥
asyncWaitOperator.setup(
parameters.getContainingTask(),
parameters.getStreamConfig(),
parameters.getOutput());
return (T) asyncWaitOperator;
}
在这里,如果你用自定义的asyncLookupFunction 启动sql任务,本地进行debug到这里就会发现:
outputMode 参数值是:“ordered” 也就是保证顺序的。也就是说即使我们用asyncFunction,那么
内部还是给我们做了保证顺序。
思考一下:
让你们实现一个异步处理数据的程序,但是要保证顺序输出,也就是进来的顺序和输出的顺序相同。
相信大家很容易想到用队列实现就行,为了简单,我简单画图说明一下:
1.数据流 1,2,3,4顺序到达
2.优先进入顺序队列queue ,同时异步线程(比如:CompletableFuture) 进行处理,返回future
3.程序优先从队列 queue 取出来,循环判断 future.isDone
这样必须等队列数据1 处理完成,才会去获取1的结果。但是 2 3 会异步进行处理,相当于提高了并发。当然需要控制队列长度,以及获取的超时时间。
看看flink怎么做继续 进入AsyncWaitOperator#setup 看实现
// Queue, into which to store the currently in-flight stream elements // 开始就定义了具体的队列存储 private transient StreamElementQueuequeue; public void setup() { switch (outputMode) { case ORDERED: // 初始化元素队列(有序) queue = new OrderedStreamElementQueue<>(capacity); break; case UNORDERED: queue = new UnorderedStreamElementQueue<>(capacity); break; default: throw new IllegalStateException("Unknown async mode: " + outputMode + '.'); } this.timestampedCollector = new TimestampedCollector<>(super.output); }
在operator 内部有个核心处理数据的方法:
// 当数据真正进入算子内部
public void processElement(StreamRecord record) throws Exception {
StreamRecord element;
// 先加入队列
final ResultFuture entry = addToWorkQueue(element);
// ResultHandler 实现了 ResultFuture,内部有回调结果的 *** 作
final ResultHandler resultHandler = new ResultHandler(element, entry);
if (timeout > 0L) {
// 注册一个超时的东西
resultHandler.registerTimeout(getProcessingTimeService(), timeout);
}
// 调用我们的函数进行处理
userFunction.asyncInvoke(element.getValue(), resultHandler);
}
而在addToWorkQueue 内部,简单看看如何 *** 作
private ResultFutureaddToWorkQueue(StreamElement streamElement) throws InterruptedException { Optional > queueEntry; // 调用tryPut进行存放元素 while (!(queueEntry = queue.tryPut(streamElement)).isPresent()) { mailboxExecutor.yield(); } return queueEntry.get(); }
然后看看 OrderedStreamElementQueue 的结构和 tryPut 方法
// 内部就是一个ArrayDeque,元素是StreamElementQueueEntry private final Queue> queue; public OrderedStreamElementQueue(int capacity) { this.capacity = capacity; this.queue = new ArrayDeque<>(capacity); } // 创建元素 private StreamElementQueueEntry createEntry(StreamElement streamElement) {} // 存储元素 public Optional > tryPut(StreamElement streamElement) { //略 StreamElementQueueEntry queueEntry = createEntry(streamElement); // Queue > queue 存放值得地方 queue.add(queueEntry); }
这里看到,创建元素之后会返回一个ResultFuture 的异步对象,因为 StreamElementQueueEntry 是继承了 ResultFuture 的接口
interface StreamElementQueueEntry数据进入队列后,我们是join完成?extends ResultFuture { boolean isDone(); void emitResult(TimestampedCollector output); StreamElement getInputElement(); default void completeExceptionally(Throwable error) {} }
当我们异步处理完成数据之后,肯定会调用:
public void eval(CompletableFuture> future, Object... keys) throws InterruptedException { // 返回数据 future.complete(rowData); }
这里实际上是会回调的 ResultFuture 的 实现下到 AsyncWaitOperator#complete
public void complete(Collection results) {
if (!completed.compareAndSet(false, true)) {
return;
}
processInMailbox(results);
}
// processInMailbox -> 直到这里
private void processResults(Collection results) {
// 取消定时器超时控制
if (timeoutTimer != null) {
// canceling in mailbox thread avoids
// https://issues.apache.org/jira/browse/Flink-13635
timeoutTimer.cancel(true);
}
// update the queue entry with the result
resultFuture.complete(results);
// 输出已经异步函数里面返回的元素
outputCompletedElement();
}
然后进入刚才的队列 OrderedStreamElementQueue
private void outputCompletedElement() {
if (queue.hasCompletedElements()) {
// emit only one element to not block the mailbox thread unnecessarily
queue.emitCompletedElement(timestampedCollector);
}
}
public void emitCompletedElement(TimestampedCollector output) {
if (hasCompletedElements()) {
// 是先取头节点
final StreamElementQueueEntry head = queue.poll();
head.emitResult(output);
}
}
// 这里控制顺序,每次判断是否完成,都是取头部元素进行处理
public boolean hasCompletedElements() {
return !queue.isEmpty() && queue.peek().isDone();
}
public void emitResult(TimestampedCollector output) {
output.setTimestamp(inputRecord);
for (OUT r : completedElements) {
// 发送数据
output.collect(r);
}
}
这里就数据发送完成了。
小结- 我们用tableApi 做 async sql 函数的时候,实际内部用了异步,保顺。注意:这里是partition 保证顺序。
- 基本原理是把元素加入有序队列,每次complate取判断头部元素是否完成,再往下游发送做到保持顺序
- 内部有对接受数据是watermark 以及超时等处理,没仔细分析。有兴趣可以再看看
4.有问题可以留言沟通、指正
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)