Thursday, June 4, 2009

Incoming-- wrapping up message receivers

Given the last post on upcall performance, I thought that instead of talking about the extension beginning with the most fundamental classes (that is, wrapping the context object), it might make more sense to follow up directly with with some discussion on how I'm looking at the problem of making the receipt of messages fast and efficient while still flexible. This post is going to assume a bit of knowledge about 29West; specifically, it will be referring to the concepts of topics, event queues, and contexts, although some high-level descriptions will be provided.

I wanted to provide for a number of different ways to receive messages, in essence allowing any Python callable to be used as the target of message upcalls, while at the same time provide some basic classes that could be subclassed to make the process more simple if desired. Across all of these approaches I needed to make the handling of messages as fast as possible.

In LBM, you need to provide a few raw ingredients to create a message receiver:

  • A context object, which establishes the the overall logical messaging environment for sending and receiving messages.

  • A topic object, which identifies a named channel to which messages are posted to or received from.

  • An optional event queue which decouples the arrival of messages from their processing.

  • Some internal bookkeeping items to tell LBM how to route messages back to your application.
I took the view that in a significant number of applications, the majority of the receivers created would probably be associated with a single context object, a single event queue, and further would be handled in a single fashion, say with a bound method on different instances of a class. With this motivation, I decided to create a receiver factory that would keep these items together for the user to make creation of the receiver simple. The factory's definition from the pxd file is pretty simple:

cdef class ReceiverFactory:
cdef core.Context ctx
cdef core.EventQueue defaultEventQueue
cdef object defaultReceiverClass
The only required piece data needed to create a factory instance is the context; the event queue and receiver class (more on this below) are optional.

And here are the key pieces of the implementation from the pyx file:

cdef class ReceiverFactory:
"""
This class takes 1 required and 2 optional arguments. The first arg
is the core.Context object that the factory will be creating
receivers for. The second, defaultReceiverClass, is a callable that
yields instances of subclasses of receiver.AbstractReceiver. It
defaults to Receiver. The third is an optional event queue you'd
like to be shared between all receivers created with this factory.

If a defaultReceiverClass is specified, it must be a callable that
yields a subclass of receiver.AbstractReceiver (thus it may be a
class object or other callable). The callable must take 2 arguments:
the first argument is the calling ReceiverFactory object (self),
and the second is whatever the current value for eventCallback is.
If the class you wish to use takes more arguments, consider passing
a closure that provides the additional args.

NOTICE: Since the expected class to be produced by
createReceiverForTopic is one of the extension classes, duck typed
classes won't work properly here; the class of the object yielded
by the defaultReceiverClass must be a subclass of AbstractReceiver.
"""
def __init__(self, core.Context ctx, defaultReceiverClass=Receiver,
defaultEventQueue=None):
#you may override this __init__ as long as you call it from the
#derived class's __init__
self.ctx = ctx
self.defaultEventQueue = defaultEventQueue
self.defaultReceiverClass = defaultReceiverClass

