申请试用
HOT
登录
注册
 

技术内幕 | StarRocks Pipeline 执行框架(下)

StarRocks
/
发布于
/
977
人浏览
欢迎来到 StarRocks 技术内幕系列文章,我们将为你全方位揭晓 StarRocks 背后的技术原理和实践细节,助你快速上手这款明星开源数据库产品。本期 StarRocks 技术内幕将主要介绍 StarRocks Pipeline 执行框架的基本概念、原理及代码逻辑。

作者:冉攀峰,StarRocks 核心研发,知乎账号 satanson

导读:欢迎来到 StarRocks 技术内幕系列文章,我们将为你全方位揭晓 StarRocks 背后的技术原理和实践细节,助你快速上手这款明星开源数据库产品。本期 StarRocks 技术内幕将主要介绍 StarRocks Pipeline 执行框架的基本概念、原理及代码逻辑。

StarRocks Pipeline 执行框架(上)篇中,主要为大家讲解了 Pipeline 执行引擎想解决的问题及一般性原理。关于 Pipeline 执行引擎的实现, BE 端拆分 Pipeline 的逻辑,以及 Pipeline 实例 PipelineDriver 的调度执行逻辑,将在本篇中继续与大家分享。


#01

背景介绍

详见 StarRocks Pipeline 执行框架(上)篇


#02

基本概念

详见 StarRocks Pipeline 执行框架(上)篇


#03

源码解析

章节二的基本概念输入完以后,我们开始从以下几个方面解析 StarRocks 的源码:

  1. BE 初始化 Pipeline 执行引擎:主要介绍 BE 启动后,如何初始化 Pipeline 执行引擎的全局资源。
  2. BE 端 Query 生命周期管理:主要介绍在 BE 上,如何用 QueryContext 管理所属的全体 Fragment Instance,以及 Fragment Instance 的准备、执行和销毁逻辑。
  3. 物理算子拆分 Pipeline 算子:算子拆分逻辑在 Pipeline 执行引擎中占比很重,涉及到每个算子的重构,后续添加新算子,需要遵从一定的原则拆分算子,为 Pipeline 算子的接口定义正确的语义。
  4. PipelineDriver 的调度逻辑:主要涉及到 PipelineDriver 的 Ready、Blocked 和 Running 三种状态的转换。

1、BE 初始化 Pipeline 执行引擎

BE 初始化全局对象的方法主要有两种:

  • 将全局对象定义在 ExecEnv 对象中,参考 be/src/runtime/exec_env.h,be/src/runtime/exec_env.cpp文件。
  • 定义全局性的单例(Singleton)对象,例如 be/src/exec/pipeline/query_context.cpp,如果对象本身可以独立完成初始化、不依赖参数设置、不依赖于其他对象的初始化顺序,则可以定义为单例。

Pipeline 执行引擎的全局性对象

PipelineDriver 执行器

定义为 ExecEnv::_driver_executor,类型为 pipeline::GlobalDriverExecutor,主要由执行线程池和轮询线程构成。其中执行线程的数量默认为机器的硬件核数,轮询线程数量为 1。

// 源码文件: be/src/runtime/exec_env.cpp
// 函数: Status ExecEnv::_init(const std::vector<StorePath>& store_paths) 

std::unique_ptr<ThreadPool> driver_executor_thread_pool;
auto max_thread_num = std::thread::hardware_concurrency();
if (config::pipeline_exec_thread_pool_thread_num > 0) {
    max_thread_num = config::pipeline_exec_thread_pool_thread_num;
}
LOG(INFO) << strings::Substitute("[PIPELINE] Exec thread pool: thread_num=$0", max_thread_num);
RETURN_IF_ERROR(ThreadPoolBuilder("pip_executor") // pipeline executor
                        .set_min_threads(0)
                        .set_max_threads(max_thread_num)
                        .set_max_queue_size(1000)
                        .set_idle_timeout(MonoDelta::FromMilliseconds(2000))
                        .build(&driver_executor_thread_pool));
_driver_executor = new pipeline::GlobalDriverExecutor(std::move(driver_executor_thread_pool), false);
_driver_executor->initialize(max_thread_num)


pipeline::GlobalDriverExecutor 的结构如下:

Pipeline IO 线程池

ExecEnv._pipeline_scan_io_thread_pool,主要用于执行 ScanOperator 读数据操作的异步化 IO 任务。IO 线程池队列大小和线程数目,取决于参数:

  • config::pipeline_scan_thread_pool_queue_size
  • config::pipeline_scan_thread_pool_thread_num
