Eureka! Why You Shouldn’t Use ZooKeeper for Service Discovery

Many companies use ZooKeeper for service discovery. At Knewton, we believe this is a fundamentally flawed approach. In this article, I will walk through our failures with ZooKeeper, tell you why you shouldn’t be using it for service discovery, and explain why Eureka is a better solution.

Remember What You’re Building On

Let’s back up. It’s important to first discuss what your target environment is before deciding what software to use or how to build your own. In the cloud, resiliency to equipment and network failure has to be a primary concern. When you’re running your software across a large number of replaceable pieces of hardware, it’s inevitable that one of them will fail at some point. At Knewton, we run on AWS, and we’ve seen many different types of failure. You have to design your systems expecting failure. Other companies on AWS agree (there are whole books written on the topic). You have to anticipate box failure, high latency, and network partitions — and build resiliency against them into your system.

Don’t assume your environment is the same as others. Sure, if you’re managing your own datacenters, you quite possibly could be putting in the time and money to minimize hardware failures and network partitions. But cloud environments like AWS make a different trade-off. You are going to have these issues, and you had better prepare for them.

Failures with ZooKeeper

ZooKeeper is a great software project. It is mature, has a large community supporting it, and is used by many teams in production. It’s just the wrong solution to the problem of service discovery.

In CAP terms, ZooKeeper is CP, meaning that it’s consistent in the face of partitions, not available. For many things that ZooKeeper does, this is a necessary trade-off. Since ZooKeeper is first and foremost a coordination service, having an eventually consistent design (being AP) would be a horrible design decision. Its core consensus algorithm, Zab, is therefore all about consistency.

For coordination, that’s great. But for service discovery it’s better to have information that may contain falsehoods than to have no information at all. It is much better to know what servers were available for a given service five minutes ago than to have no idea what things looked like due to a transient network partition. The guarantees that ZooKeeper makes for coordination are the wrong ones for service discovery, and it hurts you to have them.

ZooKeeper simply doesn’t handle network partitions the right way for service discovery. Like other types of failure in the cloud, partitions actually happen. It is best to be as prepared as possible. But — as outlined in a Jepsen post on ZooKeeper and the ZooKeeper website — in ZooKeeper, clients of the nodes that are part of the partition that can’t reach quorum lose communication with ZooKeeper and their service discovery mechanism altogether.

It’s possible to supplement ZooKeeper with client-side caching or other techniques to alleviate certain failure conditions. Companies like Pinterest and Airbnb have done this. On the surface, this appears to fix things. In particular, client-side caching helps ensure that if any or all clients lose contact with the ZooKeeper cluster, they can fall back to their cache. But even here there are situations where the client won’t get all the discovery information that could be available. If quorum is lost altogether, or the cluster partitions and the client happens to be connected to nodes that are not part of quorum but still healthy, the client’s status will be lost even to those other clients communicating with those same healthy ZooKeeper nodes.

More fundamentally, supplementing ZooKeeper, a consistent system, with optimistic caching is attempting to make ZooKeeper more available. ZooKeeper is meant to be consistent. This gives you neither: you have bolted a system that wants to be AP on top of a system that is CP. This is fundamentally the wrong approach. A service discovery system should be designed for availability from the start.

Even ignoring CAP tradeoffs, setting up and maintaining ZooKeeper correctly is hard. Mistakes are so common that projects have been developed just to mitigate them. They exist for the clients and even the ZooKeeper servers themselves. Because ZooKeeper is so hard to use correctly, many of our failures at Knewton were a direct result of our misuse of ZooKeeper. Some things appear simple but are actually easy to get wrong: for example, reestablishing watchers correctly, handling session and exceptions in clients, and managing memory on the ZK boxes. Then there are actual ZooKeeper issues we hit, like ZOOKEEPER-1159 and ZOOKEEPER-1576. We even saw leadership election fail in production. These types of issues happen because of the guarantees ZooKeeper needs to make. It needs to manage things like sessions and connections, but because they aren’t needed for service discovery, they hurt more than they help.

Making the Right Guarantees: Success with Eureka

We switched to Eureka, an open-source service discovery solution developed by Netflix. Eureka is built for availability and resiliency, two primary pillars of development at Netflix. They just can’t stop talking about it — and for good reason. Since the switch, we haven’t had a single service-discovery-related production outage. We acknowledged that in a cloud environment you are guaranteed failure, and it is absolutely critical to have a service discovery system that can survive it.

First, if a single server dies, Eureka doesn’t have to hold any type of election; clients automatically switch to contacting a new Eureka server. Eureka servers in this case will also accept the missing Eureka server back when it reappears, but will only merge any registrations it has. There’s no risk of a revived server blowing out the entire service registry. Eureka’s even designed to handle broader partitions with zero downtime. In the case of a partition, each Eureka server will continue to accept new registrations and publish them to be read by any clients that can reach it. This ensures that new services coming online can still make themselves available to any clients on the same side of the partition.

But Eureka goes beyond these. In normal operation, Eureka has a built-in concept of service heartbeats to prevent stale data: if a service doesn’t phone home often enough, then Eureka will remove the entry from the service registry. (This is similar to what people typically build with ZooKeeper and ephemeral nodes.) This is a great feature, but could be dangerous in the case of partitions: clients might lose services that were actually still up, but partitioned from the Eureka server. Thankfully, Netflix thought of this: if a Eureka server loses connections with too many clients too quickly, it will enter “self-preservation mode” and stop expiring leases. New services can register, but “dead” ones will be kept, just in case a client might still be able to contact them. When the partition mends, Eureka will exit self-preservation mode. Again, holding on to good and bad data is better than losing any of the good data, so this scheme works beautifully in practice.

Lastly, Eureka caches on the client side. So even if every last Eureka server goes down, or there is a partition where a client can’t talk to any of the Eureka servers, then the service registry still won’t be lost. Even in this worst-case scenario, your service will still likely be able to look up and talk to other services. It is important to note that client-side caching is appropriate here. Because all healthy Eureka servers must be unresponsive to resort to it, we know there is no chance of new and better information possibly being reachable.

Eureka makes the right set of guarantees for service discovery. There are no equivalents for leadership election or transaction logs. There is less for you to get wrong and less that Eureka has to do right. Because Eureka is built explicitly for service discovery, it provides a client library that provides functionality such as service heartbeats, service health checks, automatic publishing, and refreshing caches. With ZooKeeper, you would have to implement all of these things yourself. Eureka’s library is built using open-source code that everyone sees and uses. That’s better than a client library that only you and two other people have seen the code for.

The Eureka servers are also infinitely easier to manage. To replace nodes, you just remove one and add another under the same EIP. It has a clear and concise website that provides a visual representation of all your services and their health. I can’t tell you how great it is to glance at a web page and see exactly what services are running or suffering issues. Eureka even provides a REST API, allowing for easy integration with other potential uses and querying mechanisms.

Conclusion

The biggest takeaways here are to remember what hardware you are building on and to only solve the problems you have to solve. Using Eureka provides both of these for us at Knewton. Cloud platforms are unreliable and Eureka is designed to handle their unique challenges. Service discovery needs to be as available and resilient as possible, and Eureka is designed to be just that.

Additional Resources

Netflix Shares Cloud Load Balancing And Failover Tool: Eureka!

Thread on ZooKeeper Vs Eureka

Why Not Use Curator/Zookeeper as a Service Registry?

A Gotcha When Using ZooKeeper Ephemeral Nodes

Introducing Curator – The Netflix ZooKeeper Library

Offline Parameter Estimation

As part of my summer internship at Knewton, I helped design and implement an offline parameter estimation batch. This blog post discusses some of the design considerations and challenges associated with the implementation of offline parameter learning. But first, I’ll provide a little context around the model and an overview of the parameter estimation process.

