Thursday, September 1, 2011

Project: Basic Web Application List with Real-time Updating Part 7 (Ensure Sequence of Messages)

Introduction

In this post, we would try to ensure that the messages are processed in the right order. Because of unpredictable network conditions, Channel API messages cannot be assured to arrive in the right order. To cover up for this, the client and server should implement a sequencing system to ensure that messages are processed properly.

Improved main.js

I made some improvements to the main.js script by using JShint. The resulting code is as follows.

$(function () {
    var INTERVAL_DELAY_OPEN_CONNECTION = 10000;
    
    $("#itemform").dialog({
        autoOpen: false,
        modal: true,
        width: 450,
        buttons: {
            "Save": function() {
                var itemtitle = $("#itemformmain input[name='itemtitle']").val();
                var itemtext = $("#itemformmain textarea[name='itemtext']").val();
                var that = this;
                $.post('/save',
                       {itemtitle: itemtitle, itemtext: itemtext},
                       function () {
                    $(that).dialog("close");
                       }, 'json');
            }, 
            "Cancel": function() { 
                $(this).dialog("close"); 
            } 
        }
    });
    
    $("input[name='additem']").bind('click',
        function () {
            $("#itemform").dialog("open");
        }
    );

        $("#mainlist").evently({
        _init: {
            async: function (cb) {
                $.getJSON('/list',
                    function (data) {
                        cb(data);
                    });
            },
            data: function (data) {
                return {
                    items: data.rows    
                };
            },
            mustache: 
                '{{#items}}' +
                '<div class="itementry">' +
                '    <h2>{{title}}</h2>' +
                '    <div>' +
                '        {{bodytext}}' +
                '    </div>' +
                '</div>' +
                '{{/items}}'
        },
                prependitem: {
            data: function (e, data) {
                return {
                    items: data.rows    
                };
            },
            mustache:
                '{{#items}}' +
                '<div class="itementry">' +
                '    <h2>{{title}}</h2>' +
                '    <div>' +
                '        {{bodytext}}' +
                '    </div>' +
                '</div>' +
                '{{/items}}',
            render: 'prepend'
        }
    });

    var setupChannel = function () {
        console.info("trying to open connection");
        $("#statusmessage").text("opening connection");
        
        var messagemap = {};
        $.ajax('/gettoken', {
            type: 'GET',
            dataType: 'json',
            success: function (data) {
                var channel = new goog.appengine.Channel(data.token);
                var socket = channel.open();
                
                socket.onopen = function () {
                    console.info("connection opened");
                    $("#statusmessage").text("");
                };
                
                socket.onclose = function () {
                    console.info("connection closed");
                };
                
                socket.onerror = function () {
                    console.info("connection error");
                    $("#statusmessage").text("no connection...");
                    setTimeout(setupChannel,
                           INTERVAL_DELAY_OPEN_CONNECTION);
                };
                
                socket.onmessage = function (message) {
                    console.info("message received " + message.data);
                    var d = $.parseJSON(message.data);
                    
                    var clientid = d.clientid;
                    var messageid = d.messageid;
                    
                    if (! messagemap[messageid]) {
                        $("#mainlist").trigger('prependitem',
                                       [d]);
                        messagemap[messageid] = true;                
                    }
                    
                    $.post('/removemessageidfromqueue',
                        {clientid: clientid,
                        messageid: messageid});
                    
                };
            },
            error: function () {
                $("#statusmessage").text("no connection...");
                setTimeout(setupChannel,
                       INTERVAL_DELAY_OPEN_CONNECTION);
            }
        });
    };

    setupChannel();    
});

Message Sequence Counter

To ensure the sequence of messages, we will be utilizing a sequence counter for each of the client connected to the server. This counter is to be integrated with the client manager.

The value of the counter is to be stored as a property of the ClientId datastore model used by the ClientManager class. The property name is 'sequencecount'.

class ClientId(db.Model):
    clientid = db.StringProperty()
    createdate = db.DateTimeProperty(auto_now_add=True)
    messagequeue = db.StringListProperty(default=[])
    sequencecount = db.IntegerProperty(default=0)

In the ClientManager class, we modify the 'add' method to integrate the initialization and setting of the 'sequencecount' property. We renamed the property to 'add_or_resetsequence'.

    def add_or_resetsequence(self, clientid, startsequence=0):
        ct_k = db.Key.from_path('ClientId', clientid)
        obj = db.get(ct_k)
        if not db.get(ct_k):
            ct = ClientId(key_name=clientid,
                          clientid=clientid,
                          sequencecount=startsequence)
            ct.put()
        else:
            obj.sequencecount = startsequence
            obj.put()

The method checks first if an entry for the passed 'clientid' exists. If it does not exists, a new entry is created and the sequence count property is set to the value of the 'startsequence' parameter. If it exists, the 'sequencecount' property of entry is updated to the value of the 'startsequence' parameter.

