Monday, April 22, 2013

Scala Command-Line Hacks

Do you like command-line scripting and one-liners with Perl, Ruby and the like?   

For instance, here's a Ruby one-liner that uppercases the input:

% echo matz | ruby -p -e '$_.tr! "a-z", "A-Z"'
MATZ

You like that kind of stuff?  Yes?  Excellent!  Then I offer you a hacking idea for Scala.

As you may know, Scala offers similar capability with the -e command-line option but it's fairly limited in its basic form because of the necessary boilerplate code to set up iteration over the standard input... it just begs for a simple HACK!

Using a simple bash wrapper,


#!/bin/bash
#
# Usage: scala-map MAP_CODE
#
code=$(cat <
scala.io.Source.stdin.getLines map { $@ } foreach println
END
)
scala -e "$code"

then we can express similar one-liners using Scala code and the standard library:

% ls | scala-map _.toUpperCase
FOO
BAR
BAZ
...

% echo "foo bar baz" | scala-map '_.split(" ").mkString("-")'
foo-bar-baz

Nifty, right?  Here's another script template to fold over the standard input,



#!/bin/bash
#
# Usage: scala-fold INITIAL_VALUE FOLD_CODE
#
# where the following val's can be used in FOLD_CODE:
#
#        `acc`  is bound to the accumulator value
#        `line` is bound to the current line
#
code=$(cat <
println(scala.io.Source.stdin.getLines.foldLeft($1) { case (acc, line) => $2 })
END
)
scala -e "$code"


Now if you wanted to calculate the sum of the second column of space-separated input, you'd write:

$ cat | scala-fold 0 'acc + (line.split(" ")(1).toInt)'
foo 1
bar 2
baz 3 
(CTRL-D)
6

You get the idea ...  hopefully this inspires you to try a few things with Scala scripting templates!

Disclaimer:  I am not advocating these hacks as replacement to learning other Unix power tools like grep, sed, awk, ... I am simply illustrating that Scala can be turned into an effective command-line scripting tool as part of the vast array of Unix tools.   Use what works best for you.

Friday, April 19, 2013

Efficiency & Scalability

Software engineers know that distributed systems are often hard to scale and many can intuitively point to reasons why this is the case by bringing up points of contention, bottlenecks and latency-inducing operations.  Indeed, there exists a plethora of reasons and explanations as to why most distributed systems are inherently hard to scale, from the CAP theorem to scarcity of certain resources, e.g., RAM, network bandwidth, ...


It's said that good engineers know how to identify resources that may not appear to be relevant to scaling initially but will become more significant as particular kinds of demand grow. If that’s the case, then great engineers know that system architecture is often the determining factor in system scalability  that a system’s own architecture may be its worse enemy so they define and structure systems in order avoid fundamental flaws.


In this post, I want to explore the relationship between system efficiency and scalability in distributed systems; they are to some extent two sides of the same coin.  We’ll consider specifically two common system architecture traits:  replication and routing.  Some of this may seem obvious to some of you but it’s always good to back intuition with some additional reasoning.


Before we go any further, it’s helpful to formulate a definition of efficiency applicable to our context:


efficiency is the extent to which useful work is performed relative to the total work and/or cost incurred.


We’ll also use the following definition of scalability,


scalability is the ability of a system to accommodate an increased workload by repeatedly applying a cost-effective strategy for extending a system’s capacity.


So, scalability and efficiency are both determined by cost-effectiveness with the distinction that scalability is a measure of marginal gain.  Stated differently, if efficiency decreases significantly as a system grows, then a system is said to be non-scalable.


Enough rambling, let’s get our thinking caps on!  Since we’re talking about distributed systems, it’s practically inevitable to compare against traditional single-computer systems, so we’ll start with a narrow definition of system efficiency:

average work for processing a request on a single computer Efficiency =
  average work for processing a request in distributed system


This definition is a useful starting point for our exploration because it abstracts out the nature of the processing that’s happening within the system; it’s overly simple but it allows us to focus our attention on the big picture.


More succinctly, we’ll write:


(1)  Efficiency = Wsingle / Wcluster

Replication Cost


Many distributed systems replicate some or all of the data they process across different processing nodes (to increase reliability, availability or read performance) so we can model:


(2) Wcluster = Wsingle + (r x Wreplication)


