Cassandra and Hadoop – Introducing the KassandraMRHelper

Here at Knewton we use Cassandra for storing a variety of data. Since we follow a service-oriented architecture, many of our internal services are backed by their own data store. Some of the types of data we store include student events, recommendations, course graphs, and parameters for models. Many of these services and clusters are often deployed in more than two environments, increasing the total number of Cassandra clusters deployed.

On the Analytics team at Knewton we are constantly working on improving a lot of the inferential models that go into our platform, while at the same time building new ones. This often involves munging a lot of data in short periods of time. For a lot of our ad-hoc analysis we use a data warehouse by which analysts can query and extract data relatively quickly. One of the challenges we’ve faced at Knewton — and specifically in Analytics — involved how to go about populating our data warehouse with data from Cassandra clusters that predated our data warehouse. To solve this problem, we implemented an internal library for bulk extracting data out of Cassandra into Hadoop with zero hits to the Cassandra cluster. A few months later we opened sourced it here and called it the KassandraMRHelper.

KassandraMRHelper takes a slightly different approach than the constructs contained in the Hadoop package in the Cassandra source code (e.g. AbstractColumnFamilyInputFormat), in that it doesn’t require a live Cassandra cluster to extract the data from. This allows us to re-run map-reduce jobs multiple times without worrying about any performance degradation of our production services. This means that we don’t have to accommodate more traffic for these offline analyses, which keeps costs down.

How does it work?
The KassandraMRHelper includes specialized Input Formats and Record Readers for SSTables. First, here’s a little bit about SSTables:

SSTables are immutable; once they’re written they never change.
SSTables can exist independently of each other but collectively they form the complete data set.
SSTables consist of 4-5 parts depending on which version you’re using:

There are 4 to 5 different components:

  • Data.db files store the data compressed or uncompressed depending on the configuration
  • Index.db is an index to the Data.db file.
  • Statistics.db stores various statistics about that particular SSTable (times accessed etc)
  • Filter.db is a bloomfilter that’s loaded in memory by the cassandra node that can tell it quickly whether a key is in a table or not.
  • CompressionInfo.db may or may not be there depending on whether compression is enabled for a ColumnFamily. It contains information about the compressed chunks in the Data.db file.

Data in columns and rows are essentially key value pairs, with rows as the keys and columns as values to the rows. The columns are also key value pairs consisting of a name and a value.

Given how data are stored, Cassandra is in fact a really good fit for MapReduce. The same partitioning schemes that Cassandra uses can also be used in MapReduce. Columns and rows can be the keys and values that get passed in the Mappers or Reducers in a MapReduce job.

Some key components of KassandraMRHelper are:

  • The SSTableInputFormat: The SSTableInputFormat describes how to read the SSTables and filters through all the components and ColumnFamilies, keeping only what’s necessary for processing using a custom PathFilter. There are two types of SSTableInputFormats depending on how you want to represent key/value pairs in MapReduce. The SSTableColumnInputFormat constructs an SSTableColumnRecordReader in which keys in the Mapper represent the row keys in Cassandra and values represent a single column under that row key. Similarly the SSTableRowInputFormat constructs an SSTableRowRecordReader in which keys in the Mappers are the Cassadndra row keys and values are column iterators over all the columns under a single row key.
  • The SSTableRecordReader: It internally uses an SSTableScanner and a Descriptor similar to the one contained in recent version of Cassandra but with backwards compatibility to identify all the parts of an SSTable. As described previously, subclasses of the SSTableRecordReader can represent values as single columns under a row or entire column iterators.
  • The SSTableColumnMapper: This abstract mapper inherits from the default Hadoop mapper but adds a bit more structure to deal with the ByteBuffers and columns contained in the SSTables. It can also skip tombstoned columns.
  • The SSTableRowMapper: This is similar to the mapper above that deals with column iterators.

Example
Setting up a MapReduce job for reading a Cassandra cluster becomes very simple. The only missing piece is finding an easy way to get all the SSTables into a Hadoop cluster. At Knewton we found Netflix’s Priam to be a good match. Priam backs up our Cassandra cluster multiple times a day into S3 making it really easy to transfer the data to Elastic MapReduce (EMR).

This simple MapReduce job shows a complete example job that consumes student event data from backed up Cassandra SSTables. The example can also be found here.

