Kankoku: A Distributed Framework for Implementing Statistical Models

As future-facing as Knewton’s adaptive learning platform may be, the concept of a personalized classroom has a surprisingly rich history. The idea has intrigued educators and philosophers for decades. In 1954, behavioral psychologist B.F. Skinner invented the concept of programmed instruction, along with a working mechanical prototype to boot. His “teaching machine” consisted of a wooden box on which questions were displayed to students on strips of paper controlled by turning knobs. One would only progress upon answering a question correctly. A crucial feature of the teaching machine was that “the items were arranged in a special sequence, so that, after completing the material in frame 1, the students were better able to tackle frame 2, and their behavior became steadily more effective as they passed from frame to frame.” The argument upon Skinner’s teaching machine was founded still holds water today: that “what is taught to a large group cannot be precisely what each student is ready just at that moment to learn.”1

KONICA MINOLTA DIGITAL CAMERA

Sixty years later, examining Skinner’s prototype still provides an insightful frame of reference. Knewton’s platform is responsible for tracking the individual learning states of each student at the granularity of individual concepts and questions. Like the teaching machine, we must deliver relevant recommendations in real-time and classroom analytics in near real-time. Those recommendations and analytics serve as a tool for both students and teachers to improve student outcomes. Considerations like these influence the engineering decisions we make on a daily basis, including the decision to use a stream-processing framework to power several of our statistical models. In this blog post, we will open the hood of our own teaching machine to explore the tradeoffs behind the design of Knewton’s scientific computing platform.

Why Stream Processing?

Knewton’s recommendation engine faces the task of providing recommendations to millions of students in real-time. As one of the pioneers of behaviorism, Skinner certainly understood the importance of delivering the right feedback at the right time.2 Respond to a student event (e.g., finishing an article) just two minutes late, and the impact of a recommendation diminishes rapidly. But what goes into each recommendation under the hood? A recommendation is essentially a ranked selection of instructional content that is most relevant to the subject matter that a student is studying at any particular time. Every student’s learning history (the data representing their interactions with content and their activity on the system) is taken into account. Knewton’s recommendation engine also considers other factors, such as each student’s learning goals and deadlines. All of this data is processed through a variety of psychometric and statistical models that estimate various characteristics of students (e.g., their proficiency or engagement level) and content (e.g., its difficulty or effectiveness). While some of these computations can be performed ahead of time, there are still numerous models that must be computed on the spot in response to a student interaction.3 Combining and processing all of this data results in a very large sequence of actions that must be performed in a small period of time.

Knewton is much more than just a differentiated learning app. Imagine if Skinner’s teaching machine knew every student’s individual learning history, knowledge state, habits, strengths, and upcoming goals, and could take into account goals set by teachers or administrators.

To handle all this data, Knewton has built Kankoku4, a stream processing framework that can respond to individual events in real-time.5 Stream processing systems operate under the requirement that inputs must be processed “straight-through” — that is, real-time feeds must trigger a set of downstream outputs without necessarily having to resort to polling or any intermediate storage. Stream processing systems are also characterized by their support of real-time querying, fault-tolerance, and ability to scale horizontally.6 The primary complement to stream processing is batch processing, consisting of programming models such as MapReduce that execute groups of events scheduled as jobs. Batch computing is fantastic for efficiently performing heavy computations that don’t require immediate response times.

However, these advantages of batch processing are also what make it less suitable for responsive, high availability systems like Knewton’s.7

Kankoku

Kankoku is a scientific computing Java framework developed in-house that provides a programming model for developing decoupled scientific models that can be composed to create any kind of computable result. The framework aims to abstract away the details of retrieving and storing data from databases, reliability, scalability, and data durability, letting model writers concentrate on creating accurate and efficient models. In the example workflow below, the nodes (or Kankokulators, as we call them) represent individual (or sets of) calculations. Streams are fed into Kankoku from a queue, which serves as a message broker by publishing received student events into various topics to which Kankoku subscribes.

Kankoku.jpg

With this framework, complex multi-stage computations can be expressed as networks of smaller, self-contained calculations. This style of programming is especially well-suited for data analysis where the outputs of an arbitrary statistical model could be used as inputs to another. One example of this could be aggregating student psychometrics as inputs for modeling student ability using Item Response Theory (IRT).