Every time a message is to be sent to the client, a sequence number is requested based on the value of the 'sequencecount' property. Once a sequence count is already used by a message, the 'sequencecount' property should be incremented so that the message would get the consecutive value. This value would be used in the client to process messages in the proper order. We add the 'request_sequencecount' method to the 'ClientManager' class, to accomplish this.

    def request_sequencecount(self, clientid):
        sequencecount = db.run_in_transaction(request_sequence_count,
                                              clientid)
        
        return sequencecount

In the 'request_sequencecount' method, not much is done as the actual operations are done in a function, 'request_sequence_count'. This is necessary to perform all the operations as one transaction.

def request_sequence_count(clientid):
    ci_k = db.Key.from_path('ClientId', clientid)
    ci = db.get(ci_k)
    
    if ci:
        count = ci.sequencecount
        ci.sequencecount += 1
        ci.put()
    
        return count

As a reference, below is the complete listing for the 'clientmanager.py' file.


from google.appengine.ext import db
from google.appengine.ext import webapp

import logging

class ClientId(db.Model):
    clientid = db.StringProperty()
    createdate = db.DateTimeProperty(auto_now_add=True)
    messagequeue = db.StringListProperty(default=[])
    sequencecount = db.IntegerProperty(default=0)

def request_sequence_count(clientid):
    ci_k = db.Key.from_path('ClientId', clientid)
    ci = db.get(ci_k)
    
    if ci:
        count = ci.sequencecount
        ci.sequencecount += 1
        ci.put()
    
        return count
        
class ClientManager(object):
    """
    manager for client channel ids
    """
    
    def add_or_resetsequence(self, clientid, startsequence=0):
        ct_k = db.Key.from_path('ClientId', clientid)
        obj = db.get(ct_k)
        if not db.get(ct_k):
            ct = ClientId(key_name=clientid,
                          clientid=clientid,
                          sequencecount=startsequence)
            ct.put()
        else:
            obj.sequencecount = startsequence
            obj.put()
            
    def remove(self, clientid):
        ct_k = db.Key.from_path('ClientId', clientid)
        ct = db.get(ct_k)
        if ct:
            ct.delete()
            
    def clientids(self):
        ct = ClientId().all()
        return [o.clientid for o in ct]
        
    def check_clientid(self, clientid):
        ct_k = db.Key.from_path('ClientId', clientid)
        ct = db.get(ct_k)
        if ct:
            return True
        else:
            return False
        
    def add_messageid(self, clientid, messageid):
        ci_k = db.Key.from_path('ClientId', clientid)
        ci = db.get(ci_k)
        if ci and messageid not in ci.messagequeue:
            ci.messagequeue = ci.messagequeue + [messageid]
            ci.put()
            
    def remove_messageid(self, clientid, messageid):
        ci_k = db.Key.from_path('ClientId', clientid)
        ci = db.get(ci_k)
        if ci and messageid in ci.messagequeue:
            l = ci.messagequeue
            l.remove(messageid)
            ci.messagequeue = l
            ci.put()
            
    def check_messageid(self, clientid, messageid):
        ci_k = db.Key.from_path('ClientId', clientid)
        ci = db.get(ci_k)
        if messageid in ci.messagequeue:
            return True
        else:
            return False
            
    def request_sequencecount(self, clientid):
        sequencecount = db.run_in_transaction(request_sequence_count,
                                              clientid)
        
        return sequencecount
        
class ClientDisconnectHandler(webapp.RequestHandler):
    def post(self):
        clientid = self.request.get('from')
        
        logging.info("Client %s disconnected"%clientid)
        
        cm = ClientManager()
        cm.remove(clientid)

Using the Sequence Counter

In the 'main.py' file, we integrate the use of the sequence counter in sending messages. We modify two request handler classes to do this.

