Analyze and optimize cloud cluster performance

Use configurable parameters to monitor and tune the performance of a cloud Hadoop cluster
Hadoop is a popular software framework that enables distributed manipulation of large amounts of data, thus making it a perfect companion to cloud computing. In fact, Hadoop MapReduce, the programming model and software framework used to write applications to rapidly process vast amounts of data in parallel on large clusters of compute nodes, is already in play on cloud systems. This article shows you how to take full advantage of Hadoop by introducing Hadoop configurable parameters and using them to monitor, analyze, and tune the performance of your Hadoop cluster.
Yu Li (, Software Engineer, IBM
07 March 2011
Also available in Chinese
  • +Table of contents
Cloud server hosting trial. Try SoftLayer. Free cloud server hosting for one month.


Hadoop is an open-source, flexible Java framework for large-scale data processing on commodity hardware networks. It was inspired by MapReduce and Global File System (GFS) technology originally developed by Google Labs and became increasingly popular for its efficiency, reliability, and scalability. Now as a top-level Apache project, Hadoop is supported and used by many companies such as IBM, Google, Yahoo!, and Facebook, and has become the industry de facto framework for large data processing.
What does Hadoop mean for cloud computing? One of the goals of cloud computing is to provide high availability to compute resources at the lowest possible overhead. Hadoop is a perfect tool to achieve this goal with its ability to work with thousands of nodes and petabytes of data and automatically handle job scheduling, partial failure, and load balancing.
To make full use of compute resource, it's important to optimize performance, including CPU, memory, and I/O (both disk and network). Hadoop can work to automatically improve performance, while leaving the interface for users to tune performance according to their specific applications. This article introduces you to the important configurable parameters of Hadoop and the method for analyzing and tuning performance.

Set up the environment

Steps to deploy the Hadoop environment

It is first necessary to build a Hadoop cluster environment before you can do performance tuning. Simply follow these steps:
  1. Prepare your cluster nodes with Linux OS, JDK 1.6, and ssh installed. Make sure sshd is running on each node.
  2. Access The Apache Software Foundation site and download a stable Hadoop distribution.
  3. Choose your NameNode (NN), JobTracker (JT), and secondary NameNode (SNN); other nodes are DataNode (DN) and TaskTracker (TT). This article assumes you choose host001 as NN, host002 as JT, and host003 as SNN.
  4. Enable NN, JT, and SNN to passphraselessly ssh to all DN and TTs.
  5. Unpack the downloaded Hadoop distribution on each node; $HADOOP_HOME is used below to represent the unpack position.
  6. Enter the $HADOOP_HOME directory and modify configuration files on NN.
    1. Add host003 into $HADOOP_HOME/conf/masters.
    2. Add all DD/TT nodes' IP addresses/host names into $HADOOP_HOME/conf/slaves, one host per line. Note: If you use host name, you need to configure the /etc/hosts file to make sure that each host name is known to all nodes in the cluster.
    3. Add the following property to $HADOOP_HOME/conf/core-site.xml to set the NN IP/ port:
    4. Add following property to $HADOOP_HOME/conf/mapred-site.xml to set the JT IP/port.
          <name> mapred.job.tracker </name>
      Note: If you use Hadoop release 0.21.0, this property name should be mapreduce.jobtracker.address.
    5. Add the following property to $HADOOP_HOME/conf/hdfs-site.xml, if you have more than one network interface on your NN:
          <description>The name of the Network Interface from which a data node 
          should report its IP address.
  7. Copy all configuration files mentioned above from NN to all other nodes in the cluster into the $HADOOP_HOME/conf/ directory.
  8. Enter the $HADOOP_HOME/bin directory on NN.
    1. Format the NN using command: $./hadoop namenode -format.
    2. Launch the script to start the Hadoop daemons.
  9. For more detailed information, refer to Hadoop Common. Note: If you choose to use Hadoop release 0.21.0, then you must use the current JDK, which is tracked by JIRA HADOOP-6941.

Install and configure the nmon performance monitoring tool

The nmon tool is a system administrator, tuner, and benchmark tool that can monitor a huge amount of important performance information in one go. You can use nmon as the monitoring tool throughout the entire performance tuning process. Follow the steps below to install and configure nmon and set up your performance monitoring system:
  1. Download the nmon binary package from the nmon for Linux site. Find the right version for your Linux OS and spread it to all nodes of the Hadoop cluster. $NMON_HOME is used below to represent where you put the binary.
  2. Since NN, JT, and SNN have been enabled to passphraselessly ssh to all other nodes, and all map/reduce jobs will be submitted on JT, choose JT as the central node to collect all nmon data. Log on to the JT node and then do following steps.
  3. Create a directory on JT (host002), for example, /home/hadoop/perf_share, and share it through NFS, using the following commands:
    1. Create the directory: $mkdir /home/hadoop/perf_share
    2. Modify the /etc/exports file to include the following line: /home/hadoop/perf_share *(rw,sync)
    3. Restart the NFS service: $/etc/rc.d/init.d/nfs restart
    4. Create the directory on all other nodes and mount them to the perf_share directory on JT:
      $mount host002: /home/hadoop/perf_share /home/hadoop/perf_share
  4. Create the following script to start nmon on all nodes:
    hosts=( shihc008 shihc009 shihc010 shihc011 shihc012 shihc013 shihc014 shihc015 
    shihc016 shihc017)
    # Remove all data in /home/hadoop/perf_share
    for host in ${hosts[@]}
      ssh $host "cd /home/hadoop/perf_share;rm -rf *"
    # Start nmon on all nodes
    for host in ${hosts[@]}
      ssh $host " /usr/bin/nmon -f -m /home/hadoop/perf_share -s 30 -c 360"

    In the last nmon command, -f means you want the data saved to a file and not displayed on the screen; -m indicates where to save the data; -s 30 means you want to capture data every 30 seconds; and -c 360 means you want 30 data points or snapshots (the total data collection time would be 30x360 seconds or 3 hours).
  5. Download nmonanalyser (an Excel spreadsheet that takes an output file from nmon and produces some nice graphs to aid in analysis) from the nmonanalyser wiki to analyze the to-be-collected monitoring data.

Detailing Hadoop configurable parameters

Hadoop provides various configuration options to users and administrators for cluster setting and tuning. There are a large number of variables in core/hdfs/mapred-default.xml that you can override in core/hdfs/mapred-site.xml. Some specify file paths on your system, but others adjust levers and knobs deep inside Hadoop's guts.
For performance tuning, there are mainly four aspects: CPU, memory, disk I/O, and network. This article describes the most relative parameters to these four aspects and leaves the others in *-defalt.xml for you to explore using a method introduced later.
CPU-related and reduce.tasks.maximum
Decide the maximum number of map/reduce tasks that will be run simultaneously by a task tracker. These two parameters are the most relative ones to CPU utilization. The default value of both parameters is 2. Properly increasing their values according to your cluster condition increases the CPU utilization and therefore improves the performance. For example, assume each node of the cluster has 4 CPUs supporting simultaneous multi-threading, and each CPU has 2 cores; then the total number of daemons should be no more than 4x2x2=16. Considering DN and TT would take 2 slots, there are at most 14 slots for map/reduce tasks, so the best value is 7 for both parameters.
Set this parameter in mapred-site.xml.
This is the main parameter for JVM tuning. The default value is -Xmx200m, which gives each child task thread 200 MB of memory at most. You can increase this value if the job is large, but should make sure it won't cause swap, which significantly reduces performance.
Let's examine how this parameter can affect the total memory usage. Assume the maximum number of map/reduce tasks is set to 7, and is left to the default value. Then memory cost of running tasks will be 2x7x200 MB =2800 MB. If each worker node has both DN and TT daemons, and each daemon costs 1 GB memory by default, the total memory allocated would be around 4.8 GB.
Set this parameter in mapred-site.xml.
Disk I/O-related, mapred.output.compress, and
These are parameters that control whether to compress the output, in which is for map output compression, mapred.output.compress is for job output compression, and is for compression code. All of these options are turned off by default.
Turning on output compression can speed up disk (local/Hadoop Distributed File System (HDFS)) writes and reduce total time of data transfer (in both shuffle and HDFS writing phase), while on the other hand cost additional overhead during the compression/decompression process.
According to personal experience, turning on compression is not effective for sequence filing with random keys/values. One suggestion is to turn on compression only when the data you're dealing with is large and organized (especially natural language data).
Set these parameters in mapred-site.xml.
io.sort.mb parameter:
This parameter sets the buffer size for map-side sorting, in units of MB, 100 by default. The greater the value, the fewer spills to the disk, thus reducing I/O times on the map side. Notice that increasing this value increases memory required by each map task.
According to experience, when the map output is large, and the map-side I/O is frequent, you should try increasing this value.
Set this parameter in mapred-site.xml.
io.sort.factor parameter
This parameter sets the number of input streams (files) to be merged at once in both map and reduce tasks. The greater this value, the fewer spills to the disk, thus reducing I/O times on both the map and reduce sides. Notice that increasing this value might cost more garbage collection activities if memory allocated for each task is not large enough.
According to experience, when there is a large number of spills to the disk, and I/O times of the sort and shuffle phase is high, you should try increasing this value.
Set this parameter in mapred-site.xml.
mapred.job.reduce.input.buffer.percent parameter
This parameter sets the percentage of memory (relative to the maximum heap size) to retain map outputs during the reduce phase. When the shuffle is concluded, any remaining map outputs in memory must consume less than this threshold before the reduce phase can begin, 0 by default. The greater this value is, the less merge on the disk, thus reducing I/O times on the local disk during the reduce phase. Notice that increasing this value might cost more garbage collection activities if memory allocated for each task is not large enough.
According to experience, when map output is large, and local disk I/O is frequent during the reduce through sort phases, you should try increasing this value.
Set this parameter in mapred-site.xml.
mapred.local.dir and parameters
These two parameters decide where to put data in Hadoop, in which mapred.local.dir decides where MapReduce intermediate data (map output data) is stored, and decides where HDFS data is stored.
According to experience, spreading these locations to all disks on each node can achieve disk I/O balance and therefore greatly improve the disk I/O performance.
Set mapred.local.dir in mapred-site.xml and in hdfs-site.xml.
This is the parameter that points to the user-defined script to determine rack-host mapping to configure rack awareness. Set this parameter in the core-site.xml file.
Rack awareness is the most important configuration to improve network performance, and it is strongly recommended that you configure it following the instructions at and
mapred.reduce.parallel.copies parameter
This parameter determines the number of threads used to copy map outputs to the reducer, 5 by default. Increasing this value can increase the network flow rate and speed up the process of copying map outputs, while also costing more CPU usage.
According to experience, the effect of increasing this value is kind of slight, and the suggestion is to increase this value only if the map output is very large in your job.
Notes: The parameter names listed above are all in the Hadoop 0.20.x release; if you use the 0.21.0 release, there might be some name changes. Besides Hadoop parameters, there are also some system parameters, such as inter-rack bandwidth, which affects overall performance.

How to tune and improve performance

After the long but necessary preparations above, you have finally arrived here to see how to tune and improve performance. The whole process can be separated into steps.

Step 1: Choose testing benchmark

The performance of the whole Hadoop cluster is decided by two aspects: HDFS I/O performance and MapReduce runtime performance. Hadoop itself supplies several benchmarks such as TestDFSIO and dfsthroughput for HDFS I/O testing, which are contained in hadoop-*-test.jar, Sort for overall hardware testing, which is contained in hadoop-*-examples.jar, and Gridmix, which mimics mixed workload in a grid environment, and is under the $HADOOP_HOME/src/benchmarks directory. You can choose any of these benchmarks according to your testing demand.
Among all these benchmarks, Sort can reflect both MapReduce runtime performance (during the procedure of "performing sort") and HDFS I/O performance (during the procedure of "writing sort results into HDFS") when the input data is large. What's more, Sort is the Apache-recommended hardware benchmark. (Find information from the Hadoop Wiki.) So, Sort is used as the example testing benchmark to show you the performance tuning method.

Step 2: Build up the baseline

  1. Testing environment:
    • Benchmark: Sort
    • Input data scale: 500 GB
    • Hadoop cluster scale: 10 DN/TT nodes
    • All nodes are homogeneous
    • Node information:
      • Linux OS
      • Two 4-core processors, support simultaneous multi-threading
      • 32 GB memory
      • Five 500 GB disks
  2. Testing scripts: Following are the scripts used for testing (refer to the Hadoop Wiki for more information about running the Sort benchmark). All scripts should be run on the JT node. Note: Put the script mentioned above and the following scripts into the same directory that you choose to store the testing result.
      # since there are 10 nodes, should write 50 GB file on each
      $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-0.20.1-examples.jar 
      randomwriter -D
      test.randomwrite.bytes_per_map=$fSize /rand_$fSize 2>&1 | tee 
      mkdir -p ./testRes/nmonFiles
      # run three cycles to get a more precise result
      for runtimes in {a,b,c}
          ./ $fSize $runtimes
      $HADOOP_HOME/bin/hadoop dfs -rmr /rand_$1-sorted
      $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-0.20.1-examples.jar sort 
      -r 70 /rand_$1
      /rand_$1-sorted 2>&1 |tee ./testRes/sort_baseline_$2.out
      cp -r /home/hadoop/perf_share ./testRes/nmonFiles/mb$4_$2
  3. Parameter values for baseline test:
    • Hadoop parameter value:
      • = 2 (Default value)
      • mapred.tasktracker.reduce.tasks.maximum = 2 (Default value)
      • mapred.reduce.parallel.copies = 5 (Default value)
      • = -Xmx200m (Default value)
      • mapred.job.reduce.input.buffer.percent = 0 (Default value)
      • io.sort.mb = 100 (Default value)
      • io.sort.factor = 10 (Default value)
      • mapred.local.dir = /hadoop/sdb
      • = /hadoop/sdc, /hadoop/sdd, /hadoop/sde
    • System parameter value:
      Inter-rack bandwidth = 1 Gb
  4. Baseline test result:
    • Execution time: 10051 seconds
    • Resource usage summary:
       AVG CPUAVG Mem (active)AVG DiskAVG Network (KB/s)
      Disk read (KB/s)Disk Write (KB/s)IO/secreadwrite
    • Detailed chart: After you get all the nmon data, you can use nmonanalyser to generate graphic charts. Since nmonanalyser is an Excel spreadsheet, just open it, click analyse nmon data, and choose the nmon files. Then you can get the analyzed charts.
      Figure 1. Analyze nmon data using nmonanalyser
      Analyze nmon data using nmonanalyserThe detailed charts generated by nmonanalyser for the baseline test are as follows:
      Figure 2. NameNode charts
      NameNode charts
      Figure 3. JobTracker charts
      JobTracker charts
      Figure 4. DataNode/TaskTracker charts
      DataNode/TaskTracker charts

Step 3: Find the bottleneck

You need to carefully explore the system bottleneck from the monitoring data and charts. Since the main workload is assigned to the DN/TT nodes, you should first focus on resource usage of the DN/TT nodes (only the nmon charts for DN/TT are shown below to save space).
From the baseline monitoring data and charts, you can see several bottlenecks in the system: The map phase CPU was not fully utilized (most of the time under 40 percent), and disk I/O was quite frequent.

Step 4: Break the bottleneck

First try to increase the CPU utilization in the map phase. From the introduction of Hadoop parameters, you realize that to increase the CPU utilization, you need to increase the value of the and reduce.tasks.maximum parameters.
In the testing environment, each node has two 4-core processors that support simultaneous multi-threading, so you have a total of 16 available slots and can set both parameters to 7.
To make the change, you need to set the and reduce.tasks.maximum properties in mapred-site.xml, restart the cluster, and launch again (since the configuration is made in the mapred-site.xml file, there is no need to modify the script here). The modified mapred-site.xml will look like:
Following are the testing results after the tuning:
  • Execution time: 8599 seconds
  • Resource usage summary:
     AVG CPUAVG Mem (active)AVG DiskAVG Network (KB/s)
    Disk read (KB/s)Disk Write (KB/s)IO/secreadwrite
Figure 5. DataNode/TaskTracker charts after tuning
DataNode/TaskTracker charts

Step 5: New round of tuning, repeat steps 3 and 4

From the data and charts you get after tuning up the maximum number of map/reduce tasks in each TaskTracker, you can see that the CPU is fully utilized during the map phase. But in the meantime, the disk I/O frequency is still high, so a new round of the tune-monitor-analyze process is needed.
You need to repeat these steps until there are no more bottlenecks in the system, and each resource is fully utilized.
Notice that each tuning will not necessarily turn out a performance improvement. If performance reduction occurs, you need to roll back to the previous configurations and try another tuning to break the bottleneck. In this testing, the following optimized results were finally achieved:
  • Execution time: 5670 seconds
  • Hadoop parameter values:
  • System parameter values: Inter-rack bandwidth = 1Gb
  • Resource usage summary:
Figure 6. DataNode/TaskTracker charts - 2nd round of tuning

Step 6: Scalability test and improvement

To further verify the tuning result, you need to increase the cluster scale and input the data scale while using the optimized configuration you got to test the scalability of the configuration. More precisely, increase the cluster scale to 30 nodes and the input data scale to 1.5TB and then do the above testing procedure all over again.
As space is limited, the detailed tuning procedure won't be described here. The monitor and analyze method is exactly the same as the one mentioned above, and the main bottleneck you saw occurred in the network. The inter-rack bandwidth became insufficient when the input data increased to TB scale. When the inter-rack bandwidth was increased to 4 GB and all other 10-nodes-optimized parameters were left unchanged, the final execution time was 5916 seconds, which is quite close to the 10-nodes-optimized result (5670 seconds).


You now know how to monitor a Hadoop cluster, analyze the system bottleneck using the monitoring data, and tune the performance. Hopefully, this knowledge can help you make full use of your Hadoop cluster and result in finishing your jobs more efficiently. You can use the method described in this article to further explore configurable parameters of Hadoop and find the connection between parameters configuration and different job characteristics.
What's more, this parameters-based tuning is kind of "static," because one set of parameters configuration is only optimized for one kind of job. To gain more flexibility, you might be interested in studying the scheduling algorithm of Hadoop and perhaps share your new methods for improving Hadoop performance.



Get products and technologies



Sign in or register to leave a comment.
Note: HTML elements are not supported within comments.

1000 characters left
 Total comments (1)
Yu - Thanks for this great article.

Will this also work with Pig queries ?

I have been trying to profile pig queries for CPU, RAM, Disk IO usage with no success. I was wondering if you can guide me on this.

I have tried Hprof and Starfish with Pig but neither works.

Any hints, pointer ?

Thanks for reading !!

No comments:

Post a Comment