public static void main(String[] args) throws Exception {
  Configuration conf = new Configuration();
  Job job = new Job(conf);
  SSTableInputFormat.setPartitionerClass(
      RandomPartitioner.class.getName(), job);
  SSTableInputFormat.setComparatorClass(LongType.class.getName(), job);
  SSTableInputFormat.setColumnFamilyName("StudentEvents", job);
  job.setOutputKeyClass(LongWritable.class);
  job.setOutputValueClass(StudentEventWritable.class);
  job.setMapperClass(StudentEventMapper.class);
  job.setReducerClass(StudentEventReducer.class);
  job.setInputFormatClass(SSTableColumnInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);
  SSTableInputFormat.addInputPaths(job, args[0]);
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  job.waitForCompletion(true);
}
public class StudentEventMapper extends SSTableColumnMapper
      <Long, StudentEvent, LongWritable, StudentEventWritable> {
  @Override
  public void performMapTask(Long key, StudentEvent value, Context context) {
    context.write(new LongWritable(getMapperKey(),
                  new StudentEventWritable(studentEvent));
  }
  @Override
  protected Long getMapperKey(ByteBuffer key, Context context) {
    ByteBuffer dup = key.slice();
    Long studentId = dup.getLong();
    return studentId;
  }
  @Override
  protected StudentEvent getMapperValue(IColumn iColumn, Context context) {
    return getStudentEvent(iColumn, context);
  }
}

Notice that the mapper extends from a specialized SSTableColumnMapper which can be used in conjunction with the SSTableColumnRecordReader.

The example above uses the identity reducer to simply write the data as comma separated values by calling the toString() method on the StudentEventWritable objects. The only additional task you have to worry about in the Reducer is deduping the data, since you will probably have a replication factor of > 1.

Automating this job becomes an easy task given that SSTables are immutable and older tables don’t have to be read if they were already read once. Enabling incremental snapshots can also help here.

Conclusion

If you want to get started on using the KassandraMRHelper you can check out the code here: https://github.com/Knewton/KassandraMRHelper. Contributions are more than welcome and encouraged.

If you’re interested in additional topics in Cassandra and Hadoop you should check out the presentation on bulk reading and writing Cassandra using Hadoop here with the slides shared here.

 

On Comparable Noise and Satisfiable Sound

The Motivation

Working for a startup can be intense. Many in this great city succumb to the pressures of work life and relinquish their purely intellectual pursuits. Here at Knewton, however, our love affairs with art, music, literature, formal systems — love affairs that, for the most part, began within very walls of the institutions that we hope to disrupt — these passions inspire our work and our camaraderie with each other. And practically speaking, interview questions at New York’s hottest startups can lean heavily on what was learned in the undergraduate CS lecture hall. The need to keep these skills sharp is clearly still relevant.

The aim here is to kick off a series of blog posts that pair substantial results in CS or formal systems with significant works of classical music. The series is limited to classical works explicitly — there is plenty of headphones-on time at work to listen to The Postal Service.

The Works

Listen to this: Gondwana by Tristan Murail

Read this: On Computable Numbers by Alan Turing

Alan Turing’s On Computable Numbers, with an Application to the Entscheidungsproblem is a seminal paper. Anyone taking a CS undergraduate core will encounter the notion of computability and the halting problem. Further from the ivory tower, it is not difficult to find people who are near-giddy about Turing Completeness:

Breaking news: HTML + CSS3 is Turing Complete,

People looking for help developing a Turing machine in Microsoft Excel,

A Turing machine built using Legos,

Brainfuck, a language described on its website as “an eight instruction Turing-Complete Programming Language”

Turing’s name has become quite literally a “formal” stamp of approval in the computing world. Turing’s seminal paper defined a set of abstractions that formed the basis for what we now call “Turing Completeness,” a requirement of all first-class computing languages. What exactly did Turing say in that paper? And what does it have to do with French spectral music?

Turing’s Paper

Beyond a few feeble mechanical attempts, computers did not exist in 1936, and Mr. Turing didn’t appear that excited about making one — he was interested in making Godel’s long, German work on incompleteness comprehensible. That is, Turing wanted to intuitively construct a class of decision problems that could be proven undecidable. Turing could not tackle this without defining a notion of “computable.” His definition leaned heavily on a few notions that we would recognize today as common, but required a little more formal attention-to-detail back then.

Turing describes memory:

We may compare a man in the process of computing a real number to a machine which is only capable of a finite number of conditions q1, q2, …. qI; which will be called “m-configurations”

and the idea of output:

… the subsequence of the symbols printed by it which are of the first kind will be called the sequence computed by the machine. The real number whose expression as a binary decimal is obtained by prefacing this sequence by a decimal point is called the number computed by the machine.

Academic language aside, the machine Turing is defining is simple. Turing calls this machine an “‘automatic-machine’ (or alpha-machine).” The machine has the following properties:

a tape divided into squares

symbols that are on the squares

the scanned symbol (the rth symbol). Modern texts will likely refer to this as the symbol the ‘head’ is pointing to.

configuration: the current m-configuration and the current scanned symbol (qn, G(r))

But this paper is not without its difficulties, especially if it is approached today.

One initial difficulty: having completed standard undergraduate classes in formal computation theory, Turing’s initial choice of terminology — “Circle-free” and “Circular” — was extremely confusing to my modern eyes. Most undergraduates approach Turing’s ideas in their coursework from the “halting problem” angle. What is initially confusing here is the following:

CIRCLE-FREE != HALTS

How can this be? Turing quickly skirts over the definition of a circle-free machine early on in the paper when he describes it as the following:

If a computing machine never writes down more than a finite number of symbols of the first kind, it will be called circular. Otherwise it is said to be circle-free. A machine will be circular if it reaches a configuration from which there is no possible move, or if it goes on moving, and possibly printing symbols of the second kind, but cannot print any more symbols of the first kind. (p. 233)

Finite. This should be the same as a machine that “halts,” right?

No. Turing’s machines are built to write down numbers. These numbers are (somewhat sneakily, in my opinion) implied to be repeating binary decimals on page 232:

The real number whose expression as a binary decimal is obtained by prefacing this sequence by a decimal point is called the number computed by the machine.

So we’re essentially talking about endlessly repeating floats — the kind of numbers that you get when you type in [1] [/] [3] on a calculator, except in binary. Numbers with bars over the top of the last digit: 0.33333333… for example (0.010101010… in binary). In Turing’s paper, the machine that prints 0.010101010… is “circle-free.” This means it encounters no terminal difficulty in computation — it can keep printing out the endless decimal representation without pausing indefinitely to calculate. By this definition, “finite” numbers such as 4, 5, and 6 would be circle-free numbers, because the machine could be constructed to just print out a sequence of zeroes after the decimal point: 6.000000000….

“Halting” really has no place in his discussion: if we took a machine that “did not halt” by the “halting problem definition”, it would print out, say, 6.000 and then wait for an indefinite amount of time before printing out the next zero.

Understanding the important concept of a circle-free machine is critical, even if your only goal in reading the Turing is to see Godel’s diagonalization “done simply” (see the argument on page 247 of the article, where he argues that the machine H is circular by suggesting an infinite loop).

Why the Murail?

The Murail is a significant representation of the spectral music genre which has significant parallels with the work of Turing.

The Murail, like the Turing, uses an intuitive language, completely divorced from the method-as-music ideal of serialism (about serial music). Gondwana is based entirely off of the first sounds you hear in the piece — it is the subsequent deconstruction that leads the listener down a path that is nearly always defined momentarily, in that each subsequent moment of sound in the piece is very often based off of the prior moment. There are few moments of discontinuity.

The work starts off very simply. The piece begins with seven to eight moments of repeated sound (see 0:12, 0:30, 0:46, 1:01, 1:15, 1:28, 1:38, 1:48, and then MAYBE [1:55, 2:01, 2:07, 2:13-2:40] in this recording), as easily understood derivations from a simple sonic idea.

Indeed, these events are like Turing’s “appeal to intuition” by Murail. There is no recognition of pattern possible without repetition. With the seven chime-strike chords, Murail is handing us a ball of yarn — we will need it to find our way home from the complicated, thorny walk in the woods that is the rest of the piece. There will be no resounding return, no recapitulation to save the day here.

Beyond the opening, the Murail begins to dig deep into the essence of sound. At roughly eight minutes into the work, a hissing begins to become a main sonic feature. The connection to the beginning is simple — there is a brief, percussive sound at the beginning of the piece with a few strings playing, perhaps a maracca — there are lots of high, hiss-like frequencies that comprise the sound of a maracca. But it brings to mind an important question — a question that is at the heart of spectral music: what kind of music is happening in the parts of sound waves that we DON’T hear? What about all the extremely high frequencies that dogs can hear? Above those, even? They exist. They have to. A sound wave can be encoded by a number. Large numbers exist. What if we slowed the noise, say, of water molecules at room temperature bouncing around inside a mason jar? Anyone that has been distracted by the sound of a tea kettle on the verge of boiling already knows that interesting sonic things can happen with water molecules on the surface of a boiling hot-plate. But what about beyond the ear? What types of large-number-based frequencies exist at this sonic extremes of our world?

Computable vs. Uncomputable Sounds?

There is a strong argument that much of “Western Harmony” can be boiled down into small, rational-number relationships between different frequencies; numerical relationships, in fact, that are easily computable by a Turing machine (with a reasonable “skeleton table,” even!). A major triad, for example, is little more than the three ratios:

1, 5/4, and 3/2 (pitches ‘do’, ‘mi’ and ‘so’, if you’ve ever watched “The Sound of Music”)

By the standards of Murail (and Turing, for that matter), these are small numbers, and the human ear is quite adept at performing the computation of recognizing them and placing them in reasonable cultural and musical contexts.

The human ear, from a perspective of what it can understand or “compute” in this way, is quite good. We like minor chords, diminished chords — heck, we’ll even listen to Coltrane. Serial music is tougher. In general, music that doesn’t “sound good” to the western ear because of dissonance, etc. likely involves sound frequency relationships that make use of larger numbers. A minor chord, for example, requires use of the 6th and 8th partials. Its ratios, depending on how you define the third of the chord, require much larger numbers than that of a major chord. The numbers involved with frequencies heard in the Murail then, in all their roaring, bending, and hissing glory, beg a final question:

What is the limit? What is uncomputable for the human ear?

Note:

Parallels with music aside, readers not familiar with Turing’s life and the circumstances of his death should read this: turing wikipedia article. Oppression and intolerance are real, and Turing suffered greatly at the hands of the British government.

from: www.nateburke.com

Java-Scala Interoperability

To many people, Java is programming. It’s the only programming language most of my non-technical friends could name off-hand, and the first one that many programmers learn. Since its initial appearance in 1995, Java’s usage has steadily grown.

There are many reasons for this. The design of the Java platform, in which Java code compiles to bytecode that runs on the Java Virtual Machine (JVM), allows Java to be a “write once, run anywhere” language. That is, Java code that works on my computer will work exactly the same on your computer, or a server in the cloud (in theory, at least). Its language features, especially static typing and object-orientation, lend it to robust, modular, rock-solid code. Due to years of JVM and Java development, the platform is blazingly fast and stable. In addition, the many libraries available for Java allow anybody to leverage years of development and experience to perform complicated tasks simply and quickly. The Java community is also strong: on top of the official documentation, there are years of blog posts (an unscientific Google search for “java blog” yields 268,000,000 results) and close to 280,000 questions tagged “Java” on StackExchange. That’s a lot of collective knowledge and expertise.

At Knewton our systems are Java-based, for a few of the reasons mentioned above. But Java is far from perfect. Its syntax includes much boilerplate and support for concepts like concurrency, parallelism, and functional programming is unidiomatic. There are a lot of small tweaks or features that could make Java a lot more pleasant to work with. These imperfections aren’t significant enough to outweigh the benefits of using Java, but they do make daily work a little more challenging.

Enter Scala. Scala is what would happen if there were a Genie that eavesdropped on every Java developer as they cursed when their build broke due to a missing semicolon, then inferred a wish from their complaints. It offers what Java lacks: a much cleaner syntax (semicolons optional, types inferred), pattern matching and case classes, mix-ins (a more powerful inheritance/interface system), higher-order/first-class/anonymous functions, built-in immutable data structures, a built-in read-evaluate-print-loop (REPL) and actor-based concurrency. Even more importantly than all of this, though, is one huge advantage: it runs on the JVM. This means that, unlike other languages, Scala doesn’t require the reimplementation of large banks of code; programs can use existing Java libraries with minimal effort. For instance, the Java standard libraries are available as soon as your Scala REPL starts. To translate

ArrayList<String> fruits = new ArrayList<String>();
fruits.add("apple");
fruits.add("pear");
fruits.add("banana");
for (String fruit : fruits) {
    System.out.println(fruit);
}

into Scala, we simply write the following (JavaConversions is a Scala library that makes using Java collections like ArrayList much more idiomatic):

import java.util.ArrayList
import scala.collection.JavaConversions._
val fruits = new ArrayList[String]
fruits.add("apple")
fruits.add("pear")
fruits.add("banana")
for (fruit &lt;- fruits) {
    println(fruit)
}
// More idiomatically
fruits.foreach(println)

If you’re using libraries that aren’t packaged with Java or your own classes, just ensure that they lie in the classpath. In addition, the syntactic similarities between Java and Scala mean that the information from nearly any Java resource — a blog post, StackExchange question, Javadoc, or whiteboard code snippet — will be equally valuable in Scala, allowing Scala to tap into over 15 years of experience. While Scala may be a relatively new language, it has access to all of the network benefits of a behemoth like Java.

Portability works the other way, too, but with a few caveats. Consider the following Scala class, in the file “Hamster.scala”:

class Hamster()

The Java file “HamsterTest.java” might contain the following:

public class HamsterTest {
    public static void main(String[] args) {
        Hamster hamster = new Hamster();
    }
}

To compile and run, execute

$ scalac Hamster.scala
$ javac -cp path/to/scala-library.jar:. HamsterTest.java
$ java -cp path/to/scala-library.jar:. HamsterTest

in the shell, and hopefully find that everything works as expected. However, if we want to add a weight field to our Hamster class (“class Hamster(var weight: Int)”), things start to break.

Hamster hamster = new Hamster(42);
// This works (note: "weight()", not "weight"):
System.out.println("The hamster weighs " + hamster.weight());
// But if our hamster then goes on a diet, we're out of luck:
hamster.setA(10); // doesn't work
hamster.a(10); // doesn't work

We can solve this problem with the handy scala.reflect.BeanProperty annotation, which adds getWeight and setWeight methods (we could also add them explicitly):

class Hamster(@BeanProperty var weight: Int)

Then, from Java:

Hamster hamster = new Hamster(42);
hamster.setWeight(hamster.getWeight() - 32);

What about using Scala traits within Java? Since traits aren’t quite interfaces, but they aren’t quite abstract classes, they’re a little tricky. A basic Scala trait, such as

trait Mammal {
    def beHairy
}

can be used just as an interface. But if we add another trait, Rodent,

trait Rodent extends Mammal {
    def beHairy {
        println("I have fur!")
    }
    def gnaw
}

and attempt to extend it

public class Rat extends Rodent {
    public void gnaw() {
        System.out.println("gnawsome");
    }
}

we are met with the compile-time error “no interface expected here”, even though Rodent implements beHairy(). To see why we get this message, whip out javap, the Java disassembler bundled with every JDK (javap is an essential tool for deciphering strange Scala behavior).

$ javap Rodent
Compiled from "Rodent.scala"
public interface Rodent extends Mammal,scala.ScalaObject{
    public abstract void beHairy();
    public abstract void gnaw();
}

Thus, even though Rodent contains method implementation, it still compiles to an interface. Some compile-time magic allows Scala classes to use these traits. The easiest fix is to add something like

abstract class RodentJava extends Rodent

for each trait that implements methods, and have Java classes extend the abstract class, rather than the trait. This has the advantage of being dead simple, but the disadvantage of requiring the maintenance of a separate Java API and sacrificing the ability to mix-in multiple traits as in Scala (this is, however, impossible in Java, since it lacks multiple inheritance); a single abstract Scala class for each potentially used set of mix-ins is the way to go here, but this method risks a combinatorial explosion).

Traits are just one example of a Scala feature that lacks a direct Java equivalent. First-class functions, anonymous functions, Scala’s greater number of access level modifiers, pattern matching, tuples, implicits, and operator overloading are all either non-trivial or impossible to use in Java. In general, to be useful to Java programmers, a library that uses any Scala-specific features must offer a Java-oriented API (of greater complexity than the examples seen here, but involving similar workarounds). Both Apache Kafka [3] and Twitter’s Finagle [4], which I’ve worked with in my summer at Knewton, take this approach; while it’s possible to write Scala code that can be used seamlessly from Java, to do so is to give up the expressiveness of idiomatic Scala.

Whether, as some suggest, Scala will be a successor to Java, or merely a fad, its compatibility with Java is one of its greatest strengths. Combining Java and Scala is nowhere near the undertaking that combining two unrelated languages would be, so the cost of experimenting with Scala as a part of an existing Java architecture is minimal. Given the potential benefits and low risk, it makes sense to consider a total or partial move to Scala as part of a JVM-based ecosystem.