class GetTokenHandler(webapp.RequestHandler):
    def get(self):
        uniqueid = self.request.cookies.get('uid')
        token = channel.create_channel(uniqueid)
        
        # calculate a suitable start sequence value based on time
        td = datetime.datetime.now() - datetime.datetime(year=2011, month=1, day=1)
        startsequence = (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / 10**3
        cm = ClientManager()
        
        cm.add_or_resetsequence(uniqueid, startsequence=startsequence)
        
        self.response.headers['Content-Type'] = 'text/json'
        self.response.out.write(json.dumps(dict(token=token,
                                                startsequence=startsequence)))

In the 'GetTokenHandler', we simply calculate a suitable start value for the sequence and use that to initialize or reset the sequence counter using the method, 'add_or_resetsequence', which we discussed in the previous section. We also added to the JSON to be return to the client the value of 'startsequence' so that it could update the sequence counter on the client side.

class SendMessagesWorkerHandler(webapp.RequestHandler):
    def post(self):
        message = json.loads(self.request.get('message'))
        messageid = self.request.get('messageid')
        
        cm = ClientManager()
        
        for clientid in cm.clientids():
            try:
                sequencecount = cm.request_sequencecount(clientid)
                if sequencecount:
                    jsonstr = json.dumps(dict(message,
                                              clientid=clientid,
                                              sequence=sequencecount))
                    taskqueue.add(url='/workers/senditem',
                                  params={'clientid': clientid,
                                          'message': jsonstr,
                                          'count': 0,
                                          'messageid': messageid,
                                          'sequence': sequencecount},
                                  name="SendItem-%s-%s"%(messageid, clientid))
            except (taskqueue.TaskAlreadyExistsError,
                    taskqueue.TombstonedTaskError):
                pass

In the 'SendMessagesWorkHandler', we request a value from the sequence counter. The sequence count value requested is then integrated into the JSON message to be sent to the client.

Client-side Sequencing

Most of the work is done in the client-side. In the client-side, messages received are checked against a sequence counter that is synchronized to the sequence counter in the server upon the opening of a connection to the server. Messages that are received have a sequence parameter with an integer value. This value is compared to the value of the sequence counter. If the value is equal to the current value of the sequence counter then the message is processed immediately, added to the list right away, and the sequence counter is incremented. If the value is less than the current value of the sequence counter then the message is thrown away as it is a stale message, If the value is greater than the current value of the sequence counter then the message is added to a list to be processed later on once the value of the sequence counter becomes equal to its sequence parameter. This seems to be quite complicated at first but becomes simple eventually.

  var setupChannel = function () {
    console.info("trying to open connection");
    $("#statusmessage").text("opening connection");
    
    $.ajax('/gettoken', {
      type: 'GET',
      dataType: 'json',
      success: function (data) {
        var channel = new goog.appengine.Channel(data.token);
        var socket = channel.open();
        
        var messagemap = {};
        var defferedmessages = {};
        
        socket.onopen = function () {
          console.info("connection opened");
          $("#statusmessage").text("");
        };
        
        socket.onclose = function () {
          console.info("connection closed");
        };
        
        socket.onerror = function () {
          console.info("connection error");
          $("#statusmessage").text("no connection...");
          setTimeout(setupChannel,
               INTERVAL_DELAY_OPEN_CONNECTION);
        };
        
        var processmessage = function (messageobj) {
          var messageid = messageobj.messageid;
          
          if (typeof messagemap.messageid === 'undefined') {
            $("#mainlist").trigger('prependitem', [messageobj]);
          }
          
          messagemap[messageid] = true;
          
        };
        
        var SequenceManager = {
          sequencecount: 0,
          deferredmessages: {},
          increment: function () {
            this.sequencecount += 1;
            this.process_defferedmessages();
          },
          reset: function (startsequence) {
            this.sequencecount = startsequence;
            this.deferredmessages = {};
          },
          push_defferedmessage: function (m) {
            this.deferredmessages[m.sequence] = m;
          },
          process_defferedmessages: function () {
            if (this.deferredmessages[this.sequencecount]) {
              processmessage(this.deferredmessages[this.sequencecount]);
              delete this.deferredmessages[this.sequencecount];
              this.sequencecount += 1;
              this.process_defferedmessages();
            }
          }
        };
        
        SequenceManager.reset(data.startsequence);
        console.info("Set sequence: " + data.startsequence);
        
        socket.onmessage = function (message) {
          console.info("message received " + message.data);
          var d = $.parseJSON(message.data);
          
          var clientid = d.clientid;
          var messageid = d.messageid;
          var message_seq = d.sequence;
          console.info("Sequence count: " + SequenceManager.sequencecount);
          
          if (message_seq == SequenceManager.sequencecount) {
            processmessage(d);
            SequenceManager.increment();
          }
          else if (message_seq > SequenceManager.sequencecount){
            SequenceManager.push_defferedmessage(d);
          }
          $.post('/removemessageidfromqueue',
            {clientid: clientid, messageid: messageid});
          
        };
      },
      error: function () {
        $("#statusmessage").text("no connection...");
        setTimeout(setupChannel,
             INTERVAL_DELAY_OPEN_CONNECTION);
      }
    });
  };

Thursday, August 18, 2011

Project: Basic Web Application List with Real-time Updating Part 6 (More Robust Connection Between Client and Server)

Introduction

In the last post, we made the sending of messages robust by setting up a simple client acknowledgement system and message resending system. To further improve the robustness of the application, in this blog post, we will add to the client-side a way for it to reconnect to the server when it is disconnected. The client can be disconnected due to network issues or even the expiration of the client token.

Preliminaries: Add html Container for Displaying Connection Status

It would be easier to appreciate the connection and reconnection of the client to the server if there is a way for the user to see the status of the connection. This would also improve the usability of the application as it will indicate to the user when the client is disconnected to the server.

This will involve simple changes to the main.html template and the main.css files.

{% extends "base.html" %}

{% block header %}
<div id="statusmessage">
</div>
{% endblock %}

{% block content %}
<div id="itemform" title="Item Form">
  <form id ="itemformmain" action="/save" method="post">
    <ul>
      <li>
        <div class="formlabel">
          <label for="itemtitle">Title</label>
        </div>
        <div class="formwidget">
          <input name="itemtitle"
                 type="text"
                 placeholder="Place Title Here"
                 size="50"
                 required>
        </div>
      </li>
      <li>
        <div class="formlabel">
          <label for="itemtext">Main Text</label>
        </div>
        <div class="formwidget">
          <textarea name="itemtext"
                    cols="50"
                    rows="5"
                    required></textarea>
        </div>
      </li>
    </ul>
  </form>
</div>
<input type="button" name="additem" value="Add New Item"/>
<div id="mainlist">
</div>
{% endblock %}

{% block addscript %}
<script src="/js/main.js"></script>
{% endblock %}

In the 'main.html' file, we inserted a div element inside the header block.

{% block header %}
<div id="statusmessage">
</div>
{% endblock %}

In the 'main.css' file, we added some style directives for the statusmessage div container we added in 'main.html'.

body {
    font-size: 10px;
}

#itemform ul {
    padding-left: 0;
}