// 源码文件: be/src/runtime/exec_env.cpp
// 函数: Status ExecEnv::_init(const std::vector<StorePath>& store_paths) 
int num_io_threads = config::pipeline_scan_thread_pool_thread_num <= 0
                             ? std::thread::hardware_concurrency()
                             : config::pipeline_scan_thread_pool_thread_num;

_pipeline_scan_io_thread_pool =
        new PriorityThreadPool("pip_scan_io", // pipeline scan io
                               num_io_threads, config::pipeline_scan_thread_pool_queue_size);

WorkGroup(即资源组 ResourceGroup)执行器

定义为 ExecEnv._wg_driver_executor,WorkGroup 用于 Pipeline 执行引擎的资源隔离,主要的设计动机是为了把不同业务场景的 Workload 划分到相应的 WorkGroup中,每个 WorkGroup 有自己的 CPU、Memory 和并行数量资源 Quota。 每个 WorkGroup 按照资源 Quota 的限制复用计算资源,从而实现隔离性。(WorkGroup 的详细源码分析请参考专门解析文档,此处提及,是为了保证本文的完整性。) WorkGroup 执行器和 PipelineDriver 执行器功能类似,实现了基于 WorkGroup 的调度逻辑。

WorkGroup Scan 执行器

定义为ExecEnv._scan_executor,也用于 WorkGroup 功能,类似 Pipeline IO 线程池,该执行器可以根据 WorkGroup 的资源 Quota 限制,执行 ScanOperator 提交的异步化 IO 任务。

QueryContextManager

QueryContext 管理一个查询在某台执行节点上的全体 Fragment Instance,QueryContextManager 顾名思义就是对 QueryContext 进行操作,主要用于其生命周期的管理。参考源码文件:be/src/exec/pipeline/query_context.cpp。

WorkGroupManager

WorkGroupManager 用于管理 WorkGroup,详见 WorkGroup 相关文档。

2、BE 端 Query 生命周期管理

QueryContext 和 FragmentContext

计算节点 BE 为查询维护下列对象:

  • QueryContext:在 QueryContextManager 中注册,拥有 FragmentContextManager 对象管理 Fragment Instance。
  • FragmentContext:在QueryContext.fragment_mgr 中注册,每个 Fragment Instance 对应一个 FragmentContext。
  • Pipelines:FragmentContext 包含一组 Pipeline,来源于 Fragment Instance 的执行子树的拆解。
  • Drivers:FragmentContext 包含一组 PipelineDriver,PipelineDriver 通过 Pipeline 创建,来自同一个 Pipeline 的 PipelineDriver 的数量,取决于 Pipeline 并行度。
  • MorselQueues:ScanOperator 和 MorselQueue 的映射表,MorselQueue 包含一组 Morsel,Morsel 是 ScanOperator 读取数据的分片。

QueryContext 的生命周期比 FragmentContext 生命周期久,跨所有 Fragment Instance,属于 Query 层面的对象,可以由 QueryContext 管理。比如控制 Query 内存使用 MemTracker,所有 Fragment Instance 共享的 DescriptorTable。FragmentContext 只管理 Fragment Instance 范围的资源,主要包括 Pipelines、PipelineDrivers 和 MorselQueues。

只有当 FragmentContext 中的所有 PipelineDriver 都完成计算,FragmentContext 的生命周期才结束;只有当所有 FragmentContext 的生命周期结束,QueryContext 的生命周期才结束。QueryContext 生命周期结束后,就可以析构并且释放 QueryContext 占用的资源。

Fragment Instance 执行逻辑的入口

BE 收到来自 FE 的 exec_plan_fragment 后,创建 FragmentExecutor 执行该 Fragment Instance,代码如下:

// 文件:be/src/service/internal_service.cpp
template <typename T>
void PInternalServiceImpl<T>::exec_plan_fragment(google::protobuf::RpcController* cntl_base,
                                                 const PExecPlanFragmentRequest* request,
                                                 PExecPlanFragmentResult* response, google::protobuf::Closure* done) {
    ClosureGuard closure_guard(done);
    brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
    auto st = _exec_plan_fragment(cntl);
    if (!st.ok()) {
        LOG(WARNING) << "exec plan fragment failed, errmsg=" << st.get_error_msg();
    }
    st.to_protobuf(response->mutable_status());
}

PInternalServiceImpl::exec_plan_fragment 调用 _exec_plan_fragment:

