Logo { blog }

innovation through collaboration

Using Docker to Build an IPython-driven Spark Deployment

TL;DR: Our ipython-spark-docker repo is a way to deploy an Apache Spark cluster driven by IPython notebooks, running Docker containers for each component. The project uses Bash scripts to build each node type from a common Docker image that contains all necessary packages, enables data access from a Hadoop cluster, and runs on dedicated hosts. By using IPython as the interface, you can leverage a variety of data processing, machine learning, and visualization tasks using the following tools and libraries:
HDFS Hbase Hive Oozie Pig Hue
Pandas NLTK NumPy SciPy SymPy Scikit-Learn
Cython Numba Biopython 0MQ Pattern Seaborn
Matplotlib Statsmodels Beautiful Soup NetworkX LLVM MDP
Bokeh Vincent

Generating a few amps

Here at Lab41, we build open source prototypes using the latest big data, infrastructure, and data science technologies. In a perfect world, these tools would magically work together. The reality is that they usually require a lot of effort just to install and configure properly. And when someone else comes along to actually use them — especially if they are a newly-minted teammate or someone unfamiliar with the myriad command-line switches and gotchas — the experience can transform these tools into shark repellent for sysadmins and end users alike.

If the above sounds familiar, or if you’re interested in using IPython notebooks to perform non-trivial* data analytics with Apache Spark, then please continue…
*defined roughly as being able to use data in our 32-node Hadoop cluster

Sparked interest

This effort started when I became interested in Apache Spark, which has quickly become the heir apparent to MapReduce’s big data throne. By most measures, this data processing engine is living up to claims of better performance and usable APIs for powerful algorithmic libraries. If you add in its support for interactive and iterative development, plus use of data-scientist- and developer-friendly languages like Python, it’s no surprise why so many have fallen for this relative newcomer.

Out of the box, Spark includes a number of powerful capabilities, including the ability to write SQL queries, perform streaming analytics, run machine learning algorithms, and even tackle graph-parallel computations. Those features enable Spark to compete with a number of tools from mature ecosystems like Hadoop, but what really stands out is its usability. In short, incorporating interactive shells (in both Scala and Python) presents an approachable way to kick the tires. In my book, that’s a huge win that should help pull in curious developers (like me). After going through Spark’s cut-and-paste examples, as well as a few more involved tutorials, I had seen enough to want to begin using this platform. Anticipating the rest of our team benefiting from its capabilities, I also became interested in enabling their data analysis needs.

Usability a driving need

Within our team, we have developers, data scientists, and analysts with varying skills and experiences. Providing a solution that everyone could use was a key goal, which led to the following objectives:

  • We needed the Spark cluster to handle decent-sized workloads. Our 32-node Hadoop cluster is a representative size.
  • We needed an “easy-to-use” interface and language bindings that everyone would have a shot at learning. Python would be good. Something with collaboration features that wasn’t driven by the command line would be great.
  • We needed to run analytics against “non-trivial” data, which I’ll define as being able to access and process data from our Hadoop cluster.

After giving it some thought, I realized IPython would address that short list nicely and would be a familiar interface for our team. I decided to try to build something that looks like:

Definitions
  • Spark Master: the Spark node that receives jobs, organizes the workflow, and doles out work
  • Spark Worker: N number of Spark nodes that receive work tasks from the master and do the actual analysis
  • IPython Driver: the process running the application’s main function; the Python shell in “client” mode submits from outside the cluster, which is why I refer to it as the remote client and client driver

A straightforward path?

The first step, deploying the Spark cluster, seemed trivial since Lab41 uses a CDH5 cluster and Cloudera includes Spark in their distribution. However, I also had to develop around the situation where end users won’t be able to login/SSH directly to the Spark cluster for their analytics. Most of our partners are very security-conscious, so adding a client node that can remotely connect and drive analytics on the cluster became the next must-have. “Easy,” I thought. “I’ll just setup a remote node to drive Spark analysis within the cluster.” I assumed the steps would be straightforward (and probably already solved):

  1. Start the master
  2. Connect workers to the master node
  3. Configure a remote client connected to the master
  4. Deploy an IPython notebook server on the client