Vocabulary and Context

  • A student event is any interaction a student has with educational content that is recorded in Knewton’s data store (e.g., watching an instructional video or answering a quiz question).
    • When an event consists of a student responding to an assessment question, it is referred to as an item interaction. In this case, the item refers to the actual question.
    • Items are responsible for assessing one or more concepts (e.g., long division). The content for a particular book consists of a collection of concepts.
    • A graph structure identifies the relationship between concepts (e.g., a concept might be a prerequisite for another concept). The graph structure also identifies which concept(s) a particular item assesses. Therefore, a single graph identifies the relationships for all items and concepts contained in the book used for a course.
    • By understanding the characteristics of each item (e.g., its level of difficulty) as well as a student’s current level of proficiency in a particular concept, Knewton is able to provide a recommendation on what piece of educational content the student should interact with next in order to meet the learning objectives of the course. This explanation glosses over a number of other factors taken into consideration by Knewton’s recommendation engine, but suffice to say, the recommendation relies on an understanding of item characteristics and student proficiency.
    • Item characteristics and student proficiency are modeled using Item Response Theory (IRT) as discussed in a previous blog post.

Parameter Estimation Overview

Once the initial model components are identified based on experimentation and evaluation by data science teams, Knewton has to assess how the parameters for this model will be estimated over time to ensure robustness and accuracy as more data are collected. The team can choose to train parameters in an ad hoc mode, a real time online mode, a recurring offline mode, or some hybrid.

Ad hoc

In this mode, the data scientist(s) might derive parameters outside of the operational functions of the product based on their analysis. Parameters are configured into the product based on this analysis in an ad hoc manner based on additional experiments/analysis by the data scientist(s).

The key operational drawback of this approach is the lack of a systematic and real-time response to new student events. Parameter updates have to wait until the next time the data scientist(s) choose to gather and incorporate new student events into a parameter estimation run. In addition, without a standardized and repeatable process in place, different data scientists may approach the parameter updates in a slightly different way based on their familiarity with the problem. This can pose a challenge in maintaining quality consistency and reproducibility.

The benefit of this approach, however, is that it has few constraints imposed upon it, and so is incredibly flexible. Data scientists are able to tweak the training process as they see fit.

Offline

This mode refers to a recurring operational process by which parameters are regularly re-estimated as part of some (semi-)automated process. At Knewton, this is achieved through scheduled batch jobs that automate the gathering of training data and execute the parameter learning.

While this limits the flexibility afforded to the parameter estimation method (since the logic has to be implemented in an automated batch), it improves robustness by simplifying the process by which model parameters can be updated in response to changes in the underlying data. This process still does not provide real-time parameter updates, but, at the time of execution, it can incorporate all available data into its parameter estimates.

Online

This mode refers to the integration of the parameter update process into the online behavior of the product. At Knewton, this means that each incoming student event can trigger a parameter update. This is necessary because each additional student interaction provides insight into the current proficiency level attained by the student. An application of this approach outside of Knewton involves having to cluster news articles into related topic areas. Since new articles are constantly coming in over a newswire, there is a need to associate and update the clusters to reflect the update corpus of news article [1].

The key operational challenge of online estimation is that data generally have to be processed incrementally. Multiple scans of the full data set are not always feasible. In addition, real-time response rate requirements limit the amount of computation that can take place for each incoming data point (particularly if the number of incoming events is high) [2]. As an example in the context of news articles and topic models, Banerjee et al. employ algorithms which hold the “number of clusters” constant and only update the cluster parameters for the topic area which appears to be most closely aligned with an incoming news article [1].

While these techniques are effective in providing real-time response rates, the accuracy of model predictions may suffer over time if there are underlying changes to the model parameters that were held constant. In the case of news topic clusters, the study finds that, in general, “online algorithms give worse nMI (a measure of cluster quality) results than corresponding batch algorithms, which is expected since the online algorithms can only update the cluster statistics incrementally” [1]. In Knewton’s case, new data might suggest that item parameters need to be tweaked from the default settings with which they were originally configured.

Hybrid

The authors of the news topic clustering study — like Knewton — propose the use of a hybrid model where offline parameter estimation is used to regularly refresh model parameters that are otherwise being updated incrementally through the online parameter update [1]. This provides the ability for models parameters to react in real time to incoming data points, while systematically refreshing parameters that might otherwise become stale as a result of only making incremental online parameter adjustments.

The hybrid approach does pose the challenge of having to identify an optimal frequency at which offline parameter estimation should be carried out to augment the performance of the online parameter updates. The frequency is likely to be dependent on how quickly errors accumulate due to online incremental updates and the tolerance range for such prediction errors.

Design Considerations for Offline Parameter Estimation

The remainder of this post discusses the offline batch process that was to be introduced in support of the hybrid parameter estimation approach. As such, the objectives of this batch job were to:

  • Extract item interactions from the student events recorded by Knewton;
  • Parse them into training data for a parameter learning algorithm for the IRT model; and
  • Estimate new item parameters grouped by book. By grouping items by the book, the estimation process can use relationships between different concepts (e.g., if mastery of concept 1 is a prerequisite for mastery of concept 2) to inform parameter selection for items assessing each of those concepts.

There were four major design considerations which went into achieving these objectives: robustness, flexibility, scale, and transparency.

Robustness

One of the main considerations was to design an offline parameter estimation job which would be able to accommodate future changes in the estimation process as well as the underlying data. As a result, there was heavy use of modularized design to decouple various components of the batch process:

    • Rather than directly retrieve student events from the database, the batch relies on an intermediary batch job that retrieves student events from the database and parses them into an easily consumable input format.
    • Finally, a separate library is defined that accepts a training data set as input and uses it to perform the actual parameter estimation.

This allows the batch process to gather a training dataset and pass it to a machine learning algorithm for parameter estimation while being agnostic of the underlying database structure, graph structure, and parameter estimation algorithm details.

Flexibility

Though the batch job is itself written in Java, the library responsible for performing parameter estimation is written in Python in order to provide the flexibility of leveraging mathematical packages such as pandas and numpy. To facilitate this, the batch job accommodates python code execution as a post-Reduce calculation step using a custom MapReduce framework (which uses Thrift to communicate between the Java and Python components).

The library used to perform the actual parameter estimation is the same library which the data science teams utilize for their experimental analysis. This eliminates the need to generate duplicate code going from exploratory analysis to offline parameter estimation — particularly as the parameter estimation techniques are tweaked. In addition, the batch is able to support input parameters corresponding to the full range of optimization settings used by the data science teams to tweak the parameter learning process.

Scale

The biggest challenge was dealing with the amount of data involved in the parameter estimation step. Knewton records over 500 million student interactions each month. Using a MapReduce design allowed the extraction and parsing of item interactions from student events to be achieved by distributing the student event records across multiple Hadoop partitions. Using a 15 node cluster of R3 AWS instances, the batch was able parse student events for a given month within an hour. The R3 instances are optimized for in-memory analytics, providing 61 GiB of memory and 160 GB of SSD storage.

Even after the batch had extracted the item interactions from the student event records, this still left between one and five million training data instances per book for the estimation step. Such large data sets place a heavy load on memory during the iterative parameter estimation process. While this issue was addressed by using AWS instances optimized for memory, another option would be to sub-sample down to manageable levels as the amount of data increases over time. The other alternative would be to modify the learning algorithm to avoid storing all 5 million training data instances in memory during the update calculations. However, devising a special algorithm for this may limit the long-term flexibility of the parameter estimation process.

Transparency

The batch job uses Counters and logging to provide transparency into the parameter estimation steps. The logging is facilitated by the parameter estimation library which enables the user to see parameter estimation statistics (like log-likelihood values) at the end of each iteration in the algorithm. This was designed so that one can look inside the parameter estimation process and evaluate discrepancies or optimization opportunities.

Parameter Validations

