Command Query Responsibility Segregation with S3 and JSON

Stephen Haberman - 18 Apr 2011

We recently tackled a problem at Bizo where we wanted to decouple our high-volume servers from our MySQL database.

While considering different options (NoSQL vs. MySQL, etc.), in retrospect we ended up implementing a SOA-version of the Command Query Separation pattern (or Command Query Responsibility Segregation, which is services/messaging-specific).

Briefly, in our new approach, queries (reads) use an in-memory cache that is bulk loaded and periodically reloaded from a snapshot of the data stored as JSON in S3. Commands (writes) are HTTP calls to a remote JSON API service. MySQL is still the authoritative database, we just added a layer of decoupling for both reads and writes.

This meant our high-volume servers now have:

Prior Approach: Cached JPA Calls

For context, our high-volume servers rely on configuration data that is stored in a MySQL database. Of course, the configuration data that doesn't have to be absolutely fresh, so we'd already been using caching to avoid constantly pounding the database for data that rarely changes.There were several things we liked about this approach:

That being said, we wanted to tweak a few things:

While not initially a big deal, as Bizo has grown, we're now running in multiple AWS regions, and cache misses require a cross-region JDBC call to fetch their data from the MySQL server running in us-east.Illustrated in code, our approach had, very simplified, been:

    class TheServlet {
      public void doGet() {
        int configId = request.getParameter("configId");
        Config config = configService.getConfig(configId);
        // continue processing with config settings
      }
    }

    class ConfigService {
      // actually thread-safe/ehcache-managed, flushed every 30m
      Map<Integer, Config> cached = new HashMap<Integer, Config>();

      public Config getConfig(int configId) {
        Config config = cached.get(configId);
        if (config == null) {
          // hit mysql for the data, blocks the request thread
          config = configJpaRepository.find(configId);
          // cache it
          cached.put(configId, config);
        }
        return config;
      }
    }

Potential “Big Data” Approaches

Given our primary concern was MySQL being a single point of failure, we considered moving to a new database platform, e.g. SimpleDB, Cassandra, or the like, all of which can scale out across machines.

Of course, RDS's master/slave MySQL setup already reduces its risk of single machine point of failure, but the RDS master/slave cluster as a whole is still, using the term loosely, a “single point”. Granted, with this very loose definition, there will always be some “point” you rely on–we just wanted one that we felt more comfortable with than MySQL.

Anyway, for NoSQL options, we couldn't get over the cons of:

Realization: MySQL is Fine, Fix the Cache

Of course, we really didn't have a Big Data problem (well, we do have a lot of those, but not for this problem).

We just had a cache seeding problem. Specifically:

The S3 file then basically becomes our alternative “query” database in the CQRS pattern.When these are put together, a solution emerges where we can have a in-memory, always-populated cache of the configuration data that is refreshed by a background thread and results in request threads never blocking.

In code, this looks like:

    class TheServlet {
      public void doGet() {
        // note: no changes from before, which made migrating easy
        int configId = request.getParameter(&quot;configId&quot;);
        Config config = configService.getConfig(configId);
        // continue processing with config settings
      }
    }

    class ConfigService {
      // the current cache of all of the config data
      AtomicReference<Map> cached = new AtomicReference();

      public void init() {
        // use java.util.Timer to refresh the cache
        // on a background thread
        new Timer(true).schedule(new TimerTask() {
          public void run() {
            Map newCache = reloadFromS3(&quot;bucket/config.json.gz&quot;);
            cached.set(newCache);
          }
        }, 0, TimeUnit.MINUTES.toMillis(30));
      }

      public Config getConfig(int configId) {
        // now always return whatever is in the cache--if a
        // configId isn't present, that means it was not in
        // the last S3 file and is treated the same as it
        // not being in the MySQL database previously
        Map currentCache = cached.get();
        if (currentCache == null) {
          return null; // data hasn't been loaded yet
        } else {
          return currentCache.get(configId);
        }
      }

      private Map reloadFromS3(String path) {
        // uses AWS SDK to load the data from S3
        // and Jackson to deserialize it to a map
      }
    } 

A Few Wrinkles: Real-Time Reads and Writes

So far I've only talked about the cached query/reads side of the new approach. We also had two more requirements:

While we could have continued using a MySQL/JDBC connection for these few requests, this also provided the opportunity to build a JSON API in front of the MySQL database. This was desirable for two main reasons:

Scalatra Servlet Example

With Jackson and Scalatra, the JSON API server was trivial to build, especially since it could reuse the same JSON DTO objects that are also serialized in the config.json.gz file in S3.

As an example for how simple Jackson and Scalatra made writing the JSON API, here is the code for serving real-time request requests:

    class JsonApiService extends ScalatraServlet {
      get("/getConfig") {
        // config is the domain object fresh from MySQL
        val config = configRepo.find(params("configId").toLong)

        // configDto is just the data we want to serialize
        val configDto = ConfigMapper.toDto(configDto)

        // jackson magic to make json
        val json = jackson.writeValueAsString(configDto)

        json
      }
    } 

Background Writes

The final optimization was realizing that, when the high-volume servers have requests that trigger stats to be written to MySQL, for our requirements, these writes aren't critical.

This means there is no need to perform them on the request-serving thread. Instead, we can push the writes onto a queue and have it fulfilled by a background thread.

This generally looks like:

    class ConfigWriteService {
      // create a background thread pool of (for now) size 1
      private ExecutorService executor = new ThreadPoolExector(...);

      // called by the request thread, won't block
      public void writeUsage(int configId, int usage) {
        offer("https://json-api-service/writeUsage?configId=" +
          configId +
          "&usage=" +
          usage);
        }
      }

      private void offer(String url) {
        try {
          executor.submit(new BackgroundWrite(url));
        } catch (RejectedExecutionException ree) {
          // queue full, writes aren't critical, so ignore
        }
      }

      private static class BackgroundWrite implements Runnable {
        private String url;

        private BackgroundWrite(String url) {
          this.url = url;
        }

        public void run() {
          // make call using commons-http to url
        }
      }
    } 

tl;dr We Implemented Command Query Responsibility Segregation

With changing only a minimal amount of code in our high-volume servers, we were able to:

For more reading on CQS/CQRS, I suggest: