Use Task Queues for Message Sending
The first thing we'll do in this blog post to make the sending of messages robust is to use Task Queues. So why use Task Queues? Task Queues is a mechanism which allows us to relegate the sending of messages to a background process. Instead of 'SaveHandler' sending the messages to the clients, which might take a long time depending on the number of clients connected, we will relegate the sending of messages to a task allowing the request handler to return immediately.
#!/usr/bin/env python
from google.appengine.ext import webapp
from google.appengine.ext.webapp import util
from google.appengine.api import channel
from google.appengine.api import taskqueue
import os
from google.appengine.ext.webapp import template
from django.utils import simplejson as json
import uuid
import datetime
import Cookie
import logging
from model import *
from clientmanager import *
# some code omitted
class SaveHandler(webapp.RequestHandler):
def post(self):
itemtitle = self.request.get('itemtitle')
itemtext = self.request.get('itemtext')
self.response.headers['Content-Type'] = 'text/json'
if itemtext and itemtitle:
item = Item(title=itemtitle, bodytext=itemtext)
item.put()
jsonstr = json.dumps({'rows': [{'title': itemtitle, 'bodytext': itemtext},]})
taskqueue.add(url='/workers/sendmessages', params={'message': jsonstr})
self.response.out.write(json.dumps({'result': 'success'}))
else:
self.response.out.write(json.dumps({'result': 'failure'}))
# some code omitted
class SendMessagesWorkerHandler(webapp.RequestHandler):
def post(self):
message = self.request.get('message')
cm = ClientManager()
for clientid in cm.clientids():
taskqueue.add(url='/workers/senditem', params={'clientid': clientid, 'message': message})
class SendItemWorkerHandler(webapp.RequestHandler):
def post(self):
clientid = self.request.get('clientid')
message = json.loads(self.request.get('message'))
jsonstr = json.dumps(message)
channel.send_message(clientid, jsonstr)
def main():
application = webapp.WSGIApplication([('/', MainHandler),
('/save', SaveHandler),
('/gettoken', GetTokenHandler),
('/_ah/channel/disconnected/', ClientDisconnectHandler),
('/workers/senditem', SendItemWorkerHandler),
('/workers/sendmessages', SendMessagesWorkerHandler),
('/list', ListHandler),],
debug=True)
util.run_wsgi_app(application)
if __name__ == '__main__':
main()
Highlighted above are the changes to main.py to accomplish the relegation of sending messages to a task.
In the 'SaveHandler', the code for sending messages to all connected clients is simplified to just one statement.
taskqueue.add(url='/workers/sendmessages', params={'message': jsonstr})
This statement adds a task to a queue which is responsible for sending the messages. It will be executed by a background process and thus allowing the 'SaveHandler' to return immediately. This fixes the possibility of the 'SaveHandler' taking a long time to return because it has to send messages to many clients. What is added to the taskqueue is a url and some parameters. The Task Queue system would later make a POST request to the url along with the parameters.
The handler for the path, '/workers/sendmessages', is the 'SendMessagesWorkerHandler'. The 'SendMessagesWorkerHandler' is just like a regular request handler, but it is designed to be called by the Task Queue system rather than by a web client. 'SendMessagesWorkerHandler' simply loops through all the connected client IDs and once again relegates the task of the actual sending of messages to another task.
So why so many tasks and not just execute everything just at once? It would really be a lot easier to just execute everything at once. However, this makes our application inefficient in utilizing the server resources and thus may cause problems later on when we scale our application. The advantage of using task queues is that we are able to give the server a chance to schedule tasks to make the utilization of resources more efficient. The disadvantage is that we need to make adjustments on how we think about the execution of our code.
Making Sure Messages are Sent
The Channel API does not assure that any messages sent would be able to reach the client. There are many issues involved such as network problems. In addition, the Channel API does not have a notification system when a sending of a message fails. Thus, additional work is necessary to make the sending of messages more robust.
To make the sending of messages more robust, we will be keeping track of all the messages sent to each client and we will also implement on the client-side a simple acknowledgement that a message is received. As long as the server does not yet receive an acknowledgement from the client, the server would periodically retry sending the same message to the client.
from google.appengine.ext import db
from google.appengine.ext import webapp
import logging
class ClientId(db.Model):
clientid = db.StringProperty()
createdate = db.DateTimeProperty(auto_now_add=True)
messagequeue = db.StringListProperty(default=[])
#stores the messageid of all pending messages sent to the client
class ClientManager(object):
"""
manager for client channel ids
"""
def add(self, clientid):
ct_k = db.Key.from_path('ClientId', clientid)
if not db.get(ct_k):
ct = ClientId(key_name=clientid, clientid=clientid)
ct.put()
def remove(self, clientid):
ct_k = db.Key.from_path('ClientId', clientid)
ct = db.get(ct_k)
if ct:
ct.delete()
def clientids(self):
ct = ClientId().all()
return [o.clientid for o in ct]
def check_clientid(self, clientid):
"""
helper method to check whether a clientid is connected
"""
ct_k = db.Key.from_path('ClientId', clientid)
ct = db.get(ct_k)
if ct:
return True
else:
return False
def add_messageid(self, clientid, messageid):
"""
add a messageid to the messagequeue
"""
ci_k = db.Key.from_path('ClientId', clientid)
ci = db.get(ci_k)
if ci and messageid not in ci.messagequeue:
ci.messagequeue = ci.messagequeue + [messageid]
ci.put()
def remove_messageid(self, clientid, messageid):
"""
remove a messageid from the messagequeue
"""
ci_k = db.Key.from_path('ClientId', clientid)
ci = db.get(ci_k)
if ci and messageid in ci.messagequeue:
l = ci.messagequeue
l.remove(messageid)
ci.messagequeue = l
ci.put()
def check_messageid(self, clientid, messageid):
"""
check if a messageid is in messagequeue
"""
ci_k = db.Key.from_path('ClientId', clientid)
ci = db.get(ci_k)
if messageid in ci.messagequeue:
return True
else:
return False
class ClientDisconnectHandler(webapp.RequestHandler):
def post(self):
clientid = self.request.get('from')
logging.info("Client %s disconnected"%clientid)
cm = ClientManager()
cm.remove(clientid)
Highlighted above are the addition to the 'clientmanager.py' file. The above additions implements a simple message tracking system. For each message sent to the client, the message id of that client is added to a message queue or pending message list for that client. The message id is kept in the pending message list of the client up until an acknowledgement is received from the client.
!/usr/bin/env python
from google.appengine.ext import webapp
from google.appengine.ext.webapp import util
from google.appengine.api import channel
from google.appengine.api import taskqueue
import os
from google.appengine.ext.webapp import template
from django.utils import simplejson as json
import uuid
import datetime
import Cookie
import logging
from model import *
from clientmanager import *
#code omitted
class SaveHandler(webapp.RequestHandler):
def post(self):
itemtitle = self.request.get('itemtitle')
itemtext = self.request.get('itemtext')
self.response.headers['Content-Type'] = 'text/json'
if itemtext and itemtitle:
item = Item(title=itemtitle, bodytext=itemtext)
item.put()
messageid = str(uuid.uuid4())
jsonstr = json.dumps({'rows': [{'title': itemtitle, 'bodytext': itemtext},],
'messageid': messageid})
taskqueue.add(url='/workers/sendmessages',
params={'message': jsonstr, 'messageid': messageid},
name="SendMessages-%s"%messageid)
self.response.out.write(json.dumps({'result': 'success'}))
else:
self.response.out.write(json.dumps({'result': 'failure'}))
#code omitted
class SendMessagesWorkerHandler(webapp.RequestHandler):
def post(self):
message = json.loads(self.request.get('message'))
messageid = self.request.get('messageid')
cm = ClientManager()
for clientid in cm.clientids():
try:
jsonstr = json.dumps(dict(message,
clientid=clientid))
taskqueue.add(url='/workers/senditem',
params={'clientid': clientid,
'message': jsonstr,
'count': 0,
'messageid': messageid},
name="SendItem-%s-%s"%(messageid, clientid))
except (taskqueue.TaskAlreadyExistsError,
taskqueue.TombstonedTaskError):
pass
class SendItemWorkerHandler(webapp.RequestHandler):
def post(self):
clientid = self.request.get('clientid')
message = self.request.get('message')
messageid = self.request.get('messageid')
countdown = 30 # number of seconds before resending message
cm = ClientManager()
count = int(self.request.get('count'))
if count == 0: # first time the message is sent to the client
try:
taskqueue.add(url='/workers/senditem',
params={'clientid': clientid,
'message': message,
'messageid': messageid,
'count': 1},
countdown=countdown,
name="SendItem-%s-%s-%s"%(clientid, messageid, 1))
cm.add_messageid(clientid, messageid)
channel.send_message(clientid, message)
except (taskqueue.TaskAlreadyExistsError,
taskqueue.TombstonedTaskError):
pass
else:
if cm.check_clientid(clientid) and cm.check_messageid(clientid, messageid):
count = count + 1
if count <= 5:
try:
taskqueue.add(url='/workers/senditem',
params={'clientid': clientid,
'message': message,
'messageid': messageid,
'count': count},
countdown=countdown,
name="SendItem-%s-%s-%s"%(clientid, messageid, count))
logging.info("Resending message %s"%message)
channel.send_message(clientid, message)
except (taskqueue.TaskAlreadyExistsError,
taskqueue.TombstonedTaskError):
pass
else:
# after trying 5 times, assume that the client has disconnected
cm.remove(clientid)
class RemoveMessageIdFromQueueHandler(webapp.RequestHandler):
def post(self):
clientid = self.request.get('clientid')
messageid = self.request.get('messageid')
logging.info("messageid: %s, clientid: %s"%(messageid, clientid))
if clientid and messageid:
cm = ClientManager()
cm.remove_messageid(clientid, messageid)
def main():
application = webapp.WSGIApplication([('/', MainHandler),
('/save', SaveHandler),
('/gettoken', GetTokenHandler),
('/_ah/channel/disconnected/', ClientDisconnectHandler),
('/workers/senditem', SendItemWorkerHandler),
('/workers/sendmessages', SendMessagesWorkerHandler),
('/removemessageidfromqueue', RemoveMessageIdFromQueueHandler),
('/list', ListHandler),],
debug=True)
util.run_wsgi_app(application)
if __name__ == '__main__':
main()
Highlighted above are the changes to 'main.py'.
In the 'SaveHandler', a unique identifier is generated for the message. This unique identifier is to be used for tracking the message. It is inserted as part of the message and passed as a parameter of the task. Another change is that a name was assigned to the task. This is a good practice and would be explained later.
In the 'SendMessagesWorkerHandler', the message identifier was passed to each send item task. It also added a count parameter. The count parameter is indicative of how many times a message has been sent to the client. Since the message has not yet been sent to the client, it is initialized to zero. Also a name was assigned to each task. This is done because it is a good practice and it prevents a similar task to be queued later.
A task in the App Engine Task Queue system is not assured to execute only once; it may execute multiple times. The reason for this is because the App Engine system is designed to be robust and scalable and this is one of the tradeoffs. Thus, a task must be designed so that it can be executed multiple times without any difference to executing it just once. In other words, our tasks should be idempotent.
The App Engine Task Queue system implements a task name tomb stoning system. This means that once a task with a particular name is executed, the name of that task is tomb stoned and a task with a similar name would not be executed later on. The name remains tomb stoned for a couple of days and would eventually be available. Thus naming tasks would prevent a similar task to be executed.
So for the case of the 'SendMessagesWorkHandler', even if it executes more than once, it will be adding to the queue, tasks with the same names. Thus it does not matter if the 'SendMessagesWorkHandler' is executed multiple times. This is the reason why it is a good practice to name tasks properly.
The heavy lifting is accomplished in the 'SendItemWorkHandler'. When a message is sent for the first time, the count parameter pass to the 'SendItemWorkHandler' has a value of zero and goes to the first branch of the main if statement.
if count == 0: # first time the message is sent to the client
try:
taskqueue.add(url='/workers/senditem',
params={'clientid': clientid,
'message': message,
'messageid': messageid,
'count': 1},
countdown=countdown,
name="SendItem-%s-%s-%s"%(clientid, messageid, 1))
cm.add_messageid(clientid, messageid)
channel.send_message(clientid, message)
except (taskqueue.TaskAlreadyExistsError,
taskqueue.TombstonedTaskError):
pass
In the above code, a new send item task is queued right away. The new tasks is queued right away so as to prevent the execution of other statements in this branch when a similar task has already been queued. This situation may happen when the task has already been executed, thus this serves as a guard and makes task idempotent. This new task has basically the same parameters as the send item task in 'SendMessagesWorkerHandler'. One difference is that the count has been changed to one. There is also an additional argument to the add method, the countdown argument. The countdown argument indicates that this task is to be executed only after the indicated number of seconds. This is the reason why the count has the value one, it is because that by the time the new task is executed, the message has already been sent once.
So why the need to add another task? This will be clearer by discussing the else branch of the the main if statement.
else:
if cm.check_clientid(clientid) and cm.check_messageid(clientid, messageid):
count = count + 1
if count <= 5:
try:
taskqueue.add(url='/workers/senditem',
params={'clientid': clientid,
'message': message,
'messageid': messageid,
'count': count},
countdown=countdown,
name="SendItem-%s-%s-%s"%(clientid, messageid, count))
logging.info("Resending message %s"%message)
channel.send_message(clientid, message)
except (taskqueue.TaskAlreadyExistsError,
taskqueue.TombstonedTaskError):
pass
else:
# after trying 5 times, assume that the client has disconnected
cm.remove(clientid)
So in the case that the message has already been sent, succeeding send item tasks will execute this branch of the code. The first statement is a check whether the client is still connected and whether the message is still in the pending message list of the client. This is done with the use of the client manager methods we defined earlier. This check is necessary because this code is going to be executed after 30 seconds. After 30 seconds, the client could already have disconnected or the message could no longer be in the pending message list and therefore, no need for a message resend. A message is removed from the pending message list by the handler for the client acknowledgement which will be discussed later. In the case that the message has not yet been removed from the pending list, the message is resent again and a new message send task is queue again with an incremented count parameter. The queued message send task is once again delayed so as to give the client enough time to acknowledge the receipt of the message. This is repeated up to 5 times and then it is assumed that the client has disconnected from the server and is thus removed from the client manager.
The client acknowledgement is done by the client making a post request to '/removemessageidfromqueue' path.
class RemoveMessageIdFromQueueHandler(webapp.RequestHandler):
def post(self):
clientid = self.request.get('clientid')
messageid = self.request.get('messageid')
if clientid and messageid:
cm = ClientManager()
cm.remove_messageid(clientid, messageid)
The handler for this path is shown above. It receives two parameters, clientid and messageid. This parameters identify a particular message to a particular client and are used to remove the message from the client's pending message list using the 'remove_messageid' method of the client manager. This is why in the 'SendItemWorkHandler', the messageid is checked if it is still in the pending message list as it is expected to be removed once the client has acknowledge the receipt of the message.
The message receipt acknowledgement is accomplished in the 'main.js' Javascript code.
$(function () {
// code omitted
var setupChannel = function () {
var messagemap = {};
$.getJSON('/gettoken', function (data) {
var channel = new goog.appengine.Channel(data['token']);
var socket = channel.open();
socket.onopen = function () {
console.info("connection opened");
};
socket.onclose = function () {
console.info("connection closed");
};
socket.onerror = function () {
console.info("connection error");
};
socket.onmessage = function (message) {
console.info("message received " + message.data);
var d = $.parseJSON(message.data);
var clientid = d['clientid'];
var messageid = d['messageid'];
if (! messagemap[messageid]) {
$("#mainlist").trigger('prependitem', [d]);
messagemap[messageid] = true;
}
$.post('/removemessageidfromqueue',
{clientid: clientid,
messageid: messageid});
};
});
};
setupChannel();
});
In the code above, the client receives the clientid and messageid parameters along with the message. These parameters are used to make a POST request that serves as an acknowledgement that the message has been received. In addition, some code has been added to prevent the multiple insertion of the of the same message to the list. Because there already exist the possibility of sending a message multiple times, it is possible that a message might be received multiple times also. To prevent this, we use a simple mapping object named 'messagemap' to take note of received messages. In the case, that a message has already been received, the message is simply ignored by the client.
Next Post
In the next post, I will try make the client-server connection more robust.