DistOS-2011W BigTable

From Soma-notes

Introduction

Bigtable is a distributed storage system used at Google. Its main purpose is to have an enormous amount of data scale reliably over a large number of computers. Distributed transparencies are of key importance in the overall framework.

Projects and applications that require a large amount of storage space for data are well suited for this type of system. Some projects from Google that use Bigtable include: Google Earth, the Google web-indexing engine, and the Google App Engine. Ultimately these applications will store different types of information, from URLs, web pages, and satellite images. Some of these applications require low latency and while others require a high-throughput of batch-processing.

The design and framework of Bigtable is of considerable interest, however, it is not open source and accessible to the general public. Clearly, this is a problem for an implementation report, so we will use Apache’s open-source implementation Hadoop to get an understanding of how to configure, run, and deploy applications across the system. Throughout the rest of the paper, we will contrast and comment on the similarities between the two platforms.

Real World Implementations

Bigtable is a proprietary system at Google, so it is currently only used and implemented by them. There are, however, other implementations that exist in the open source world; notably Hadoop, which is Apache’s implementation of MapReduce running on a distributed file system. It is based on the papers released by Google on their MapReduce and Google File System (GFS). Their framework allows applications to be run on large clusters of commodity hardware. Some organizations that make significant use Hadoop are: Yahoo, eBay, Facebook, Twitter, and IBM.

In the next two sections, we discuss the features and underlying’s of the two frameworks.

Google’s Implementation

Bigtable was designed with the following in mind:

  • Wide applicability
  • Scalability
  • High performance
  • High availability

Bigtable is in some ways similar to a traditional database. Data is accessed using unique row, column as well as time identifiers. The data itself is seen as strings, but quite often serialized objects are stored in these strings for more complicated structures.

Read and write operations are both atomic for simplicity in the case of concurrent access.

Structure of Data

Since the data is distributed over many computers, related data can become disjoint in terms of location. These related data sets are known as tablets and correspond to a row key. When these tablets correspond to only a small number of computers, the reads become more efficient. In many situations, such as URLs for web indexing, the hostname parts are reordered to allow pages from the same domain to be lexicographically grouped together and in-turn speed-up searches.


Reordering URLs alt text


Individual column identifiers on the other hand, are part of what’s known as a column family. Data within a specific family are usually of the same type and are compressed together. Within each of these families there can be many columns. These groupings are useful for situations such as authorization of users; some users have more privilege and can view more data than others. In this case, a user with limited access would only have access to a select few column families.

Along with row and column identifiers, there are timestamps. These allow for different versions of potentially the same data. Timestamps are usually set automatically by the system, but can also be manually set by applications as a 64-bit number.

Garbage Collection

Bigtable also provides a garbage collection feature. For example, cells with multiple versions can be truncated to store only the last n versions or only the versions that were added within the last three days.

Apache’s Implementation – Hadoop

The Hadoop framework utilizes Google’s Map/Reduce algorithm to separate an application into many small components that can be run and executed across multiple machines. Several extensions exist for Hadoop that replicate Bigtable's distributed file system. One that we will look at it HBase. It is also developed in Java and is extended on top of the Hadoop Distributed File System (HDFS). HBase provides the same functionality as Bigtable described in the sections above.

Components

A typical Hadoop installation consists of

  • Hadoop Common
    • Access to the filesystems of Hadoop
    • Files to start Hadoop
    • Documentation, source code
  • HDFS
    • Distributed, scalable, portable file system written in Java
    • Uses TCP/IP for communication
    • Clients use Remote Procedure Calls (RPC) to communicate to each other
    • Splits data across multiple machines
      • Usually replicates data on 3 nodes: 2 on the same rack, 1 on another rack
  • Single Master and multiple Slave nodes

Installation, Running, Debugging on Hadoop

The installation of Hadoop is quite straight-forward. After unzipping the compressed Hadoop folder, two files needed to be edited:

  1. conf/hadoop-env.sh
  2. bin/hadoop

For the first file, find the following line:

# export JAVA_HOME=/usr/lib/j2sdk1.6-sun

and change it to:

# export JAVA_HOME=/System/Library/Frameworks/JavaVM.framework/Versions/1.6.0/

For the second file, find the following line:

