Friday, August 7, 2009

Incoming Revisited-- You're Never Too Old To Learn

While writing so much about receiving messages and discussing performance, I came up with a few ideas about how the Python extension could become a bit faster. I've discussed most of the ideas already in past posts, so there shouldn't really be any surprises here if you've been reading along. This post is going to build on the ideas from those, so you might need to go back to a few of the older “Incoming” posts to fully understand what's being discussed here.

The potential speedups I had in mind were:

  • Get rid of the sequential search for the proper handling method in the “routing” receiver objects, and start using the message type as a direct index into the handler list.

  • Replace the routing receiver's dispatch table with a C array. Currently it's a Python list, but that was largely for convenience; the list is a lot slower to access than an array.

  • Loan a Python thread to LBM to use for message handling. Early on I showed how much faster Python handled upcalls from extension modules if the thread was known to it, and I wanted to see if I could accrue some benefit doing the same in the LBM extension.
I won't keep you in suspense; all changes provided some benefit, although some much more than others.

First off was eliminating the sequential search for the proper receiver method for a particular message type. I wasn't expecting very much improvement from this change since all that was being eliminated was a loop setup and a C comparison, and my expectations were largely met. You may recall from a previous post that the method dispatch table was ordered such that the message types with the highest probability of occurring were at the front of the dispatch table, and thus for the most part the first comparison in the sequential search was all that was needed to find the proper handler, since in the majority of cases the most frequently occurring message was a data message, and that handler lived at index 0 of the table.

It's hard to even say how much improvement was gained; I certainly observed a few thousand more messages a second a lot of the time, but due to measurement noise it wasn't a consistent improvement. However, it gave enough benefit that I decided to leave the change in.

A much bigger improvement came from changing the routing receiver's dispatch table from being a Python list to a C array. Frankly, I don't know why this didn't occur to me before-- both searching and direct indexing into a Python list was certainly going to entail a lot more processing than simply doing the same with a C array. The StaticRoutingReceiver extension type changed to support this like so:

cdef class StaticRoutingReceiver(AbstractRoutingReceiver):
cdef void * dispatchList[lbmh.LBM_MSG_UME_REGISTRATION_COMPLETE_EX]
I used the highest-valued message type as the size of the array. I still don't like this to some degree; I wish 29West would add a symbolic constant LBM_MAX_MSG_TYPE that they change from release to release of LBM so that I won't have to make sure that any particular version of the extension had the proper symbolic name in there.

Setting this thing up involved a little more manual management of Python object reference counts. This is because I was going to put references to Python bound methods into C pointers, and I needed to make sure that the method objects didn't disappear. So __init__() changed to look like the following:

def __init__(self, myFactory, msgCallback=None):
cdef object meth
super(StaticRoutingReceiver, self).__init__(myFactory, msgCallback)
for 0 <= i < lbmh.LBM_MSG_UME_REGISTRATION_COMPLETE_EX:
meth = getattr(self.msgCallback, _msgTypeToMethodMap[i])
Py_INCREF(meth)
self.dispatchList[i] = <void *>meth
It's all pretty straightforward; find the desired method, increment the reference count, and then store it into the dispatchList. There's a similar piece of work to manage decrementing reference counts when the receiver is “shut down”, but covering that is a topic for another post. It's interesting that Pyrex won't actually let you create a C array of “object”; you have to make it an array of void * and do a cast.

But now, routing an upcall is nothing; here's what _routeMsgCallback() evolved into:

cdef _routeMsgCallback(self, message.Message msg):
#use the message type as an index into the dispatchList
cdef object meth
meth = <object> self.dispatchList[msg.type]
meth(msg)
Unlike going from a sequential search to direct indexing into a Python list, moving from a list to an array provided significant improvements in the rate that messages could be received. Roughly 60K more messages/sec could be handled in my test program, and that was a steady improvement.

That takes us to the third speedup, loaning a Python thread to LBM. Back in THIS POST I showed how Python performed much better when it was familiar with the thread that was acquiring the GIL when calling up into Python from a C extension. In that post I wished for an interface in LBM that would allow an app to provide a “thread factory”, a function that LBM could call that would provide it any threads it needed.