Finally, a systematic QA step is an important feature for any automated offline parameter estimation. Knewton exercises a number of manual validation steps ranging from quantitative analysis (e.g., running historical events through the model to evaluate predictive power) to qualitative analysis (e.g., reviewing recommendations produced with the parameter estimates). In the future, some of these QA steps could be incorporated into the batch process to provide fully automated offline parameter estimation.

References

[1] A. Banerjee and S. Basu, “Topic Models over Text Streams: A Study of Batch and Online Unsupervised Learning,” Society for Industrial and Applied Mathematics, pp. 431-436, 2007.
[2] S. Zhong, “Efficient streaming text clustering,” Neural Networks, vol. 18, no. 5-6, pp. 790-798, 2005.

Data Visualizations for Model Validation

In order to provide students with sensible, targeted recommendations, the Knewton platform uses a variety of statistical models to track and interpret student progress. These models regularly undergo improvements designed to progressively increase our understanding of student behavior. One tool for assessing the accuracy of these improvements is a model dashboard a visual display of model fitness metrics that allows a reviewer to interpret a model’s ability to capture and explain diverse student behaviors within the Knewton platform.

Model dashboards are primarily useful for analyzing performance on student behavior observed after the initial model release. Models are rigorously validated during the development phase, wherein data scientists clearly state assumptions made by the model and test how well it explains held-out data collected from student interactions. However, as Knewton launches new adaptive integrations, the population of students, and therefore student behaviors, diversifies and drives the need for further monitoring of model performance. A dashboard that displays quantitative metrics on the model’s function can help pinpoint when and how the model deviates from expectations, providing empirical motivation and direction for adjustments. By monitoring the models during the cycle of modification and validation, data scientists discover ways to make improvements responsibly and scientifically.

Since proficiency estimation is a fundamental pillar of Knewton recommendations, it was the first model that I instrumented with a dashboard. Via the proficiency model, Knewton computes a quantitative measure of a student’s proficiencies on concepts for every recommendation, which can then be used to predict the probability of a student getting an item (e.g., a question) correct. The system then fuses this proficiency data with several other types of inferences to develop a comprehensive understanding of a student’s academic abilities and thereby make personalized recommendations.

Choosing Metrics

One of the most fundamental requirements for the proficiency model is the ability to successfully predict student responses. There are several ways to measure accuracy. Two examples of relevant questions are, “How far off were the predictions from the observations?” and “How well do our predictions discriminate correct from incorrect responses?” The metrics I chose to represent for the dashboard’s initial presentation attempt to address a few of these different nuances.

For each item that Knewton recommends, we compute a response probability (RP) representing our prediction of whether the student will respond correctly to the item given her estimated proficiency in each of its related concepts. For example, a response probability of 0.7 would predict a 70% chance that a student would answer the question correctly. To explore prediction accuracy from a variety of perspectives, I implemented three separate accuracy metrics for the first iteration of the proficiency dashboard.

For the first metric, I used a very rough measure of how close the predictions were to the observed responses by computing a mean squared error (MSE) between the correctness values and the prediction probabilities. While a low MSE is a strong indicator of accurate predictions, a large MSE is not as easily interpreted. Another metric is needed to characterize this case.

It may be that although the RPs are not near 0 or 1, by defining a threshold of, say, .5, the RPs do a reasonable job at separating correct responses from incorrect responses in that most RPs greater than 0.5 pair with a correct response, and vice versa. It would not be very informative, though, to choose a single threshold, since for different choices of thresholds there will often be a tradeoff between correct and incorrect categorizations. In order to easily visualize this tradeoff, I computed a receiver operator characteristic (ROC) for the second metric. A ROC plots the true positive rate (TPR) against the false positive rate (FPR) for a sliding threshold from 0 to 1. More specifically and using the following data, known as a confusion matrix:

Incorrect response Correct response
RP > threshold FP TP
RP < threshold TN FN

 

the TPR (TP/(TP+FN)) is plotted against the FPR (FP/(FP+TN)). As the probability threshold decreases from 1, a highly predictive model will increase the TPR more rapidly than the FPR, while a completely random prediction would increase them both roughly equally. So, the ROC curve provides an easy interpretation at a glance a strong deviation from a 45-degree line shows that the RPs are good at separating correct from incorrect responses.

For the last metric, I computed the empirical distribution of probabilities for the correct and incorrect responses, displayed as a histogram of RPs. Data scientists examine the asymmetry of RPs over correct versus incorrect responses to validate prior assumptions made about student proficiencies in the model. In addition, outliers in this distribution (such as a peak around high RPs for incorrect responses) may not be apparent in the previous two metrics, yet may reflect subsets of students whose behaviors do not adhere to model assumptions and ought to be investigated.

Technical Implementation

These metrics provide a graded set of interpretations of the model’s accuracy. In order to implement the dashboard, data required by each metric must be transformed over a series of phases from the live service to their final presentable form. The proficiency model runs as a component of the Recommendation Service, which runs on an AWS EC2 instance. The model running within the service takes in student responses and updates student proficiencies online. For each batch of responses, metadata are logged in a format containing RPs and item correctness. These logs are swept on a periodic basis to an S3 bucket, providing a persistent, canonical record of model state at the time that we actually served a student.

A workflow executes periodically to refresh the metrics on display. Steps include ETL scripts for loading the logs into a Redshift RDS cluster, parsing the logs, transforming them into the metrics, and publishing the results to the dashboard. For displaying these metrics, Knewton currently uses Looker, which provides a sophisticated graphical visualization for the metrics stored in the database.

Looking Forward

The dashboard currently implements the three metrics outlined above, yet there are many other important validation criteria for the proficiency model that will benefit from similar automation and reporting. Iterating on and augmenting the metrics will provide a fresher and more holistic snapshot of model behavior that will continue to expedite how Knewton data scientists track model performance day-to-day. The dashboard that I delivered provides our data scientists with actionable feedback to ensure that our models serve diverse student needs with a powerful adaptive education experience.

Making Lives Easier with Knewton Crab Stacker

In a previous post, we discussed Knewton’s in-house deployment tool, Knewton Crab Stacker (KCS). To summarize, KCS is a command line tool used to make deployment of CloudFormation (one of Amazon’s services) easier. It saves our engineers from banging their heads against their desks when trying to deploy their services.

So what exactly makes KCS such a valuable, can’t-live-without-it tool? In this post, we’ll take a look at some of the many KCS commands that make the lives of a Knewton engineer easier.

SSH

Normally, when you want to ssh into an EC2 instance, you have to go through a long and arduous process to find the instance’s public DNS name, then locate your own ssh key for that instance, and then finally type out the command that lets you ssh into the instance. You have to do this every single time you want to ssh. As you may imagine, this gets annoying fast.

To make this whole process simpler, we have a KCS command that does everything for you. All you have to do is specify which stack and target environment you’re trying to ssh into, and then KCS will take care of the rest. It will find the public DNS of the instance, find your ssh key, and finally ssh-es into the box for you. Besides being a huge time saver, my favorite part about this command is that it adds colors to the instance’s terminal. Colors make everything better.

BrianZeng1.jpg

Kick

Often while working on a service, we will make modifications to the instance (which we get into by using the awesome KCS ssh command). But when you make modifications, inevitably something gets messed up. No one wants their instance to be messed up, so you have to restart it. This usually involves relaunching the stack the instance is a part of, and twiddling your thumbs while you wait.

Here at Knewton, we like to save time, so we created a command that allows us to essentially restart our instance. We call this command kick.

Underneath the hood, kick gets the address of the instance we want to kick, ssh-es into the instance, and re-runs cfn-init (the command that is first run when the instance is created). This re-downloads the needed resources and configures everything you need using Chef. After kicking an instance, the instance is essentially brand new and ready for more tinkering.

Roundhouse Kick