Starting the master and worker nodes in our CDH5 cluster via the Cloudera Manager was straightforward. Building a client node also was easy since the Spark team graciously provides several source and pre-built packages for several recent releases. With a straightforward download and install, my client was ready to drive the cluster.

To initially test the client driver — considering the end goal was to use IPython — I decided to start with a pyspark shell connected to the master (I decided to hold off on IPython integration to isolate any potential misconfigurations). Based on tutorials, connecting the remote client to the cluster initially appeared as easy as specifying ./bin/pyspark --master spark://ip:port. However, I immediately ran into a couple errors related to library mismatches:

1
incompatible spark driver - java.io.InvalidClassException: org.apache.spark.deploy.ApplicationDescription; local class incompatible: stream classdesc serialVersionUID = 583745679236071411, local class serialVersionUID = 7674242335164700840
1
java.lang.ClassCastException (cannot assign instance of org.apache.spark.rdd.RDD$$anonfun$17 to field org.apache.spark.SparkContext$$anonfun$runJob$4.func$1 of type scala.Function1 in instance of org.apache.spark.SparkContext$$anonfun$runJob$4)

After a few rounds of Googling, I found out these errors are caused by using incompatible spark driver libraries. Understandably, the client driver node needs to use libraries compatible with our cluster nodes, whereas the driver’s v1.2.1 was apparently incompatible with our cluster’s v1.2.0. With that quick reminder to always verify build versions, I downloaded and installed the correct one on the client. Problem fixed!

Down the rabbit hole I went…

With those library mismatch errors behind me, I soon encountered another error:

1
Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory

These kinds of errors scare me more than most since they give just enough to identify the general cause (“registering” the client and/or workers), but not enough to figure out exactly where to look. After poking around the server and worker logs (/var/log/spark/spark-<master|worker>-<hostname>.log), it looked like the client successfully connected to the master, but something after that failed to complete the Spark initialization. Errors like the following highlighted that it had something to do with my network configuration:

1
WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://sparkDriver@sandbox:41615]. Address is now gated for 60000 ms, all messages to this address will be delivered to dead letters.

Since “unreachable addresses” can of course be caused by several factors, I’ll save you the nitty-gritty and jump straight to the important point: connecting a remote client node requires several expected and non-obvious network settings:

  • Expected Node-to-Node Communication: The master and workers must be reachable. This requirement is satisfied by several well-documented configurations:
    • Firewall rules must allow traffic to the Spark service (default: 7077 for master, random for worker)
    • Firewall rules must allow traffic to the Spark UIs (default: 8080 for master, 8081 for worker)
  • Unexpected Cluster->Driver Communication: From the Configuring Ports for Network Security page, notice how important things such as communicating state changes and serving files require connections from the Spark nodes to the driver on a random port. That architecture means the remote client node is opening randomly-selected ports for callback from nodes in the Spark cluster. This design forces two important updates to our network communication:
    • Firewall rules must allow traffic from the Spark master and workers to a range of random ports on the client
    • The client must be network-addressable by master and worker nodes, which means the tcp://sparkDriver above needed to be a fully-qualified domain name (FQDN) on the network.

Compatibility is key

Whereas I could easily open the potential range of random ports on master, worker, and client nodes, adding a network-addressable client to the cluster felt like a step too far for this initial test setup. At this point, I decided to stop using our primary Hadoop cluster and instead virtualize a test Spark cluster within our internal instance of OpenStack. As before, using the pre-built Spark packages made it easy to create master and worker nodes for a standalone Spark installation. Running the startup scripts ./sbin/start-<master|slaves|all>.sh fired up and registered the master and workers, providing me with a throwaway cluster I could use for experimentation more comfortably.

I now had spun up a small virtualized Spark cluster, added a client node on the network, ensured it was reachable with a FQDN, and opened all necessary OpenStack security rules and ports for each node. For good measure I ensured each node’s /etc/hosts contained entries for the cluster’s nodes (i.e. 10.1.2.3 spark-node1.internal-domain), leaving me confident all necessary traffic would reach its intended destination.

With the network configurations behind me, the quest led me into another set of library mismatch errors:

1
INFO scheduler.TaskSetManager: Lost task 613.3 in stage 0.0 (TID 2261) on executor spark-node1.internal-domain: java.io.IOException (Cannot run program "python2.7": error=2, No such file or directory)