// 文件:be/src/service/internal_service.cpp
template <typename T>
Status PInternalServiceImpl<T>::_exec_plan_fragment(brpc::Controller* cntl) {
    auto ser_request = cntl->request_attachment().to_string();
    TExecPlanFragmentParams t_request;
    {
        const uint8_t* buf = (const uint8_t*)ser_request.data();
        uint32_t len = ser_request.size();
        RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, TProtocolType::BINARY, &t_request));
    }
    bool is_pipeline = t_request.__isset.is_pipeline && t_request.is_pipeline;
    LOG(INFO) << "exec plan fragment, fragment_instance_id=" << print_id(t_request.params.fragment_instance_id)
              << ", coord=" << t_request.coord << ", backend=" << t_request.backend_num
              << ", is_pipeline=" << is_pipeline << ", chunk_size=" << t_request.query_options.batch_size;
    if (is_pipeline) {
        auto fragment_executor = std::make_unique<starrocks::pipeline::FragmentExecutor>();
        auto status = fragment_executor->prepare(_exec_env, t_request);
        if (status.ok()) {
            return fragment_executor->execute(_exec_env);
        } else {
            return status.is_duplicate_rpc_invocation() ? Status::OK() : status;
        }
    } else {
        return _exec_env->fragment_mgr()->exec_plan_fragment(t_request);
    }
}

当采用 Pipeline 执行引擎时,创建 FragmentExecutor,完成下列操作:

  • 调用 FragmentExecutor::prepare 函数, 初始化 Fragment Instance 的执行环境,创建和注册 QueryContex、FragmentContext,将Fragment Instance 拆分成 Pipelines,创建 PipelineDrivers。
  • 调用 FragmentExecutor::execute 函数,向 Pipeline 执行线程提交 PipelineDrivers 运行。

FragmentExecutor::prepare 函数

参考 be/src/exec/pipeline/fragment_executor.cpp,主要的逻辑如下:

  1. 判断 Fragment Instance 是否为重复投递,如果是,直接返回错误状态 Status::DuplicateRpcInvocation。
  2. 注册或获得已有的 QueryContext,处理 Query 的第一个 Fragment Instance 时,注册 QueryContext,后续到达的 Fragment Instance 复用已注册的 QueryContext。设置 QueryContext 需要处理的 Fragment Instance 的数量和 Query 过期时间等参数,Query 过期时间用于自动取消长期得不到执行的 Query;如果 Query 有大量的 Fragment Instance,先到达的部分 Fragment Instance 完成执行而退出,在没有活跃的 Fragment Instance 的情况下,QueryContext 依然需要保留一段时间,等到后续 Fragment Instance 全部达到或者自动过期而取消执行。
  3. 创建和初始化 FragmentContext 对象,FragmentContext 需要注册到 QueryContext.fragment_mgr 中,注册的时机为 FragmentContext::prepare 函数的末尾。因为有的异步逻辑(比如 Global Runtime Filter 的投递),需要访问 FragmentContext 的成员变量,在 FragmentContext 未完成所有的初始化之前注册,会对异步逻辑暴露 FragmentContext,导致访问未初始化的成员变量而出错。
  4. 调用函数 Exec::create_tree 生成 Non-pipeline 执行树,使用 PipelineBuilder 将执行树拆解成为 Pipeline。
  5. 调用 convert_scan_range_to_morsel 函数将 ScanNode 需要访问的 TScanRangeParams 转换为 ScanOperator 可访问的 Morsel。
  6. 将 Non-pipeline 执行引擎的 DataSink 转换为 Pipeline 引擎的 SinkOperator,调用 _decompose_data_sink_to_operator
  7. 根据 DOP(degree-of-parallelism),为 Pipeline 创建 PipelineDriver,并且将 MorselQueue 和相应的 ScanOperator 关联。
  8. 完成其他的必要的初始化,并且注册 FragmentContext。

FragmentExecutor::execute函数

//文件: be/src/exec/pipeline/fragment_executor.cpp
Status FragmentExecutor::execute(ExecEnv* exec_env) {
    for (const auto& driver : _fragment_ctx->drivers()) {
        RETURN_IF_ERROR(driver->prepare(_fragment_ctx->runtime_state()));
    }

    if (_fragment_ctx->enable_resource_group()) {
        for (const auto& driver : _fragment_ctx->drivers()) {
            exec_env->wg_driver_executor()->submit(driver.get());
        }
    } else {
        for (const auto& driver : _fragment_ctx->drivers()) {
            exec_env->driver_executor()->submit(driver.get());
        }
    }

    return Status::OK();
}