A very common scenario that an engineer comes across is when he’s made a change to a service, has finished testing it locally, and then wants to test it on a real stack. To do this using just CloudFormation, we would have to first upload our new build to S3, then update our stack to use the new build. Updating a stack takes quite a bit of time, anywhere from a couple to ten-plus minutes. That’s a lot of waiting around.

That’s why we invented roundhouse kick. Roundhouse kick does everything you need to update the version of your service without having to relaunch your stack.

Here’s how it works: first, it will upload your build to S3. Next, it will do what we call an in-place update of the stack. Instead of launching new instances as a regular update would do, an in-place update just updates the existing instances. The time saved with the in-place update makes up the majority of the time saved. After updating the stack, KCS will then kick all the instances of the stack, which, in effect, restarts the stack and grabs the new version of the service you uploaded earlier. We like to think we made Chuck Norris proud with roundhouse kick.

“Chuck Norris can upload his new build to S3, update his stack, and kick his stack all at once.”

Grab logs and Failure logs

Sometimes you roundhouse kick your stack too hard and it stops working (there’s no such thing as a soft roundhouse kick). To find out what’s wrong, you have to ssh into the instance and check the logs. But there are many logs. And you’ll probably have forgotten where all of these logs are located.

Don’t worry — KCS has got you covered.

With a simple command, you can get all of the logs from your instance in a nicely bundled tarball. To do this, KCS knows the location of your logs thanks to some coordination with the Chef recipes that set up the logging system. After determining these locations, KCS will then perform an scp command with all the needed arguments to retrieve all the files. Now you can find out why your stack couldn’t handle the roundhouse kick.

What’s Next for KCS?

Even with all the cool commands that KCS has, there’s always room for improvement. People want KCS to run faster, have more features, and be invincible to bugs. When there’s a bug in a new release of KCS (which are unfortunately inevitable), the deployment team gets bombarded with complaints from disgruntled KCS users. We then work to fix everything, and try to get a new release of KCS out. But even when we do release a new KCS, not everyone remembers to upgrade their version and we continue to get complaints. We ask them to check their version and then we find out they aren’t on the latest version. An upgrade then fixes the issue. This is annoying and unnecessary for both KCS users and the deployment team.

To solve this problem, we created KCSServer — the website version of KCS, which has been my baby during my summer internship. Since KCSServer is a website, we don’t have to worry about people having different versions of KCS. We can very easily make changes to KCSServer without having to worry about getting people to install the latest version.

Migrating KCS to a website also provides many other benefits. One of the main issues we wanted to address was the speed of KCS. As a command line tool, KCS is pretty slow. For a command (such as describing a stack), KCS has to make a call to Amazon after determining all the proper credentials, and then once it retrieves the information, it has to output everything in a readable format for the user. With KCSServer, we can make this command much faster by utilizing a cache. A command has to be run once. Then, for all other times the command is run, KCSServer can just retrieve the output from the cache (of course, we update the cache as needed). This reduces the latency of a command from a couple of seconds to milliseconds. Considering that our rapidly-growing team of engineers uses KCS a lot, these seconds saved will quickly become hours, then days of developer time saved.. Another added benefit? With some CSS, we can make KCSServer look a whole lot more pleasant to look at than the dull terminal.

What’s the Take-Away?

Hopefully after reading about how we at Knewton use KCS to maximize our efficiency, you’ll start thinking more about how to eliminate inefficiencies in your own deployment process, or any process for that matter. Hopefully you’ll start asking yourself, “What’s slowing me down at doing my job?” and “What can I do about it?” Then you can go out there and create your own version of KCS. Don’t forget to give it an awesome name.

Online Cross-Validation to Predict Future Student Behavior

At Knewton we use various mathematical models to understand how students learn. When building such models we want to make sure they generalize, or perform well for a large population of students. Cross-validation, a technique in machine learning, is a way to assess the predictive performance of a mathematical model. At Knewton, we use cross-validation extensively to test our models.

Cross-validation is based on the fact that we don’t have access to unlimited data. If we had all the possible data on student learning patterns, the solution would be straightforward. We would test all our models with the data and pick the one with the lowest error rate. In reality, we only have a finite set of student data to work with. Given a limited amount of data, how do we decide which model performs the best?

One approach is to use all of the available data to test our model. A major problem with this approach is overfitting, which is demonstrated in Figure 1.

Gizem1.jpg

Figure 1: Left: the model (blue) underfits the data (orange). This is an over-simplistic explanation of the data where the model would be a better fit if it had more parameters. Middle: the model fits the data just right, where the model captures the overall pattern in the data well. Right: the model overfits the data, where the model fits the noise in the dataset. (Source)

If our model overfits the data, the error rate will be low but if new data is added to the dataset, the model might perform poorly as the fit doesn’t explain the new data well. This is why models that overfit do not generalize well and should be avoided.

This is where cross-validation comes into play. In this approach, rather than fitting the model to the full dataset we split it into training and test sets. This is also referred to as holdout cross-validation, as we are leaving a portion of the data out for testing. The model is fitted using only the training portion of the dataset. Then we assess the predictive performance of the model on the left-out data, which is the test set.

As an example, one model we use to assess student learning is Item Response Theory (IRT). We want to cross-validate our IRT model for a set of student responses to test the performance of our model. To do this, we can split the student response data into training and test sets, fit the model to the training data, and validate it on the test data. If the fitted model predicts the student responses in the test set accurately we can accept this IRT model.

When measuring how students learn, we assume they learn over time. Therefore, it is useful to understand how students behave as time progresses. A shortcoming of the holdout cross-validation technique is that it makes comparisons between random bits of past student data so it can’t make predictions about how students will behave in the future. It would be very useful if we were able to make predictions about students’ future behavior given their past learning patterns.

Online cross-validation is a version of cross-validation which can validate over time series data. Going back to our student response data example, online cross-validation uses a student’s past data to predict how that student will behave in the future. The dataset for online cross-validation is a time-ordered set of responses the student gave in the past. We take the first k responses of a student and use them for the training set, then we try to predict that student’s k+1st, k+2nd, …, k+nth response. If our prediction accuracy is high, we can say that our model is a good fit for our dataset.

Let’s look at how online cross-validation works in more detail. The students answer some questions over time. Some of these responses are correct (green) and some are incorrect (red). Online cross-validation will start by training on the student’s first response only (k=1), then use this to predict whether the student is going to get the next item (k+1 = 2) correct or incorrect.

Gizem2.jpg

Figure 2: The first iteration of online cross-validation. The dots represent whether a student got a question correct (green) or incorrect (red). The model is fitted using the first response (k=1) and then used to predict the second, k+1st item (k+1=2). If our prediction matches the student response, our model accuracy increases. 0/1 refers to incorrect/correct.

In the next iteration of online cross-validation, we can use the first two responses (k=2) as our training set, fit the model using these two data points, and predict the third response (k+1=3).

Gizem3.jpg

Figure 3: The second iteration of online cross-validation. The dots represent whether a student got a question correct (green) or incorrect (red). The model is fitted using the first two responses (k=2) and then used to predict the third, k+1st item (k+1=3). 0/1 refers to incorrect/correct.

Online cross-validation continues until we run through all the iterations by increasing the training set one student response at a time. We expect to make better predictions as we add more data to our training set.

With online cross-validation, we are not limited to predicting only the next response in the future. We can predict a student’s next 2, 3, …, n responses. This makes online cross-validation a very useful technique if we want to make predictions far in the future.

Both holdout cross-validation and online cross-validation are very useful methods to assess the performance of models. Holdout cross-validation method is useful in assessing performance if we have a static dataset, whereas online cross-validation is helpful when we want to test a model on time series data.

 

Kankoku: A Distributed Framework for Implementing Statistical Models

