Skip to content

Commit

Permalink
[fix](scan) catch exceptions thrown in scanner (apache#36101)
Browse files Browse the repository at this point in the history
## Proposed changes

The uncaught exceptions thrown in the scanner will cause the BE to
crash.

<!--Describe your changes.-->
  • Loading branch information
mrhhsg authored Jun 11, 2024
1 parent d443938 commit da7269c
Showing 1 changed file with 31 additions and 6 deletions.
37 changes: 31 additions & 6 deletions be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,17 @@ void ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
}

scanner_delegate->_scanner->start_wait_worker_timer();
auto s = ctx->thread_token->submit_func(
[this, scanner_ref = scan_task, ctx]() { this->_scanner_scan(ctx, scanner_ref); });
auto s = ctx->thread_token->submit_func([scanner_ref = scan_task, ctx]() {
auto status = [&] {
RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref));
return Status::OK();
}();

if (!status.ok()) {
scanner_ref->set_status(status);
ctx->append_block_to_queue(scanner_ref);
}
});
if (!s.ok()) {
scan_task->set_status(s);
ctx->append_block_to_queue(scan_task);
Expand All @@ -157,16 +166,32 @@ void ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
is_local ? ctx->get_simple_scan_scheduler() : ctx->get_remote_scan_scheduler();
auto& thread_pool = is_local ? _local_scan_thread_pool : _remote_scan_thread_pool;
if (scan_sched) {
auto work_func = [this, scanner_ref = scan_task, ctx]() {
this->_scanner_scan(ctx, scanner_ref);
auto work_func = [scanner_ref = scan_task, ctx]() {
auto status = [&] {
RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref));
return Status::OK();
}();

if (!status.ok()) {
scanner_ref->set_status(status);
ctx->append_block_to_queue(scanner_ref);
}
};
SimplifiedScanTask simple_scan_task = {work_func, ctx};
return scan_sched->submit_scan_task(simple_scan_task);
}

PriorityThreadPool::Task task;
task.work_function = [this, scanner_ref = scan_task, ctx]() {
this->_scanner_scan(ctx, scanner_ref);
task.work_function = [scanner_ref = scan_task, ctx]() {
auto status = [&] {
RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref));
return Status::OK();
}();

if (!status.ok()) {
scanner_ref->set_status(status);
ctx->append_block_to_queue(scanner_ref);
}
};
task.priority = nice;
return thread_pool->offer(task)
Expand Down

0 comments on commit da7269c

Please sign in to comment.