Big Data Query — Done Easy!

Some of the most common [SQL] queries I hear about from Big Data application developers are “How do I retrieve top n of x?“, “Find me last n of x?“, “How many times user u did blah?” This is especially true for large scale applications built around ad campaigns, user preferences and personalization. For example, in online advertising (AdTech) and online retail, companies make every effort to serve content that they think would be most interesting to you, and financially beneficial to them, based on a ton of data gathered from your previous visits.

Let’s pause for a sec… when talking about querying Big Data, I am most certainly presuming that the data is denormalized and distributed. Right? Ok good, because I for one cannot imagine Big Data queries running against normalized data stored in traditional Relational Database Management Systems. Moving on…

In this blog post, I will explore one such query and walk you through one way of data modeling and coding techniques using Aerospike’s open source NoSQL database—keeping in mind both storage and performance—to achieve the desired result.

Prerequisites

  • Node.js – To install latest stable version, click here
  • Aerospike DB – To install the latest version, click here
  • Aerospike Node.js Client – To install the latest version, click here

Technical know-how

A good understanding and working knowledge of the following technologies is presumed.

  • Node.js
  • Aerospike DB
  • Aerospike Node.js Client
  • Lua

Query

In last n days, how many times user u performed action a on campaign c

So, for a given user, we’d like to know how many times he/she has taken some action on a given campaign within a time period.

Assumptions

  • Total users in the database: ~100million
  • Total campaigns: 10
  • Average number of actions per campaign: 10 (per month / 30days)

Data Model

  • Set: users
  • Key: uid — This is the primary key value defined/generated by the application
  • Bin: A Map where every entry contains timestamp as key and concatenated string of campaignId:actionId as value.

In Aerospike DB, a Set—similar to tables in traditional RDBMs—is a collection of records, and each record is a collection of Bins—similar to columns in traditional RDBMs. Each record is accessible via unique key.

Sample User Record:

{ uid: 'user1234', 
  campaignAction: { 
       '1427163913119':'c2:click', 
       '1425490351275':'c1:read', 
       '1423522713450':'c1:open', 
       '1427353432343':'c1:link', 
       '1424123753705':'c1:read', 
       '1426056581725':'c1:copy', 
       '1422562280941':'c2:click', 
       '1427548545929':'c1:copy', 
       '1422843025707':'c2:click', 
       '1423683680445':'c1:share' 
   } 
}

Based on the assumptions outlined above​:

  • The campaignAction ​Map for a given user will contain up to (10 actions * 10 campaigns) * 12 months = 1200 entries​​​​
  • Note: A background process may be run to keep the entries in this Map relevant. For example, it could delete entries older than n days

The Fun Part—Code!!

First we will look at the Record User Defined Function (UDF) that does all the heavy lifting followed by client code that executes this UDF and processes the result.

Note: UDFs are a powerful feature of Aerospike DB and can be used to extend the capabilities of the DB engine. They reside and execute on the server, closer to the data. For more information on UDFs, click here.

Record UDF (campaignActionCounter.lua):

function count(userRecord,qCampaign,qAction)
    -- initialize return value
    local returnValue = 0

    -- extract map entries from userRecord and store it in local variable
    local campaignActionMap = userRecord.m

    -- custom function to split string based on delimiter
    function split(s, delimiter)
        local result = {}
        for match in (s..delimiter):gmatch("(.-)"..delimiter) do
            table.insert(result, match);
        end
        return result
    end

    -- iterate over map entries and count how many times this user has taken action qAction on campaign qCampaign
    for key, value in map.pairs(campaignActionMap) do
        -- split value to get campaign and action
        local campaignAndAction = split(value,":")
        local userCampaign = campaignAndAction[1]
        local userAction = campaignAndAction[2]

        -- TODO: add a check for timestamp
        if userCampaign == qCampaign and userAction == qAction then
            returnValue = returnValue + 1
        end
    end

    return returnValue 
end

In the code above, here’s what’s happening:

  • Record UDF is passed in three parameters: 1) User record (read internally based on the key passed in from the client), 2) Campaign Id, and 3) Action Id
  • A Map that contains timestamp as key and concatenated string of campaignId:actionId as value is read and iterated over to match campaign Id (qCampaign) and action Id (qAction) to the values passed in from the client
  • Notice TODO in the UDF — this is where you’d also add a check for timestamp, in addition to checking for campaign Id and action Id, to filter out records older than n days
  • If the criteria is satisfied, counter (returnValue) is incremented
  • Finally, counter (returnValue) is returned to the client

Client Code

I have coded this in Node.js, but it can be easily ported to other supported languages.

// Register UDF - NOTE: In prod env, this should be done via aql
client.udfRegister('campaignActionCounter.lua', function(response) {
  // Check status
  if ( response.code === 0 ) {    
    // Select a random campaign Id 
    var campaignId = 'c1';

    // Select a random action Id
    var actionId = 'click';

    // Setup UDF statement passing in UDF module name (= .lua file), function name and params campaignId & actionId 
    var UDF = {module:'campaignActionCounter', funcname: 'count', args: [campaignId,actionId]};

    // Select a random user and construct key structure
    var key = aerospike.key('test','campaigns','user1234');

    // Execute UDF        
    client.execute(key, UDF, function(response, result) {
      // Check status
      if ( response.code === 0 ) {
        console.log('Query Campaign: ', campaignId);
        console.log('Query Action: ', actionId);
        console.log('Query Counter: ', result);
      }
      else {
        console.log("campaignActionCounter ERROR:\n", response);
      }
    });
  } else {
    // Error occurred
    console.log("campaignActionCounter UDF registeration ERROR:\n", response);
  }
});

In the code above, here’s what’s happening:

  • Record UDF (campaignActionCounter.lua) is registered—as noted, in a production environment this should be done outside of application code using Aerospike admin and CLI tools
  • Campaign Id and Action Id are set. These are the two primary filters/predicates. (For simplicity, they are hard-coded.)
  • A ‘statement’ object is constructed to specify which UDF to execute, module name, and params to be passed to the UDF
  • Aerospike key object is constructed in order to retrieve the user record—using namespace (‘test’), set (‘campaigns’) and uid (‘user1234’)
  • Method execute() is invoked passing in key and statement objects constructed above. (At this point the user record is retrieved and then the UDF gets executed on the server.)
  • Returned value (result) is displayed in the console. In this example, result is the the total number of times user ‘user1234‘ has performed action ‘click‘ on campaign ‘c1

Pinch Of Salt

Please note that this is one way of approaching and solving similar problems and it may not be suitable or viable for all use cases.

That’s All Folks!

Leave a Reply