Speed and horizontal scalability are also important in developing a stream processing framework for real-time events. One of the many ways Knewton achieves horizontal scalability is by partitioning the input data stream using a partitioning key in the queue.8

“Kankoku” Means “Recommendation”

Similar to how Skinner’s teaching machine immediately responds to individual inputs, Kankoku streamlines responsive event processing for arbitrary, unbounded data streams. Both serve a complex need — providing personalized learning recommendations — yet have internal mechanisms that are easily decomposable, and execution that is reproducible.

But Kankoku is very different from the teaching machine. The software it powers is capable of understanding and analyzing the learning mechanisms of millions of students. Ensuring that Knewton doesn’t sacrifice quality to meet the demands of quantity or speed is a top priority. To meet these ends, we are continually revising and extending our models to run more efficiently while delivering better results. Kankoku’s design is a strength here. Not only does it help Knewton break down a complex task into smaller pieces, it also makes it simpler to understand and tweak each component. Monitoring these models requires complex visibility tools that allow Knewton to examine intermediate computation in real-time. Kankoku is less like one teaching machine than it is hundreds of small machines working together in concert.

So What?

In his exposition “Programming Instruction Revisited,” Skinner spoke of his dream of creating technology that would help classrooms evolve beyond the “phalanx formation” by helping teachers become even more attuned to every student’s individual needs. As history has shown us, implementing such technology at scale is an extremely difficult problem. Truly understanding student needs and providing feedback in real-time is a non-trivial challenge for any person, much less a computer program. Practical machine learning and “artificial intelligence” is in many ways a systems engineering challenge — building models that can handle real-time workloads at scale is crucial to creating a service that will actually be useful to students and teachers. Well-designed systems will never replace teaching, but they can provide an automated, responsive, and unified platform to expose insights about student learning to teachers and parents around the world, who do understand how to best act on those insights.

Teachingmachine.jpg

Acknowledgements

I’d like to thank the creators of Kankoku — Nikos Michalakis, Ferdi Adeputra, Jordan Lewis, Erion Hasanbelliu, Rafi Shamim, Renee Revis, Paul Kernfeld, Brandon Reiss, George Davis, and Kevin Wilson — for their tireless work as well as letting me play with such an awesome piece of technology. Stay tuned for part 2 of this blog post for more details on my internship project (extending the Kankoku framework with Apache Storm).


  1. B.F. Skinner. Programming Instruction Revisited

  2. Knewton is not preaching or practicing behaviorism. This is only meant to be an analogy. 

  3. http://en.wikipedia.org/wiki/Online_algorithm 

  4. Kankoku means “advice” or “recommendation” in Japanese. It also means “Korea.” 

  5. In addition to powering Knewton’s recommendation engine, stream processing suits a variety of applications, ranging from powering Google Trends to supporting fraud detection and “ubiquitous computing” systems built on cheap micro-sensor technology that demand high-volume and low-latency requirements. Other applications include powering bank transactions (which require exactly-once delivery), image processing for Google Street View, and command-and-control in military environments. See: Akidau, et al. MillWheel: Fault-Tolerant Stream Processing at Internet Scale

  6. Stonebraker, et al. The 8 Requirements of Real-Time Stream Processing

  7. Frameworks such as the Lambda Architecture exist that unite both programming models. There is also technically a gray zone between batch and streaming processing frameworks – for instance, Spark Streaming processes events in microbatches. Some of our models can’t be implemented with microbatching, but it is an interesting idea worth exploring. 

  8. Alternative terminology for “grouping”: sharding, shuffling. 

Kankoku: A Distributed Framework For Implementing Statistical Models (Part 2)

The focus of my internship project this summer was to extend Kankoku (Knewton’s scientific computing framework) to operate in a more distributed fashion. There are a few reasons that drove this change — namely, increased functionality in model expression and greater operational control in model execution. In this blog post I will analyze these objectives in detail and explore why they necessitated tradeoffs from both the systems engineering and business perspectives.

Kankoku is a scientific computing Java framework developed in-house at Knewton that provides a stream-processing programming model for developing decoupled scientific models that can be composed to create any kind of computable result. For a more detailed discussion on stream processing and Kankoku, see part one of this blog post.

