Spark on MR3 - A New Way to Run Apache Spark

Aug 18, 2021

Read in 9 minutes

MR3 is a general purpose execution engine that provides native support for Hadoop and Kubernetes. While Hive on MR3 is its main application, MR3 can easily execute MapReduce/Tez jobs as well. For example, Hive on MR3 supports compaction without relying on MapReduce because MapReduce jobs performing compaction are directly sent to MR3.

With the release of MR3 1.3, we introduce another major application – Spark on MR3. In a nutshell, it is Apache Spark using MR3 as the execution backend. Spark on MR3 is implemented as as an add-on to Spark which exploits MR3 to implement the scheduler backend of Spark. As such, running Spark on MR3 requires no change to Spark and the user can use a binary distribution of Spark when installing Spark on MR3.

Before we give more details on Spark on MR3, a critical question should be answered – Why do we need an alternative execution backend for Spark? After all, Spark is a mature system that already provides native support for Hadoop and Kubernetes, so it may well appear that Spark on MR3 brings no additional advantage. Spark on MR3, however, is not intended just to demonstrate the capability of MR3 as an execution engine. Rather it addresses an important architectural limitation of Apache Spark.

Motivation

The main motivation for developing Spark on MR3 is to allow multiple Spark applications to share compute resources such as Yarn containers or Kubernetes Pods. In the case of vanilla Spark, different Spark applications must maintain their own sets of executors because Spark lacks the feature of recycling compute resources among Spark applications. Instead Spark relies on a resource manager (such as Yarn and Kubernetes) to distribute compute resources. Note that Apache Livy is not a solution here because it only allows multiple users to share Spark applications via REST API.

In the case of Spark on MR3, a single instance of MR3 DAGAppMaster manages all compute resources shared among Spark applications. The following diagram illustrates Spark on MR3 running on Kubernetes where four Spark drivers share a common DAGAppMaster Pod. When a Spark driver terminates, its executor Pods are not released back to Kubernetes immediately. Rather DAGAppMaster tries to recycle idle Pods by reassigning them to those Spark drivers that request more compute resources or keeping them in a reserve pool for use in the future.

spark.mr3.k8s.client.mode

Thus Spark on MR3 reduces the overhead of acquiring and releasing compute resources when multiple Spark applications run concurrently. Since MR3 uses the Java Virtual Machine (JVM), recycling compute resources also translates into reducing the JVM warm-up overhead (which can be surprisingly high). Moreover Spark on MR3 decouples itself from the scheduling policy of the resource manager. For example, we can enforce fair scheduling among Spark applications regardless of the scheduling policy of the resource manager.

Spark on MR3 can be particularly useful in cloud environments where Spark applications are created and destroyed frequently. In cloud environments, the overhead of provisioning compute resources is far from negligible. For example, it may take a few minutes to provision a new Kubernetes node on Amazon EKS. By allowing DAGAppMaster to keep a reserve pool of compute resources, Spark on MR3 can effectively minimize the overhead of provisioning compute resources. Then it can achieve a reduction in the cloud cost as well as an improvement in the user experience (as a Spark job gets executed right after submission).

Implementation

Spark on MR3 is implemented as an add-on to Spark. The add-on module, called Spark-MR3, is registered as an external cluster manager and provides an implementation of the scheduler backend of Spark. Spark-MR3 becomes embedded in the Spark driver and relays messages between the Spark driver and MR3. Internally it provides an implementation of TaskScheduler and SchedulerBackend of Spark, and creates MR3Client in order to communicate with MR3. When DAGScheduler of Spark wants to execute a new Spark stage, Spark-MR3 converts the specification of the stage into a DAG (with a single vertex) which is sent to the MR3 DAGAppMaster. The DAG is executed by Spark executors running inside ContainerWorkers, and the result is sent back to DAGScheduler of Spark.

spark-mr3-architecture

Since Spark-MR3 relays all requests from Spark to MR3, MR3 is a resource manager like Yarn and Kubernetes from the point of view of Spark. This works because Spark is agnostic to the underlying cluster manager and MR3 is capable of supplying compute resources after communicating with a real resource manager. As Spark loads an external cluster manager using the Java ServiceLoader mechanism, running Spark on MR3 requires no change to Spark.

Experiment

Now we present experiment results to demonstrate the performance of Spark on MR3. (Except for task scheduling and resource management, Spark on MR3 behaves in the same way as vanilla Spark.) First we show that Spark on MR3 competes well against vanilla Spark for a single Spark application. Next we show that Spark on MR3 achieves a substantial reduction in running time when multiple Spark applications run concurrently.

