Thursday, September 1, 2011

Project: Basic Web Application List with Real-time Updating Part 7 (Ensure Sequence of Messages)

Introduction

In this post, we would try to ensure that the messages are processed in the right order. Because of unpredictable network conditions, Channel API messages cannot be assured to arrive in the right order. To cover up for this, the client and server should implement a sequencing system to ensure that messages are processed properly.

Improved main.js

I made some improvements to the main.js script by using JShint. The resulting code is as follows.

$(function () {
    var INTERVAL_DELAY_OPEN_CONNECTION = 10000;
    
    $("#itemform").dialog({
        autoOpen: false,
        modal: true,
        width: 450,
        buttons: {
            "Save": function() {
                var itemtitle = $("#itemformmain input[name='itemtitle']").val();
                var itemtext = $("#itemformmain textarea[name='itemtext']").val();
                var that = this;
                $.post('/save',
                       {itemtitle: itemtitle, itemtext: itemtext},
                       function () {
                    $(that).dialog("close");
                       }, 'json');
            }, 
            "Cancel": function() { 
                $(this).dialog("close"); 
            } 
        }
    });
    
    $("input[name='additem']").bind('click',
        function () {
            $("#itemform").dialog("open");
        }
    );

        $("#mainlist").evently({
        _init: {
            async: function (cb) {
                $.getJSON('/list',
                    function (data) {
                        cb(data);
                    });
            },
            data: function (data) {
                return {
                    items: data.rows    
                };
            },
            mustache: 
                '{{#items}}' +
                '<div class="itementry">' +
                '    <h2>{{title}}</h2>' +
                '    <div>' +
                '        {{bodytext}}' +
                '    </div>' +
                '</div>' +
                '{{/items}}'
        },
                prependitem: {
            data: function (e, data) {
                return {
                    items: data.rows    
                };
            },
            mustache:
                '{{#items}}' +
                '<div class="itementry">' +
                '    <h2>{{title}}</h2>' +
                '    <div>' +
                '        {{bodytext}}' +
                '    </div>' +
                '</div>' +
                '{{/items}}',
            render: 'prepend'
        }
    });

    var setupChannel = function () {
        console.info("trying to open connection");
        $("#statusmessage").text("opening connection");
        
        var messagemap = {};
        $.ajax('/gettoken', {
            type: 'GET',
            dataType: 'json',
            success: function (data) {
                var channel = new goog.appengine.Channel(data.token);
                var socket = channel.open();
                
                socket.onopen = function () {
                    console.info("connection opened");
                    $("#statusmessage").text("");
                };
                
                socket.onclose = function () {
                    console.info("connection closed");
                };
                
                socket.onerror = function () {
                    console.info("connection error");
                    $("#statusmessage").text("no connection...");
                    setTimeout(setupChannel,
                           INTERVAL_DELAY_OPEN_CONNECTION);
                };
                
                socket.onmessage = function (message) {
                    console.info("message received " + message.data);
                    var d = $.parseJSON(message.data);
                    
                    var clientid = d.clientid;
                    var messageid = d.messageid;
                    
                    if (! messagemap[messageid]) {
                        $("#mainlist").trigger('prependitem',
                                       [d]);
                        messagemap[messageid] = true;                
                    }
                    
                    $.post('/removemessageidfromqueue',
                        {clientid: clientid,
                        messageid: messageid});
                    
                };
            },
            error: function () {
                $("#statusmessage").text("no connection...");
                setTimeout(setupChannel,
                       INTERVAL_DELAY_OPEN_CONNECTION);
            }
        });
    };

    setupChannel();    
});

Message Sequence Counter

To ensure the sequence of messages, we will be utilizing a sequence counter for each of the client connected to the server. This counter is to be integrated with the client manager.

The value of the counter is to be stored as a property of the ClientId datastore model used by the ClientManager class. The property name is 'sequencecount'.

class ClientId(db.Model):
    clientid = db.StringProperty()
    createdate = db.DateTimeProperty(auto_now_add=True)
    messagequeue = db.StringListProperty(default=[])
    sequencecount = db.IntegerProperty(default=0)

In the ClientManager class, we modify the 'add' method to integrate the initialization and setting of the 'sequencecount' property. We renamed the property to 'add_or_resetsequence'.

    def add_or_resetsequence(self, clientid, startsequence=0):
        ct_k = db.Key.from_path('ClientId', clientid)
        obj = db.get(ct_k)
        if not db.get(ct_k):
            ct = ClientId(key_name=clientid,
                          clientid=clientid,
                          sequencecount=startsequence)
            ct.put()
        else:
            obj.sequencecount = startsequence
            obj.put()