As future-facing as Knewton’s adaptive learning platform may be, the concept of a personalized classroom has a surprisingly rich history. The idea has intrigued educators and philosophers for decades. In 1954, behavioral psychologist B.F. Skinner invented the concept of programmed instruction, along with a working mechanical prototype to boot. His “teaching machine” consisted of a wooden box on which questions were displayed to students on strips of paper controlled by turning knobs. One would only progress upon answering a question correctly. A crucial feature of the teaching machine was that “the items were arranged in a special sequence, so that, after completing the material in frame 1, the students were better able to tackle frame 2, and their behavior became steadily more effective as they passed from frame to frame.” The argument upon Skinner’s teaching machine was founded still holds water today: that “what is taught to a large group cannot be precisely what each student is ready just at that moment to learn.”1

KONICA MINOLTA DIGITAL CAMERA

Sixty years later, examining Skinner’s prototype still provides an insightful frame of reference. Knewton’s platform is responsible for tracking the individual learning states of each student at the granularity of individual concepts and questions. Like the teaching machine, we must deliver relevant recommendations in real-time and classroom analytics in near real-time. Those recommendations and analytics serve as a tool for both students and teachers to improve student outcomes. Considerations like these influence the engineering decisions we make on a daily basis, including the decision to use a stream-processing framework to power several of our statistical models. In this blog post, we will open the hood of our own teaching machine to explore the tradeoffs behind the design of Knewton’s scientific computing platform.

Why Stream Processing?

Knewton’s recommendation engine faces the task of providing recommendations to millions of students in real-time. As one of the pioneers of behaviorism, Skinner certainly understood the importance of delivering the right feedback at the right time.2 Respond to a student event (e.g., finishing an article) just two minutes late, and the impact of a recommendation diminishes rapidly. But what goes into each recommendation under the hood? A recommendation is essentially a ranked selection of instructional content that is most relevant to the subject matter that a student is studying at any particular time. Every student’s learning history (the data representing their interactions with content and their activity on the system) is taken into account. Knewton’s recommendation engine also considers other factors, such as each student’s learning goals and deadlines. All of this data is processed through a variety of psychometric and statistical models that estimate various characteristics of students (e.g., their proficiency or engagement level) and content (e.g., its difficulty or effectiveness). While some of these computations can be performed ahead of time, there are still numerous models that must be computed on the spot in response to a student interaction.3 Combining and processing all of this data results in a very large sequence of actions that must be performed in a small period of time.

Knewton is much more than just a differentiated learning app. Imagine if Skinner’s teaching machine knew every student’s individual learning history, knowledge state, habits, strengths, and upcoming goals, and could take into account goals set by teachers or administrators.

To handle all this data, Knewton has built Kankoku4, a stream processing framework that can respond to individual events in real-time.5 Stream processing systems operate under the requirement that inputs must be processed “straight-through” — that is, real-time feeds must trigger a set of downstream outputs without necessarily having to resort to polling or any intermediate storage. Stream processing systems are also characterized by their support of real-time querying, fault-tolerance, and ability to scale horizontally.6 The primary complement to stream processing is batch processing, consisting of programming models such as MapReduce that execute groups of events scheduled as jobs. Batch computing is fantastic for efficiently performing heavy computations that don’t require immediate response times.

However, these advantages of batch processing are also what make it less suitable for responsive, high availability systems like Knewton’s.7

Kankoku

Kankoku is a scientific computing Java framework developed in-house that provides a programming model for developing decoupled scientific models that can be composed to create any kind of computable result. The framework aims to abstract away the details of retrieving and storing data from databases, reliability, scalability, and data durability, letting model writers concentrate on creating accurate and efficient models. In the example workflow below, the nodes (or Kankokulators, as we call them) represent individual (or sets of) calculations. Streams are fed into Kankoku from a queue, which serves as a message broker by publishing received student events into various topics to which Kankoku subscribes.

Kankoku.jpg

With this framework, complex multi-stage computations can be expressed as networks of smaller, self-contained calculations. This style of programming is especially well-suited for data analysis where the outputs of an arbitrary statistical model could be used as inputs to another. One example of this could be aggregating student psychometrics as inputs for modeling student ability using Item Response Theory (IRT).

Speed and horizontal scalability are also important in developing a stream processing framework for real-time events. One of the many ways Knewton achieves horizontal scalability is by partitioning the input data stream using a partitioning key in the queue.8

“Kankoku” Means “Recommendation”

Similar to how Skinner’s teaching machine immediately responds to individual inputs, Kankoku streamlines responsive event processing for arbitrary, unbounded data streams. Both serve a complex need — providing personalized learning recommendations — yet have internal mechanisms that are easily decomposable, and execution that is reproducible.

But Kankoku is very different from the teaching machine. The software it powers is capable of understanding and analyzing the learning mechanisms of millions of students. Ensuring that Knewton doesn’t sacrifice quality to meet the demands of quantity or speed is a top priority. To meet these ends, we are continually revising and extending our models to run more efficiently while delivering better results. Kankoku’s design is a strength here. Not only does it help Knewton break down a complex task into smaller pieces, it also makes it simpler to understand and tweak each component. Monitoring these models requires complex visibility tools that allow Knewton to examine intermediate computation in real-time. Kankoku is less like one teaching machine than it is hundreds of small machines working together in concert.

So What?

In his exposition “Programming Instruction Revisited,” Skinner spoke of his dream of creating technology that would help classrooms evolve beyond the “phalanx formation” by helping teachers become even more attuned to every student’s individual needs. As history has shown us, implementing such technology at scale is an extremely difficult problem. Truly understanding student needs and providing feedback in real-time is a non-trivial challenge for any person, much less a computer program. Practical machine learning and “artificial intelligence” is in many ways a systems engineering challenge — building models that can handle real-time workloads at scale is crucial to creating a service that will actually be useful to students and teachers. Well-designed systems will never replace teaching, but they can provide an automated, responsive, and unified platform to expose insights about student learning to teachers and parents around the world, who do understand how to best act on those insights.

Teachingmachine.jpg

Acknowledgements

I’d like to thank the creators of Kankoku — Nikos Michalakis, Ferdi Adeputra, Jordan Lewis, Erion Hasanbelliu, Rafi Shamim, Renee Revis, Paul Kernfeld, Brandon Reiss, George Davis, and Kevin Wilson — for their tireless work as well as letting me play with such an awesome piece of technology. Stay tuned for part 2 of this blog post for more details on my internship project (extending the Kankoku framework with Apache Storm).


  1. B.F. Skinner. Programming Instruction Revisited

  2. Knewton is not preaching or practicing behaviorism. This is only meant to be an analogy. 

  3. http://en.wikipedia.org/wiki/Online_algorithm 

  4. Kankoku means “advice” or “recommendation” in Japanese. It also means “Korea.” 

  5. In addition to powering Knewton’s recommendation engine, stream processing suits a variety of applications, ranging from powering Google Trends to supporting fraud detection and “ubiquitous computing” systems built on cheap micro-sensor technology that demand high-volume and low-latency requirements. Other applications include powering bank transactions (which require exactly-once delivery), image processing for Google Street View, and command-and-control in military environments. See: Akidau, et al. MillWheel: Fault-Tolerant Stream Processing at Internet Scale

  6. Stonebraker, et al. The 8 Requirements of Real-Time Stream Processing

  7. Frameworks such as the Lambda Architecture exist that unite both programming models. There is also technically a gray zone between batch and streaming processing frameworks – for instance, Spark Streaming processes events in microbatches. Some of our models can’t be implemented with microbatching, but it is an interesting idea worth exploring. 

  8. Alternative terminology for “grouping”: sharding, shuffling. 

Kankoku: A Distributed Framework For Implementing Statistical Models (Part 2)

The focus of my internship project this summer was to extend Kankoku (Knewton’s scientific computing framework) to operate in a more distributed fashion. There are a few reasons that drove this change — namely, increased functionality in model expression and greater operational control in model execution. In this blog post I will analyze these objectives in detail and explore why they necessitated tradeoffs from both the systems engineering and business perspectives.