JAVA=$JAVA_HOME/bin/java

and change it to:

JAVA=$JAVA_HOME/Commands/java

The Hadoop installation is now complete and we can now dive into the API’s and creating a project.

Creating a Hadoop Project

I looked at two possible deployment options; one in a local standalone mode and another in a pseudo-distributed mode.

Standalone mode

The first is very simple and involves a simple master and slave running on a single machine. An input text file is read and parsed into tokens corresponding to unique words. These words are inserted into a map data structure. The reduce function is executed by the slave or client nodes. In this case, one client will apply the reduce function over the map; in turn, creating a count of the number of recurrences of unique words. Once all the reduce functions are complete, the output is printed to a file shown in Figure 4.

Single node application input alt text
Figure 2: Application input used is just a simple text file with text.
Single node console output alt text
Figure 3: Console output from a word counter executed over a single node.
Single node application output alt text
Figure 4: Application output from a word counter executed over a single node. The map is displayed visually to show the map/reduce output.


The following code snippet creates a map/reduce job which can be picked up and processed by multiple slave nodes:

JobConf conf = new JobConf(getConf(), HadoopTest.class);
		conf.setJobName("wordcount");

		// the keys are words (strings)
		conf.setOutputKeyClass(Text.class);
		// the values are counts (ints)
		conf.setOutputValueClass(IntWritable.class);

		conf.setMapperClass(MapClass.class);
		conf.setCombinerClass(Reduce.class);
		conf.setReducerClass(Reduce.class);

		List<String> other_args = new ArrayList<String>();
		for (int i = 0; i < args.length; ++i) {
			try {
				if ("-m".equals(args[i])) {
					conf.setNumMapTasks(Integer.parseInt(args[++i]));
				} else if ("-r".equals(args[i])) {
					conf.setNumReduceTasks(Integer.parseInt(args[++i]));
				} else {
					other_args.add(args[i]);
				}
			} catch (NumberFormatException except) {
				System.out.println("ERROR: Integer expected instead of "
						+ args[i]);
				return printUsage();
			} catch (ArrayIndexOutOfBoundsException except) {
				System.out.println("ERROR: Required parameter missing from "
						+ args[i - 1]);
				return printUsage();
			}
		}
		// Make sure there are exactly 2 parameters left.
		if (other_args.size() != 2) {
			System.out.println("ERROR: Wrong number of parameters: "
					+ other_args.size() + " instead of 2.");
			return printUsage();
		}
		FileInputFormat.setInputPaths(conf, other_args.get(0));
		FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));

		JobClient.runJob(conf);

Here, a reduce function is shown that will join words that are equal and output the number of occurences within the map.

public void reduce(Text key, Iterator<IntWritable> values,
				OutputCollector<Text, IntWritable> output, Reporter reporter)
				throws IOException {
			int sum = 0;
			while (values.hasNext()) {
				sum += values.next().get();
			}
			output.collect(key, new IntWritable(sum));
		}

Pseudo-distributed mode

Running a pseudo-distributed setup alt text
Figure 5: Running a pseudo-distributed setup

For the pseudo-distributed mode, I changed the configuration of the following files:

  • conf/core-site.xml
<configuration>
    <property>
        <name>fs.default.name</name>
        <value>hdfs://localhost:9000</value>
    </property>
</configuration>
  • conf/hdfs-site.xml
<configuration>
    <property>
        <name>dfs.replication</name>
        <value>1</value>
    </property>
</configuration>
  • conf/mapred-site.xml:
<configuration>
    <property>
        <name>mapred.job.tracker</name>
        <value>localhost:9001</value>
    </property>
</configuration>

To create a cluster of nodes that will run in separate Java process on the same machine (and hence pseudo-distributed), I entered the command:

bin/hadoop nodecluster –format

This will create the clusters according to the configuration above. To start the nodes, I used this command:

bin/start-all.sh

In order to test the system, there are samples available with the Hadoop package. I ran the one below:

bin/hadoop jar hadoop-*-examples.jar grep input output 'dfs[a-z.]+'

The result of the distributed computation is unclear here, as the output is not updated in the cluster summary page. I managed to get the clusters up and running, but did not manage to get any meaningful computation running across the separate Java processes.