The method checks first if an entry for the passed 'clientid' exists. If it does not exists, a new entry is created and the sequence count property is set to the value of the 'startsequence' parameter. If it exists, the 'sequencecount' property of entry is updated to the value of the 'startsequence' parameter.

Every time a message is to be sent to the client, a sequence number is requested based on the value of the 'sequencecount' property. Once a sequence count is already used by a message, the 'sequencecount' property should be incremented so that the message would get the consecutive value. This value would be used in the client to process messages in the proper order. We add the 'request_sequencecount' method to the 'ClientManager' class, to accomplish this.

    def request_sequencecount(self, clientid):
        sequencecount = db.run_in_transaction(request_sequence_count,
                                              clientid)
        
        return sequencecount

In the 'request_sequencecount' method, not much is done as the actual operations are done in a function, 'request_sequence_count'. This is necessary to perform all the operations as one transaction.

def request_sequence_count(clientid):
    ci_k = db.Key.from_path('ClientId', clientid)
    ci = db.get(ci_k)
    
    if ci:
        count = ci.sequencecount
        ci.sequencecount += 1
        ci.put()
    
        return count

As a reference, below is the complete listing for the 'clientmanager.py' file.


from google.appengine.ext import db
from google.appengine.ext import webapp

import logging

class ClientId(db.Model):
    clientid = db.StringProperty()
    createdate = db.DateTimeProperty(auto_now_add=True)
    messagequeue = db.StringListProperty(default=[])
    sequencecount = db.IntegerProperty(default=0)

def request_sequence_count(clientid):
    ci_k = db.Key.from_path('ClientId', clientid)
    ci = db.get(ci_k)
    
    if ci:
        count = ci.sequencecount
        ci.sequencecount += 1
        ci.put()
    
        return count
        
class ClientManager(object):
    """
    manager for client channel ids
    """
    
    def add_or_resetsequence(self, clientid, startsequence=0):
        ct_k = db.Key.from_path('ClientId', clientid)
        obj = db.get(ct_k)
        if not db.get(ct_k):
            ct = ClientId(key_name=clientid,
                          clientid=clientid,
                          sequencecount=startsequence)
            ct.put()
        else:
            obj.sequencecount = startsequence
            obj.put()
            
    def remove(self, clientid):
        ct_k = db.Key.from_path('ClientId', clientid)
        ct = db.get(ct_k)
        if ct:
            ct.delete()
            
    def clientids(self):
        ct = ClientId().all()
        return [o.clientid for o in ct]
        
    def check_clientid(self, clientid):
        ct_k = db.Key.from_path('ClientId', clientid)
        ct = db.get(ct_k)
        if ct:
            return True
        else:
            return False
        
    def add_messageid(self, clientid, messageid):
        ci_k = db.Key.from_path('ClientId', clientid)
        ci = db.get(ci_k)
        if ci and messageid not in ci.messagequeue:
            ci.messagequeue = ci.messagequeue + [messageid]
            ci.put()
            
    def remove_messageid(self, clientid, messageid):
        ci_k = db.Key.from_path('ClientId', clientid)
        ci = db.get(ci_k)
        if ci and messageid in ci.messagequeue:
            l = ci.messagequeue
            l.remove(messageid)
            ci.messagequeue = l
            ci.put()
            
    def check_messageid(self, clientid, messageid):
        ci_k = db.Key.from_path('ClientId', clientid)
        ci = db.get(ci_k)
        if messageid in ci.messagequeue:
            return True
        else:
            return False
            
    def request_sequencecount(self, clientid):
        sequencecount = db.run_in_transaction(request_sequence_count,
                                              clientid)
        
        return sequencecount
        
class ClientDisconnectHandler(webapp.RequestHandler):
    def post(self):
        clientid = self.request.get('from')
        
        logging.info("Client %s disconnected"%clientid)
        
        cm = ClientManager()
        cm.remove(clientid)

Using the Sequence Counter

In the 'main.py' file, we integrate the use of the sequence counter in sending messages. We modify two request handler classes to do this.