Kankoku is a scientific computing Java framework developed in-house at Knewton that provides a stream-processing programming model for developing decoupled scientific models that can be composed to create any kind of computable result. For a more detailed discussion on stream processing and Kankoku, see part one of this blog post.

Weathering the Storm: Introducing Distributed Kankoku

Partitioning, or dividing a set of inputs into a collection of subsets, is a key problem in any distributed system. Mod hashing and consistent hashing are examples of how shuffle groupings would be implemented, in which keys are uniformly distributed across partitions in a pseudorandom process. Kankoku currently performs a shuffle grouping before model execution, which allows workloads to be balanced across separate machine stacks that each run independent instances of the same topology.1 However, the calculation of certain psychometrics may require additional partitioning (i.e., multi-partitioning).

Recall that Knewton performs online analysis of both the student and the classroom. Consider the scenario in which the output of Kankokulator node A (calculating a student metric) serves as the input to Kankokulator node B (calculating a classroom metric). Since A processes events per student, the initial grouping must happen by student ID. However, B must process events per classroom. This presents a problem, since there is no guarantee that two students in the same class are grouped to the same partition. A simple solution might be to route the output of A through a queue serving as an intermediate message broker. This queue can then regroup the data stream based on class ID:

Kankokulator.jpg

However, this approach scales poorly for several reasons. Creating new queue shards to track each new multi-partition can be difficult to maintain from a development standpoint. Rerouting the data stream to an intermediate broker with every grouping also introduces extra overhead and network latency. There is also no guarantee that the models execute deterministically. Previously, each instantiation of a Kankoku topology ran on its own machine, processing each input in a topologically-ordered fashion. With intermediate queues, keys may be processed out of order due to varying latency. A more general-purpose solution is preferable.

This is where the Apache Storm framework (originally developed by Twitter) comes in as a possible candidate. Like Kankoku, Storm is a general stream-processing framework, but with one crucial design difference: it is strongly distributed, in that nodes in the same topology need not run sequentially on the same machine. As a result, Storm supports the ability to perform arbitrary groupings between each node, and multiple groupings within the same topology.2 Nodes in a Storm topology are referred to as bolts, and data sources are referred to as spouts.

Using Storm’s Trident API, declaring a new grouping within the topology is as simple as calling the function partitionBy. The example below shows how our hypothetical scenario above might be implemented using Storm instead of rerouting through a queue:

tridenttopology.jpg

Kankoku can therefore be extended by “wrapping” subtopologies (individual Kankokulators or groups of Kankokulators) within Storm bolts. Bolts will encompass contiguous Kankokulators expecting data streams partitioned by a common key type, and a new bolt will be created whenever an additional partitioning operation is required. This interaction introduces the functionality of multi-partitioning while still preserving our original model execution; bolts do not define how data is managed and arbitrary Kankokulator code can still run within a bolt. Hence, in this architecture Kankoku provides a higher-level programming model built upon Storm.

Another use case for this particular design arises from Storm’s convenient parallelism hint feature. Parallelism hints are the initial number of executor threads allocated to a particular bolt, which can be rebalanced during runtime. Tuning the parallelism hint of bolts gives us additional operational control over executing topologies by weighting CPU resources differently for separate subtopologies. Therefore, subtopologies that we expect to be more computationally expensive can be allocated more processing power, which in turn helps increase throughput.

Queue.jpg

The topology above shows how a Storm-Kankoku topology might be represented. Within each bolt, the Kankoku subtopology will run deterministically so as to take advantage of data locality. Hence, it is advantageous to wrap as many Kankokulators as possible within each given bolt while still fitting the constraints imposed by weighted parallelism and multi-partitioning.

Tradeoffs of Operating In A Distributed Environment

My internship project this summer consisted of implementing a prototype of the integrated Storm-Kankoku framework similar to the sample topology displayed above in addition to examining the tradeoffs behind extending the Kankoku framework using Storm. Introducing added parallelism at a platform level can have sweeping effects on the behavior of our statistical models, affecting both integrity and performance. A few considerations we explored:

A) Bolt-level deterministic execution. Although Storm may not be able to recover the state of execution within an individual bolt if it fails, Storm’s “Transactional Topologies” guarantee that “multiple batches can be processed in parallel, but commits are guaranteed to be ordered.” Hence, topological ordering still applies and we expect reproducible execution.

B) Fault-tolerance. Storm provides fault tolerance with clear guarantees across bolt execution and state-saving operations (either exactly-once or at-least-once delivery). By assigning a monotonically increasing transaction ID to each commit of events, Storm provides the semantics needed to detect and filter out duplicate events replayed by Storm in the event of a failure. Fault tolerance is especially important when the outputs of Kankokulator nodes are saved or transmitted during execution — without Storm’s guarantees, events might be lost or duplicated.

C) Horizontal Scalability. Any implementation must take care to increase throughput gains without decreasing latency. One possible performance pitfall faced in a distributed environment is the added latency introduced by redundant computations that must be computed by each bolt (such as loading the Knewton knowledge graph). This could potentially be solved by an off-node cache such as ElastiCache at the cost of introducing additional complexity. In general, careful load testing must be performed to determine the ideal method of data processing — whether to pass values across the wire or to utilize intermediate checkpointing and storage structures.

As expected, many of these tradeoffs don’t point to a single right answer. For instance, depending on the scenario Knewton might leverage Storm’s exactly-once functionality at the expense of introducing more latency. In situations like these, it becomes less a question of which approach to take and more so a question of how important each requirement is. How important is it to filter out duplicate events? What is the cost of producing a recommendation that is stale, possibly by just a few seconds? How important is it for Knewton to keep its latency as low as possible? These questions strike at the heart of both Knewton’s internal systems design and its core business value-add, and encapsulate much of what made my internship intellectually engaging and rewarding.

Sources


  1. By topology, we mean a directed acyclic graph (DAG) that defines the workflow of calculations. 

  2. Storm implements the ability to partition by using an abstraction called an Active Distributed Hash Table (Active DHT). Active DHTs extend distributed hash tables to allow an arbitrary user defined function (UDF) to be executed on a key-value pair. Source: A. Goel, Algorithms for Distributed Stream Processing

Make Your Test Suite Accessible

What does it mean for something as complex and dynamic as a platform to be “well-tested”? The answer goes beyond simple functional coverage.

Testing has been my specialty through much of my 14 years of experience in software. If there is one thing I’ve learned about testing, it is that tests can, and should, do more than just test. Tests can be used to communicate and collaborate. Tests can also be used to discover what your product is, as well as what it should be. At their best, tests can be the frame of reference that anchors a team and solidifies team goals into verifiable milestones.

Testing the platform

The Knewton platform is composed of many component services. Each of those services is developed by a dedicated team, and each service is tested on its own with the standard unit, integration, and performance tests. This article is not really about those tests but about how we test the platform as a whole.

The Knewton platform uses data to continuously personalize the delivery of online learning content for individual students. The platform determines student proficiencies at extremely detailed levels, provides activity recommendations, and generates analytics. To do all this, our platform must be fast, scalable, and reliable. Our team must be skilled at grappling with intricate technical problems, while maintaining high-level perspective and focus on the greater system. Testing is part of how we maintain this dual perspective.

Accessibility

Accessibility is the most important criteria we build into our tests to help us achieve the above goals.

In the context of a full-stack test suite, accessibility to me means at least the following:

– Anyone can run the tests
– Anyone can read the test report and analyze test failures
– Anyone can read, change, extend, or otherwise interact with the test definitions

Making tests accessible and promoting those accessible tests can be a tough cultural challenge as well as a tough technical challenge. But the cost of failing at this is high. The more isolated your test suite (and the engineers who create and execute it) are, the less value you will derive from it. Your tests will not reflect involvement from the greater organization, and more importantly, the information your tests generate will not be as widely disseminated throughout the organization as they could be.

So how is a test suite made “accessible”?

