前言

当 sql 经过 planning 阶段后, 已经完成逻辑执行计划的生成和优化, 切分以及分散式计划的生成, 所有与逻辑执行计划相关的上下文已经基本完备, 后面就到了如何调度 sub plan 到各个 worker 上以及分发 split 到 task 的流程, 本文主要介绍 presto 是如何 schedule task 的?

上下文构建

开始调度前, 会先初始化各个相关对象, 生成整个 scheduling 的上下文环境. presto 调度的主类是 SqlQueryScheduler, 其内部会构建 SqlStageExecution, StageScheduler, StageLinkage 等对象辅助完成整个 task 和 split 的下发.初始化 SqlQueryScheduler 对象时, 会在遍历 stage plan 树的过程中, 递归调用 createStages 来创建 SqlStageExecution 对象, 每个 stage plan 都由一个 SqlStageExecution 负责调度其 task, split.

而在调用 createStages 创建 SqlStageExecution 时, 很重要的一步是计算出 NodePartitionMap 对象, 该对象包含了 splitToBucket, bucketToPartition, partitionToNode 三组映射关系, 与 task 和 split 的调度紧密相关.

NodePartitionMap 对象的计算与当前 plan fragment 的分区策略有关, 具体分为两类:
  • 对于非源头 fragment, 其分区策略一般是 SystemPartitioningHandle, 根据其分区策略, 当前集群可用节点以及 hash partition count 配置来共同确定该 fragment 需要的计算节点个数后, 再调用 NodeSelector 选出.

  • 对于源头 fragment, 需要通过 partitioningHandle 找到其对应 Connector 提供的 NodePartitioningProvider 来计算 bucketToNode, 这里以 hive connector 的实现举例.

如果 presto 对接的 connector 是多复本设计的存储系统, 那么在计算 NodePartitionMap 时可以在其 connector 的 NodePartitioningProvider 介面中实现数据分区的复本删选, 分区裁剪, 负载均衡等控制逻辑.

tasks 调度

初始化调度相关的上下文后, 就调用各个 stage 的调度器来开始下发 task 和 split.

调度 stage 是会涉及到相应的调度顺序 ExecutionSchedule, 这个对象提供的介面很简单, 但控制逻辑可能很复杂, 对要调度的 stage 归类, 内部可能有序, 每次给出一批要先调度的 stage 集合.

presto 提供 all-at-once 和 phased 两种策略:
  • all-at-once: 顾名思义, 就是一次调度所有 stage, 虽然是一次性调度, 但各个 stage 之间是有序的, 从整个 sub plan 树的叶子节点开始, 自底向上的去调度其对应 stage, 最先调度源头 stage, 最后调度到 root stage.
  • phased: 遍历整个 sub plan 树, 根据一定规则切分为多个 plan fragment 的集合, 分为多个阶段去调度, 具体切分 phase 规则这里不展开了.

确定先调度哪个 stage 后, 再使用其对应调度器 StageScheduler 下发其 task 和 split, 在构建 SqlStageExecution 时就已经根据当前 stage 的分区策略确定其 stage 调度器了. 对于源头 stage 使用 FixedSourcePartitionedScheduler 或 SourcePartitionedScheduler, 而非源头 stage 使用 FixedCountScheduler.

具体在调度某个 stage 时下发多少 task 到 worker, 其实在计算 NodePartitionMap 对象就基本已经确定了, 各个调度器都是取其 partitionToNode 为依据, 按每个 partition 下发一个 task, 这里可以看出其实一个 stage 是可能分配多个 task 在同个 worker 上的, 只是目前各个 connector 以及 SystemPartitioningHandle 都实现为 partition 与 node 相对应.

splits 下发

调度 split 时, split 调度器提供 split 批量调度的机制, 控制每次只下发一批 splits, 调度器在确定一个 split 应该下发到哪个 worker 上时, 会用到前面提到的 NodePartitionMap 对象, 通过其提供的三组映射关系, 以 split 为 key 获取该 split 需要被下发到哪个 worker 上.调度 split 时还涉及到摆放策略 SplitPlacementPolicy, 但与其命名不那么贴切的是它并不是设计 split 摆放到哪个 worker的, 而是实现了 node 的 split 负载统计以及 split 延迟调度等控制机制.

stage 调度器每完成一次调度, 都会产生对应的调度结果 ScheduleResult, 其中包含哪些新创建的 task 以及哪些 split 被 block 住了, 由于自身 stage 的变化, 导致 presto 需要用这个调度结果对上对下都做一些调整适配, 包括 add exchange location 到 parent stage, 再 add output buffer 到 child stage.

对于源头 stage, 消费的是具体某个 connector 提供的 split, 这些 splits 其实在 planning 阶段就准备好, 并包装在 stage plan 的 splitSource 里了.

但对于非源头 stage, 消费的是上游 stage 提供的 remote split, 而不是具体某个 connector split, 因此需要在上游 stage 调度时获取其 task 摆放信息, 以此创建 remote split 喂给下游 stage, 这样每个 task 就都知道去哪些上游 task pull 数据了.

但是由于前文中提到的每个 ExecutionSchedule 的调度策略不同, 上游 stage 被调度时, 下游 task 可能都没有下发到 worker 上, 这是会暂时把上游 task 的摆放信息先保存在 exchangeLocations 中, 等调度到下游 stage 后, 先检查其 exchangeLocations 成员, 再将其初始化为 remote split 塞给其各个 task, 这样保证了不管调度顺序如何, 都不会发生 remote split 泄露.

结语

本文从 presto scheduling 细节入手, 总结了 presto 调度 stage, 下发 task 和 split 过程中的核心链路, 希望对后续进行 presto 执行层面优化的同学有所帮助.
推荐阅读:
相关文章