In this article we’ll be writing an AMQP connector that we can re-use in future AMQP based projects. We’ll be using the Twisted testing component Trial to test our code as we develop it. To start let’s create our project directory, setup a test case, and run our first test.
1 2 3 4 | mkdir connector cd connector mkdir test vi test/test.py |
The resulting directory structure will be:
connector/
connector/test/
connector/test/test.py
In test.py:
1 2 3 4 5 6 7 8 9 10 11 | from twisted.trial import unittest class AmqpConnectorTest(unittest.TestCase): def setUp(self): pass def tearDown(self): pass def test_connector_init(self): connector = AmqpConnector() |
Now we can run our first test and enjoy the sweet, sweet failure. Note that tests are run from within the connector directory.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | trial test/test.py test AmqpConnectorTest test_connector_init ... [ERROR] =============================================================================== [ERROR]: test.AmqpConnectorTest.test_connector_init Traceback (most recent call last): File "test/test.py", line 11, in test_connector_init connector = AmqpConnector() exceptions.NameError: global name 'AmqpConnector' is not defined ------------------------------------------------------------------------------- Ran 1 tests in 0.018s FAILED (errors=1) |
Ok, let’s fix this by implementing connector.py in the connector/ directory.
1 2 | class AmqpConnector(): pass |
Our directory structure is now:
connector/
connector/connector.py
connector/test/
connector/test/test.py
We update our test to import the new connector and run trial.
1 2 3 4 5 6 7 8 9 10 11 12 | from connector import AmqpConnector from twisted.trial import unittest class AmqpConnectorTest(unittest.TestCase): def setUp(self): pass def tearDown(self): pass def test_connector_init(self): connector = AmqpConnector() |
1 2 3 4 5 6 7 8 9 | trial test/*.py test AmqpConnectorTest test_connector_init ... [OK] ------------------------------------------------------------------------------- Ran 1 tests in 0.003s PASSED (successes=1) |
Thinking about what our connector must do and referencing the examples provided with txAMQP we’ll implement tests for the following:
And here are the tests:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | from twisted.trial import unittest from twisted.internet.defer import inlineCallbacks from connector import AmqpConnector class AmqpConnectorTest(unittest.TestCase): def setUp(self): self.connector = AmqpConnector() def tearDown(self): pass @inlineCallbacks def test_connect(self): connector = self.connector yield connector.connect() @inlineCallbacks def test_authenticate(self): connector = self.connector yield connector.connect() yield connector.authenticate() @inlineCallbacks def test_channel_open(self): connector = self.connector yield connector.connect() yield connector.authenticate() yield connector.channel_open() |
You’ll notice that I’ve added an additional import to provide the inlineCallbacks decorator. This allows us to write asynchronous code as if it were synchronous code. I’ve also moved the connector initialization to setUp() and removed the init test. The setUp() method is run before each test and the tearDown() method is run after each test. This means that each test method will be passed a new connector object to work with (self.connector).
To get an idea of how to start writing our connector lets review the test code that comes with txAMQP. In a previous article I went over fetching txAQMP from launchpad. If you have not done that yet you should do it now. Review the file “src/txamqp/testlib.py”. We’re going to “borrow” the init() and connect() methods but re-arrange it a bit to move authentication into a separate method.
I’ve also changed the reference to the spec file. We’ll need to copy “amqp0-8.xml” into the connector directory.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | from twisted.internet.defer import inlineCallbacks, Deferred from txamqp.protocol import AMQClient, TwistedDelegate from twisted.internet import protocol, reactor import txamqp.spec class AmqpConnector(): def __init__(self): self.host = "localhost" self.port = 5672 self.spec = "amqp0-8.xml" self.user = "guest" self.password = "guest" self.vhost = "/" @inlineCallbacks def connect(self,host=None,port=None,spec=None,vhost=None): host = host or self.host port = port or self.port spec = spec or self.spec vhost = vhost or self.vhost delegate = TwistedDelegate() onConn = Deferred() f = protocol._InstanceFactory(reactor, AMQClient(delegate, vhost, txamqp.spec.load(spec)), onConn) self.cnx = reactor.connectTCP(host, port, f) self.connection = yield onConn @inlineCallbacks def authenticate(self, user=None, password=None): user = user or self.user password = password or self.password yield self.connection.start({"LOGIN": user, "PASSWORD": password}) @inlineCallbacks def channel_open(self): self.channel = yield self.connection.channel(1) yield self.channel.channel_open() |
When I first ran these tests they all failed:
1 | exceptions.IOError: [Errno 2] No such file or directory: 'amqp0-8.xml' |
This stumped me for a while until I realized local files were being searched for in the “_trial_temp” directory that Trial uses. A simple fix to setUp() resolved the issue:
1 2 3 | def setUp(self): self.connector = AmqpConnector() self.connector.spec = "../amqp0-8.xml" |
Please remember that these tests assume you have a RabbitMQ instance running locally on port 5672 that responds to the user “guest” with the password “guest”. I only mention that because my next test run failed because RabbitMQ was not running.
With RabbitMQ running the next test run with slightly more successful:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | test AmqpConnectorTest test_authenticate ... [OK] [ERROR] test_channel_open ... [OK] [ERROR] test_connect ... [OK] [ERROR] =============================================================================== [ERROR]: test.AmqpConnectorTest.test_authenticate Traceback (most recent call last): Failure: twisted.trial.util.DirtyReactorAggregateError: Reactor was unclean. Selectables: <<class 'twisted.internet.tcp.Client'> to ('localhost', 5672) at 12cfdd0> =============================================================================== [ERROR]: test.AmqpConnectorTest.test_channel_open Traceback (most recent call last): Failure: twisted.trial.util.DirtyReactorAggregateError: Reactor was unclean. Selectables: <<class 'twisted.internet.tcp.Client'> to ('localhost', 5672) at 149de70> =============================================================================== [ERROR]: test.AmqpConnectorTest.test_connect Traceback (most recent call last): Failure: twisted.trial.util.DirtyReactorAggregateError: Reactor was unclean. Selectables: <<class 'twisted.internet.tcp.Client'> to ('localhost', 5672) at 16a0b70> ------------------------------------------------------------------------------- Ran 3 tests in 1.199s FAILED (errors=3, successes=3) |
The message “Reactor was unclean.” indicates our tests are leaving an open connection. We can fix this by closing the connection in tearDown():
1 2 3 | @inlineCallbacks def tearDown(self): yield self.connector.cnx.disconnect() |
One final test run should give us all green lights:
1 2 3 4 5 6 7 8 9 10 | test AmqpConnectorTest test_authenticate ... [OK] test_channel_open ... [OK] test_connect ... [OK] ------------------------------------------------------------------------------- Ran 3 tests in 1.195s PASSED (successes=3) |