#itemform li {
    display: block;
}

#itemform .formlabel {
    float: left;
    text-align: right;
    width: 60px;
}

#itemform .formwidget {
    margin-left: 70px;
}

#statusmessage {
    text-align: center;
    font-size: 12px;
}

Adding the Reconnection Logic

The client reconnection logic is accomplished in the main Javascript file, 'main.js'.

$(function () {
    var INTERVAL_DELAY_OPEN_CONNECTION = 10000;
    
    // some code omitted

    var setupChannel = function () {
        console.info("trying to open connection");
        $("#statusmessage").text("opening connection");
        
        var messagemap = {};
        $.ajax('/gettoken', {
            type: 'GET',
            dataType: 'json',
            success: function (data) {
                var channel = new goog.appengine.Channel(data['token']);
                var socket = channel.open();
                
                socket.onopen = function () {
                    console.info("connection opened");
                    $("#statusmessage").text("");
                };
                
                socket.onclose = function () {
                    console.info("connection closed");
                };
                
                socket.onerror = function () {
                    console.info("connection error");
                    $("#statusmessage").text("no connection...");
                    setTimeout(setupChannel,
                           INTERVAL_DELAY_OPEN_CONNECTION);
                };
                
                socket.onmessage = function (message) {
                    console.info("message received " \
                             + message.data);
                    var d = $.parseJSON(message.data);
                    
                    var clientid = d['clientid'];
                    var messageid = d['messageid'];
                    
                    if (! messagemap[messageid]) {
                        $("#mainlist").trigger('prependitem',
                                       [d]);
                        messagemap[messageid] = true;                
                    }
                    
                    $.post('/removemessageidfromqueue',
                        {clientid: clientid,
                        messageid: messageid});
                    
                }
            },
            error: function () {
                $("#statusmessage").text("no connection...");
                setTimeout(setupChannel,
                       INTERVAL_DELAY_OPEN_CONNECTION);
            }
        });
    };

    setupChannel();    
});

The changes in the above code is the addition of the INTERVAL_DELAY_OPEN_CONNECTION variable at the top and a minor rewrite of the setupChannel function.

Notice in the setupChannel function, we replaced the '$.getJSON' function call to '$.ajax'. The reason for this is that '$.ajax' allows more options. This allowed us to put an error handler function in case the AJAX request fails; this is also possible with '$.getJSON' but is a lot harder. Another reason for this is that according tot he yayquery podcast, the use '$.ajax' is a better practice than using helper functions such as '$.getJSON'.

When the client is disconnected to the server, the socket.onerror callback function is called. Thus to reconnect to the server, code fro reconnecting to the server must be added to the callback function.

$("#statusmessage").text("no connection...");
                    setTimeout(setupChannel,
                           INTERVAL_DELAY_OPEN_CONNECTION);

In the callback function, the statusmessage div container is first set to the contain the text, "no connection...", to indicate to the user that the connection to the server is not available. Then delayed call to the setupChannel function is setup. The delay is indicated by the value of INTERVAL_DELAY_OPEN_CONNECTION variable. We set a short delay so as to allow the network to stabilize first in case that the disconnection is caused by a temporary network problem.

It is not avoidable that an attempt by the client to reconnect to the server fails. This may happen when the network connection has been totally loss or a temporary network problem takes longer to be resolved. An indication of this problem is the inability of the client to receive a response from a request to the path, '/gettoken'. Thus our AJAX request to '/gettoken' would fail and the error callback function of the $.ajax would be called.