def createReceiverForTopic(self, Topic pyTopic, eventCallback=None,
core.EventQueue eventq=None,
receiverClass=None):
"""
Create a receiver for the specified topic. If an eventCallback
is provided, pass it into the receiver class's constructor
(see below). If an eventq is provided, attach the queue to the
receiver so it is used for event delivery.

If a receiverClass is specified, it must be a callable that
yields a subclass of receiver.AbstractReceiver (thus it may be
a class object or other callable). The callable must take two
arguments: the first argument is the calling factory object
(self), and the second is whatever the current value for
eventCallback is. If the class you wish to use takes more
arguments, consider passing a closure that provides the
additional args. If the receiverClass is specified here, it
overrides any that was specified at the factory's creation
with defaultReceiverClass. If none is specified here, then
the one specified at the factory's creation is used.

NOTICE: Since the expected class is one of the extension
classes, duck typed classes won't work properly here; the
class of the object yielded by the callable must be a
subclass of AbstractReceiver.
"""
cdef int result
cdef object rcvrClass
cdef lbmh.lbm_context_t *context
cdef lbmh.lbm_rcv_t **rcv
cdef lbmh.lbm_topic_t *cTopic
cdef AbstractReceiver pyReceiver
cdef lbmh.lbm_event_queue_t *eq
if receiverClass is None:
rcvrClass = self.defaultReceiverClass
else:
rcvrClass = receiverClass
pyReceiver = rcvrClass(self, eventCallback)
if not typecheck(pyReceiver, AbstractReceiver):
raise LBMWException("The provided callable must yield “
“a subclass of AbstractReceiver")
context = self.ctx._getContext()
if eventq is None:
eventq = self.defaultEventQueue
if eventq is None:
eq = NULL
else:
eq = eventq._getEQ()
cTopic = pyTopic._getTopic()
rcv = pyReceiver._getReceiverPtrPtr()
Py_INCREF(pyReceiver) #key to preventing dangling pointers!
with nogil:
result = lbmh.lbm_rcv_create(rcv, context, cTopic,
<lbmh.lbm_rcv_cb_proc> _rcvEventCallback,
pyReceiver, eq)
if result == lbmh.LBM_FAILURE:
raise LBMFailure(fn="lbm_rcv_create")
return pyReceiver
So a bit of discussion is in order here.

The general approach is to create a factory that works cooperatively with a user-supplied factory to produce the appropriate Receiver instances. By default, the “user-supplied” factory is a class object, and it yields an instance of itself anytime a new receiver is created. However, the user factory can be any callable, as long as it produces an instance that is a derived class of the extension's AbstractReceiver class. This opens up a lot of possibilities as to what you might provide for the user factory.

When the user creates the ReceiverFactory instance, he provides the context that new receivers are to be associated with, and he has the option of supplying the default receiver factory object that will be used to create receiver objects, and an event queue for queuing arrived messages for processing.

When the user calls createReceiverForTopic, the only required argument is the topic that identifies the channel through which the receiver will acquire messages. The other arguments allow overriding the user's default receiver instance factory and event queue that were both provided at ReceiverFactory instantiation time, and to also specify an alternate event callback that will be the target of upcalls when messages arrive. This last bit is key; the default behavior of the ReceiverFactory is to generate a receiver object whose methods get invoked whenever a message for the receiver arrives. The eventCallback keyword argument allows the user to specify an alternative callback target; in this case, the receiver object simply acts as a control construct to manage the routing of messages to the desired upcall target.

A couple of examples of how to use this would be helpful. In each, assume that aContext is an LBM context object, aTopic is an LBM topic object, and anEventQueue is an LBM event queue object.

#Example 1: create a receiver that uses
#an independent function for callbacks
def simpleCallback(aReceiver, theMessage):
#do stuff with theMessage
return

receiverFac = ReceiverFactory(aContext)
newReceiver = receiverFac.createReceiverForTopic(aTopic,
eventCallback=simpleCallback)

#Example 2: create a receiver subclass whose instances
#are the target of callbacks
class MyReceiver(receiver.Receiver):
def handleMsg(self, theMessage):
#do stuff with theMessage
return

receiverFac = ReceiverFactory(aContext, defaultReceiverClass=MyReceiver)
newReceiver = receiverFac.createReceiverForTopic(aTopic)

#Example 3: like Example 2, but specifying MyReceiver at creation time
receiverFac = ReceiverFactory(aContext)
newReceiver = receiverFac.createReceiverForTopic(aTopic,
receiverClass=MyReceiver)

#Example 4: Using a function for the receiverClass callable
class MyReceiver(receiver.Receiver):
def handleMsg(self, theMessage):
#do stuff with theMessage
return

class MyOtherReceiver(receiver.Receiver):
def handleMsg(self, theMessage):
#do other stuff
return

def vendReceiver(rFac, eventQueue):
if someTestOnRecvFac(rFac):
theReceiver = MyReceiver(rFac, eventQueue)
else:
theReceiver = MyOtherReceiver(rFac, eventQueue)
return theReceiver

receiverFac = ReceiverFactory(aContext,
defaultReceiverClass=vendReceiver)
newReceiver = receiverFac.createReceiverForTopic(aTopic)
So there's lots of ways to get the factory to generate different sorts of receivers.

There are a number of calls of the form “_getTopic()”, “_getEQ()” that are used to acquire the underlying pointer to the managed C object that the Python extension type is wrapping. These methods are only available in the extension itself, and by convention I start them with an underscore to identify that.

The only other magic worth noting here is the use of Py_INCREF() on the newly created receiver instance. This is because we're about to give a reference to this instance to LBM and we want to make sure that this reference is counted properly by Python. This reference is dropped when the user indicates that they no longer want the receiver; I'll show how that works later.

But the above is only part of the story. This takes care of the creation aspect, but I still need to concern myself with the actual delivery of messages. That means we have to focus on the receivers themselves in the next post.

No comments:

Post a Comment