Russell Bateman
March 2015
last update:
Here are some notes I'm putting down on message queueing.
~ $ python Python 2.7.5 (default, Nov 3 2014, 14:26:24) [GCC 4.8.3 20140911 (Red Hat 4.8.3-7)] on linux2 Type "help", "copyright", "credits" or "license" for more information. >>> import stomp Traceback (most recent call last): File "<stdin>", line 1, inImportError: No module named stomp
Here's a STOMP client, the one Python seems to recognize, written by a Jason R. Briggs: stomp.py-4.0.13.tar.gz. This version supports both Python 2.x and 3.x.
Once you have it, however, you can write a stupid example:
import time import sys import stomp class Listener( object ): # This stuff happens on a different thread from the one that called send() def on_error( self, headers, message ): # What to do when an error receiving a message occurs print( 'received an error %s' % message ) def on_message( self, headers, message ): # What to do when a message is received print( 'received a message %s' % message ) # Set up connection to (whatever queueing service supporting STOMP)... conn = stomp.Connection() # Set up a listener: what receives a message... conn.set_listener( '', Listener() ) # Start a session with the queueing service... conn.start() conn.connect() # Establish a subscription with the queueing service... conn.subscribe( destination='/queue/test', id=1, ack='auto' ) # Now, put a message into the queue and sleep a bit... conn.send( body=' '.join( sys.argv[ 1: ] ), destination='/queue/test' ) time.sleep( 2 ) # We're done! conn.disconnect()
Implements publish and subscribe semantics. For example, the scorecard batch runner publishes compound queries and the search server subscribes to them. Myriad batch runner instances can push to this queue; the search server just services the request as quickly as it can.
There is no guarantee messages are delivered in the order sent. Consequently, relying on two-part messages and other abberations put you in a world of hurt.
Implies "point-to-point," one publisher and one receiver. This isn't what interests us.
I made a slide presentation of this: Message Queueing
Here's some more, and more reasonable, ActiveMQ code. First, sending a message:
import stomp conn = stomp.Connection10() conn.start() conn.connect() conn.send( '/topic/SampleTopic', 'Simples Assim' ) conn.disconnect()
Next, sending a message (not so durable, then durable):
import stomp import time class SampleListener( object ): def on_message( self, headers, message ): print message conn = stomp.Connection10() conn.set_listener( 'SampleListener', SampleListener() ) conn.start() if not durable: # - This is not so durable... ------------------------- conn.connect() conn.subscribe( '/topic/SampleTopic' ) else: # - This is better... --------------------------------- conn.connect( headers={ 'client-id' : 'SampleClient' } ) conn.subscribe( destination='/topic/SampleTopic', headers={ 'activemq.subscriptionName' : 'SampleSubscription' } ) # ----------------------------------------------------- time.sleep( 1 ) conn.disconnect()
A fuller quick-start page can be found here.
I ended up doing a quick, iFriday presentation on this as noted earlier that actually garnered a couple of votes, not my own, however, as I voted for Dakota's test efforts.
Here's something relevant to our case. We'll want messages to persist across reboots of the server. By default, Stomp produces messages set to non-persistent. We must explicitly tell the Stomp library to add "persistent:true" to all send requests we want to persist across ActiveMQ restarts. (This is the opposite of the default for JMS submitted messages.) See ActiveMQ: The STOMP Guide.
All my batch-runner code was committed and pushed with the message, "Added throttle to slow down compound queries to search server. Added more stats to batch_runner main().
The two classes of queues can't really be mixed semantically.
...where:
...where:
Note that Kafka can be used for both models.