error: function () {
                $("#statusmessage").text("no connection...");
                setTimeout(setupChannel,
                       INTERVAL_DELAY_OPEN_CONNECTION);
            }

The error callback function has similar code to the socket.onerror callback function, meaning it will try again to reconnect to the server after a short delay. If the server remains to be accessible, the client would end up to keep on trying to reconnect after an interval of time. In the case that the server becomes accessible a new connection is established to the server.

Testing the Reconnect Functionality

To test the above logic, I run the App Engine application using the SDK and bound it to the address 0.0.0.0 to make it accessible from another machine.

$ python2.5 ./dev_appserver.py -a 0.0.0.0 basicrtlist/

I then accessed the application from a browser running inside an Ubuntu VM. To simulate the loss of connection, I manually disconnected and reconnected the Ubuntu VM from the virtual network.

Next Post

In the next post, we would cover the case of the client being disconnected from the server for quite sometime that it has already failed to receive some messages from the server. Simply reconnecting to the server does not mean that the client's list of items would be automatically updated to the latest state.  Thus a mechanism should be established to allow the client to update its list to the latest state. This is what is going to be covered in the next post.

Friday, August 12, 2011

Project: Basic Web Application List with Real-time Updating (Part 5: More Robust Message Sending)

Use Task Queues for Message Sending

The first thing we'll do in this blog post to make the sending of messages robust is to use Task Queues. So why use Task Queues? Task Queues is a mechanism which allows us to relegate the sending of messages to a background process. Instead of 'SaveHandler' sending the messages to the clients, which might take a long time depending on the number of clients connected, we will relegate the sending of messages to a task allowing the request handler to return immediately.


#!/usr/bin/env python


from google.appengine.ext import webapp
from google.appengine.ext.webapp import util
from google.appengine.api import channel
from google.appengine.api import taskqueue


import os
from google.appengine.ext.webapp import template
from django.utils import simplejson as json


import uuid
import datetime
import Cookie
import logging


from model import *
from clientmanager import *


# some code omitted


class SaveHandler(webapp.RequestHandler):
    def post(self):
        itemtitle = self.request.get('itemtitle')
        itemtext = self.request.get('itemtext')
        
        self.response.headers['Content-Type'] = 'text/json'
        
        if itemtext and itemtitle:
            item = Item(title=itemtitle, bodytext=itemtext)
            item.put()


            jsonstr = json.dumps({'rows': [{'title': itemtitle, 'bodytext': itemtext},]})
            taskqueue.add(url='/workers/sendmessages', params={'message': jsonstr})


            self.response.out.write(json.dumps({'result': 'success'}))
        else:
            self.response.out.write(json.dumps({'result': 'failure'}))
            
# some code omitted
        
class SendMessagesWorkerHandler(webapp.RequestHandler):
    def post(self):
        message = self.request.get('message')
        
        cm = ClientManager()
        
        for clientid in cm.clientids():
            taskqueue.add(url='/workers/senditem', params={'clientid': clientid, 'message': message})


class SendItemWorkerHandler(webapp.RequestHandler):
    def post(self):
        clientid = self.request.get('clientid')
        message = json.loads(self.request.get('message'))
        
        jsonstr = json.dumps(message)
        channel.send_message(clientid, jsonstr)


def main():
    application = webapp.WSGIApplication([('/', MainHandler),
                                         ('/save', SaveHandler),
                                         ('/gettoken', GetTokenHandler),
                                         ('/_ah/channel/disconnected/', ClientDisconnectHandler),
                                         ('/workers/senditem', SendItemWorkerHandler),
                                         ('/workers/sendmessages', SendMessagesWorkerHandler),
                                         ('/list', ListHandler),],
                                         debug=True)
    util.run_wsgi_app(application)




if __name__ == '__main__':
    main()

Highlighted above are the changes to main.py to accomplish the relegation of sending messages to a task.

In the 'SaveHandler', the code for sending messages to all connected clients is simplified to just one statement.

taskqueue.add(url='/workers/sendmessages', params={'message': jsonstr})

This statement adds a task to a queue which is responsible for sending the messages. It will be executed by a background process and thus allowing the 'SaveHandler' to return immediately. This fixes the possibility of the 'SaveHandler' taking a long time to return because it has to send messages to many clients. What is added to the taskqueue is a url and some parameters. The Task Queue system would later make a POST request to the url along with the parameters.

The handler for the path, '/workers/sendmessages', is the 'SendMessagesWorkerHandler'. The 'SendMessagesWorkerHandler' is just like a regular request handler, but it is designed to be called by the Task Queue system rather than by a web client. 'SendMessagesWorkerHandler' simply loops through all the connected client IDs and once again relegates the task of the actual sending of messages to another task.

