Building the Data Warehouse

Here at SoftLayer, we have a lot of things that we need to keep track of. It's not just payments, servers, rack slots, network ports, processors, hard drives, RAM sticks, and operating systems, it's also bandwidth, monitoring, network intrusions, firewall logs, VPN access logs, API access, user history, and a whole host more. Last year, I was tapped to completely overhaul the existing bandwidth system. The old system was starting to show its age, and with our phenomenal growth it just hasn't been able to keep up.

SoftLayer has 20,000+ servers. Each of those servers is on 2 networks, the public network open to the Internet, and the private network that only our customers can use. Each of those networks exists on a 3 to 4-level network hierarchy. This gives us more than 50,000 switch ports, but we'll use 50,000 to make the math easier. Each switch port has bandwidth in and bandwidth out, as well as packets in and packets out. That gives us 200,000 data points to poll. Bandwidth is polled every 5 minutes, giving us 57,600,000 data points per day, or 1,728,000,000 per month. Given that bandwidth data points are all 64 bit numbers, and we also have to track the server ID (32 bit), the network (32 bit), and the datetime (32 bit), that makes a month's worth of raw data (excluding any overhead from storage engines) 34.56GB. Now, the data is to be stored redundantly, so double that. Also, we have to track bandwidth for racks, virtual racks, and private racks, so add another 50% onto the data. That gives us around 90GB per month of data.