Weathering the Storm: Introducing Distributed Kankoku

Partitioning, or dividing a set of inputs into a collection of subsets, is a key problem in any distributed system. Mod hashing and consistent hashing are examples of how shuffle groupings would be implemented, in which keys are uniformly distributed across partitions in a pseudorandom process. Kankoku currently performs a shuffle grouping before model execution, which allows workloads to be balanced across separate machine stacks that each run independent instances of the same topology.1 However, the calculation of certain psychometrics may require additional partitioning (i.e., multi-partitioning).

Recall that Knewton performs online analysis of both the student and the classroom. Consider the scenario in which the output of Kankokulator node A (calculating a student metric) serves as the input to Kankokulator node B (calculating a classroom metric). Since A processes events per student, the initial grouping must happen by student ID. However, B must process events per classroom. This presents a problem, since there is no guarantee that two students in the same class are grouped to the same partition. A simple solution might be to route the output of A through a queue serving as an intermediate message broker. This queue can then regroup the data stream based on class ID:

Kankokulator.jpg

However, this approach scales poorly for several reasons. Creating new queue shards to track each new multi-partition can be difficult to maintain from a development standpoint. Rerouting the data stream to an intermediate broker with every grouping also introduces extra overhead and network latency. There is also no guarantee that the models execute deterministically. Previously, each instantiation of a Kankoku topology ran on its own machine, processing each input in a topologically-ordered fashion. With intermediate queues, keys may be processed out of order due to varying latency. A more general-purpose solution is preferable.

This is where the Apache Storm framework (originally developed by Twitter) comes in as a possible candidate. Like Kankoku, Storm is a general stream-processing framework, but with one crucial design difference: it is strongly distributed, in that nodes in the same topology need not run sequentially on the same machine. As a result, Storm supports the ability to perform arbitrary groupings between each node, and multiple groupings within the same topology.2 Nodes in a Storm topology are referred to as bolts, and data sources are referred to as spouts.

Using Storm’s Trident API, declaring a new grouping within the topology is as simple as calling the function partitionBy. The example below shows how our hypothetical scenario above might be implemented using Storm instead of rerouting through a queue:

tridenttopology.jpg

Kankoku can therefore be extended by “wrapping” subtopologies (individual Kankokulators or groups of Kankokulators) within Storm bolts. Bolts will encompass contiguous Kankokulators expecting data streams partitioned by a common key type, and a new bolt will be created whenever an additional partitioning operation is required. This interaction introduces the functionality of multi-partitioning while still preserving our original model execution; bolts do not define how data is managed and arbitrary Kankokulator code can still run within a bolt. Hence, in this architecture Kankoku provides a higher-level programming model built upon Storm.

Another use case for this particular design arises from Storm’s convenient parallelism hint feature. Parallelism hints are the initial number of executor threads allocated to a particular bolt, which can be rebalanced during runtime. Tuning the parallelism hint of bolts gives us additional operational control over executing topologies by weighting CPU resources differently for separate subtopologies. Therefore, subtopologies that we expect to be more computationally expensive can be allocated more processing power, which in turn helps increase throughput.

Queue.jpg

The topology above shows how a Storm-Kankoku topology might be represented. Within each bolt, the Kankoku subtopology will run deterministically so as to take advantage of data locality. Hence, it is advantageous to wrap as many Kankokulators as possible within each given bolt while still fitting the constraints imposed by weighted parallelism and multi-partitioning.

Tradeoffs of Operating In A Distributed Environment

My internship project this summer consisted of implementing a prototype of the integrated Storm-Kankoku framework similar to the sample topology displayed above in addition to examining the tradeoffs behind extending the Kankoku framework using Storm. Introducing added parallelism at a platform level can have sweeping effects on the behavior of our statistical models, affecting both integrity and performance. A few considerations we explored:

A) Bolt-level deterministic execution. Although Storm may not be able to recover the state of execution within an individual bolt if it fails, Storm’s “Transactional Topologies” guarantee that “multiple batches can be processed in parallel, but commits are guaranteed to be ordered.” Hence, topological ordering still applies and we expect reproducible execution.