In the experiment, we use two clusters running HDP (Hortonworks Data Platform) 3.1.4. Each worker node runs a single ContainerWorker, or equivalently, a single Spark executor.

For the workload, we use Spark multiuser benchmark. Each thread in a Spark application submits the same sequence of eight queries of the TPC-DS benchmark (#19, #42, #52, #55, #63, #68, #73, #98). We use a scale factor of 100 (roughly equivalent to 100GB) which is large enough to keep every worker node busy and small enough to clearly highlight the effect of recycling ContainerWorkers. We store the the dataset in text format without compression.

Single Spark application

Spark on MR3 provides two policies for task scheduling, FIFO scheduling and fair scheduling, which specify how to distribute Spark jobs (submitted by multiple threads) within the same Spark application. Spark on MR3 also implements delay scheduling similarly to vanilla Spark but in its own way. For testing with a single Spark application, we use 1 second of delay for delay scheduling (e.g., by setting spark.locality.wait to 1s for vanilla Spark).

The following graph shows the result of running a single Spark application with vanilla Spark and Spark on MR3 in the cluster Gold. The Spark driver creates eight threads each of which submits the same sequence of eight TPC-DS queries. (Thus it submits a total of 8 * 8 = 64 Spark jobs.) We observe that Spark on MR3 runs as fast as vanilla Spark for FIFO scheduling, but slightly slower for fair scheduling.

gold.compare.spark.mr3

The result is totally different in the cluster Grey. We observe that Spark on MR3 finishes executing Spark jobs much faster than vanilla Spark (3034s vs 8042s for FIFO scheduling and 3583s vs 8017s for fair scheduling).

grey.compare.spark.mr3

The large difference in running time is due to the difference in task scheduling, in particular how delay scheduling works in vanilla Spark and Spark on MR3. The following graph shows the percentage of Spark tasks with no matching host, i.e., those tasks that are executed on hosts not included in their (non-empty) location hints. For example, a Spark task with location hints grey1 and grey2 has no matching host if it is executed on host grey3. We observe that the percentage is much higher for vanilla Spark (1.8 percent vs 22.7 percent for FIFO scheduling and 7.2 percent vs 26.8 percent for fair scheduling). In conjunction with the use of slow (1 gigabit) network, the high percentage gives rise to many straggler tasks, which in turn increase the running time considerably.

grey.percentage.no.match.host

Multiple Spark applications

Independently of task scheduling policies, Spark on MR3 provides two policies, FIFO scheduling and fair scheduling, which specify how to assign ContainerWorkers (i.e., compute resources) among multiple Spark applications. For testing with multiple applications, we disable delay scheduling.

The following graph shows the result of running 12 identical Spark applications with vanilla Spark in the cluster Gold where Yarn uses fair scheduling. A vertical bar displays the progress of a Spark executor running on the corresponding worker node, and its color indicates the Spark application that owns the Spark executor. We observe that despite the use of fair scheduling, Spark applications considerably differ in the total amount of compute resources allocated by Yarn. We also see many gaps between vertical bars which denote idle periods after a Spark executor terminates and before another Spark executor starts.

vanialla-spark-fair

The following graph shows the result of running the same 12 Spark applications (as in the above experiment for vanilla Spark) with Spark on MR3 using fair scheduling in the cluster Gold. (Incidentally the scheduling policy of the resource manager itself does not matter for our experiment.) We observe that Spark on MR3 quickly reaches a fair allocation of worker nodes among Spark applications by recycling ContainerWorkers. Note that Spark on MR3 produces no gaps between vertical bars because a single ContainerWorker is recycled on each worker node throughout the experiment. In our experiment, Spark on MR3 reduces the running time by 25.6% (from 907.7s to 674.6s) in comparison with vanilla Spark.

spark-mr3-fair

The following graphs show the result in the cluster Gold when both Yarn and Spark on MR3 use FIFO scheduling. We submit each Spark application at an interval of 60 seconds. Both vanilla Spark and Spark on MR3 run one Spark application at a time and complete all the Spark applications in the order of submission. Note that once the first Spark application terminates, Spark on MR3 finishes the remaining Spark applications much faster because the JVMs have warmed up. In our experiment, Spark on MR3 reduces the running time by 36.3% (from 1797.3s to 1144.5s) in comparison with vanilla Spark.

spark-mr3-gold-multi-fifo

Conclusion

Spark on MR3 is easy to adopt, whether on Hadoop or on Kubernetes, because it differs from Apache Spark only in task scheduling and resource management. In addition to recycling ContainerWorkers among Spark applications, the user can also exploit such features of MR3 as fault tolerance, speculative execution, and autoscaling. To try Spark on MR3, see User Guide of Spark on MR3.