So why so many tasks and not just execute everything just at once? It would really be a lot easier to just execute everything at once. However, this makes our application inefficient in utilizing the server resources and thus may cause problems later on when we scale our application. The advantage of using task queues is that we are able to give the server a chance to schedule tasks to make the utilization of resources more efficient. The disadvantage is that we need to make adjustments on how we think about the execution of our code.

Making Sure Messages are Sent

The Channel API does not assure that any messages sent would be able to reach the client. There are many issues involved such as network problems. In addition, the Channel API does not have a notification system when a sending of a message fails. Thus, additional work is necessary to make the sending of messages more robust.

To make the sending of messages more robust, we will be keeping track of all the messages sent to each client and we will also implement on the client-side a simple acknowledgement that a message is received. As long as the server does not yet receive an acknowledgement from the client, the server would periodically retry sending the same message to the client.


from google.appengine.ext import db
from google.appengine.ext import webapp


import logging


class ClientId(db.Model):
    clientid = db.StringProperty()
    createdate = db.DateTimeProperty(auto_now_add=True)


    messagequeue = db.StringListProperty(default=[])
    #stores the messageid of all pending messages sent to the client
    
class ClientManager(object):
    """
    manager for client channel ids
    """
    
    def add(self, clientid):
        ct_k = db.Key.from_path('ClientId', clientid)
        if not db.get(ct_k):
            ct = ClientId(key_name=clientid, clientid=clientid)
            ct.put()
            
    def remove(self, clientid):
        ct_k = db.Key.from_path('ClientId', clientid)
        ct = db.get(ct_k)
        if ct:
            ct.delete()
            
    def clientids(self):
        ct = ClientId().all()
        return [o.clientid for o in ct]
        
    def check_clientid(self, clientid):
        """
        helper method to check whether a clientid is connected
        """
        ct_k = db.Key.from_path('ClientId', clientid)
        ct = db.get(ct_k)
        if ct:
            return True
        else:
            return False
        
    def add_messageid(self, clientid, messageid):
        """
        add a messageid to the messagequeue
        """
        ci_k = db.Key.from_path('ClientId', clientid)
        ci = db.get(ci_k)
        if ci and messageid not in ci.messagequeue:
            ci.messagequeue = ci.messagequeue + [messageid]
            ci.put()
            
    def remove_messageid(self, clientid, messageid):

        """
        remove a messageid from the messagequeue
        """

        ci_k = db.Key.from_path('ClientId', clientid)
        ci = db.get(ci_k)
        if ci and messageid in ci.messagequeue:
            l = ci.messagequeue
            l.remove(messageid)
            ci.messagequeue = l
            ci.put()
            
    def check_messageid(self, clientid, messageid):

        """
        check if a messageid is in messagequeue
        """

        ci_k = db.Key.from_path('ClientId', clientid)
        ci = db.get(ci_k)
        if messageid in ci.messagequeue:
            return True
        else:
            return False
        
class ClientDisconnectHandler(webapp.RequestHandler):
    def post(self):
        clientid = self.request.get('from')
        
        logging.info("Client %s disconnected"%clientid)
        
        cm = ClientManager()
        cm.remove(clientid)

Highlighted above are the addition to the 'clientmanager.py' file. The above additions implements a simple message tracking system. For each message sent to the client, the message id of that client is added to a message queue or pending message list for that client. The message id is kept in the pending message list of the client up until an acknowledgement is received from the client.


!/usr/bin/env python


from google.appengine.ext import webapp
from google.appengine.ext.webapp import util
from google.appengine.api import channel
from google.appengine.api import taskqueue


import os
from google.appengine.ext.webapp import template
from django.utils import simplejson as json


import uuid
import datetime
import Cookie
import logging


from model import *
from clientmanager import *


#code omitted


class SaveHandler(webapp.RequestHandler):
    def post(self):
        itemtitle = self.request.get('itemtitle')
        itemtext = self.request.get('itemtext')
        
        self.response.headers['Content-Type'] = 'text/json'
        
        if itemtext and itemtitle:
            item = Item(title=itemtitle, bodytext=itemtext)
            item.put()
            
            messageid = str(uuid.uuid4())
            jsonstr = json.dumps({'rows': [{'title': itemtitle, 'bodytext': itemtext},],
                'messageid': messageid})
            taskqueue.add(url='/workers/sendmessages',
                          params={'message': jsonstr, 'messageid': messageid},
                          name="SendMessages-%s"%messageid)
            
            self.response.out.write(json.dumps({'result': 'success'}))
        else:
            self.response.out.write(json.dumps({'result': 'failure'}))
            
#code omitted
        