FragmentExecutor::execute 函数的主要操作如下:

1. 变量 FragmentContext 中的所有 PipelineDriver,执行 PipelineDriver::prepare 函数。 该函数主要完成 PipelineDriver 范围的 profile 注册、调用每个算子的 prepare 函数、设置 Driver 之间前置等待条件,比如 HashJoin 左侧的 PipelineDriver 需要等待右侧 PipelineDriver 完成,消费 RuntimeFilter 的 PipelineDriver 需要等待生产 RuntimeFilter 的 PipelineDriver 完成。

2. 把 PipelineDriver 提交给 Pipeline 执行线程。 PipelineDriver 提交后,FragmentExecutor 的生命周期结束,FragmentExecutor 是临时性的,禁止在 FragmentExecutor 中定义 PipelineDriver 可引用的对象。

3、PipelineBuilder 拆分 pipeline

BE 上的 PipelineBuilder 会把 PlanFragment 拆分成多个 Pipeline,拆分过程中,PlanFragment 中物理算子会转化为 Pipeline 算子。

物理算子

物理算子是 ExecNode 的子类,FE 投递给 BE 的 Fragment Instance 中,包含构成所属 PlanFragment 的物理算子, 物理算子如下图所示:

另外,在 Fragment Instance 中,一般用 DataSink 的子类描述该 Fragment Instance 计算结果的去向,比如 DataStreamSink 会把计算结果发给下游 Fragment Instance的ExchangeNode。在 Pipeline 执行引擎中,DataStreamSink 和 ExchangeNode 会分别转化为 ExchangeSinkOperator 和 ExchangeSourceOperator。

Pipeline 算子

​Pipeline 算子的数量比物理算子的数量多,这是因为,Pipeline 算子最多只有一路输入和一路输出,多路输入的物理算子和全量物化的物理算子,会拆解成多个 Pipeline 算子。Pipeline 算子接口定义,请参考 be/src/exec/pipeline/operator.h,部分接口定义如下:

  • pull_chunk:从算子中拉取 chunk,一般计算时,从一对算子的前置算子拉取 chunk,然后推给后继算子。
  • push_chunk:向算子推 chunk。
  • has_output:表示状态,当前算子可输出,可以执行 pull_chunk。
  • need_input:表示状态,当前算子可输入,可以执行 push_chunk。
  • is_finished:当前算子已经结束,不能执行 pull_chunk/push_chunk。
  • prepare:prepare 和 open 表达式和调用其他内部数据结构的 prepare 函数。
  • close:close 表达式和调用其他内部数据结构的 close 函数。
  • set_finishing:关闭输入,执行 set_finishing 之后,算子的 need_input 始终返回 false,不可调用 push_chunk,但算子内部可能有缓存的计算结果,has_output 可能返回 true,可以调用 pull_chunk。
  • set_finished:关闭输入和输出,调用后,is_finished、has_output、need_input 都返回 false,pull_chunk 和 push_chunk 不可调用,当 Pipeline 中 HashJoinProbeOperator 和 LimitOperator 算子产生短路并且提前结束时,需要调用前置算子 set_finished 函数。如果两个算子之间通过专门的 Context 交换数据,则 set_finished 函数中,需要正确地重置 Context 状态,一个算子需要感知到另外一个算子的 set_finished 函数调用。比如 LocalExchangeSinkOperator 和 LocalExchangeSourceOperator。
  • set_canceled:类似 set_finished,但表示算子异常结束,如果算子需要区分正常或者异常结束,则需要重载 set_canceled 函数,目前只有 ExchangeSinkOperator 用到该函数。
  • pending_finish:表示状态,当算子实现了异步化,算子结束时,异步化任务尚未完成,算子需要等待异步化任务结束后,才能销毁所在的 PipelineDriver。提前销毁 PipelineDriver 可能会导致异步化任务延后执行引用算子中的已销毁对象。

一个算子会经过 prepare -> finishing -> finished -> [cancelled] -> closed 的转换,Pipeline 执行引擎根据算子的状态, 执行相应的接口。

Pipeline 执行引擎中,每个算子有一个 OperatorFactory 类,Pipeline 由 OperatorFactory 组成,PipelineDriver 是 Pipeline 的实例,PipelineDriver 由 Operator 构成,OperatorFactory 创建 Operator 对象,从 Pipeline 创建 PipelineDriver 时,遍历 Pipeline 中的 OperatorFactory,调用 OperatorFactory::create 方法。