where r is the number of replicas in the system and Wreplication is the work required to replicate the data to other nodes. Wreplication is typically lower than Wsingle, though realistically they have different cost models (e.g., Wsingle may be CPU-intensive whereas Wreplication may be I/O-intensive).   If n is the number of nodes in the system, then r may be as large as (n-1), meaning replicating to all other nodes, though most systems will only replicate to 2 or 3 other nodes  for good reason  as we’ll discover later.


We’ll now define the replication coeffient, which expresses the relative cost of replication compared to the cost of processing the request on a single node:


(3) Qreplication = Wreplication / Wsingle


Solving for Qreplication, we get:


(4) Wreplication = Qreplication x Wsingle


If we substitute Wreplication in (2) by the equation formulated in (4), we obtain:


(5) Wcluster = Wsingle x [ 1 + ( r x Qreplication * Wsingle ) ]


We now factor out Wsingleon the left side:


(6) Wcluster = Wsingle x [ 1 + r * Qreplication ]


Taking the efficiency equation (1) and substituting Wcluster from (6), the equation becomes:


(7) Efficiency = Wsingle / [ Wsingle x ( 1 + r * Qreplication ]


We then simplify Wsingle to obtain the final efficiency for a replicating distributed system:


(8) Efficiency (replication) = 1 / [ 1 + (r x Qreplication) ]


As expected, both r and Qreplication are critical factors determining efficiency.


Interpreting this last equation and assuming Qreplication is a constant inherent to the system’s processing, our two takeaways are:


  1. If the system replicates to all other nodes (i.e., r = n - 1) it becomes clear that the efficiency of the system will degrade as more nodes are added and will approach zero as n becomes sufficiently large.

    To illustrate this, let's assume Qreplication is 10%,

    Efficiency (r = 1, n = 2) = 91%
    Efficiency (r = 2, n = 3) = 83%
    Efficiency (r = 3, n = 4) = 76%
    Efficiency (r = 4, n = 5) = 71%
    Efficiency (r = 5, n = 6) = 67%
    ...

    In other words, fully-replicated distributed systems don't scale.


  2. For a system to scale, the replication factor should be a (small) constant.

    Let's illustrate this with Qreplication fixed at 10% and using a replication factor of 3,

    Efficiency (r = 3, n = 4) = 76%
    Efficiency (r = 3, n = 5) = 76%
    Efficiency (r = 3, n = 6) = 76%
    Efficiency (r = 3, n = 7) = 76%
    Efficiency (r = 3, n = 8) = 76%
    ...

    As we can see, fixed-replication-factor distributed systems scale  although, as you might expect, they do not exhibit the same efficiency as a single-node system. At worse, the efficiency will be 1/r  as you would intuitively expect.

Routing Cost



When a distributed system routes requests to nodes holding the relevant information (e.g., a partially replicated system, r < n) its working model may be defined as,


(9) Wcluster = (r / n) * Wsingle  +  (n-r)/n * (Wrouting + Wsingle)


The above equation represents the fact that r out of n requests are processed locally whereas the remainer of the requests are routed and processed on a different node.


Let’s define the routing coefficient to be,


(10) Qrouting  =  Wrouting / Wsingle


Solving for Wrouting in (9) by (11) to obtain,


(12) Wcluster = (r/n) * Wsingle  +  (n-r)/n * [ (Qrouting * Wsingle) + Wsingle ]


and taking the efficiency equation (1), substituting Wcluster from (12), the simplified equation becomes:


(13) Efficiency (routing) = n / [ n + (n - r) * Qrouting ]


Looking at this last equation, we can infer that:


  1. As the system grows and n goes towards infinity, the efficiency of the system can be expressed as 1 / (1 + Qrouting).   The efficiency is not dependent on the actual number of nodes within the system therefore routing-based systems generally scale.(But you knew that already)
  2. If the number of nodes is large compared to the replication factor (n >> r) and Qrouting is significant (1.0, same cost as Wsingle), then the efficiency is ½, or 50%.   This matches the intuition that the system is routing practically all requests and therefore spending half of its efforts on routing.   The system is scaling linearly but it’s costing twice as much to operate (for every node) compared to a single-node system.
  3. If the cost of routing is insignificant (Qrouting = 0), the efficiency is 100%.  That’s right, if it doesn’t cost anything to route the request to a node that can process it, the efficiency is the same as a single-node system.


Let’s consider a practical distributed system with 10 nodes (n = 10), a replication factor of 3 (r = 3), and a relative routing cost of 10% (Qrouting = 0.10).  This system would have an efficiency of  10 / 10 + (7 * 10%) = 93.46%.   As you can see, routing-based distributed systems can be pretty efficient if Qrouting is relatively small.

Where To Now?



Well, this was a fun exploration of system scalability in the abstract.  We came up with interesting equations to describe the scalabilty of both data-replicating and request-routing architectures.  With some thinkering, these can serve as a good basis for reasoning about some of your distributed systems.


In real life, however, there are many other aspects to consider when scaling systems.  In fact, it often feels like a whack-a-mole hunt; you never know there the next performance non-linearity is going to rear its ugly head.  But if you use either (or both) the data-replicating and request-routing style architecture with reasonable replication factors and you manage to keep your replication/routing costs well below your single-node processing costs, you may find some comfort in knowing that at least you haven’t introduced a fundamental scaling limitation unto your system.

PS: With apologies for the formatting of the formulas ... Blogger wasn't exactly friendly with my equations imported from Google Docs so I had to go down the ASCII route. Thanks for reading and making it through!


Monday, April 15, 2013


Sensible Defaults for Apache HttpClient

Defaults for HttpClient

Before coming to Bizo, I wrote a web service client that retrieved daily XML reports over HTTP using the Apache DefaultHttpClient. Everything went fine until one day the connection simply hung forever. We found this odd because we had set the connection timeout. It turned out we also needed to set the socket timeout (HttpConnectionParams.SO_TIMEOUT). The default for both connection timeout (max time to wait for a connection) and socket timeout (max time to wait between consecutive data packets) is infinity. The server was accepting the connection but then not sending any data so our client hung forever without even reporting any errors. Rookie mistake, but everyone is a rookie at least once. Even if you are an expert with HttpClient, chances are there will be someone maintaining your code in the future who is not.

Another problem with defaults using HttpClient is with PoolingClientConnectionManager. PoolingClientConnectionManager has two attributes: MaxTotal and MaxPerRoute. MaxTotal is the maximum total number of connections in the pool. MaxPerRoute is the maximum number of connections to a particular host. If the client attempts to make a request and either of these maximums have been reached, then by default the client will block until a connection is free. Unfortunately the default for MaxTotal is 20 and the default MaxPerRoute is only 2. In a SOA, it is common to have many connections from a client to a particular host. The limit of 2 (or even 1) connections per host makes sense for a polite web crawler, but in a SOA, you are likely going to need a lot more. Even the 20 maximum total connections in the pool is likely much lower than desired.

If the client does reach the MaxPerRoute or the MaxTotal connections, it will block until the connection manager timeout (ClientPNames.CONN_MANAGER_TIMEOUT) is reached. This timeout controls how long the client will wait for a connection from the connection manager. Fortunately, if this timeout is not set directly, it will default to the connection timeout if that is set, which will prevent the client from queuing up requests indefinitely.

What would a better set of defaults be?

A good default is something that is "safe". A safe default for a connection timeout is long enough to not give up waiting when things are working normally, but short enough to not cause system instability when the is down. Unfortunately safe is context dependent. Safe for a daily data sync process and safe for an in thread service request handler are very different. Safe for a request that is critical to the correct functioning of the program is different than safe for a some ancillary logging that is ok to miss 1% of the time. A default for timeouts that is safe in all cases is not really possible.

Safe defaults for PoolingClientConnectionManager's MaxTotal and MaxPerRoute should be big enough that they won’t be hit unless there is a bug. New to version 4.2 is the fluent-hc API for making http requests. This uses a PoolingClientConnectionManager with defaults of 200 MaxTotal and 100 MaxPerRoute. We are using these same defaults for all our configurations.

Note that the fluent-hc API is very nice, but requires setting the connection timeouts on each request. This is perfect if you need to tune the settings for each request but does not provide a safety check against accidentally leaving the timeout infinite.

How can you help out a new dev implementing a new HTTP client?

If you can't have a safe default and the existing defaults are decidedly not safe, then it is best to require a configuration. We created a wrapper for PoolingClientConnectionManager that requires the developer to choose a configuration instead of letting the defaults silently take effect. One way to require a configuration is to force passing in the timeout values. However, it can be a hard to know the right values especially when stepping into a new environment. To help a developer implementing a new client at Bizo, we created some canonical configurations in the wrapper based on our experience working in our production environment on AWS. The configurations are:

Configuration
Connection timeout
Socket timeout
MaxTotal
MaxPerRoute
SameRegion
125 ms
125 ms
200
100
SameRegionWithUSEastFailover
1 second
1 second
200
100
CrossRegion
10 seconds
10 seconds
200
100
MaxTimeout
1 minute
5 minutes
200
100

Clients with critical latency requirements can use the SameRegion configuration and need to make sure they are connecting to a service in the same AWS region. Back end processes that can tolerate latency can use the MaxTimeout configuration. Now when a developer is implementing a new client, the timeouts used by other services are readily available without having to hunt through other code bases. The developer can compare these with the current use case and choose an appropriate configuration. Additionally, if we learn that some of these configurations need to be tweaked, then we can easily modify all affected code.

Commonly the socket timeout will need to be adjusted for a specific service. After a connection is established, a service will not typically start sending its response until it has finished whatever calculation was requested. This can vary greatly even for different parameters on the same service endpoint. The socket timeout will need to be set based on the expected response times of the service.

It is easy to miss a particular setting even if you know it is there. At Bizo, we are always looking for ways to solve a problem in one place. We are hopeful that this will eliminate any issues we have had with bad defaults in our HttpClients.

Monday, February 18, 2013

Map-side aggregations in Apache Hive

When running large scale Hive reports, one error we occasionally run into is the following:

Possible error:
 Out of memory due to hash maps used in map-side aggregation.

Solution:
 Currently hive.map.aggr.hash.percentmemory is set to 0.5. Try setting it to a lower value. i.e 'set hive.map.aggr.hash.percentmemory = 0.25;'

What’s going on is that Hive is trying to optimize the query by performing a map-side aggregation.  This is a map-side optimization that does a partial aggregation inside of the mapper, which results in the mapper outputting fewer rows.  In turn, this reduces the amount of information that Hadoop needs to sort and distribute to the reducers.

Let’s think about what the Hadoop job looks like with the canonical word count example.

In the word count example, the naive approach is for the mapper to tokenize each row of input and output the key-value pair (#{token}, 1).  The Hadoop framework will sort these pairs by the tokens, and the reducer sums the values to produce the total counts for each token.

Using a map-side aggregation, the mappers would instead tokenize each row and store partial counts in an in-memory hash map.  (More precisely, the mappers are storing each key with the corresponding partial aggregation, which is just a count in this case.)  Periodically, the mappers will output the pairs (#{token}, #{token_count}).  The Hadoop framework again sorts these pairs and the reducers sum the values to produce the total counts for each token.  In this case, the mappers will each output one row for each token every time the map is flushed instead of one row for each occurrence of each token.  The tradeoff is that they need to keep a map of all tokens in memory.

By default, Hive will try to use the map-side aggregation optimization, but it falls back to the standard approach if the hash map is not producing enough of a memory savings.  After processing 100,000 rows (modifiable via hive.groupby.mapaggr.checkinterval), Hive will check the number of items in the hash map.  If it exceeds 50% (modifiable via hive.map.aggr.hash.min.reduction) of the number of rows read, the map-side aggregation will be aborted.

Hive will also estimate the amount of memory needed for each entry in the hash map and flush the map to the reducers whenever the size of the map exceeds 50% of the available mapper memory (modifiable via hive.map.aggr.hash.percentmemory).  This, however, is an estimate based on the number of rows and the expected size of each row, so if the memory usage is per row is unexpectedly high, the mappers may run out of memory before the hash map is flushed to the reducers.

In particular, if a query uses a count distinct aggregation, the partial aggregations actually contain a list of all values seen.  As more distinct values are seen, the amount of memory used by the map will increase without necessarily increasing the number of rows of the map, which is what Hive uses to determine when to flush the partial aggregations to the reducers.

Whenever a mapper runs out of memory, a group by clause is present, and map-side aggregation is turned on, Hive will helpfully suggest that you reduce the flush threshold to avoid running out of memory.  This will lower the threshold (in rows) of when Hive will automatically flush the map, but it may not help if the map size (in bytes) is growing independently of the number of rows.

Some alternate solutions include simply turning off map-side aggregations (set hive.map.aggr = false), allocating more memory to your mappers via the Hadoop configuration, or restructuring the query so that Hive will pick a different query plan.

For example, a simple
 select count(distinct v) from tbl
can be rewritten as
 select count(1) from (select v from tbl group by v) t.

This latter query will avoid using the count distinct aggregation and may be more efficient for some queries.

Friday, February 15, 2013

Reader Driven Development

In this talk on Effective ML, Yaron Minsky talks about Reader Driven Development. That is, writing your code with the reader in mind. Making decisions that will make the code more easily read and understood by other developers down the line.
The interest of the reader always pushes in the direction of clarity, simplicity, and the ability to change the code later. In most real projects, code is read and changed many more times than it is written. The readers interest are paramount in that regard.
When writing code the interests of the reader and writer may be at odds, and when faced with a decision, always err in the direction of the reader. The reader is always right. Regardless of team size, it's helpful to program this way. Even code you've written yourself may not be as clear 6 months or a year later otherwise. Great perspective, and I think it fits in nicely with previous posts here on programming style and code reviews (tend to agree with your reviewers, they are the audience!).

Wednesday, January 23, 2013

Asanban: Lean Development with Asana and Kanban


On Bizo's External Apps team (aka. 'xapps'), we've been using a Kanban system to manage our work.  All of Bizo Engineering uses Asana to track tasks, which isn't specifically designed for Kanban.  We've settled on a set of of conventions that we use in Asana which enable our Kanban system.  These conventions also help us to track metrics like the average lead time from month to month.

Background

Kanban is a second-generation Agile software development methodology.  The focus is on finding and fixing bottlenecks, as well as removing waste by limiting work-in-progress.  (The "WIP" limits referenced in this post are the number of work items that are allowed to be in a particular stage of the system at one time.)  Adopting a Kanban system has made things easier for engineers, increased efficiency, and is very popular with our Product Management folks as well.  We are now focused on delivering value incrementally rather than specifying and implementing larger chunks of work.  If you're interested in adopting Kanban, I recommend reading David Anderson's seminal book on the topic: Kanban: Successful Evolutionary Change for Your Technology Business

Conventions

Each stage of work in our value chain is a priority heading in Asana.  The name of the priority header follows the convention: "{STEP NAME} ({WIP}):", eg. "Dev Ready (10):".  The steps that are earliest in the value chain are at the bottom of our Asana project, with tasks moving upwards through each stage until they reach "Production (15):" at the top when the functionality described by the task has been delivered to production.  Once Product Management has verified that the functionality described by a task is functioning correctly in production, they mark the task as complete.  We use tags to represent work item types, although its fairly limited at present.


Metrics

One of the most basic metrics to track in a Kanban system is the average amount of lead time (the time it takes from when a task gets added to the input queue until value is delivered).  I have created some tooling that allows us to accomplish this systematically.  I'll first describe what it does, and then how you can use it.

The first piece of the tooling bulk loads task data from the Asana API into MongoDB.  The API returns JSON and I just store JSON as-is in MongoDB, which works out well since MongoDB speaks JSON natively.  One hiccup is that once tasks are archived in Asana, you can no longer obtain information about them through the API.  Accordingly, the bulk load needs to be scheduled to run on a regular basis (in our case, every night) so that we don't lose information about archived tasks.  Furthermore, we have a policy that tasks should not be archived until they have been completed for at least 24 hours, so that the bulk loader will always run at least once after a task has been completed before it gets archived.  After loading the task data, the bulk loader will create data describing how much time each task spent in each state, as well as how long each task took (in days) to complete from start to finish (lead time).

The other piece is a Sinatra web service that runs a map-reduce against the lead time data created by the bulk loader and serves lead times by month as JSON.  It can also aggregate by year or day (but I don't think aggregating by day is useful).

I have packaged up both of those pieces into a gem called "asanban", which you can use.   The source code and instructions for installation and usage are here: https://github.com/patgannon/asanban

Pain Points

There are a couple of problems I've run into using Asana with a Kanban system.  The first is that there's no way to enforce WIP limits.  Users just have to be mindful of the limits shown in the priority headers.  I have been thinking about writing a nightly report that uses the data created by the bulk loader to find violated WIP limits and send out emails, but I haven't gotten to it yet.  (This tooling is essentially a hack day project at this point.)  There is also no functionality to facilitate different classes of service (SLAs and WIP break-downs), but maybe those could be supported using the same kind of nightly report.

Another problem I've run into is that task sizes can be all over the place, which reduces the meaningfulness of the metrics.  Some Kanban practitioners use hierarchical work items to address this kind variability in size.  Stories can be grouped into epics and/or broken down into "grains".  Asana does support sub-tasks, so I may recommend that we use those to break down large work items in the future, at which point the bulk loader would be modified to track metrics by sub-task (for tasks that have them, which would be assumed to be epics).

Next Steps

As we fine tune our Kanban process, we'll use these lead time metrics to verify that when we've made an adjustment (changing a WIP limit or adding a buffer, for example), that our performance improves.  I'd like to have more metrics so that we can have even better insight into our system in the future.  For example, I'd like to see the average time tasks spend in particular steps, the average amount of total WIP, as well as WIP in each step (shown over time) and failure load.

The first order of business moving forward on this will probably be an improved charting interface on top of the existing metrics.  Also, it would be nice if the bulk loader used a scheduling library so that folks don't have to manually schedule it in cron.  It also could use some automated tests!!!  As I mentioned previously, I've just been working on this on hack days, so if there's something you'd like to see done soon, well... pull requests will be gladly accepted! :)

Monday, January 21, 2013

What Makes Spark Exciting

At Bizo, we’re currently evaluating/prototyping Spark as a replacement for Hive for our batch reports.

As a brief intro, Spark is an alternative to Hadoop. It provides a cluster computing framework for running distributed jobs. Similar to Hadoop, you provide Spark with jobs to run, and it handles splitting up the job into small tasks, assigning those tasks to machines (optionally with Hadoop-style data locality), issuing retries if tasks fail transiently, etc.

In our case, these jobs are processing a non-trivial amount of data (log files) on a regular basis, for which we currently use Hive.

Why Replace Hive?

Admittedly, Hive has served us well for quite awhile now. (One of our engineers even built a custom “Hadoop on demand” framework for running periodic on-demand Hadoop/Hive jobs in EC2 several months before Amazon Elastic Map Reduce came out.)

Without Hive, it would have been hard for us to provide the same functionality, probably at all, let alone in the same time frame.

That said, it has gotten to the point where Hive is more frequently invoked in negative contexts (“damn it, Hive”) than positive.

Personally, I admittedly even try to avoid tasks that involve working with Hive. I find it to be frustrating and, well, just not a lot of fun. Why? Two primary reasons:

1. Hive jobs are hard to test

Bizo has a culture of excellence, and for engineering one of the things this means is testing. We really like tests. Especially unit tests, which are quick to run and enable a fast TDD cycle.

Unfortunately, Hive makes unit testing basically impossible. For several reasons:

  • Hive scripts must be run in a local Hadoop/Hive installation.

    Ironically, very few developers at Bizo have local Hadoop installations. We are admittedly spoiled by Elastic Map Reduce, such that most of us (myself anyway) wouldn’t even know how to setup Hadoop off the top of our heads. We just fire up an EMR cluster.

  • Hive scripts have production locations embedded in them.

    Both our log files and report output are stored in S3, so our Hive scripts end up with lots of “s3://” paths scattered throughout in them.

    While we do run dev versions of reports with “-dev” S3 buckets, still relying on S3 and raw log files (that are usually in a compressed/binary-ish format) is not conducive to setting up lots of really small, simplified scenarios to unit test each boundary case.

  • Hive scripts do not provide any abstraction–they are just one big HiveQL file. This means its hard to break up a large report into small, individually testable steps.

Despite these limitations, about a year ago we had a developer dedicate some effort to prototyping an approach that would run Hive scripts within our CI workflow. In the end, while his prototype worked, the workflow was wonky enough that we never adopted it for production projects.

The result? Our Hive reports are basically untested. This sucks.

2. Hive is hard to extend

Extending Hive via custom functions (UDFs and UDAFs) is possible, and we do it all the time–but it’s a pain in the ass.

Perhaps this is not Hive’s fault, and it’s some Hadoop internals leaking into Hive, but the various ObjectInspector hoops, to me, always seemed annoying to deal with.

Given these shortcomings, Bizo has been looking for a Hive-successor for awhile, even going so far as to prototype revolute, a Scala DSL on top of Cascading, but had not yet found something we were really excited about.

Enter Spark!

We had heard about Spark, but did not start trying it until being so impressed by the Spark presentation at AWS re:Invent (the talk received the highest rating of all non-keynote sessions) that we wanted to learn more.

One of Spark’s touted strengths is being able to load and keep data in memory, so your queries aren’t always I/O bound.

That is great, but the exciting aspect for us at Bizo is how Spark, either intentionally or serendipitously, addresses both of Hive’s primary shortcomings, and turns them into huge strengths. Specifically:

1. Spark jobs are amazingly easy to test

Writing a test in Spark is as easy as:

class SparkTest {
  @Test
  def test() {
    // this is real code...
    val sc = new SparkContext("local", "MyUnitTest')
    // and now some psuedo code...
    val output = runYourCodeThatUsesSpark(sc)
    assertAgainst(output)
  }
}

(I will go into more detail about runYourCodeThatUsesSpark in a future post.)

This one liner starts up a new SparkContext, which is all your program needs to execute Spark jobs. There is no local installation required (just have the Spark jar on your classpath, e.g. via Maven or Ivy), no local server to start/stop. It just works.

As a technical aside, this “local” mode starts up an in-process Spark instance, backed by a thread-pool, and actually opens up a few ports and temp directories, because it’s a real, live Spark instance.

Granted, this is usually more work than you want to be done in an unit test (which ideally would not hit any file or network I/O), but the redeeming quality is that it’s fast. Tests run in ~2 seconds.

Okay, yes, this is slow compared to pure, traditional unit tests, but is such a huge revolution compared to Hive that we’ll gladly take it.

2. Spark is easy to extend

Spark’s primary API is a Scala DSL, oriented around what they call an RDD, or Resilient Distributed Dataset, which is basically a collection that only supports bulk/aggregate transforms (so methods like map, filter, and groupBy, which can be seen as transforming the entire collection, but no methods like get or take which assume in-memory/random access).

Some really short, made up example code is:

// RDD[String] is like a collection of lines
val in: RDD[String] = sc.textFile("s3://bucket/path/")
// perform some operation on each line
val suffixed = in.map { line => line + "some suffix" }
// now save the new lines back out
suffixed.saveAsTextFile("s3://bucket/path2")

Spark’s job is to package up your map closure, and run it against that extra large text file across your cluster. And it does so by, after shuffling the code and data around, actually calling your closure (i.e. there is no LINQ-like introspection of the closure’s AST).

This may seem minor, but it’s huge, because it means there is no framework code or APIs standing between your running closure and any custom functions you’d want to run. Let’s say you want to use SomeUtilityClass (or the venerable StringUtils), just do:

import com.company.SomeUtilityClass
val in: RDD[String] = sc.textFile("s3://bucket/path/")
val processed = in.map { line =>
  // just call it, it's a normal method call
  SomeUtilityClass.process(line) 
}
processed.saveAsTextFile("s3://bucket/path2")

Notice how SomeUtilityClass doesn’t have to know it’s running within a Spark RDD in the cluster. It just takes a String. Done.

Similarly, Spark doesn’t need to know anything about the code you use witin the closure, it just needs to be available on the classpath of each machine in the cluster (which is easy to do as part of your cluster/job setup, you just copy some jars around).

This seamless hop between the RDD and custom Java/Scala code is very nice, and means your Spark jobs end up reading just like regular, normal Scala code (which to us is a good thing!).

Is Spark Perfect?

As full disclosure, we’re still in the early stages of testing Spark, so we can’t yet say whether Spark will be a wholesale replacement for Hive within Bizo. We haven’t gotten to any serious performance comparisons or written large, complex reports to see if Spark can take whatever we throw at it.

Personally, I am also admittedly somewhat infutuated with Spark at this point, so that could be clouding my judgement about the pros/cons and the tradeoffs with Hive.

One Spark con so far is that Spark is pre-1.0, and it can show. I’ve seen some stack traces that shouldn’t happen, and some usability warts, that hopefully will be cleared up by 1.0. (That said, even as a newbie I find the codebase small and very easy to read, such that I’ve had several small pull requests accepted already–which is a nice consolation compared to the daunting codebases of Hadoop and Hive.)

We have also seen that, for our first Spark job, moving from “Spark job written” to “Spark job running in production” is taking longer than expected. But given that Spark is a new tool to us, we expect this to be a one-time cost.

More to Come

I have a few more posts coming up which explain our approach to Spark in more detail, for example:

  • Testing best practices
  • Running Spark in EMR
  • Accessing partitioned S3 logs

To see those when they come out, make sure to subscribe to the blog, or, better yet, come work at Bizo and help us out!