IN THIS ARTICLE

    Subscribe to Our Newsletter

    Stay updated with the latest on web, mobile, and IoT, delivered weekly.
    Thanks for subscribing!

    Thanks for subscribing!

    Get ready for some great content.

    At PubNub, we have Terabytes of raw log per day waiting to be consumed for analytics, both for internal use and customer service. Analytics are extremely critical to us, serving as the building blocks for our price model, as well as insight into almost everything about the PubNub Data Stream Network. We were originally using an embarrassing undisclosed system for analytics and it’s barely operated and was extremely expensive because with always-on hot instances running. Our goal was to completely replace this system and we wanted to use spot instances to spin up and shutdown cluster on demand to save on costs.

    PubNub has a progressively enhanced strategy on tracking metrics, which in turn has added more granularity; wanting to keep our analytics interface the same, we began looking for an alternative solution. Hadoop seemed promising.

    Because the PubNub analytics service is based on Amazon Web Services, we started with EMR (Elastic Map Reduce) a dedicated Service offered by Amazon AWS. Amazon said they have special optimization and support for interaction between S3 and EMR clusters. The first option we tried was Hadoop streaming. We provided our mapper script for parsing and formatted the parsing result for streaming default aggregator to reduce. We soon found that the logs specified in the input field on the EMR console would cause problems effectively exploding the hadoop master node RAM, no matter how large the machine we chose.

    Basically Java Heap errors even after tuning, etc via bootstrapping. We worked around it by generating a name file which contained a huge log name list as input. Slaves got the log name assigned from the master and retrieved the log though boto followed by the parsing work. We also wrote bootstrap scripts passed in the console to install dependency like boto. These hooking methods worked. We launched 200 small instances as slave nodes and one 4xlarge node as master. It took 5 days to generate a month data and it crashed regularly. This was the first time we got close to hadoop. At first we didn’t notice that hadoop needs commodity hardware. Additional work was needed to transform the reduced result to its original format of an analytics system and not trivial. The methods looked cumbersome and we wanted something more flexible.

    Then we shifted our eyes onto hive, a MapReduce job scheduler which has an SQL-esque query interface.  Still we used EMR to run hive. One of the best parts of EMR hive is you can store your table schema on s3, giving you the ability to get the table whenever you want, while also supporting partition that helps reduce load time. Hive is schema on read rather than schema on write, meaning it doesn’t check data during loading, but does check when a query is issued which is a good feature for ad-hoc partition. We tested a small dataset in the interactive shell and it worked well. First we created a table with only one field. Then we loaded a raw log to that table and we transformed that table by providing our parsing scripts and imported another table with all the fields we need. We noticed that hive uses `01` as default column delimiter. Rather than using transform we could’ve prepared data before loading. The provision and execution time was still not satisfying. The version of hive Amazon distribution was limited to 0.70. We also wanted full control of cluster and not to not be bound to EMR.

    We moved to set up our own hive cluster, custom built. We used CDH4 with map reduce v1 because it’s stable and easier to configure. Then we found that operation on cluster is even more important than programming in hive.  We tried a couple orchestration tools including, ubuntu juju, whirl, and starcluster.  Provision through juju was always broken in an unlimited non-working state due to a package install error. Provisioning by whirl was relatively slow and hive was not installed by default and felt cumbersome. Hadoop and hive setup acted as a plugin of starcluster. Instead of hadoop provision after booting, it used customized ami in which everything had already been installed and used nfs for configuration file sharing. The official hadoop ami used cdh3u3, which was old.  Ami customization are needed to reduce provision time. Also need to mention that our data is compressed.  However the ideal compression format for hadoop is lzo, because it has a balance between speed and compression rate as well as being splittable (gzip and bzip are not). Single file should not be too small because namenode keeps metadata in memory for all files and memory can be easily exploded if there are lots of small files. That explains why we got so many outofmemory exceptions when we used emr. The block size is 256M by default, so input files should be as close to multiple of 256M as possible. Another thing is HDFS capacity estimation. We roughly calculate capacity by using total space of disks of nodes divide replication number.

    At the end of the day, we ended up building our own internal continuous MapReduce streaming solution to solve our data stream Network analysis needs.  We learned a lot in days with hadoop and found it worked successfully for complex questions over large timespans, however it is important to note this was not the solution we needed.  Our solution which is still actively running today was a much more cost-effective monitoring service which meters and monitors the PubNub Data Stream Network usage over time.  Hope this helps!

    Resources
    Resources

    Building a HIPAA-compliant App

    Everything You Need to Know About Developing and Scaling a HIPAA-compliant App
    Download Now
    Building a HIPAA-compliant App
    More From PubNub