Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] fix be core in highly concurrent queries #47411

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,10 @@ DEFINE_Int32(fragment_mgr_asynic_work_pool_thread_num_min, "16");
DEFINE_Int32(fragment_mgr_asynic_work_pool_thread_num_max, "512");
DEFINE_Int32(fragment_mgr_asynic_work_pool_queue_size, "4096");

// Fragment thread pool for prepare
DEFINE_Int32(fragment_mgr_prepare_work_pool_thread_num, "16");
DEFINE_Int32(fragment_mgr_prepare_work_pool_queue_size, "512");

// Control the number of disks on the machine. If 0, this comes from the system settings.
DEFINE_Int32(num_disks, "0");
// The read size is the size of the reads sent to os.
Expand Down
4 changes: 4 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,10 @@ DECLARE_Int32(fragment_mgr_asynic_work_pool_thread_num_min);
DECLARE_Int32(fragment_mgr_asynic_work_pool_thread_num_max);
DECLARE_Int32(fragment_mgr_asynic_work_pool_queue_size);

// Fragment thread pool for prepare
DECLARE_Int32(fragment_mgr_prepare_work_pool_thread_num);
DECLARE_Int32(fragment_mgr_prepare_work_pool_queue_size);

// Control the number of disks on the machine. If 0, this comes from the system settings.
DECLARE_Int32(num_disks);
// The read size is the size of the reads sent to os.
Expand Down
10 changes: 5 additions & 5 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) {
}

Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& request,
ThreadPool* thread_pool) {
FifoThreadPool* thread_pool_for_prepare) {
if (_prepared) {
return Status::InternalError("Already prepared");
}
Expand Down Expand Up @@ -348,7 +348,7 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re
{
SCOPED_TIMER(_build_tasks_timer);
// 6. Build pipeline tasks and initialize local state.
RETURN_IF_ERROR(_build_pipeline_tasks(request, thread_pool));
RETURN_IF_ERROR(_build_pipeline_tasks(request, thread_pool_for_prepare));
}

_init_next_report_time();
Expand All @@ -358,7 +358,7 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re
}

Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFragmentParams& request,
ThreadPool* thread_pool) {
FifoThreadPool* thread_pool_for_prepare) {
_total_tasks = 0;
const auto target_size = request.local_params.size();
_tasks.resize(target_size);
Expand Down Expand Up @@ -524,15 +524,15 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag
std::condition_variable cv;
int prepare_done = 0;
for (int i = 0; i < target_size; i++) {
RETURN_IF_ERROR(thread_pool->submit_func([&, i]() {
thread_pool_for_prepare->offer([&, i]() {
SCOPED_ATTACH_TASK(_query_ctx.get());
prepare_status[i] = pre_and_submit(i, this);
std::unique_lock<std::mutex> lock(m);
prepare_done++;
if (prepare_done == target_size) {
cv.notify_one();
}
}));
});
}
std::unique_lock<std::mutex> lock(m);
if (prepare_done != target_size) {
Expand Down
5 changes: 3 additions & 2 deletions be/src/pipeline/pipeline_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ class PipelineFragmentContext : public TaskExecutionContext {
// should be protected by lock?
[[nodiscard]] bool is_canceled() const { return _query_ctx->is_cancelled(); }

Status prepare(const doris::TPipelineFragmentParams& request, ThreadPool* thread_pool);
Status prepare(const doris::TPipelineFragmentParams& request,
FifoThreadPool* thread_pool_for_prepare);

Status submit();

Expand Down Expand Up @@ -168,7 +169,7 @@ class PipelineFragmentContext : public TaskExecutionContext {
const std::map<int, int>& shuffle_idx_to_instance_idx);

Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& request,
ThreadPool* thread_pool);
FifoThreadPool* thread_pool_for_prepare);
void _close_fragment_instance();
void _init_next_report_time();

Expand Down
10 changes: 8 additions & 2 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,11 @@ FragmentMgr::FragmentMgr(ExecEnv* exec_env)
.set_max_threads(config::fragment_mgr_asynic_work_pool_thread_num_max)
.set_max_queue_size(config::fragment_mgr_asynic_work_pool_queue_size)
.build(&_thread_pool);

_thread_pool_for_prepare = std::make_unique<FifoThreadPool>(
config::fragment_mgr_prepare_work_pool_thread_num,
config::fragment_mgr_prepare_work_pool_queue_size, "for task prepare");

CHECK(s.ok()) << s.to_string();
}

Expand Down Expand Up @@ -849,8 +854,9 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
{
SCOPED_RAW_TIMER(&duration_ns);
Status prepare_st = Status::OK();
ASSIGN_STATUS_IF_CATCH_EXCEPTION(prepare_st = context->prepare(params, _thread_pool.get()),
prepare_st);
ASSIGN_STATUS_IF_CATCH_EXCEPTION(
prepare_st = context->prepare(params, _thread_pool_for_prepare.get()), prepare_st);

if (!prepare_st.ok()) {
query_ctx->cancel(prepare_st, params.fragment_id);
query_ctx->set_execution_dependency_ready();
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/fragment_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ class FragmentMgr : public RestMonitorIface {
scoped_refptr<Thread> _cancel_thread;
// every job is a pool
std::unique_ptr<ThreadPool> _thread_pool;
std::unique_ptr<FifoThreadPool> _thread_pool_for_prepare;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why it will core in the future???
You should explain it in the PR description.


std::shared_ptr<MetricEntity> _entity;
UIntGauge* timeout_canceled_fragment_count = nullptr;
Expand Down
Loading