class SendMessagesWorkerHandler(webapp.RequestHandler):
    def post(self):
        message = json.loads(self.request.get('message'))
        messageid = self.request.get('messageid')
        
        cm = ClientManager()
        
        for clientid in cm.clientids():
            try:
                jsonstr = json.dumps(dict(message,
                                          clientid=clientid))
                taskqueue.add(url='/workers/senditem',
                              params={'clientid': clientid,
                                      'message': jsonstr,
                                      'count': 0,
                                      'messageid': messageid},
                              name="SendItem-%s-%s"%(messageid, clientid))
            except (taskqueue.TaskAlreadyExistsError,
                    taskqueue.TombstonedTaskError):
                pass




class SendItemWorkerHandler(webapp.RequestHandler):
    def post(self):
        clientid = self.request.get('clientid')
        message = self.request.get('message')
        messageid = self.request.get('messageid')
        
        countdown = 30 # number of seconds before resending message
        
        cm = ClientManager()
        count = int(self.request.get('count'))
        if count == 0: # first time the message is sent to the client
            try:
                taskqueue.add(url='/workers/senditem',
                              params={'clientid': clientid,
                                      'message': message,
                                      'messageid': messageid,
                                      'count': 1},
                              countdown=countdown,
                              name="SendItem-%s-%s-%s"%(clientid, messageid, 1))
                
                cm.add_messageid(clientid, messageid)
                channel.send_message(clientid, message)
            except (taskqueue.TaskAlreadyExistsError,
                    taskqueue.TombstonedTaskError):
                pass
        else:
            if cm.check_clientid(clientid) and cm.check_messageid(clientid, messageid):
                count = count + 1
                if count <= 5:
                    try:
                        taskqueue.add(url='/workers/senditem',
                                      params={'clientid': clientid,
                                              'message': message,
                                              'messageid': messageid,
                                              'count': count},
                                      countdown=countdown,
                                      name="SendItem-%s-%s-%s"%(clientid, messageid, count))
                        logging.info("Resending message %s"%message)
                        channel.send_message(clientid, message)
                    except (taskqueue.TaskAlreadyExistsError,
                            taskqueue.TombstonedTaskError):
                        pass
                else:
                    # after trying 5 times, assume that the client has disconnected
                    cm.remove(clientid)




class RemoveMessageIdFromQueueHandler(webapp.RequestHandler):
    def post(self):
        clientid = self.request.get('clientid')
        messageid = self.request.get('messageid')
        
        logging.info("messageid: %s, clientid: %s"%(messageid, clientid))
        
        if clientid and messageid:
            cm = ClientManager()
            cm.remove_messageid(clientid, messageid)


def main():
    application = webapp.WSGIApplication([('/', MainHandler),
                                         ('/save', SaveHandler),
                                         ('/gettoken', GetTokenHandler),
                                         ('/_ah/channel/disconnected/', ClientDisconnectHandler),
                                         ('/workers/senditem', SendItemWorkerHandler),
                                         ('/workers/sendmessages', SendMessagesWorkerHandler),
                                         ('/removemessageidfromqueue', RemoveMessageIdFromQueueHandler),
                                         ('/list', ListHandler),],
                                         debug=True)
    util.run_wsgi_app(application)




if __name__ == '__main__':
    main()


Highlighted above are the changes to 'main.py'.

In the 'SaveHandler', a unique identifier is generated for the message. This unique identifier is to be used for tracking the message. It is inserted as part of the message and passed as a parameter of the task. Another change is that a name was assigned to the task. This is a good practice and would be explained later.

In the 'SendMessagesWorkerHandler', the message identifier was passed to each send item task. It also added a count parameter. The count parameter is indicative of how many times a message has been sent to the client. Since the message has not yet been sent to the client, it is initialized to zero. Also a name was assigned to each task. This is done because it is a good practice and it prevents a similar task to be queued later.

A task in the App Engine Task Queue system is not assured to execute only once; it may execute multiple times. The reason for this is because the App Engine system is designed to be robust and scalable and this is one of the tradeoffs. Thus, a task must be designed so that it can be executed multiple times without any difference to executing it just once. In other words, our tasks should be idempotent.

The App Engine Task Queue system implements a task name tomb stoning system. This means that once a task with a particular name is executed, the name of that task is tomb stoned and a task with a similar name would not be executed later on. The name remains tomb stoned for a couple of days and would eventually be available. Thus naming tasks would prevent a similar task to be executed.

So for the case of the 'SendMessagesWorkHandler', even if it executes more than once, it will be adding to the queue, tasks with the same names. Thus it does not matter if the 'SendMessagesWorkHandler' is executed multiple times. This is the reason why it is a good practice to name tasks properly.

