David and Andrew gave me an in depth overview of the current state of
the analytics cluster, so I figured it might be good for me to clean
up the notes from that and send them here, both to check my
understanding of these things, and in hopes that this may be useful.
The official docs for this are here:
One prominent item on that page is the System Architecture Overview,
which is out of date:
In particular, here are the things that are different:
"Pixel service" - no longer will have bundlers.
"ReqLog service" - no longer will have bundlers. Currently udp2log.
More on this below.
"Bundle queue [Cassandra]" - Currently a combination of udp2log and Kafka.
"Datawarehouse [Cassandra]" - this is Hadoop now
The long term plan is to use Kafka here. Short term, udp2log is in
use (piped from udp2log into Kafka in a couple cases)
Here's what the current data pipeline looks like for pageviews going
to Hadoop and out to useful information to analysts, using the
terminology from the Kraken flow diagram (even though it's probably
not appropriate anymore in many cases). These are each separate hosts
or clusters unless otherwise noted:
1. ReqLog service: Squid/Varnish udp logging emitters
2. Bundle queue: udp2log
3. Bundle queue: Kafka producers (on udp2log host)
4. Bundle queue: Kafka broker
5. Canonicalization topology: Kafka consumer
6. Data Warehouse (ETL phase): Log storage on Hadoop cluster
7. MapReduce Jobs: Pig scripts (running on Hadoop cluster)
8. Data Warehouse (Processing): Processed data, usually as CSVs
(stored on Hadoop cluster)
Routing of Kafka producers to Kafka brokers is done via Zookeeper.
The glorious future looks something like:
1. ReqLog service: Squid/Varnish Kafka producers
2. Bundle queue: Kafka broker
3. Canonicalization topology: Multiple Kafka consumers/Storm Spouts
(on Storm cluster)
4. Canonicalization topology: Storm bolts for
anonymization/geotagging, etc (on Storm cluster)
5. Data Warehouse (ETL phase): Log storage on Hadoop cluster
6. MapReduce Jobs: Pig scripts (running on Hadoop cluster)
7. Data Warehouse (Processing): Processed data, usually as CSVs
(stored on Hadoop cluster)
More detail on the pipeline:
Currently udp2log - 1:100 sampling for most things, plus finer
granularity for some things. We eventually need to decide if we're
going to put Kafka providers directly on the Varnish hosts in
production, or if we're going to continue to emit the logs via udp to
a udp2log collector.
We didn't get into talking about this part of the architecture.
Rather than restating my jumbled recollection of what's going on here
(randomly throwing around terms like "ClickTracking extension",
"EventLogging extension", "varnishncsa"), I'll ask that someone
venture a correct assessment of current/future state of this.
Kafka producers produce from udp2log for now (one per stream). We
currently have 2 Kafka brokers, managed via Zookeeper. Raw logs are
persisted for one week sliding window, straight to disk. Not
sanitized, so this host will be locked down for the forseable future.
If we weren't sampling, we would typically get about 16.8 terabytes in
one week's sliding window of uncompressed data. Snappy compression
(not implemented yet) should reduce size by roughly 2/3.
Apache Zookeeper (not in the diagram) - Zookeeper provides
application-level routing of data, keeping track of unavailable nodes,
and signaling to senders to use different hosts when necessary.
We have 3 Zookeeper nodes. Thread about zookeeper kafka failover
We currently have a single Kafka consumer - hourly cron job, pulling
from brokers, and pushing the raw logs straight into Hadoop (in
approximately 1 gig chunks). We'd like to do much more processing at
this stage. Most importantly, we'd like to anonymize the data at this
stage so that we don't have to be stingy about who we give access to
the Hadoop cluster.
The plan is to use Storm <http://storm-project.net/> as an ETL layer.
There is some custom dev work that needs to happen at this stage,
since log anonymization is not something that we're aware of any
off-the-shelf solutions that fit into this architecture (in
particular, with the ability to do the anonymization in real-time).
We plan to do a formal privacy and security review before relying on
whatever solution that we develop.
Glossary for Storm:
"Worker nodes": run Bolts.
"Bolt": a small unit of processing
"Topology": arrangement of bolts, flow of data. Bolts operates on
data. Bolts can be writte in any language.
"Tuple": Unit of data in storm.
"Spout": source of tuples.
"Nimbus": job manager/scheduler, doesn't actually do any processing.
If Nimbus goes down, topology still works.
We plan to have 8 hosts running as Storm workers. This is probably
more than we need; it may be as few as 2-3. If we're right about
this, it's easy enough to put that hardware to other uses.
We're using Hadoop/HDFS to store data. The logs currently come in
hourly blocks of 1:100 sampled data, which translates into lots of
1gig files stored hierarchically by date/time.
HDFS is block filesystem with 256 mb blocks. Blocks are replicated 3
times over, each stored on different physical hardware. A central
NameNode coordinates what gets stored where.
Our Batch Processing System is Hadoop/MapReduce under the hood. "Hue"
is used to provide web management user interface. "Oozie" is
technology which manages XML-based workflow definition, and does
things like schedule regular updates to data. "Pig" and "Hive" are
two different layers for building MapReduce jobs.
Pig is a simple domain-specific language for defining batch processing
jobs. For those that have access to the analytics cluster, here's the
blog stats pig script:
Hive is a different technology, which turns SQL queries into Map/Reduce jobs.
Pig is currently the preferred tool, but we'll support Hive if it catches on.
A few other notes:
* Infrastructure and Hardware
** Current allocation plans refelct a priori estimates -- we'll be
revising them as we get a feel for the workloads (ex: 8 machines on
ETL is a ridiculous overestimate; we'll probably only need 2-3.)
* Alteratives & Comparisons
** We benchmarked and tested Cassandra as a datastore, but ultimately
went with HDFS
** Request Logging
I hope this is helpful to everyone, and please let me know if there's
anything I missed or got wrong here.