In reality, LBM already partially meets this need with what's referred to as the “sequential” mode of operation of an LBM context. By default, an LBM context object operates in what's known as “embedded” mode; it internally creates the thread that's used for calling back into applications. In the case of Python, it's this foreign thread that creates a slowdown. However, contexts can also be created in what's known as “sequential” mode. In this mode, no internal thread is created; instead, an application invokes a function with the context that activates message processing, and the thread that invokes this function is also used for performing callbacks.

The LBM extension supports setting sequential mode and also invoking the processing function via a method on Context objects, processEvents(). This method blocks the calling thread while messages are processed, and returns periodically to allow the thread to do other tasks (like figuring out if it's time to exit). Enabling sequential mode is handled by setting an option on the context attribute object like so:

self.contextAttrs.setOption("operational_mode", "sequential")
This has to be done on the context attribute object because once the context has been created the operational mode can't be changed, so the context has to be created in the proper mode.

Once you have a sequential mode context, you must invoke processEvents() on it in order for receivers to have any messages delivered to them. I wanted my test program to support both embedded and sequential operation, so I not only made setting sequential mode dependent on command line parameters, I changed the test run method to optionally spawn threads for calling processEvents():

    def doProcessEvents(app, ctx):
#ctx is a Context object
while not app.quit:
#process for 1000 millis, then loop again
ctx.processEvents(1000)

if self.options.sequential: #run sequentially
for ctx in self.contexts:
cthread = threading.Thread(target=doProcessEvents,
args=(self, ctx))
cthread.start()

while not self.quit:
#pretty much just print stats until we're done
time.sleep(1)
self.printStats()
if (self.options.exitFlagFile and
os.path.exists(self.options.exitFlagFile)):
self.quit = True
So doProcessEvents() provides a starting point where new threads can begin execution; it's within this function that the processEvents() method is invoked on the context. The explanation of the 'for' looping over self.contexts is that this program supports using multiple contexts, and each context must have processEvent() invoked on it in a separate thread. So if the program is to run with sequential contexts, a thread is created for each, and they call processEvents() on their respective contexts until the application is told to exit. However, if the program is to run in embedded mode, then nothing extra needs to be done; the processing threads have already been internally created by the context objects, and all the program has to do is wait for updates to flow in.

Of the three speedups, this had the most profound impact. Depending on the message size (and test run), upwards of 100K more messages/sec could be handled by the receiver when the underlying context was running in sequential mode.

It's time to look at a bit of data to see how we're doing. The following graph shows the message handling rates at different message sizes for a Python receiver program running in different modes compared with an equivalent C program (the C program is 29West's stock lbmmrcv program). The message source for all of these programs is 29West's lbmmsrc C program, which is able to produce messages faster than Python can consume them. The idea here is to get some notion as to how the extension is doing compared to C.

The Python receiver program has a number of different ways it can be run, allowing us to see the impact of various speedups. In all cases it uses a routing receiver since those will be able to handle data messages faster (since determining that a message is a data message happens in C). It actually has two equivalent receivers it can use, one in pure Python, and the other a Pyrex extension receiver. It can also run in either embedded or sequential mode. This provides four execution combinations of receiver and operational mode, each of which is run with a list of different message sizes. Additionally, the C receiver is run for each message size as well to provide a comparison. Each test run involved the transmission of 7M messages. The message rates reported here are in the K's of msgs/sec.



One thing I find maddening when running these tests is that the results are only approximately consistent. I know that's to be expected to some degree, but the cause of the variability is too opaque for my liking. For instance, I've seen the C test program running much closer to 900K msgs/sec, and in fact the sequential Python/Pyrex combination routinely runs at 490K msgs/sec. In either case, it's impossible to tell what else might be going on that impacts performance. For some reason in this test Python/Pyrex underperformed a bit, so that's what I charted.

Nonetheless, I'm pretty pleased. The first operating version of the extension could only do 40K msgs/sec, so it's pretty hard to complain about an order of magnitude improvement.