Hmmm, strange,” I thought. “All of my OpenStack images include Python…what’s the deal?” Well, when I provisioned the OpenStack instances, I used different host images for the cluster nodes and client driver as a way to better mimic that real-world possibility. It turns out the older worker/client nodes had python2.6, whereas the client (and Spark’s default options) explicitly specify python2.7. Updating the client environment to export PYSPARK_PYTHON=python propagated Spark’s configuration and let each node rely on their native python build. This situation clearly won’t work for a production deployment, but I was at the stage of wanting to move past errors and could later re-build environments and configurations.

Next, I ran into the strange situation where my client would accept the examples I had created, but when it submitted jobs to workers, they seemed to be missing things and would fail with messages such as:

1
ImportError: No module named numpy

and

1
version 'GLIBC_2.14' not found

Of course! In my previous rounds of yak-shaving fixes, I forgot one obvious requirement: All Spark nodes clearly need the same/compatible environment to effectively distribute analysis (aka “not fail”) across the cluster. It wasn’t sufficient to add things like numpy and GLIBC to my client; every node in the Spark cluster also needed those same modules and libraries.

Whale of a pivot

I made a crucial decision at this point. I did not like the idea of continuing to tweak and tune the configurations, environments, and libraries for each master, worker, and client hosts. While I was beginning to understand things, I knew nobody else would be able (or want) to replicate my work. If only there was a technology focused on transparent repeatability and portability of infrastructure…

Enter Docker!

Yes, the fantastic “build once, run anywhere” container not only enables development of portable apps, but also can be a godsend to sysadmins in this type of situation. From their website:

Sysadmins use Docker to provide standardized environments for their development, QA, and production teams, reducing “works on my machine” finger-pointing. By “Dockerizing” the app platform and its dependencies, sysadmins abstract away differences in OS distributions and underlying infrastructure.

Perfect! This benefit, I knew, would enable me to package all the configuration options within a common environment I could then deploy as master, worker, and client nodes. Caveat emptor, though. I have used Docker enough to know my intended IPythonized-Spark (or is it Sparkified-IPython?) setup would require a decent amount of customization, especially the network configuration pieces. But I also knew it was possible, and since a combination of Dockerfiles and scripting would lead to a repeatable build, I made the call to Dockerize the entire setup.

Dockerization

Since others already figured out how to run Spark in Docker, I first turned to those images and tutorials. After using them, I learned a few important things:

  1. The images usually run the entire Spark cluster in a single container, which is great for kicking the tires, but not realistic for running actual workloads. I needed to run each master/worker/client as a separate container, ideally with each on a dedicated node to support large workloads.
  2. The images usually include default configurations for accessing Hadoop nodes that spin up within the container. While I could manually specify the longhand hdfs://<hadoop-namenode>/path/to/hdfs/file to access our Hadoop cluster, I’m lazy and wanted our hadoop-namenode to serve as the container’s default HDFS endpoint. To enable that default connectivity, I added our Hadoop configuration to the container. As an added measure for data locality, the ideal deployment would run these containers inside our Hadoop nodes and thereby avoid sending large amounts of data across the network. Keep in mind this setup means you’ll have to ensure library version compatibility between the containers and your Hadoop nodes.
  3. When run, the default Docker option sets each container’s hostname to its container ID, which causes issues related to Spark’s use of FQDN for network traffic. For example, a hostname of d5d3225d06c4 would cause Spark workers to attempt sending traffic to that host, which of course wouldn’t exist on the network. By passing the host node’s hostname to the container at runtime, the container effectively “thinks” it is the host and can broadcast the appropriate destination address.
  4. The images provide a very basic Python environment. We need several additional data wrangling, machine learning, and visualization tools.
  5. They’ve all done great work I can build on for our needs.
Refining my original architecture, I was looking to build something like:

Refined concept for IPython-Spark-HDFS using Docker

The base image

Since I’ve used similar bits and pieces in other work, I knew where I wanted to start for building the foundation. I started with the following Docker images:

Enhancing the base