Anyone can run the tests

The best thing you can do with a test suite is get it running in your continuous integration server. At Knewton we use Jenkins as our CI server. Anyone in our organization can use Jenkins to invoke the tests against any testing environment, at any time, without any special setup on their computer whatsoever.

Additionally, the test code is in our Git repository, and everyone is encouraged to check it out and invoke the tests in very flexible ways. Developers have the option of running a single test, a set of related tests, tests that correlate with a given JIRA ticket, or other options. Developers can run the tests against a local development environment, or a deployed environment. A test suite that can be run in flexible ways is an important part of accessibility.

Anyone can read the test report

Our test suite produces several kinds of test reports. The report I enjoy showing off the most is the HTML report, which lists every test that runs and details every test that fails (this capability is built into the wonderful RSpec testing framework we use). This HTML report is archived in Jenkins with every test run, so anyone can read it for any test run right within their browser. And because the report uses plain English, it is comprehensible by anyone who is familiar with our platform’s features, developers or not.

Here is what a small portion of our HTML test report looks like, showing both passing and failing tests:

What may or may not be obvious here is that tests are really about information. When I test a piece of software, my product is actionable information. When I make an automated test suite, my product is an information generator. Building a generator of information is one of the more valuable and interesting bits of work a QA engineer can do; here at Knewton, we encourage this mentality.

Anyone can change the tests

First and foremost, my job at Knewton is to enable tests to take place easily. Secondly, my job is to assist and initiate the creation of actual tests. Here at Knewton, it’s great for me to see the testing framework I created be picked up by developers, changed, extended and generally used. While we do formal code reviews on the tests, we try to make that process very efficient in order to ensure that there are very low barriers for anyone who creates a platform test.

What does accessibility get you?

Here are just a few of the ways that an accessible test suite brings value to an organization:

-Raising awareness of the behaviors of the system and the interactions between various components in the system throughout the entire organization.

-Eliminating bottlenecks when releasing: got the code deployed and need to run the tests? Just go press the button.

-Enabling continuous deployment: when your tests are in your continuous integration system, it becomes easy to chain together build, deploy, and test plans into a continuous deployment scheme (we are still working on this one).

-Encouraging better tests: when non-testers are encouraged to get involved in testing, unexpected questions get asked.

More to come

Testing is a massively important part of the puzzle for Knewton as we scale our technology and our organization. We are learning more every day about how to make the best, most accessible and valuable tests we can. In a future post, I intend to share some of the technical details and tools we have been using to make our tests. In the meantime, I welcome your feedback on the ideas presented here and around testing in general.

Student Latent State Estimation with the Kalman Filter

The Kalman filter is an algorithm that estimates the values of unknown, or latent, variables from a series of noisy measurements taken over time. The Kalman filter has numerous applications in finance and different areas of engineering, and in this blog post, we will show how the Kalman filter can apply to student learning. We will use a Kalman filter to infer an approximation to a student’s ability given their response times to a set of questions.

To fully understand the concepts below, you’ll need a background in basic probability and statistics.

Model

To begin, we are going to describe our inference problem and a model of the system – a set of assumptions of how student learning works.

Given a series of scalar measurements:

Z=\{z_1,z_2,...,z_n\}

where z_i denotes the natural logarithm of the time it took our student to answer question i,

We want to infer the scalar latent variables:

X=\{x_1,x_2,...,x_n\}

where the student has ability x_i at the time question i is answered.

We model the change in student ability over time as a Gaussian random walk, meaning that the current ability value is based on the previous ability value, plus some Gaussian noise:

x_k=x_{k-1}+w_k, where w_k is drawn from N(0, \tau \sigma^2_x), where \tau is the time the student spent between questions, and \sigma^2_x is a hyperparameter that corresponds to the variance for this latent process (1).

Having the variance of the noise increase linearly with the time difference makes our equation consistent with a continuous Gaussian random walk, and is consistent with our intuition that a student’s ability needs time to change. In other words, a student is unlikely to experience significant change in ability if they’re between questions in a quiz, but if it’s been a day since they’ve answered a question, they’re more likely to have taken the time to learn more about the concept, or, conversely, they might have forgotten the material. Because the latent state variance is time-dependent, our filter is technically called a hybrid Kalman filter, since it assumes a continuous-time model for a discrete set of observations.

We don’t assume that the student ability x_k accounts for all the variability in the log of the student response time, z_k. For example, it’s possible that the student is familiar with this particular problem or is distracted by her environment. Therefore, we say that the observed times are likewise corrupted by some Gaussian noise, v_k:

z_k=x_k+v_k, where v_k is drawn from N(0, \sigma^2_z), where \sigma^2_z is a hyperparameter that corresponds to the variance of the Gaussian noise (2).


Kalman Filter blog post (v2)

The resulting model is pictured by the diagram above: the ability of the student at the previous question x_{k-1} determines the log response time z_{k-1}. The ability of the student at current question x_k is determined by the ability at the previous question x_{k-1}, and determines the current log response time, z_k.


Inference

The Kalman filter is an algorithm for estimating each x_k a posteriori — that is, it computes an estimate for each x_k given observations Z = \{z_1, z_2,...,z_k\}. It does this recursively, which means that it only needs an estimate of x_{k-1}, along with observation z_k, to output an estimate for x_k.

To make a new estimate, we first need to compute two intermediate pieces of information: a prior distribution and a likelihood distribution.

A note on syntax:

z_{1:k-1} denotes our observations z_1 through z_{k-1}, and x_{k-1|k-1} represents our estimate of ability at the k-1th question given observations z_{1:k-1}; likewise \sigma^2_{k-1|k-1} represents our estimate of the variance given observations z_{1:k-1}.

f denotes the Gaussian probability density function (pdf) with mean \mu  and variance \sigma^2:

f(x;\mu,\sigma^2) = \frac{1}{\sigma \sqrt{2 \pi}}e^{-\frac{(x-\mu)^2}{2 \sigma^2}}

Calculating our prior distribution

Our prior term represents the knowledge we have of current latent state x_k having seen everything but our current observation z_{k}.

To calculate our prior, we start off with an estimate of x_{k-1}, represented as a Gaussian distribution with mean x_{k-1|k-1} and variance \sigma^2_{k-1|k-1}:

p(x_{k-1} |z_{1:k-1}) = f(x_{k-1};x_{k-1|k-1}, \sigma^2_{k-1|k-1}) (3)

From equation 1, we see that x_k is simply the addition of two independent random Gaussian random variables, x_{k-1} and w_k.

From probability theory, we know that the probability density of the addition of two independent random variables can be expressed as a convolution of the two composite probability densities. It happens that the convolution of two Gaussians is also a Gaussian:

p(x_k|z_{1:k-1}) = \displaystyle\int p(x_{k-1}|z_{1:k-1}) p(x_k|x_{k-1}) dx_{k-1} = f(x_k; x_{k|k-1}, \sigma^2_{k|k-1}), where

x_{k|k-1}=x_{k-1|k-1} and

\sigma^2_{k|k-1}=\sigma^2_{k-1|k-1}+ \tau \sigma^2_x  (4)

We call p(x_k|z_{1:k-1}) the prior knowledge we have about x_k. The next step is to look at the current observation, z_k and see what information it adds.

Calculating our likelihood distribution

Our likelihood term represents the information our current observation z_k gives us about our latent state x_k.

From equation 2, we see that likelihood of our observation z_k given our hidden variable x_k is simply a Gaussian centered at x_k. This becomes our likelihood term:

p(z_k|x_k) = f(z_k; x_k, \sigma^2_z) (5)

Combining prior and likelihood

The Kalman filter combines the prior knowledge we have about x_k and our likelihood term in accordance with Bayes’ rule, by multiplying the prior term with the likelihood term. We call this resulting distribution the posterior, or our estimate of x_k given all the information we have.