This doesn't seem like a lot of data at first, but we need to generate custom bandwidth graphs for use on the web. Since it's on the web, loading times above 2 seconds are unacceptable. Also, these are not enormous files with small keys (we'll spend more time on that later) so 45GB of bandwidth data is a whole lot different than 45GB of movie files or MP3s.

To accomplish this, we decided that we needed a data warehouse. After numerous false starts and blind alleys, we decided to make our own system from scratch using MySQL. We considered commercial products, and pre-built open source solutions, but they just didn't seem to fit our needs properly. The data warehouse project commenced with phase 1.

Phase 1: MySQL Cluster with Read Shards, Large Tables
Our first implementation was relatively simple. We planned to have a MySQL cluster for writing all the data, with the data split into 100 tables. The ID of the hardware mod 100 would determine the table name that we would write to. Then we'd have between 5 and 20 read databases, each replicating a different table, for load reasons. Then all we have to do is index the data properly, and add code to our applications to pull data from the correct read database node, and we'll be fine.

Bad news: MySQL cluster isn't designed for data with huge numbers of large keys. MySQL cluster stores all indexes and keys in memory on the cluster controller multiple times. As mentioned before, our data had 12 bytes of key, and 8 bytes of data. This means that we could only get about 2 million rows into the data warehouse before MySQL would lock up and quit working with the ever helpful error message "Unknown error: 157." Even more disturbing, deleting items from MySQL cluster didn't free up memory, as the indexes had to be rebuilt before that would happen. We upgraded everything to MySQL 5.1.6, but per the MySQL manual, "Beginning with MySQL 5.1.6, it is possible to store the non-indexed columns of NDB tables on disk, rather than in RAM as with previous versions of MySQL Cluster." Unfortunately, it's the indexes that are causing us problems, so the upgrade didn't help.

Phase 2: MySQL Cluster with Read Shards, Small Tables
Since MySQL cluster couldn't handle a small number of really big tables, how about a really big number of small tables? We tried making hundreds of tables per day, and removing the indexes on tables older than a certain time, but that quickly ran up against operating system limitations on the number of files in a folder. When we switched operating systems to alleviate this problem, we ran into MySQL's limitation of roughly 21,000 tables per database. Nothing could get past that, so we had to move away from the cluster entirely.

Phase 3: Large MySQL box with Read Shards, Large Tables
We then moved on to one single MySQL box with enormous hard drives and multiple read shards. This looked promising at first, but the box simply couldn't handle the amount of inserts and updates, and the slave servers were locking up too often. We thought that if we partitioned the tables, MySQL could handle the inserts better. This, naturally, broke replication in a different way. This was mainly because we had too many tables now that each partition was getting its own file, so MySQL would be constantly opening all these table files to perform the updates, and would quickly run out of memory and begin to swap. We just had to decrease the number of tables per server. We decided to abandon the centralized master server idea, and built out 5 pairs of master/slave servers.

Phase 4: 5 Pairs of Master/Slave Servers, Large Grouped Tables
This plan really seemed like it was going to work. We actually had a working data warehouse for almost 2 weeks without any errors. We were close to breaking out the champagne, but there was one feature we still hadn't implemented.

The time came to add the tracking of bandwidth for virtual dedicated racks as well. To accomplish this, we changed the MySQL INSERT statement to INSERT... ON DUPLICATE KEY UPDATE. For those that don't immediately recognize the terminology, the ON DUPLICATE KEY UPDATE syntax is designed so that if a particular database key already exists in the database, simply change the INSERT statement into an UPDATE statement. The UPDATE portion is defined at the end of the clause. Since the key to our data is the large "object ID, date time, data type" combination, adding the duplicate syntax allowed us to issue multiple INSERT statements for the same data, and have it continually update with new values. This was especially handy for virtual racks with thousands of servers.

MySQL threw us another curve ball when we tried this. There was a known issue with the ON DUPLICATE UPDATE syntax and replication using binary logs. Namely, the UPDATE would remain an INSERT in the log somehow, so we'd get duplicate key errors thrown on the slave, when the master was still working fine. Each time this happened, it would require stopping both servers, re-synching the tables, clearing the logs, restarting replication, and restarting the data transfer processes. This was unacceptable, so we had to move once more.

Phase 5: 10 Individual Machines
Since we already had 10 identical database machines, we decided to make them all independent and ignorant of each other. We stayed with the item group theory, leaving multiple bandwidth items per table. With 25 different items per table, we were only going to have 1,000,000 rows per table per month, which still maintains our speed. However, the index size problem bit us again. These tables were simply too large to have 3/4 of their columns indexed.

Finally we thought we had the answer. We would split up each of these tables by month. That way, each table would max out at 1,000,000 rows, so the indexes wouldn't be unmanageable. Remember that we've already split the connection points from one, to five, to ten. We've also broken the tables up from one large table, to one per object, and now one per group of objects. In order to cut down on the complexity of the application layer code, we decided to use merge tables to keep the table name consistent. That way we can simply select from "the table" rather than "the table from March." When we did this, we almost immediately began getting mysterious deadlocks on the servers. The INSERT statements would conflict with any SELECT statements using the same table, and the two threads would just hang indefinitely. Oddly, the table was never actually locked, it seemed that there was some sort of "offsetting lock" happening, where both threads were deadlocked in a race condition. The table could still be used, but the application we had inserting the data would be hung, so it was causing unacceptable delays in data inserts.

Phase 6: 10 Individual Machines, Small Individual Tables
Finally we decided that enough was enough, we were going to roll our own scaled storage solution. Clustering didn't work, throwing hardware at the problem unfortunately didn't work, and even the fancy new features for merged tables and partitioned tables didn't work. We simply decided that we were going to keep the old "grouped tables" model, as well as creating a new table for every month without merge tables. This way, we keep the number of tables relatively low, and the number of rows per table low. Plus, as a bonus, by controlling the table names ourselves we could ensure that MySQL wouldn't open too many files. All inserts went into this month's table, and all reads would come out of whatever specific month they needed. A cron job was set up to periodically issue FLUSH TABLES on all 10 data warehouse nodes, and we had our completed product! It's been months now since we had a major database failure, we can generate any bandwidth graphs you want in less than a second, and other developers are starting to create their own table groups.

Each object's data resides completely on two of our data warehouse nodes, so the data is redundantly stored in the event of a node going down. The application code has been written with extensive factory patterns and ORM so that all a developer has to do is create a new group type, a new data tracking object, and add data to it. The code automatically selects the applicable nodes to write the data to, creates the tables if they don't exist, and writes the data to the tables. Similarly, a "get data" command will randomly select one location for the data, and retrieve the proper data, using only the tables that are necessary.

But Wait, There's More!
All 6 phases above were happening simultaneously with other systems. The raw data itself needed to be stored, buffered, transferred, and translated into a format suitable for the data warehouse. Since almost a quarter of our servers are in other parts of the country, we had to have redundant data storage and transfer solutions to make sure the raw data got to where it needed to be. For this data, the rules were different.

First of all, there are two layers of raw data. Each city has a local bandwidth polling database. We use rtgpoll to poll the bandwidth data. Since rtgpoll is designed to write to a different table for each data type, we kept the data like that. For ease of data management, we created a script that would keep a rotating two-day cycle of tables, one for today and one for yesterday, with a merge table that encompassed them both. We could get away with the merge table on this layer because there are far fewer tables, far fewer rows, and different indexes. Since the interface isn't important at this layer, we could index only the date time and get the performance we wanted by making our transfer scripts to the global buffer date-based.

The global buffer server is the same box we attempted to use in phase 3 and 4, above. It has the exact same table structure as the datacenter buffers, with the rotating tables living under a merge table. This data is replicated out to a slave server, to prevent read/write contention. At this layer, we have no ON DUPLICATE KEY statements being executed, and no partitions, plus our merge tables are much smaller, so everything works out. These tables act as a permanent raw data archive in the event of a system failure or a bandwidth dispute with a customer.

The scripts that pull data out of the datacenter buffers also inserts data into a queue for each of the data warehouse nodes. We store a lookup table in memcache that will translate a raw interface ID into the data warehouse nodes that interface's data needs to be inserted into. That raw row is then inserted into the queues for the nodes it belongs to.

Finally, a set of scripts runs on the data warehouse nodes, constantly pulling any new data out of their queues on the global buffer slave. The data is translated from raw interface data to match up to our customer accounts, then inserted into the local data warehouse database, ready to be selected out to make graphs, reports, or billing.

All Powered by Tracking Object
The entire system is interfaced from our application code using the tracking object system. The tracking objects are a series of PHP classes that link a particular object in our existing production database to that object's various data points in the data warehouse. Using ORM and factory patterns, we were able to abstract tracking objects to the point where any object in our database could have an associated "trackingObject" member variable. Servers, Virtual Dedicated Racks, Cloud Computing Instances, and other systems can simply call the getBandwidthData() method on their tracking object, and the tracking object system will automatically select the correct database, select the correct table, and pull the correct fields, formatting them as a collection of generic "bandwidth data" objects. Other metrics, like CPU and memory usage for servers and Cloud Computing Instances, can just as easily be retrieved.

Similarly, most of our back-end systems use the tracking objects to add data to the data warehouse. The developers don't touch the warehouse directly, they simply load whatever object they have new data for, and pass an array of raw data objects to our internal addData() function, which automatically determines database node, write table, and data structure. The tracking object system is completely transparent to the other developers, and it means new tracking objects or data warehouse nodes can be created seamlessly without changing existing code.

By centralizing the reading and writing into these classes, the data warehouse can be extended infinitely. A new type of data can be added as easily as adding a new data warehouse data type class to the file system, as well as a row to the database. As long as that class has the data structure properly defined, new tracking objects can be created for that data type, and data can begin being recorded immediately. Creating a new tracking object will automatically choose two or more database nodes to store the data on. Creating new nodes takes the current tracking object count into consideration, so the nodes stay balanced.

So far the system has 33,115,715,147 rows in 683,460 tables spread over 10 databases. We have hundreds of customers who view their bandwidth graphs every day, and a handful that systematically pull the graphs every hour. Load tests suggest that performance doesn't degrade until we hit 500 simultaneous graph requests, and even then we still come in under 2 seconds per graph. With the scaling potential and the single point of access for developers, we should be able to use this system indefinitely.