Borrowing from those two Docker images to build the common base, I layered a few important changes within the Dockerfile:

  1. The image downloads updated Spark libraries to the latest pre-built standalone packages.
  2. The image updates Spark configuration options for PYSPARK_PYTHON, SPARK_SSH_PORT and SPARK_SSH_OPTS. The latter two force Spark to communicate on SSH via a non-standard port (I chose 2122). I made this change to the containers’ SSH daemons so I could still SSH in “normally” via port 22 on the host machines.
  3. I added SSH keys to enable Spark master-worker communication, which I strongly recommend re-generating before your build if you decide to try.
  4. I added specific configuration details for HDFS access, which you’ll need to update to connect to your cluster.

Creating role-based images

Building on that base image, I created Docker images for each master, worker, and client node types. Each image uses a bootstrap.sh script to start runit, leaving each node type to implement different startup services:

  • The master image runs an SSH daemon and spark-master process. This setup violates Docker’s “one-process-per-container” philosophy, but is necessary since master and workers communicate via SSH (as noted before, via port 2122)
  • Similarly, the worker images startup an SSH daemon and spark-worker process
  • The client image runs an IPython notebook using a custom pyspark profile, which I configured by following guides such as How to use IPython notebook with Apache Spark

Running the containers

I wrote a few Bash scripts to startup each container type. If you plan to use these, keep in mind two important details:

  • Since I wanted each container living on a dedicated node, you’ll have to manually startup each container type within a provisioned node. Within OpenStack, I simply startup each instance with an after-build command to run that node type’s startup script (i.e. ./3-run-spark-worker.sh spark://master-fqdn:port). If provisioning on bare metal and/or within your HDFS cluster, you could use something heavyweight like puppet, a lighter deployment tool like fabric, or even a simple series of ssh -e commands.
  • Given the required network traffic between master, workers, and client, there is a wide range of default ports that each host needs to forward to its respective container. Simply put, each host needs to transparently forward all Spark ports to its container. I could have achieved this by EXPOSEing ports in the Dockerfile and later publishing each port/range at runtime, but that method can cause iptables to run out of memory. Plus, it makes the container metadata (and docker ps output) unreadable with so many mapped ports. Instead, I made the host create a new iptables chain with custom PREROUTING rules. If you don’t want iptables as a dependency, or if you just want to handle networking The Docker Way, I would suggest explicitly setting the random ports identified in the Configuring Ports for Network Security guide (i.e. SPARK_WORKER_PORT and spark.driver.port).
The end result is:

Docker image and networking for ipython-spark-docker deployment

Spark word count example via IPython
</td>
<td>
  <a class="fancybox-effects-a"  href=/images/post_11_docker/screenshot_mllib.png><img src="/images/post_11_docker/screenshot_mllib.png" title="Screenshot of Spark MLlib example" ></a>
  <div><small>_Spark MLlib example_</small></div>
</td>

Wrap-up

As with most big data platforms, setting up Apache Spark was not a simple “double-click installation” process. It required host and network configurations that sometimes were difficult to find and decipher. Adding my goal of driving analytics with a remote client revealed additional gotchas. I managed to troubleshoot these, but it was an effort that I wouldn’t want others to have to reproduce. The extra desire to leverage IPython’s simpler interface, connect to our HDFS cluster, and ensure library compatibility between all nodes led me to Docker’s doorstep.

While the architecture is complex, Docker made it less complicated and more repeatable to develop, test, document, and iterate. Fast forward to today, we now have a working version of IPython-driven Spark analytics on our HDFS data, which is something others might be looking to use. And rather than say, “Email me for help,” or “Google ‘this’ and StackOverflow ‘that’,” I can point you to ipython-spark-docker for:

  1. A base Docker image that contains all necessary Hadoop, Spark, and Python packages.
  2. Skeleton configuration files for HDFS access.
  3. Separate master, worker, and client images.
  4. Bash scripts to build and run each base/master/worker/client.
  5. End users access and use the entire system through the client container’s IPython notebook.

If you’ve read this far, thanks for your patience while I walked you through this end-to-end journey. I came across so many questions online where people ran into similar problems that I wanted to document the entire process. Hopefully, this post will save others from wondering where things might have gone wrong.

If you decide to give our repo a try, let us know. The Lab is interested in knowing if it helps, and is happy to offer a helping hand if something needs a little more work.

Until our next post, thanks for reading!