// 文件:be/src/exec/pipeline/pipeline.h
// 函数: Pipeline::create_operators
Operators create_operators(int32_t degree_of_parallelism, int32_t i) {
    Operators operators;
    for (const auto& factory : _op_factories) {
        operators.emplace_back(factory->create(degree_of_parallelism, i));
    }
    return operators;
}

OperatorFactory 如下:

对于某些算子,比如 ScanOperator,来自同一个 OperatorFactory 的多个算子,会共享一份表达式,共享的表达式放置在 OperatorFactory 中,通过 OperatorFactory::prepare函数调用表达式的 prepare/open 函数,通过 OperatorFactory::close 函数调用表达式的 close 函数。

如果表达式不可重入,在计算时,算子在多个线程中执行,存在线程不安全问题。因此新添加的表达式,要保证:


  1. 表达式是线程安全的。
  2. 线程不安全的表达式,不在 OperatorFactory 中共享,每个算子有自己私有的副本。

Pipeline 拆分

当 BE 收到 FE 发来的 Fragment Instance 时,会创建一个 FragmentExecutor 对象初始化 Fragment Instance 的执行环境。FragmentExecutor::prepare 函数使用 PipelineBuilder 将 PlanFragment 拆分成Pipeline。

// file: be/src/exec/pipeline/fragment_executor.cpp
// function: FragmentExecutor::prepare
ExecNode* plan = nullptr;
RETURN_IF_ERROR(ExecNode::create_tree(runtime_state, obj_pool, fragment.plan, *desc_tbl, &plan))
// ...
PipelineBuilderContext context(_fragment_ctx, degree_of_parallelism);
PipelineBuilder builder(context);
_fragment_ctx->set_pipelines(builder.build(*_fragment_ctx, plan))
  • 首先通过 ExecNode::create_tree 函数获得 PlanFragment 的物理算子构成执行树。
  • 初始化 PipelineBuilderContext 对象,传入 degree_of_parallelism 参数。
  • 构建 PipelineBuilder 对象,调用 PipelineBuilder::build 拆分 Pipeline。

PipelineBuilder::build 主要从执行树 root 节点开始,递归调用 decompose_to_pipeline 函数。

Pipelines PipelineBuilder::build(const FragmentContext& fragment, ExecNode* exec_node) {
    pipeline::OpFactories operators = exec_node->decompose_to_pipeline(&_context);
    _context.add_pipeline(operators);
    _context.get_pipelines().back()->set_root();
    return _context.get_pipelines();
}

物理算子需要重载 ExecNode 的 decompose_to_pipeline 函数。

​decompose_to_pipeline 函数递归地调用,完成算子的拆分。以 ProjectNode 为例,ProjectNode 调用decompose_to_pipeline 函数对 _children[0] 先完成 Pipeline 拆解,并返回 OperatorFactory 数组,然后 ProjectNode 自身转变为 ProjectOperatorFactory,追加 OperatorFactory 数组的末尾,参考下面代码:

pipeline::OpFactories ProjectNode::decompose_to_pipeline(pipeline::PipelineBuilderContext* context) {
    using namespace pipeline;
    OpFactories operators = _children[0]->decompose_to_pipeline(context);
    // Create a shared RefCountedRuntimeFilterCollector
    auto&& rc_rf_probe_collector = std::make_shared<RcRfProbeCollector>(1, std::move(this->runtime_filter_collector()));

    operators.emplace_back(std::make_shared<ProjectOperatorFactory>(
            context->next_operator_id(), id(), std::move(_slot_ids), std::move(_expr_ctxs),
            std::move(_type_is_nullable), std::move(_common_sub_slot_ids), std::move(_common_sub_expr_ctxs)));
    // Initialize OperatorFactory's fields involving runtime filters.
    this->init_runtime_filter_for_operator(operators.back().get(), context, rc_rf_probe_collector);
    if (limit() != -1) {
        operators.emplace_back(std::make_shared<LimitOperatorFactory>(context->next_operator_id(), id(), limit()));
    }
    return operators;
}

复杂算子的拆分可能会用到 LocalExchange 算子,目前 LocalExchange 算子支持 Passthrough、broadcast 和 shuffle 模式。更多复杂的算子拆分可以参考针对这些算子的详细源码解析。

DataSink 的拆分和 ExecNode 不同,可以参考函数 FragmentExecutor::_decompose_data_sink_to_operator,此处不再赘述。

