Skip to content

Commit

Permalink
Fix bug that rdma server rx complete queue will be blocked if client …
Browse files Browse the repository at this point in the history
…crashed. (#1978)

Fixes #1976

Signed-off-by: vegetableysm <[email protected]>
  • Loading branch information
vegetableysm authored Aug 9, 2024
1 parent f5721bb commit 6682a5a
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 13 deletions.
11 changes: 8 additions & 3 deletions src/common/rdma/rdma.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,21 +155,26 @@ int IRDMA::GetCompletion(fid_cq* cq, int timeout, void** context) {
if (ret > 0) {
break;
} else if (ret < 0 && ret != -FI_EAGAIN) {
return ret;
if (ret == -FI_EAVAIL) {
fi_cq_readerr(cq, &err, 0);
ret = -err.err;
}
break;
} else if (timeout > 0) {
clock_gettime(CLOCK_REALTIME, &end);
if ((end.tv_sec - start.tv_sec) * 1000 +
(end.tv_nsec - start.tv_nsec) / 1000000 >
timeout) {
return -FI_ETIMEDOUT;
ret = -FI_ETIMEDOUT;
break;
}
}
}
if (context) {
*context = err.op_context;
}

return 0;
return ret < 0 ? ret : 0;
}

void IRDMA::FreeInfo(fi_info* info) {
Expand Down
14 changes: 12 additions & 2 deletions src/common/rdma/rdma_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,12 @@ Status RDMAServer::GetRXCompletion(int timeout, void** context) {
continue;
}
} else if (ret < 0) {
return Status::Invalid("GetRXCompletion failed");
if (ret == -FI_ECANCELED) {
// client crashed
return Status::ConnectionError("Client crashed.");
} else {
return Status::Invalid(fi_strerror(-ret));
}
} else {
return Status::OK();
}
Expand All @@ -449,7 +454,12 @@ Status RDMAServer::GetTXCompletion(int timeout, void** context) {
continue;
}
} else if (ret < 0) {
return Status::Invalid("GetTXCompletion failed:" + std::to_string(ret));
if (ret == -FI_ECANCELED) {
// client crashed
return Status::ConnectionError("Client crashed.");
} else {
return Status::Invalid(fi_strerror(-ret));
}
} else {
return Status::OK();
}
Expand Down
20 changes: 12 additions & 8 deletions src/server/async/rpc_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,12 @@ void RPCServer::doRDMARecv() {
VLOG(100) << "RDMA server stopped!";
return;
}
if (status.IsConnectionError()) {
LOG(ERROR) << "Connection error!" << status.message();
VineyardRecvContext* recv_context =
reinterpret_cast<VineyardRecvContext*>(context);
doVineyardClose(recv_context);
}
VLOG(100) << "Get RX completion failed! Error:" << status.message();
VLOG(100) << "Retry...";
} else {
Expand All @@ -378,6 +384,12 @@ void RPCServer::doRDMARecv() {
VineyardMsg* recv_msg =
reinterpret_cast<VineyardMsg*>(recv_context->attr.msg_buffer);

if (recv_msg->type == VINEYARD_MSG_CLOSE) {
doVineyardClose(recv_context);
delete recv_context;
continue;
}

VineyardRecvContext* recv_context_tmp = new VineyardRecvContext();
VineyardMsg* recv_msg_tmp = new VineyardMsg();
if (recv_msg_tmp == nullptr || recv_context_tmp == nullptr) {
Expand Down Expand Up @@ -407,14 +419,6 @@ void RPCServer::doRDMARecv() {
rdma_server_->Recv(
recv_context->rdma_conn_id, reinterpret_cast<void*>(recv_msg),
sizeof(VineyardMsg), reinterpret_cast<void*>(recv_context));
} else if (recv_msg->type == VINEYARD_MSG_CLOSE) {
boost::asio::post(vs_ptr_->GetIOContext(),
[this, recv_context_tmp, recv_msg_tmp] {
doVineyardClose(recv_context_tmp);
delete recv_context_tmp;
delete recv_msg_tmp;
});
delete recv_context;
} else {
LOG(ERROR) << "Unknown message type: " << recv_msg->type;
rdma_server_->Recv(
Expand Down

0 comments on commit 6682a5a

Please sign in to comment.