class GetTokenHandler(webapp.RequestHandler):
    def get(self):
        uniqueid = self.request.cookies.get('uid')
        token = channel.create_channel(uniqueid)
        
        # calculate a suitable start sequence value based on time
        td = datetime.datetime.now() - datetime.datetime(year=2011, month=1, day=1)
        startsequence = (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / 10**3
        cm = ClientManager()
        
        cm.add_or_resetsequence(uniqueid, startsequence=startsequence)
        
        self.response.headers['Content-Type'] = 'text/json'
        self.response.out.write(json.dumps(dict(token=token,
                                                startsequence=startsequence)))

In the 'GetTokenHandler', we simply calculate a suitable start value for the sequence and use that to initialize or reset the sequence counter using the method, 'add_or_resetsequence', which we discussed in the previous section. We also added to the JSON to be return to the client the value of 'startsequence' so that it could update the sequence counter on the client side.

class SendMessagesWorkerHandler(webapp.RequestHandler):
    def post(self):
        message = json.loads(self.request.get('message'))
        messageid = self.request.get('messageid')
        
        cm = ClientManager()
        
        for clientid in cm.clientids():
            try:
                sequencecount = cm.request_sequencecount(clientid)
                if sequencecount:
                    jsonstr = json.dumps(dict(message,
                                              clientid=clientid,
                                              sequence=sequencecount))
                    taskqueue.add(url='/workers/senditem',
                                  params={'clientid': clientid,
                                          'message': jsonstr,
                                          'count': 0,
                                          'messageid': messageid,
                                          'sequence': sequencecount},
                                  name="SendItem-%s-%s"%(messageid, clientid))
            except (taskqueue.TaskAlreadyExistsError,
                    taskqueue.TombstonedTaskError):
                pass

In the 'SendMessagesWorkHandler', we request a value from the sequence counter. The sequence count value requested is then integrated into the JSON message to be sent to the client.

Client-side Sequencing

Most of the work is done in the client-side. In the client-side, messages received are checked against a sequence counter that is synchronized to the sequence counter in the server upon the opening of a connection to the server. Messages that are received have a sequence parameter with an integer value. This value is compared to the value of the sequence counter. If the value is equal to the current value of the sequence counter then the message is processed immediately, added to the list right away, and the sequence counter is incremented. If the value is less than the current value of the sequence counter then the message is thrown away as it is a stale message, If the value is greater than the current value of the sequence counter then the message is added to a list to be processed later on once the value of the sequence counter becomes equal to its sequence parameter. This seems to be quite complicated at first but becomes simple eventually.

  var setupChannel = function () {
    console.info("trying to open connection");
    $("#statusmessage").text("opening connection");
    
    $.ajax('/gettoken', {
      type: 'GET',
      dataType: 'json',
      success: function (data) {
        var channel = new goog.appengine.Channel(data.token);
        var socket = channel.open();
        
        var messagemap = {};
        var defferedmessages = {};
        
        socket.onopen = function () {
          console.info("connection opened");
          $("#statusmessage").text("");
        };
        
        socket.onclose = function () {
          console.info("connection closed");
        };
        
        socket.onerror = function () {
          console.info("connection error");
          $("#statusmessage").text("no connection...");
          setTimeout(setupChannel,
               INTERVAL_DELAY_OPEN_CONNECTION);
        };
        
        var processmessage = function (messageobj) {
          var messageid = messageobj.messageid;
          
          if (typeof messagemap.messageid === 'undefined') {
            $("#mainlist").trigger('prependitem', [messageobj]);
          }
          
          messagemap[messageid] = true;
          
        };
        
        var SequenceManager = {
          sequencecount: 0,
          deferredmessages: {},
          increment: function () {
            this.sequencecount += 1;
            this.process_defferedmessages();
          },
          reset: function (startsequence) {
            this.sequencecount = startsequence;
            this.deferredmessages = {};
          },
          push_defferedmessage: function (m) {
            this.deferredmessages[m.sequence] = m;
          },
          process_defferedmessages: function () {
            if (this.deferredmessages[this.sequencecount]) {
              processmessage(this.deferredmessages[this.sequencecount]);
              delete this.deferredmessages[this.sequencecount];
              this.sequencecount += 1;
              this.process_defferedmessages();
            }
          }
        };
        
        SequenceManager.reset(data.startsequence);
        console.info("Set sequence: " + data.startsequence);
        
        socket.onmessage = function (message) {
          console.info("message received " + message.data);
          var d = $.parseJSON(message.data);
          
          var clientid = d.clientid;
          var messageid = d.messageid;
          var message_seq = d.sequence;
          console.info("Sequence count: " + SequenceManager.sequencecount);
          
          if (message_seq == SequenceManager.sequencecount) {
            processmessage(d);
            SequenceManager.increment();
          }
          else if (message_seq > SequenceManager.sequencecount){
            SequenceManager.push_defferedmessage(d);
          }
          $.post('/removemessageidfromqueue',
            {clientid: clientid, messageid: messageid});
          
        };
      },
      error: function () {
        $("#statusmessage").text("no connection...");
        setTimeout(setupChannel,
             INTERVAL_DELAY_OPEN_CONNECTION);
      }
    });
  };