The heavy lifting is accomplished in the 'SendItemWorkHandler'. When a message is sent for the first time, the count parameter pass to the 'SendItemWorkHandler' has a value of zero and goes to the first branch of the main if statement.

        if count == 0: # first time the message is sent to the client
            try:
                taskqueue.add(url='/workers/senditem',
                              params={'clientid': clientid,
                                      'message': message,
                                      'messageid': messageid,
                                      'count': 1},
                              countdown=countdown,
                              name="SendItem-%s-%s-%s"%(clientid, messageid, 1))
                
                cm.add_messageid(clientid, messageid)
                channel.send_message(clientid, message)
            except (taskqueue.TaskAlreadyExistsError,
                    taskqueue.TombstonedTaskError):
                pass


In the above code, a new send item task is queued right away. The new tasks is queued right away so as to prevent the execution of other statements in this branch when a similar task has already been queued.  This situation may happen when the task has already been executed, thus this serves as a guard and makes task idempotent. This new task has basically the same parameters as the send item task in 'SendMessagesWorkerHandler'. One difference is that the count has been changed to one. There is also an additional argument to the add method, the countdown argument. The countdown argument indicates that this task is to be executed only after the indicated number of seconds. This is the reason why the count has the value one, it is because that by the time the new task is executed, the message has already been sent once.

So why the need to add another task? This will be clearer by discussing the else branch of the the main if statement.


          else:
            if cm.check_clientid(clientid) and cm.check_messageid(clientid, messageid):
                count = count + 1
                if count <= 5:
                    try:
                        taskqueue.add(url='/workers/senditem',
                                      params={'clientid': clientid,
                                              'message': message,
                                              'messageid': messageid,
                                              'count': count},
                                      countdown=countdown,
                                      name="SendItem-%s-%s-%s"%(clientid, messageid, count))
                        logging.info("Resending message %s"%message)
                        channel.send_message(clientid, message)
                    except (taskqueue.TaskAlreadyExistsError,
                            taskqueue.TombstonedTaskError):
                        pass
                else:
                    # after trying 5 times, assume that the client has disconnected
                    cm.remove(clientid)

So in the case that the message has already been sent, succeeding send item tasks will execute this branch of the code. The first statement is a check whether the client is still connected and whether the message is still in the pending message list of the client. This is done with the use of the client manager methods we defined earlier. This check is necessary because this code is going to be executed after 30 seconds. After 30 seconds, the client could already have disconnected or the message could no longer be in the pending message list and therefore, no need for a message resend. A message is removed from the pending message list by the handler for the client acknowledgement which will be discussed later. In the case that the message has not yet been removed from the pending list, the message is resent again and a new message send task is queue again with an incremented count parameter. The queued message send task is once again delayed so as to give the client enough time to acknowledge the receipt of the message. This is repeated up to 5 times and then it is assumed that the client has disconnected from the server and is thus removed from the client manager.

The client acknowledgement is done by the client making a post request to '/removemessageidfromqueue' path.


class RemoveMessageIdFromQueueHandler(webapp.RequestHandler):
    def post(self):
        clientid = self.request.get('clientid')
        messageid = self.request.get('messageid')
        
        if clientid and messageid:
            cm = ClientManager()
            cm.remove_messageid(clientid, messageid)

The handler for this path is shown above. It receives two parameters, clientid and messageid. This parameters identify a particular message to a particular client and are used to remove the message from the client's pending message list using the 'remove_messageid' method of the client manager. This is why in the 'SendItemWorkHandler', the messageid is checked if it is still in the pending message list as it is expected to be removed once the client has acknowledge the receipt of the message.

The message receipt acknowledgement is accomplished in the 'main.js' Javascript code.


$(function () {


// code omitted


var setupChannel = function () {
var messagemap = {};
$.getJSON('/gettoken', function (data) {
var channel = new goog.appengine.Channel(data['token']);
var socket = channel.open();

socket.onopen = function () {
console.info("connection opened");
};

socket.onclose = function () {
console.info("connection closed");
};

socket.onerror = function () {
console.info("connection error");
};

socket.onmessage = function (message) {
console.info("message received " + message.data);
var d = $.parseJSON(message.data);

var clientid = d['clientid'];
var messageid = d['messageid'];

if (! messagemap[messageid]) {
$("#mainlist").trigger('prependitem', [d]);
messagemap[messageid] = true;
}

$.post('/removemessageidfromqueue',
{clientid: clientid,
messageid: messageid});
};

});
};


setupChannel();
});

In the code above, the client receives the clientid and messageid parameters along with the message. These parameters are used to make a POST request that serves as an acknowledgement that the message has been received. In addition, some code has been added to prevent the multiple insertion of the of the same message to the list. Because there already exist the possibility of sending a message multiple times, it is possible that a message might be received multiple times also. To prevent this, we use a simple mapping object named 'messagemap' to take note of received messages. In the case, that a message has already been received, the message is simply ignored by the client.

Next Post

In the next post, I will try make the client-server connection more robust.