Luckily, the multiplication of two Gaussians is still a Gaussian, although unnormalized:

p(x_k|z_{1:k}) = \frac{1}{Z} p(x_k|z_{1:k-1})*p(z_k|x_k) = f(x_k;x_{k|k}, \sigma^2_{k|k}), where Z is a normalizing constant, and where:

x_{k|k}=x_{k|k-1}+C_k(z_k-x_{k|k-1})

C_k=\frac{\sigma^2_{k|k-1}}{\sigma^2_{k|k-1} + \sigma^2_z}

\sigma^2_{k|k} = (\frac{1}{\sigma^2_{k|k-1}} + \frac{1}{\sigma^2_z})^{-1} (6)

To summarize, given a Gaussian posterior distribution for x_{k-1} (Equation 3) and a new observation z_k, the Kalman filter estimates a new Gaussian posterior for x_k (Equation 6). By updating the Kalman filter as we receive new observations, we can obtain fast, real-time estimates of our latent state.

Open Source Code

An open source implementation of this hybrid Kalman filter algorithm is on Knewton’s GitHub:

https://github.com/Knewton/Kalman

Authors

Sophie Chou is a senior at Columbia University majoring in Computer Science. She’s interested in Machine Learning, Natural Language Processing, and becoming a better teacher. You can follow her on twitter @mpetitchou.

Andersen Chen is a senior at Brown University, majoring in Mathematics and Computer Science. He’s interested in data science, startups, and machine learning.


Cassandra and Hadoop – Introducing the KassandraMRHelper

Here at Knewton we use Cassandra for storing a variety of data. Since we follow a service-oriented architecture, many of our internal services are backed by their own data store. Some of the types of data we store include student events, recommendations, course graphs, and parameters for models. Many of these services and clusters are often deployed in more than two environments, increasing the total number of Cassandra clusters deployed.

On the Analytics team at Knewton we are constantly working on improving a lot of the inferential models that go into our platform, while at the same time building new ones. This often involves munging a lot of data in short periods of time. For a lot of our ad-hoc analysis we use a data warehouse by which analysts can query and extract data relatively quickly. One of the challenges we’ve faced at Knewton — and specifically in Analytics — involved how to go about populating our data warehouse with data from Cassandra clusters that predated our data warehouse. To solve this problem, we implemented an internal library for bulk extracting data out of Cassandra into Hadoop with zero hits to the Cassandra cluster. A few months later we opened sourced it here and called it the KassandraMRHelper.

KassandraMRHelper takes a slightly different approach than the constructs contained in the Hadoop package in the Cassandra source code (e.g. AbstractColumnFamilyInputFormat), in that it doesn’t require a live Cassandra cluster to extract the data from. This allows us to re-run map-reduce jobs multiple times without worrying about any performance degradation of our production services. This means that we don’t have to accommodate more traffic for these offline analyses, which keeps costs down.

How does it work?
The KassandraMRHelper includes specialized Input Formats and Record Readers for SSTables. First, here’s a little bit about SSTables:

SSTables are immutable; once they’re written they never change.
SSTables can exist independently of each other but collectively they form the complete data set.
SSTables consist of 4-5 parts depending on which version you’re using:

There are 4 to 5 different components:

  • Data.db files store the data compressed or uncompressed depending on the configuration
  • Index.db is an index to the Data.db file.
  • Statistics.db stores various statistics about that particular SSTable (times accessed etc)
  • Filter.db is a bloomfilter that’s loaded in memory by the cassandra node that can tell it quickly whether a key is in a table or not.
  • CompressionInfo.db may or may not be there depending on whether compression is enabled for a ColumnFamily. It contains information about the compressed chunks in the Data.db file.

Data in columns and rows are essentially key value pairs, with rows as the keys and columns as values to the rows. The columns are also key value pairs consisting of a name and a value.

Given how data are stored, Cassandra is in fact a really good fit for MapReduce. The same partitioning schemes that Cassandra uses can also be used in MapReduce. Columns and rows can be the keys and values that get passed in the Mappers or Reducers in a MapReduce job.

Some key components of KassandraMRHelper are:

  • The SSTableInputFormat: The SSTableInputFormat describes how to read the SSTables and filters through all the components and ColumnFamilies, keeping only what’s necessary for processing using a custom PathFilter. There are two types of SSTableInputFormats depending on how you want to represent key/value pairs in MapReduce. The SSTableColumnInputFormat constructs an SSTableColumnRecordReader in which keys in the Mapper represent the row keys in Cassandra and values represent a single column under that row key. Similarly the SSTableRowInputFormat constructs an SSTableRowRecordReader in which keys in the Mappers are the Cassadndra row keys and values are column iterators over all the columns under a single row key.
  • The SSTableRecordReader: It internally uses an SSTableScanner and a Descriptor similar to the one contained in recent version of Cassandra but with backwards compatibility to identify all the parts of an SSTable. As described previously, subclasses of the SSTableRecordReader can represent values as single columns under a row or entire column iterators.
  • The SSTableColumnMapper: This abstract mapper inherits from the default Hadoop mapper but adds a bit more structure to deal with the ByteBuffers and columns contained in the SSTables. It can also skip tombstoned columns.
  • The SSTableRowMapper: This is similar to the mapper above that deals with column iterators.

Example
Setting up a MapReduce job for reading a Cassandra cluster becomes very simple. The only missing piece is finding an easy way to get all the SSTables into a Hadoop cluster. At Knewton we found Netflix’s Priam to be a good match. Priam backs up our Cassandra cluster multiple times a day into S3 making it really easy to transfer the data to Elastic MapReduce (EMR).

This simple MapReduce job shows a complete example job that consumes student event data from backed up Cassandra SSTables. The example can also be found here.

public static void main(String[] args) throws Exception {
  Configuration conf = new Configuration();
  Job job = new Job(conf);
  SSTableInputFormat.setPartitionerClass(
      RandomPartitioner.class.getName(), job);
  SSTableInputFormat.setComparatorClass(LongType.class.getName(), job);
  SSTableInputFormat.setColumnFamilyName("StudentEvents", job);
  job.setOutputKeyClass(LongWritable.class);
  job.setOutputValueClass(StudentEventWritable.class);
  job.setMapperClass(StudentEventMapper.class);
  job.setReducerClass(StudentEventReducer.class);
  job.setInputFormatClass(SSTableColumnInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);
  SSTableInputFormat.addInputPaths(job, args[0]);
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  job.waitForCompletion(true);
}
public class StudentEventMapper extends SSTableColumnMapper
      <Long, StudentEvent, LongWritable, StudentEventWritable> {
  @Override
  public void performMapTask(Long key, StudentEvent value, Context context) {
    context.write(new LongWritable(getMapperKey(),
                  new StudentEventWritable(studentEvent));
  }
  @Override
  protected Long getMapperKey(ByteBuffer key, Context context) {
    ByteBuffer dup = key.slice();
    Long studentId = dup.getLong();
    return studentId;
  }
  @Override
  protected StudentEvent getMapperValue(IColumn iColumn, Context context) {
    return getStudentEvent(iColumn, context);
  }
}

Notice that the mapper extends from a specialized SSTableColumnMapper which can be used in conjunction with the SSTableColumnRecordReader.

The example above uses the identity reducer to simply write the data as comma separated values by calling the toString() method on the StudentEventWritable objects. The only additional task you have to worry about in the Reducer is deduping the data, since you will probably have a replication factor of > 1.

Automating this job becomes an easy task given that SSTables are immutable and older tables don’t have to be read if they were already read once. Enabling incremental snapshots can also help here.

Conclusion

If you want to get started on using the KassandraMRHelper you can check out the code here: https://github.com/Knewton/KassandraMRHelper. Contributions are more than welcome and encouraged.

If you’re interested in additional topics in Cassandra and Hadoop you should check out the presentation on bulk reading and writing Cassandra using Hadoop here with the slides shared here.