4、PipelineDriver 的调度逻辑

PipelineDriver 的调度主要涉及下面几个函数:

  • GlobalDriverExecutor::worker_thread:Pipeline 引擎执行线程的入口函数,该函数持续从就绪 Driver 队列获取 PipelineDriver,执行 PipelineDriver::process 函数。在 PipelineDriver 阻塞或者时间片用完时,主动 yield,换其他就绪 PipelineDriver 执行。
  • PipelineDriver::process 函数:调用 Operator::pull_chunk/push_chunk 函数进行计算,判断 PipelineDriver 是否阻塞或者需要 yield。
  • PipelineDriverPoller::run_internal:阻塞 PipelineDriver 的轮询线程的函数,遍历阻塞 PipelineDriver,将已经解除阻塞的 PipelineDriver 放回就绪队列。

GlobalDriverExecutor::worker_thread

该函数的主要功能是:

  • 从就绪队列取 PipelineDriver。
  • 执行 PipelineDriver::process 函数
  • PipelineDriver 执行完一轮之后,判断 PipelineDriver 的当前状态

PipelineDriver 正常结束,异常结束或者计算出错,则调用 PipelineDriver::finalize_driver 函数完成 PipelineDriver 的清理;

PipelineDriver 仍然处于 RUNNING 状态,则设置其状态为 READY,放回就绪 Driver 队列;

PipelineDriver 处于阻塞状态,则调用 PipelineDriverPoller->add_blocked_driver 函数,将 PipelineDriver 加入到阻塞 Driver 队列中。

  • 就绪 Driver 队列采用多级反馈队列(mlfq)实现,小查询优先调度,同时避免大查询饥饿。
  • 请参考 https://github.com/StarRocks/starrocks/blob/main/be/src/exec/pipeline/pipeline_driver_executor.cpp

PipelineDriver::process

该函数的功能主要有:

  • 遍历 PipelineDriver 中的相邻算子对,只有当两个算子 is_finished() 返回 false,前置算子 has_output() 返回 true,后置算子 need_input() 返回 true 时,调用前置算子的 pull_chunk 获得 chunk,调用后置算子的 push_chunk,将 chunk 推给它,从而完成 chunk 的转移。请参考: https://github.com/StarRocks/starrocks/blob/main/be/src/exec/pipeline/pipeline_driver.cpp
  • 当 PipelineDriver 中转移的 chunk 数量超过 100 个,本轮累积执行时间超过 100ms,则主动 yield,退出当前 process,返回就绪队列,换其他 PipelineDriver 执行。
  • 当 PipelineDriver 中当前无 chunk 可以移动,则说明 PipelineDriver 处于阻塞状态,退出当前 process,放回阻塞队列。

PipelineDriverPoller::run_internal

该函数遍历阻塞 Driver 队列,唤醒解除阻塞态的 PipelineDriver,放入就绪 Driver 队列,等待 Pipeline 执行线程调度。具体参考见:https://github.com/StarRocks/starrocks/blob/main/be/src/exec/pipeline/pipeline_driver_poller.cpp


#04

总结

本文主要讲解了 Pipeline 执行引擎想解决的问题及一般性原理。针对 Pipeline 执行引擎的实现,着重说明了 BE 端拆分 Pipeline 的逻辑,以及 Pipeline 实例 PipelineDriver 的调度执行逻辑。

想要深入学习 StarRocks 的执行引擎,还需要研究 MPP 调度和向量化执行,后续我们会继续撰文与大家分享。

读到这里,好学的你是不是又产生了一些新思考与启发?

扫描下方用户群二维码加入 StarRocks 社区一起自由交流!

关于 StarRocks

面世两年多来,StarRocks 一直专注打造世界顶级的新一代极速全场景 MPP 数据库,帮助企业建立“极速统一”的数据分析新范式,助力企业全面数字化经营。

当前已经帮助腾讯、携程、顺丰、Airbnb 、滴滴、京东、众安保险等超过 170 家大型用户构建了全新的数据分析能力,生产环境中稳定运行的 StarRocks 服务器数目达数千台。

2021 年 9 月,StarRocks 源代码开放,在 GitHub 上的星数已超过 3400 个。StarRocks 的全球社区飞速成长,至今已有超百位贡献者,社群用户突破 7000 人,吸引几十家国内外行业头部企业参与共建。


分享
0 点赞
0 收藏
确认
3秒后跳转登录页面
去登陆