Viewing the statistics of applications running on the cluster alt text
Figure 6: RViewing the statistics of applications running on the cluster

Evaluation of Installation

Setup

Overall, the configuration and setup of a Hadoop cluster is fairly simple. Difficulties tend to arise on the deployment of applications across the system as was my case.

Deploying applications

The simple application that I chose to run over the single-node cluster involved using Eclipse as the development environment to generate a jar file. This jar file was added as input over the command line to the running Hadoop implementation which executed the application over the available nodes.

Deploying this same application over the pseudo-distributed node proved to be much more challenging and is something that I plan to conquer in my future workings with Hadoop. Creating a true-distributed cluster would also be quite interesting.

Discussion

Surprisingly, the API’s for Hadoop are easy to use and incorporate with existing problems that were designed for single-core processors. However, when developing algorithms for map/reduce, it is essential to analyze the problem from a different perspective. Reduce algorithms need to be efficient and result in a constructive output that will make future reads and writes faster.

What is particularly surprising is the fact that Bigtable and Hadoop are designed to run on commodity hardware. This ultimately has resulted in huge cost savings for organizations using the systems. For instance, Google has only minimal incremental costs associated with expanding their services and adding computing power to the clusters. Particularly, since it was developed in-house, they were not forced to pay royalties or buy licenses associated with each machine.

Another interesting part to the design is how they achieve scalability and performance over a large amount of clusters. Each tablet load balances itself over multiple machines. When a tablet becomes the victim of high-demand, the tablet is automatically spread through to other nodes to balance the processing. Furthermore, if a node goes down to a variety of reasons, other nodes will jump in and take a tablet from the downed node. With many nodes in a cluster, the overhead to retrive and restore each tablet is shared across the whole network; each available node picks up one tablet and incurs only minimal delays. Clearly Bigtable and Hadoop were designed with scalability in mind, but for the system to be useful it must also be reliable. The systems are designed to be able to easily add support for larger number of users and resources. This functionality adds a great deal of complexity to the system and hence, achieving complete reliability is extremely difficult. In Google's explanation, they claim the following are key design choices:

  • Fault
  • Highly Available
  • Recoverable
  • Consistent
  • Scalable
  • Predictable Performance
  • Secure

Not surprisingly, many of these are components of distributed transparencies. What's interesting is how integral the design for failure approach is to the system. Some failure cases have extremely low probability rates of occurring, but in a large distributed system with millions of simultaneous messages over the network, these scenarios have to be accounted for. Choosing to disregard this as a legitimate issue can easily result in a minor problem cascading across entire clusters; hindering its availability.

As an extension to what I have tried so far, I plan to test and run applications across multiple physical nodes. Ideally I want to make use of the underlying file system and understand fully how the system behaves during reads and writes as well as how it handles node's dropping out due to connectivity issues.

Conclusion

The installation of a Hadoop gave me a great deal of insight into the workings and design behind the proprietary software used in the Bigtable implementation. Using the various references, tutorials, and documentations I found, I was able to set up a working example of a master/slave computation on a single node as well as set up a cluster running on a single machine across multiple Java processes. Future work that I can do is set up a fully-distributed mini-cluster with applications and files utilizing the processing power and disk space of the nodes. Working directly Hadoop has given me a greater understanding of the possible uses of such a system like Bigtable. It is also a very effective way to learn about the reasons behind some of the design decisions that were taken by the development teams. Adhering to the distributed transparencies were key to these systems, especially scalability and fault tolerance. Lastly, using Hadoop was easy-to-use and allowed one to get a system running in a relatively short period of time.

References

Bhandarkar, M. Practical Problem Solving With Apache-Hadoop. Yahoo Inc. (February 11, 2011) [1]

BigTable. Wikipedia. (February 15, 2011) [2]

Chang F. et al. Bigtable: A Distributed Storage System for Structured Data. Google Inc. (February 11, 2011) [3]

Computing and Information Science. Cornell University. (February 23, 2011) [4]

Distributed Systems. Google Code University. (February 23, 2011) [5]

Hadoop. Apache. (February 23, 2011) [6]

Hadoop. Wikipedia. (February 23, 2011) [7]

Hadoop Wiki. Apache. (February 23, 2011) [8]

HBase. Apache (February 25, 2011) [9]