B) Fault-tolerance. Storm provides fault tolerance with clear guarantees across bolt execution and state-saving operations (either exactly-once or at-least-once delivery). By assigning a monotonically increasing transaction ID to each commit of events, Storm provides the semantics needed to detect and filter out duplicate events replayed by Storm in the event of a failure. Fault tolerance is especially important when the outputs of Kankokulator nodes are saved or transmitted during execution — without Storm’s guarantees, events might be lost or duplicated.

C) Horizontal Scalability. Any implementation must take care to increase throughput gains without decreasing latency. One possible performance pitfall faced in a distributed environment is the added latency introduced by redundant computations that must be computed by each bolt (such as loading the Knewton knowledge graph). This could potentially be solved by an off-node cache such as ElastiCache at the cost of introducing additional complexity. In general, careful load testing must be performed to determine the ideal method of data processing — whether to pass values across the wire or to utilize intermediate checkpointing and storage structures.

As expected, many of these tradeoffs don’t point to a single right answer. For instance, depending on the scenario Knewton might leverage Storm’s exactly-once functionality at the expense of introducing more latency. In situations like these, it becomes less a question of which approach to take and more so a question of how important each requirement is. How important is it to filter out duplicate events? What is the cost of producing a recommendation that is stale, possibly by just a few seconds? How important is it for Knewton to keep its latency as low as possible? These questions strike at the heart of both Knewton’s internal systems design and its core business value-add, and encapsulate much of what made my internship intellectually engaging and rewarding.

Sources


  1. By topology, we mean a directed acyclic graph (DAG) that defines the workflow of calculations. 

  2. Storm implements the ability to partition by using an abstraction called an Active Distributed Hash Table (Active DHT). Active DHTs extend distributed hash tables to allow an arbitrary user defined function (UDF) to be executed on a key-value pair. Source: A. Goel, Algorithms for Distributed Stream Processing

Cassandra and Hadoop – Introducing the KassandraMRHelper

Here at Knewton we use Cassandra for storing a variety of data. Since we follow a service-oriented architecture, many of our internal services are backed by their own data store. Some of the types of data we store include student events, recommendations, course graphs, and parameters for models. Many of these services and clusters are often deployed in more than two environments, increasing the total number of Cassandra clusters deployed.

On the Analytics team at Knewton we are constantly working on improving a lot of the inferential models that go into our platform, while at the same time building new ones. This often involves munging a lot of data in short periods of time. For a lot of our ad-hoc analysis we use a data warehouse by which analysts can query and extract data relatively quickly. One of the challenges we’ve faced at Knewton — and specifically in Analytics — involved how to go about populating our data warehouse with data from Cassandra clusters that predated our data warehouse. To solve this problem, we implemented an internal library for bulk extracting data out of Cassandra into Hadoop with zero hits to the Cassandra cluster. A few months later we opened sourced it here and called it the KassandraMRHelper.

KassandraMRHelper takes a slightly different approach than the constructs contained in the Hadoop package in the Cassandra source code (e.g. AbstractColumnFamilyInputFormat), in that it doesn’t require a live Cassandra cluster to extract the data from. This allows us to re-run map-reduce jobs multiple times without worrying about any performance degradation of our production services. This means that we don’t have to accommodate more traffic for these offline analyses, which keeps costs down.

How does it work?
The KassandraMRHelper includes specialized Input Formats and Record Readers for SSTables. First, here’s a little bit about SSTables:

SSTables are immutable; once they’re written they never change.
SSTables can exist independently of each other but collectively they form the complete data set.
SSTables consist of 4-5 parts depending on which version you’re using:

There are 4 to 5 different components:

  • Data.db files store the data compressed or uncompressed depending on the configuration
  • Index.db is an index to the Data.db file.
  • Statistics.db stores various statistics about that particular SSTable (times accessed etc)
  • Filter.db is a bloomfilter that’s loaded in memory by the cassandra node that can tell it quickly whether a key is in a table or not.
  • CompressionInfo.db may or may not be there depending on whether compression is enabled for a ColumnFamily. It contains information about the compressed chunks in the Data.db file.

Data in columns and rows are essentially key value pairs, with rows as the keys and columns as values to the rows. The columns are also key value pairs consisting of a name and a value.

