From 015428854852ba59156336fd5b0aa0a8e2737920 Mon Sep 17 00:00:00 2001 From: Peter Rudenko Date: Tue, 2 Oct 2018 20:47:11 +0300 Subject: [PATCH 1/7] Fix README merge conflict Change-Id: Ic878121f9cf76faee564afdd1a3373c4ef40512f --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index e73bd1ce..d8babfe7 100755 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ Example performance speedup for [HiBench](https://github.com/intel-hadoop/HiBenc Test environment: 16 Spark standalone workers, 2x Intel Xeon E5-2697 v3 @ 2.60GHz, 30 cores per Worker, 256GB RAM, non-flash storage (HDD) -======= + ### Terasort ![TeraSort results](https://user-images.githubusercontent.com/1121987/44670087-6c78bb00-aa2a-11e8-834c-71bc177abd87.png) From d98250184ba5e0bc3f06f2146f035fbdc769607b Mon Sep 17 00:00:00 2001 From: Peter Rudenko Date: Tue, 2 Oct 2018 22:27:26 +0300 Subject: [PATCH 2/7] Fix merge conflict in README Change-Id: I610ec2e86bb5f5b66da53943b4644ffcd4468e15 --- README.md | 6 ------ 1 file changed, 6 deletions(-) diff --git a/README.md b/README.md index d8babfe7..1d32a94c 100755 --- a/README.md +++ b/README.md @@ -5,12 +5,6 @@ performing Shuffle data transfers in Spark jobs. This open-source project is developed, maintained and supported by [Mellanox Technologies](http://www.mellanox.com). ## Performance results -Example performance speedup for [HiBench](https://github.com/intel-hadoop/HiBench) workloads: - -Test environment: - -16 Spark standalone workers, 2x Intel Xeon E5-2697 v3 @ 2.60GHz, 30 cores per Worker, 256GB RAM, non-flash storage (HDD) - ### Terasort ![TeraSort results](https://user-images.githubusercontent.com/1121987/44670087-6c78bb00-aa2a-11e8-834c-71bc177abd87.png) From c340eae3d9c9aafc8c0a8df8b6ff42b6328f2e76 Mon Sep 17 00:00:00 2001 From: Peter Rudenko Date: Mon, 22 Oct 2018 12:42:26 +0300 Subject: [PATCH 3/7] Use explicit ODP memory registration. "In Implicit ODP, applications can create a memory key that covers the entire address space of a process. This relieves the application from the burden of memory registration as it allows it to use a single memory key for all IO accesses." However, SparkRDMA uses mixed memory regions (ODP for mmaped shuffle data, and non-ODP for buffer registration). "In Explicit ODP, applications still register memory buffers for communication, but this operation is used to define access control for IO rather than pin-down the pages." Benchmarks showed that executing pre-fetching verb at registration time doesn't provide any improvements. Next work will be focusing on improving performance when running in ODP mode. Change-Id: I47cbb27081230a611fadeff54abaff10966cd670 --- .../spark/shuffle/rdma/RdmaBufferManager.java | 24 +++++++------------ .../spark/shuffle/rdma/RdmaMappedFile.java | 17 +++++++------ 2 files changed, 16 insertions(+), 25 deletions(-) mode change 100644 => 100755 src/main/java/org/apache/spark/shuffle/rdma/RdmaMappedFile.java diff --git a/src/main/java/org/apache/spark/shuffle/rdma/RdmaBufferManager.java b/src/main/java/org/apache/spark/shuffle/rdma/RdmaBufferManager.java index b0515721..890e408c 100755 --- a/src/main/java/org/apache/spark/shuffle/rdma/RdmaBufferManager.java +++ b/src/main/java/org/apache/spark/shuffle/rdma/RdmaBufferManager.java @@ -25,9 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import com.ibm.disni.rdma.verbs.IbvMr; import com.ibm.disni.rdma.verbs.IbvPd; -import com.ibm.disni.rdma.verbs.SVCRegMr; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.ExecutionContext; @@ -99,7 +97,7 @@ private void close() { private final ConcurrentHashMap allocStackMap = new ConcurrentHashMap<>(); private IbvPd pd; - private IbvMr odpMr = null; + private final boolean useOdp; private long maxCacheSize; private static final ExecutionContextExecutor globalScalaExecutor = ExecutionContext.Implicits$.MODULE$.global(); @@ -110,15 +108,12 @@ private void close() { this.minimumAllocationSize = Math.min(conf.recvWrSize(), MIN_BLOCK_SIZE); this.maxCacheSize = conf.maxBufferAllocationSize(); if (conf.useOdp(pd.getContext())) { - int access = IbvMr.IBV_ACCESS_LOCAL_WRITE | IbvMr.IBV_ACCESS_REMOTE_WRITE | - IbvMr.IBV_ACCESS_REMOTE_READ | IbvMr.IBV_ACCESS_ON_DEMAND; - - SVCRegMr sMr = pd.regMr(0, -1, access).execute(); - this.odpMr = sMr.getMr(); + useOdp = true; if (conf.collectOdpStats()) { odpStats = new OdpStats(conf); } - sMr.free(); + } else { + useOdp = false; } } @@ -217,9 +212,9 @@ private void cleanLRUStacks(long idleBuffersSize) { IbvPd getPd() { return this.pd; } - IbvMr getOdpMr() { return this.odpMr; } + boolean useOdp() { return this.useOdp; } - void stop() throws IOException { + void stop() { logger.info("Rdma buffers allocation statistics:"); for (Integer size : allocStackMap.keySet()) { AllocatorStack allocatorStack = allocStackMap.remove(size); @@ -230,11 +225,8 @@ void stop() throws IOException { } } - if (odpMr != null) { - odpMr.deregMr().execute().free(); - if (odpStats != null) { - odpStats.printODPStatistics(); - } + if (useOdp && odpStats != null) { + odpStats.printODPStatistics(); } } } diff --git a/src/main/java/org/apache/spark/shuffle/rdma/RdmaMappedFile.java b/src/main/java/org/apache/spark/shuffle/rdma/RdmaMappedFile.java old mode 100644 new mode 100755 index 43684bae..d577a333 --- a/src/main/java/org/apache/spark/shuffle/rdma/RdmaMappedFile.java +++ b/src/main/java/org/apache/spark/shuffle/rdma/RdmaMappedFile.java @@ -40,9 +40,9 @@ public class RdmaMappedFile { private FileChannel fileChannel; private final IbvPd ibvPd; - private IbvMr odpMr; private final RdmaMapTaskOutput rdmaMapTaskOutput; + private final RdmaBufferManager rdmaBufferManager; public RdmaMapTaskOutput getRdmaMapTaskOutput() { return rdmaMapTaskOutput; } @@ -79,8 +79,7 @@ public RdmaMappedFile(File file, int chunkSize, long[] partitionLengths, IllegalAccessException { this.file = file; this.ibvPd = rdmaBufferManager.getPd(); - this.odpMr = rdmaBufferManager.getOdpMr(); - + this.rdmaBufferManager = rdmaBufferManager; final RandomAccessFile backingFile = new RandomAccessFile(file, "rw"); this.fileChannel = backingFile.getChannel(); @@ -136,7 +135,7 @@ private void mapAndRegister(int chunkSize, long[] partitionLengths) throws IOExc curPartition, rdmaFileMapping.address + curLength - partitionLengths[curPartition], (int)partitionLengths[curPartition], - (rdmaFileMapping.ibvMr != null) ? rdmaFileMapping.ibvMr.getLkey() : odpMr.getLkey()); + rdmaFileMapping.ibvMr.getLkey()); curPartition++; } } @@ -157,15 +156,15 @@ private void mapAndRegister(long fileOffset, long length) throws IOException, } IbvMr ibvMr = null; - if (odpMr == null) { + if (!rdmaBufferManager.useOdp()) { SVCRegMr svcRegMr = ibvPd.regMr(address, (int)length, ACCESS).execute(); ibvMr = svcRegMr.getMr(); svcRegMr.free(); } else { - int ret = odpMr.expPrefetchMr(address, (int)length); - if (ret != 0) { - throw new IOException("expPrefetchMr failed with: " + ret); - } + SVCRegMr svcRegMr = ibvPd.regMr(address, (int)length, + ACCESS | IbvMr.IBV_ACCESS_ON_DEMAND).execute(); + ibvMr = svcRegMr.getMr(); + svcRegMr.free(); } rdmaFileMappings.add(new RdmaFileMapping(ibvMr, address, mapAddress, length, alignedLength)); From 5a815033a6547f485d04b59a7c1fc967ad09270d Mon Sep 17 00:00:00 2001 From: Peter Rudenko Date: Thu, 25 Oct 2018 17:34:05 +0300 Subject: [PATCH 4/7] Staticly initialize DirectByteBuffer constructor Java reflection is quite expensive (https://docs.oracle.com/javase/tutorial/reflect/index.html): "Because reflection involves types that are dynamically resolved, certain Java virtual machine optimizations can not be performed. Consequently, reflective operations have slower performance than their non-reflective counterparts, and should be avoided in sections of code which are called frequently in performance-sensitive applications." In critical path we call getByteBuffer quite frequently: https://user-images.githubusercontent.com/1121987/47508402-bcce9680-d87c-11e8-99d2-7b55677e4d3e.png So we could statically initialize ByteBuffer constructor reflection, and only instantiate it based on the arguments. Change-Id: Ibca8ddbd67c4239a6e462a453669b7519c89afc8 --- .../apache/spark/shuffle/rdma/RdmaBuffer.java | 30 ++++++++----------- .../spark/shuffle/rdma/RdmaMappedFile.java | 18 +---------- .../shuffle/rdma/RdmaRegisteredBuffer.java | 16 +--------- 3 files changed, 15 insertions(+), 49 deletions(-) diff --git a/src/main/java/org/apache/spark/shuffle/rdma/RdmaBuffer.java b/src/main/java/org/apache/spark/shuffle/rdma/RdmaBuffer.java index c6ba6e5a..f03e0c2f 100755 --- a/src/main/java/org/apache/spark/shuffle/rdma/RdmaBuffer.java +++ b/src/main/java/org/apache/spark/shuffle/rdma/RdmaBuffer.java @@ -40,7 +40,18 @@ class RdmaBuffer { private final MemoryBlock block; private AtomicInteger refCount; - public static final UnsafeMemoryAllocator unsafeAlloc = new UnsafeMemoryAllocator(); + static final UnsafeMemoryAllocator unsafeAlloc = new UnsafeMemoryAllocator(); + public static final Constructor directBufferConstructor; + + static { + try { + Class classDirectByteBuffer = Class.forName("java.nio.DirectByteBuffer"); + directBufferConstructor = classDirectByteBuffer.getDeclaredConstructor(long.class, int.class); + directBufferConstructor.setAccessible(true); + } catch (Exception e) { + throw new RuntimeException("java.nio.DirectByteBuffer class not found"); + } + } RdmaBuffer(IbvPd ibvPd, int length) throws IOException { block = unsafeAlloc.allocate((long)length); @@ -126,25 +137,10 @@ private void unregister() { } ByteBuffer getByteBuffer() throws IOException { - Class classDirectByteBuffer; - try { - classDirectByteBuffer = Class.forName("java.nio.DirectByteBuffer"); - } catch (ClassNotFoundException e) { - throw new IOException("java.nio.DirectByteBuffer class not found"); - } - Constructor constructor; - try { - constructor = classDirectByteBuffer.getDeclaredConstructor(long.class, int.class); - } catch (NoSuchMethodException e) { - throw new IOException("java.nio.DirectByteBuffer constructor not found"); - } - constructor.setAccessible(true); - ByteBuffer byteBuffer; try { - byteBuffer = (ByteBuffer)constructor.newInstance(getAddress(), getLength()); + return (ByteBuffer)directBufferConstructor.newInstance(getAddress(), getLength()); } catch (Exception e) { throw new IOException("java.nio.DirectByteBuffer exception: " + e.toString()); } - return byteBuffer; } } diff --git a/src/main/java/org/apache/spark/shuffle/rdma/RdmaMappedFile.java b/src/main/java/org/apache/spark/shuffle/rdma/RdmaMappedFile.java index d577a333..7a0d4bc5 100755 --- a/src/main/java/org/apache/spark/shuffle/rdma/RdmaMappedFile.java +++ b/src/main/java/org/apache/spark/shuffle/rdma/RdmaMappedFile.java @@ -200,30 +200,14 @@ public void dispose() throws IOException, InvocationTargetException, IllegalAcce } private ByteBuffer getByteBuffer(long address, int length) throws IOException { - Class classDirectByteBuffer; try { - classDirectByteBuffer = Class.forName("java.nio.DirectByteBuffer"); - } catch (ClassNotFoundException e) { - throw new IOException("java.nio.DirectByteBuffer class not found"); - } - Constructor constructor; - try { - constructor = classDirectByteBuffer.getDeclaredConstructor(long.class, int.class); - } catch (NoSuchMethodException e) { - throw new IOException("java.nio.DirectByteBuffer constructor not found"); - } - constructor.setAccessible(true); - ByteBuffer byteBuffer; - try { - byteBuffer = (ByteBuffer)constructor.newInstance(address, length); + return (ByteBuffer)RdmaBuffer.directBufferConstructor.newInstance(address, length); } catch (InvocationTargetException ex) { throw new IOException("java.nio.DirectByteBuffer: " + "InvocationTargetException: " + ex.getTargetException()); } catch (Exception e) { throw new IOException("java.nio.DirectByteBuffer exception: " + e.toString()); } - - return byteBuffer; } public ByteBuffer getByteBufferForPartition(int partitionId) throws IOException { diff --git a/src/main/java/org/apache/spark/shuffle/rdma/RdmaRegisteredBuffer.java b/src/main/java/org/apache/spark/shuffle/rdma/RdmaRegisteredBuffer.java index 30e37691..ab964d94 100755 --- a/src/main/java/org/apache/spark/shuffle/rdma/RdmaRegisteredBuffer.java +++ b/src/main/java/org/apache/spark/shuffle/rdma/RdmaRegisteredBuffer.java @@ -18,7 +18,6 @@ package org.apache.spark.shuffle.rdma; import java.io.IOException; -import java.lang.reflect.Constructor; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicInteger; @@ -75,22 +74,9 @@ ByteBuffer getByteBuffer(int length) throws IOException { throw new IllegalArgumentException("Exceeded Registered Length!"); } - Class classDirectByteBuffer; - try { - classDirectByteBuffer = Class.forName("java.nio.DirectByteBuffer"); - } catch (ClassNotFoundException e) { - throw new IOException("java.nio.DirectByteBuffer class not found"); - } - Constructor constructor; - try { - constructor = classDirectByteBuffer.getDeclaredConstructor(long.class, int.class); - } catch (NoSuchMethodException e) { - throw new IOException("java.nio.DirectByteBuffer constructor not found"); - } - constructor.setAccessible(true); ByteBuffer byteBuffer; try { - byteBuffer = (ByteBuffer)constructor.newInstance( + byteBuffer = (ByteBuffer)RdmaBuffer.directBufferConstructor.newInstance( getRegisteredAddress() + (long)blockOffset, length); blockOffset += length; } catch (Exception e) { From 47c08564d18d3c62e8f400a0023ddcc34d5fdf78 Mon Sep 17 00:00:00 2001 From: Peter Rudenko Date: Fri, 2 Nov 2018 14:19:22 +0200 Subject: [PATCH 5/7] 3.1 Intermediate release (ODP fix). Change-Id: I9e17cca145ba0db7124b373bd8884ce5f90fdc20 --- README.md | 12 ++++++------ pom.xml | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 1d32a94c..fbce878b 100755 --- a/README.md +++ b/README.md @@ -45,10 +45,10 @@ Please use the ["Releases"](https://github.com/Mellanox/SparkRDMA/releases) page
If you would like to build the project yourself, please refer to the ["Build"](https://github.com/Mellanox/SparkRDMA#build) section below. The pre-built binaries are packed as an archive that contains the following files: -* spark-rdma-3.0-for-spark-2.0.0-jar-with-dependencies.jar -* spark-rdma-3.0-for-spark-2.1.0-jar-with-dependencies.jar -* spark-rdma-3.0-for-spark-2.2.0-jar-with-dependencies.jar -* spark-rdma-3.0-for-spark-2.3.0-jar-with-dependencies.jar +* spark-rdma-3.1-for-spark-2.0.0-jar-with-dependencies.jar +* spark-rdma-3.1-for-spark-2.1.0-jar-with-dependencies.jar +* spark-rdma-3.1-for-spark-2.2.0-jar-with-dependencies.jar +* spark-rdma-3.1-for-spark-2.3.0-jar-with-dependencies.jar * libdisni.so libdisni.so **must** be in `java.library.path` on every Spark Master and Worker (usually in /usr/lib) @@ -58,8 +58,8 @@ libdisni.so **must** be in `java.library.path` on every Spark Master and Worker Provide Spark the location of the SparkRDMA plugin jars by using the extraClassPath option. For standalone mode this can be added to either spark-defaults.conf or any runtime configuration file. For client mode this **must** be added to spark-defaults.conf. For Spark 2.0.0 (Replace with 2.1.0, 2.2.0 or 2.3.0 according to your Spark version): ``` -spark.driver.extraClassPath /path/to/SparkRDMA/target/spark-rdma-2.0-for-spark-2.0.0-jar-with-dependencies.jar -spark.executor.extraClassPath /path/to/SparkRDMA/target/spark-rdma-2.0-for-spark-2.0.0-jar-with-dependencies.jar +spark.driver.extraClassPath /path/to/SparkRDMA/target/spark-rdma-3.1-for-spark-2.0.0-jar-with-dependencies.jar +spark.executor.extraClassPath /path/to/SparkRDMA/target/spark-rdma-3.1-for-spark-2.0.0-jar-with-dependencies.jar ``` ### Running diff --git a/pom.xml b/pom.xml index 2a143b3b..487da6ea 100755 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ com.github.mellanox spark-rdma - 3.0 + 3.1 ${project.artifactId} SparkRDMA Shuffle Manager Plugin 2017 From 616834d3ffede4eaee66313615c68b994329f1d3 Mon Sep 17 00:00:00 2001 From: Peter Rudenko Date: Thu, 8 Nov 2018 17:29:25 +0200 Subject: [PATCH 6/7] Use single QP between executors and driver. Fixes the issue of instantiating wrong QP type (RPC requestor/responder) between executors that's running on the same host as driver. Executor instantiates with a driver QP of type RPC, while driver reuse passive channel to send announce messages from other executors. Communication between executors still uses RDMA_READ_REQUESTOR/RDMA_READ_RESPONDER QPs. Change-Id: I8acc0ac796ab1a2f16bc1e3c987fadc5bee7a110 --- .../spark/shuffle/rdma/RdmaChannel.java | 37 ++++------ .../apache/spark/shuffle/rdma/RdmaNode.java | 33 ++++----- .../spark/shuffle/rdma/RdmaRpcMsg.scala | 12 ++-- .../rdma/RdmaShuffleFetcherIterator.scala | 4 +- .../shuffle/rdma/RdmaShuffleManager.scala | 70 ++++++++++--------- 5 files changed, 78 insertions(+), 78 deletions(-) diff --git a/src/main/java/org/apache/spark/shuffle/rdma/RdmaChannel.java b/src/main/java/org/apache/spark/shuffle/rdma/RdmaChannel.java index 48199def..4da48d6f 100755 --- a/src/main/java/org/apache/spark/shuffle/rdma/RdmaChannel.java +++ b/src/main/java/org/apache/spark/shuffle/rdma/RdmaChannel.java @@ -43,7 +43,7 @@ public class RdmaChannel { private final ConcurrentHashMap> svcPostSendCache = new ConcurrentHashMap(); - enum RdmaChannelType { RPC_REQUESTOR, RPC_RESPONDER, RDMA_READ_REQUESTOR, RDMA_READ_RESPONDER } + enum RdmaChannelType { RPC, RDMA_READ_REQUESTOR, RDMA_READ_RESPONDER } private final RdmaChannelType rdmaChannelType; private final RdmaCompletionListener receiveListener; @@ -130,6 +130,7 @@ private class CompletionInfo { // NOOP_RESERVED_INDEX is used for send operations that do not require a callback private static final int NOOP_RESERVED_INDEX = 0; private final AtomicInteger completionInfoIndex = new AtomicInteger(NOOP_RESERVED_INDEX); + private final RdmaShuffleConf conf; RdmaChannel( RdmaChannelType rdmaChannelType, @@ -152,32 +153,20 @@ private class CompletionInfo { this.receiveListener = receiveListener; this.rdmaBufferManager = rdmaBufferManager; this.cpuVector = cpuVector; + this.conf = conf; switch (rdmaChannelType) { - case RPC_REQUESTOR: - // Requires full-size sends, and receives for credit reports only + case RPC: + // Single bidirectional QP between executors and driver. if (conf.swFlowControl()) { - this.recvDepth = RECV_CREDIT_REPORT_RATIO; - this.remoteRecvCredits = new Semaphore(conf.recvQueueDepth(), false); - } else { - this.recvDepth = 0; + this.remoteRecvCredits = new Semaphore( + conf.recvQueueDepth() - RECV_CREDIT_REPORT_RATIO, false); } - this.recvWrSize = 0; - this.sendDepth = conf.sendQueueDepth(); - this.sendBudgetSemaphore = new Semaphore(sendDepth, false); - break; - - case RPC_RESPONDER: - // Requires full-size receives and sends for credit reports only this.recvDepth = conf.recvQueueDepth(); this.recvWrSize = conf.recvWrSize(); - if (conf.swFlowControl()) { - this.sendDepth = RECV_CREDIT_REPORT_RATIO; - } else { - this.sendDepth = 0; - } + this.sendDepth = conf.sendQueueDepth(); + this.sendBudgetSemaphore = new Semaphore(sendDepth - RECV_CREDIT_REPORT_RATIO, false); break; - case RDMA_READ_REQUESTOR: // Requires sends only, no need for any receives this.recvDepth = 0; @@ -322,6 +311,10 @@ void connect(InetSocketAddress socketAddress) throws IOException { setRdmaChannelState(RdmaChannelState.CONNECTED); } + InetSocketAddress getSourceSocketAddress() throws IOException { + return (InetSocketAddress)cmId.getSource(); + } + void accept() throws IOException { RdmaConnParam connParams = new RdmaConnParam(); @@ -778,7 +771,7 @@ private void exhaustCq() throws IOException { } } - if (sendDepth == RECV_CREDIT_REPORT_RATIO) { + if (conf.swFlowControl() && rdmaChannelType == RdmaChannelType.RPC) { // Software-level flow control is enabled localRecvCreditsPendingReport += reclaimedRecvWrs; if (localRecvCreditsPendingReport > (recvDepth / RECV_CREDIT_REPORT_RATIO)) { @@ -895,7 +888,7 @@ void stop() throws InterruptedException, IOException { int ret = cmId.disconnect(); if (ret != 0) { logger.error("disconnect failed with errno: " + ret); - } else if (rdmaChannelType.equals(RdmaChannelType.RPC_REQUESTOR) || + } else if (rdmaChannelType.equals(RdmaChannelType.RPC) || rdmaChannelType.equals(RdmaChannelType.RDMA_READ_REQUESTOR)) { try { processRdmaCmEvent(RdmaCmEvent.EventType.RDMA_CM_EVENT_DISCONNECTED.ordinal(), diff --git a/src/main/java/org/apache/spark/shuffle/rdma/RdmaNode.java b/src/main/java/org/apache/spark/shuffle/rdma/RdmaNode.java index 68dc6cb7..1de13d52 100755 --- a/src/main/java/org/apache/spark/shuffle/rdma/RdmaNode.java +++ b/src/main/java/org/apache/spark/shuffle/rdma/RdmaNode.java @@ -52,11 +52,12 @@ class RdmaNode { private InetAddress driverInetAddress; private final ArrayList cpuArrayList = new ArrayList<>(); private int cpuIndex = 0; + private final RdmaCompletionListener receiveListener; RdmaNode(String hostName, boolean isExecutor, final RdmaShuffleConf conf, final RdmaCompletionListener receiveListener) throws Exception { this.conf = conf; - + this.receiveListener = receiveListener; try { driverInetAddress = InetAddress.getByName(conf.driverHost()); @@ -147,10 +148,8 @@ class RdmaNode { } RdmaChannel.RdmaChannelType rdmaChannelType; - if (driverInetAddress.equals(inetSocketAddress.getAddress()) || - driverInetAddress.equals(localInetSocketAddress.getAddress())) { - // RPC communication is limited to driver<->executor only - rdmaChannelType = RdmaChannel.RdmaChannelType.RPC_RESPONDER; + if (!isExecutor) { + rdmaChannelType = RdmaChannel.RdmaChannelType.RPC; } else { rdmaChannelType = RdmaChannel.RdmaChannelType.RDMA_READ_RESPONDER; } @@ -162,6 +161,12 @@ class RdmaNode { rdmaChannel.stop(); continue; } + if (!isExecutor) { + RdmaChannel previous = activeRdmaChannelMap.put(inetSocketAddress, rdmaChannel); + if (previous != null) { + previous.stop(); + } + } try { rdmaChannel.accept(); @@ -275,8 +280,8 @@ private int getNextCpuVector() { public RdmaBufferManager getRdmaBufferManager() { return rdmaBufferManager; } - public RdmaChannel getRdmaChannel(InetSocketAddress remoteAddr, boolean mustRetry) - throws IOException, InterruptedException { + public RdmaChannel getRdmaChannel(InetSocketAddress remoteAddr, boolean mustRetry, + RdmaChannel.RdmaChannelType rdmaChannelType) throws IOException, InterruptedException { final long startTime = System.nanoTime(); final int maxConnectionAttempts = conf.maxConnectionAttempts(); final long connectionTimeout = maxConnectionAttempts * conf.rdmaCmEventTimeout(); @@ -287,16 +292,12 @@ public RdmaChannel getRdmaChannel(InetSocketAddress remoteAddr, boolean mustRetr do { rdmaChannel = activeRdmaChannelMap.get(remoteAddr); if (rdmaChannel == null) { - RdmaChannel.RdmaChannelType rdmaChannelType; - if (driverInetAddress.equals(remoteAddr.getAddress()) || - driverInetAddress.equals(localInetSocketAddress.getAddress())) { - // RPC communication is limited to driver<->executor only - rdmaChannelType = RdmaChannel.RdmaChannelType.RPC_REQUESTOR; - } else { - rdmaChannelType = RdmaChannel.RdmaChannelType.RDMA_READ_REQUESTOR; + RdmaCompletionListener listener = null; + if (rdmaChannelType == RdmaChannel.RdmaChannelType.RPC) { + // Executor <-> Driver rdma channels need listener on both sides. + listener = receiveListener; } - - rdmaChannel = new RdmaChannel(rdmaChannelType, conf, rdmaBufferManager, null, + rdmaChannel = new RdmaChannel(rdmaChannelType, conf, rdmaBufferManager, listener, getNextCpuVector()); RdmaChannel actualRdmaChannel = activeRdmaChannelMap.putIfAbsent(remoteAddr, rdmaChannel); diff --git a/src/main/scala/org/apache/spark/shuffle/rdma/RdmaRpcMsg.scala b/src/main/scala/org/apache/spark/shuffle/rdma/RdmaRpcMsg.scala index cf7b56bb..d8b85bbf 100755 --- a/src/main/scala/org/apache/spark/shuffle/rdma/RdmaRpcMsg.scala +++ b/src/main/scala/org/apache/spark/shuffle/rdma/RdmaRpcMsg.scala @@ -25,8 +25,6 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.internal.Logging import org.apache.spark.shuffle.rdma.RdmaRpcMsgType.RdmaRpcMsgType -import org.apache.spark.storage.BlockManagerId - object RdmaRpcMsgType extends Enumeration { type RdmaRpcMsgType = Value @@ -80,14 +78,14 @@ object RdmaRpcMsg extends Logging { } } -class RdmaShuffleManagerHelloRpcMsg(var rdmaShuffleManagerId: RdmaShuffleManagerId) - extends RdmaRpcMsg { - private def this() = this(null) // For deserialization only +class RdmaShuffleManagerHelloRpcMsg(var rdmaShuffleManagerId: RdmaShuffleManagerId, + var channelPort: Int) extends RdmaRpcMsg { + private def this() = this(null, 0) // For deserialization only override protected def msgType: RdmaRpcMsgType = RdmaRpcMsgType.RdmaShuffleManagerHello override protected def getLengthInSegments(segmentSize: Int): Array[Int] = { - val serializedLength = rdmaShuffleManagerId.serializedLength + val serializedLength = rdmaShuffleManagerId.serializedLength + 4 require(serializedLength <= segmentSize, "RdmaBuffer RPC segment size is too small") Array.fill(1) { serializedLength } @@ -96,10 +94,12 @@ class RdmaShuffleManagerHelloRpcMsg(var rdmaShuffleManagerId: RdmaShuffleManager override protected def writeSegments(outs: Iterator[(DataOutputStream, Int)]): Unit = { val out = outs.next()._1 rdmaShuffleManagerId.write(out) + out.writeInt(channelPort) } override protected def read(in: DataInputStream): Unit = { rdmaShuffleManagerId = RdmaShuffleManagerId(in) + channelPort = in.readInt() } } diff --git a/src/main/scala/org/apache/spark/shuffle/rdma/RdmaShuffleFetcherIterator.scala b/src/main/scala/org/apache/spark/shuffle/rdma/RdmaShuffleFetcherIterator.scala index 839c68ff..e1455553 100755 --- a/src/main/scala/org/apache/spark/shuffle/rdma/RdmaShuffleFetcherIterator.scala +++ b/src/main/scala/org/apache/spark/shuffle/rdma/RdmaShuffleFetcherIterator.scala @@ -169,8 +169,8 @@ private[spark] final class RdmaShuffleFetcherIterator( } try { - val rdmaChannel = rdmaShuffleManager.getRdmaChannel(pendingFetch.rdmaShuffleManagerId, - mustRetry = true) + val rdmaChannel = rdmaShuffleManager.getRdmaChannel( + pendingFetch.rdmaShuffleManagerId, mustRetry = true) rdmaChannel.rdmaReadInQueue(listener, rdmaRegisteredBuffer.getRegisteredAddress, rdmaRegisteredBuffer.getLkey, pendingFetch.rdmaBlockLocations.map(_.length).toArray, pendingFetch.rdmaBlockLocations.map(_.address).toArray, diff --git a/src/main/scala/org/apache/spark/shuffle/rdma/RdmaShuffleManager.scala b/src/main/scala/org/apache/spark/shuffle/rdma/RdmaShuffleManager.scala index 45af61eb..6e1eff78 100755 --- a/src/main/scala/org/apache/spark/shuffle/rdma/RdmaShuffleManager.scala +++ b/src/main/scala/org/apache/spark/shuffle/rdma/RdmaShuffleManager.scala @@ -82,36 +82,36 @@ private[spark] class RdmaShuffleManager(val conf: SparkConf, isDriver: Boolean) // Book keep mapping from BlockManagerId to RdmaShuffleManagerId blockManagerIdToRdmaShuffleManagerId.put(helloMsg.rdmaShuffleManagerId.blockManagerId, helloMsg.rdmaShuffleManagerId) - Future { - getRdmaChannel(helloMsg.rdmaShuffleManagerId, mustRetry = false) - }.onSuccess { case rdmaChannel => - rdmaShuffleManagersMap.put(helloMsg.rdmaShuffleManagerId, rdmaChannel) - val buffers = new RdmaAnnounceRdmaShuffleManagersRpcMsg( - rdmaShuffleManagersMap.keys.toSeq).toRdmaByteBufferManagedBuffers( - getRdmaByteBufferManagedBuffer, rdmaShuffleConf.recvWrSize) - - for ((dstRdmaShuffleManagerId, dstRdmaChannel) <- rdmaShuffleManagersMap) { - buffers.foreach(_.retain()) - - val listener = new RdmaCompletionListener { - override def onSuccess(buf: ByteBuffer): Unit = buffers.foreach(_.release()) - override def onFailure(e: Throwable): Unit = { - buffers.foreach(_.release()) - logError("Failed to send RdmaAnnounceExecutorsRpcMsg to executor: " + - dstRdmaShuffleManagerId + ", Exception: " + e) - } + // Since we're reusing executor <-> driver QP - whis will be taken from cache. + val rdmaChannel = getRdmaChannel(helloMsg.rdmaShuffleManagerId.host, + helloMsg.channelPort, false, RdmaChannel.RdmaChannelType.RPC) + rdmaShuffleManagersMap.put(helloMsg.rdmaShuffleManagerId, rdmaChannel) + val buffers = new RdmaAnnounceRdmaShuffleManagersRpcMsg( + rdmaShuffleManagersMap.keys.toSeq).toRdmaByteBufferManagedBuffers( + getRdmaByteBufferManagedBuffer, rdmaShuffleConf.recvWrSize) + + for ((dstRdmaShuffleManagerId, dstRdmaChannel) <- rdmaShuffleManagersMap) { + buffers.foreach(_.retain()) + + val listener = new RdmaCompletionListener { + override def onSuccess(buf: ByteBuffer): Unit = buffers.foreach(_.release()) + + override def onFailure(e: Throwable): Unit = { + buffers.foreach(_.release()) + logError("Failed to send RdmaAnnounceExecutorsRpcMsg to executor: " + + dstRdmaShuffleManagerId + ", Exception: " + e) } + } - try { - dstRdmaChannel.rdmaSendInQueue(listener, buffers.map(_.getAddress), - buffers.map(_.getLkey), buffers.map(_.getLength.toInt)) - } catch { - case e: Exception => listener.onFailure(e) - } + try { + dstRdmaChannel.rdmaSendInQueue(listener, buffers.map(_.getAddress), + buffers.map(_.getLkey), buffers.map(_.getLength.toInt)) + } catch { + case e: Exception => listener.onFailure(e) } - // Release the reference taken by the allocation - buffers.foreach(_.release()) } + // Release the reference taken by the allocation + buffers.foreach(_.release()) } case announceMsg: RdmaAnnounceRdmaShuffleManagersRpcMsg => @@ -205,7 +205,8 @@ private[spark] class RdmaShuffleManager(val conf: SparkConf, isDriver: Boolean) Future { getRdmaChannelToDriver(mustRetry = true) }.onSuccess { case rdmaChannel => - val buffers = new RdmaShuffleManagerHelloRpcMsg(localRdmaShuffleManagerId.get). + val port = rdmaChannel.getSourceSocketAddress.getPort + val buffers = new RdmaShuffleManagerHelloRpcMsg(localRdmaShuffleManagerId.get, port). toRdmaByteBufferManagedBuffers(getRdmaByteBufferManagedBuffer, rdmaShuffleConf.recvWrSize) val listener = new RdmaCompletionListener { @@ -308,14 +309,19 @@ private[spark] class RdmaShuffleManager(val conf: SparkConf, isDriver: Boolean) } } - private def getRdmaChannel(host: String, port: Int, mustRetry: Boolean): RdmaChannel = - rdmaNode.get.getRdmaChannel(new InetSocketAddress(host, port), mustRetry) + private def getRdmaChannel(host: String, port: Int, mustRetry: Boolean, + rdmaChannelType: RdmaChannel.RdmaChannelType): RdmaChannel = + rdmaNode.get.getRdmaChannel(new InetSocketAddress(host, port), mustRetry, rdmaChannelType) - def getRdmaChannel(rdmaShuffleManagerId: RdmaShuffleManagerId, mustRetry: Boolean): RdmaChannel = - getRdmaChannel(rdmaShuffleManagerId.host, rdmaShuffleManagerId.port, mustRetry) + def getRdmaChannel(rdmaShuffleManagerId: RdmaShuffleManagerId, + mustRetry: Boolean): RdmaChannel = { + getRdmaChannel(rdmaShuffleManagerId.host, rdmaShuffleManagerId.port, mustRetry, + RdmaChannel.RdmaChannelType.RDMA_READ_REQUESTOR) + } def getRdmaChannelToDriver(mustRetry: Boolean): RdmaChannel = getRdmaChannel( - rdmaShuffleConf.driverHost, rdmaShuffleConf.driverPort, mustRetry) + rdmaShuffleConf.driverHost, rdmaShuffleConf.driverPort, mustRetry, + RdmaChannel.RdmaChannelType.RPC) def getRdmaBufferManager: RdmaBufferManager = rdmaNode.get.getRdmaBufferManager From 4545823b82b4fe5afc9fe2e90781d315d3c3ee5e Mon Sep 17 00:00:00 2001 From: Peter Rudenko Date: Thu, 22 Nov 2018 16:00:09 +0200 Subject: [PATCH 7/7] Spark-2.4 support In spark-2.4 mapOutputTracker.getMapSizesByExecutorId returns iterator, rather then seq. Change-Id: I4a65a6e66af34792e29ed758fe81281df2cb908b --- README.md | 7 ++++--- pom.xml | 6 ++++++ .../org/apache/spark/shuffle/rdma/RdmaShuffleReader.scala | 3 ++- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index fbce878b..5bfb8719 100755 --- a/README.md +++ b/README.md @@ -34,7 +34,7 @@ Mellanox ConnectX-5 network adapter with 100GbE RoCE fabric, connected with a Me For more information on configuration, performance tuning and troubleshooting, please visit the [SparkRDMA GitHub Wiki](https://github.com/Mellanox/SparkRDMA/wiki) ## Runtime requirements -* Apache Spark 2.0.0/2.1.0/2.2.0/2.3.0 +* Apache Spark 2.0.0/2.1.0/2.2.0/2.3.0/2.4.0 * Java 8 * An RDMA-supported network, e.g. RoCE or Infiniband @@ -49,6 +49,7 @@ The pre-built binaries are packed as an archive that contains the following file * spark-rdma-3.1-for-spark-2.1.0-jar-with-dependencies.jar * spark-rdma-3.1-for-spark-2.2.0-jar-with-dependencies.jar * spark-rdma-3.1-for-spark-2.3.0-jar-with-dependencies.jar +* spark-rdma-3.1-for-spark-2.4.0-jar-with-dependencies.jar * libdisni.so libdisni.so **must** be in `java.library.path` on every Spark Master and Worker (usually in /usr/lib) @@ -56,7 +57,7 @@ libdisni.so **must** be in `java.library.path` on every Spark Master and Worker ### Configuration Provide Spark the location of the SparkRDMA plugin jars by using the extraClassPath option. For standalone mode this can -be added to either spark-defaults.conf or any runtime configuration file. For client mode this **must** be added to spark-defaults.conf. For Spark 2.0.0 (Replace with 2.1.0, 2.2.0 or 2.3.0 according to your Spark version): +be added to either spark-defaults.conf or any runtime configuration file. For client mode this **must** be added to spark-defaults.conf. For Spark 2.0.0 (Replace with 2.1.0, 2.2.0, 2.3.0, 2.4.0 according to your Spark version): ``` spark.driver.extraClassPath /path/to/SparkRDMA/target/spark-rdma-3.1-for-spark-2.0.0-jar-with-dependencies.jar spark.executor.extraClassPath /path/to/SparkRDMA/target/spark-rdma-3.1-for-spark-2.0.0-jar-with-dependencies.jar @@ -76,7 +77,7 @@ Building the SparkRDMA plugin requires [Apache Maven](http://maven.apache.org/) 1. Obtain a clone of [SparkRDMA](https://github.com/Mellanox/SparkRDMA) -2. Build the plugin for your Spark version (either 2.0.0, 2.1.0, 2.2.0 or 2.3.0), e.g. for Spark 2.0.0: +2. Build the plugin for your Spark version (either 2.0.0, 2.1.0, 2.2.0, 2.3.0, 2.4.0), e.g. for Spark 2.0.0: ``` mvn -DskipTests clean package -Pspark-2.0.0 ``` diff --git a/pom.xml b/pom.xml index 487da6ea..69168188 100755 --- a/pom.xml +++ b/pom.xml @@ -61,6 +61,12 @@ 2.3.0 + + spark-2.4.0 + + 2.4.0 + + diff --git a/src/main/scala/org/apache/spark/shuffle/rdma/RdmaShuffleReader.scala b/src/main/scala/org/apache/spark/shuffle/rdma/RdmaShuffleReader.scala index 22c4783a..847a3d72 100755 --- a/src/main/scala/org/apache/spark/shuffle/rdma/RdmaShuffleReader.scala +++ b/src/main/scala/org/apache/spark/shuffle/rdma/RdmaShuffleReader.scala @@ -46,7 +46,8 @@ private[spark] class RdmaShuffleReader[K, C]( startPartition, endPartition, handle.shuffleId, - mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition)) + mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, + startPartition, endPartition).toSeq) val dummyShuffleBlockId = ShuffleBlockId(0, 0, 0) // Wrap the streams for compression based on configuration