Given how data are stored, Cassandra is in fact a really good fit for MapReduce. The same partitioning schemes that Cassandra uses can also be used in MapReduce. Columns and rows can be the keys and values that get passed in the Mappers or Reducers in a MapReduce job.

Some key components of KassandraMRHelper are:

  • The SSTableInputFormat: The SSTableInputFormat describes how to read the SSTables and filters through all the components and ColumnFamilies, keeping only what’s necessary for processing using a custom PathFilter. There are two types of SSTableInputFormats depending on how you want to represent key/value pairs in MapReduce. The SSTableColumnInputFormat constructs an SSTableColumnRecordReader in which keys in the Mapper represent the row keys in Cassandra and values represent a single column under that row key. Similarly the SSTableRowInputFormat constructs an SSTableRowRecordReader in which keys in the Mappers are the Cassadndra row keys and values are column iterators over all the columns under a single row key.
  • The SSTableRecordReader: It internally uses an SSTableScanner and a Descriptor similar to the one contained in recent version of Cassandra but with backwards compatibility to identify all the parts of an SSTable. As described previously, subclasses of the SSTableRecordReader can represent values as single columns under a row or entire column iterators.
  • The SSTableColumnMapper: This abstract mapper inherits from the default Hadoop mapper but adds a bit more structure to deal with the ByteBuffers and columns contained in the SSTables. It can also skip tombstoned columns.
  • The SSTableRowMapper: This is similar to the mapper above that deals with column iterators.

Example
Setting up a MapReduce job for reading a Cassandra cluster becomes very simple. The only missing piece is finding an easy way to get all the SSTables into a Hadoop cluster. At Knewton we found Netflix’s Priam to be a good match. Priam backs up our Cassandra cluster multiple times a day into S3 making it really easy to transfer the data to Elastic MapReduce (EMR).

This simple MapReduce job shows a complete example job that consumes student event data from backed up Cassandra SSTables. The example can also be found here.

public static void main(String[] args) throws Exception {
  Configuration conf = new Configuration();
  Job job = new Job(conf);
  SSTableInputFormat.setPartitionerClass(
      RandomPartitioner.class.getName(), job);
  SSTableInputFormat.setComparatorClass(LongType.class.getName(), job);
  SSTableInputFormat.setColumnFamilyName("StudentEvents", job);
  job.setOutputKeyClass(LongWritable.class);
  job.setOutputValueClass(StudentEventWritable.class);
  job.setMapperClass(StudentEventMapper.class);
  job.setReducerClass(StudentEventReducer.class);
  job.setInputFormatClass(SSTableColumnInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);
  SSTableInputFormat.addInputPaths(job, args[0]);
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  job.waitForCompletion(true);
}
public class StudentEventMapper extends SSTableColumnMapper
      <Long, StudentEvent, LongWritable, StudentEventWritable> {
  @Override
  public void performMapTask(Long key, StudentEvent value, Context context) {
    context.write(new LongWritable(getMapperKey(),
                  new StudentEventWritable(studentEvent));
  }
  @Override
  protected Long getMapperKey(ByteBuffer key, Context context) {
    ByteBuffer dup = key.slice();
    Long studentId = dup.getLong();
    return studentId;
  }
  @Override
  protected StudentEvent getMapperValue(IColumn iColumn, Context context) {
    return getStudentEvent(iColumn, context);
  }
}

Notice that the mapper extends from a specialized SSTableColumnMapper which can be used in conjunction with the SSTableColumnRecordReader.

The example above uses the identity reducer to simply write the data as comma separated values by calling the toString() method on the StudentEventWritable objects. The only additional task you have to worry about in the Reducer is deduping the data, since you will probably have a replication factor of > 1.

Automating this job becomes an easy task given that SSTables are immutable and older tables don’t have to be read if they were already read once. Enabling incremental snapshots can also help here.

Conclusion

If you want to get started on using the KassandraMRHelper you can check out the code here: https://github.com/Knewton/KassandraMRHelper. Contributions are more than welcome and encouraged.

If you’re interested in additional topics in Cassandra and Hadoop you should check out the presentation on bulk reading and writing Cassandra using Hadoop here with the slides shared here.