Make stuff to do things

Subscribe (?) Subscribe to RSS

Archive for April, 2009

Writing an AMQP connector with Python, Twisted, Trial and txAMQP

Published on April 24th, 2009 in Comments

In this article we’ll be writing an AMQP connector that we can re-use in future AMQP based projects. We’ll be using the Twisted testing component Trial to test our code as we develop it. To start let’s create our project directory, setup a test case, and run our first test.

1
2
3
4
mkdir connector
cd connector
mkdir test
vi test/test.py

The resulting directory structure will be:
connector/
connector/test/
connector/test/test.py

In test.py:

1
2
3
4
5
6
7
8
9
10
11
from twisted.trial import unittest

class AmqpConnectorTest(unittest.TestCase):
    def setUp(self):
        pass

    def tearDown(self):
        pass

    def test_connector_init(self):
        connector = AmqpConnector()

Now we can run our first test and enjoy the sweet, sweet failure. Note that tests are run from within the connector directory.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
trial test/test.py
test
  AmqpConnectorTest
    test_connector_init ...                                             [ERROR]

===============================================================================
[ERROR]: test.AmqpConnectorTest.test_connector_init

Traceback (most recent call last):
  File "test/test.py", line 11, in test_connector_init
    connector = AmqpConnector()
exceptions.NameError: global name 'AmqpConnector' is not defined
-------------------------------------------------------------------------------
Ran 1 tests in 0.018s

FAILED (errors=1)

Ok, let’s fix this by implementing connector.py in the connector/ directory.

1
2
class AmqpConnector():
    pass

Our directory structure is now:
connector/
connector/connector.py
connector/test/
connector/test/test.py

We update our test to import the new connector and run trial.

1
2
3
4
5
6
7
8
9
10
11
12
from connector import AmqpConnector
from twisted.trial import unittest

class AmqpConnectorTest(unittest.TestCase):
    def setUp(self):
        pass

    def tearDown(self):
        pass

    def test_connector_init(self):
        connector = AmqpConnector()
1
2
3
4
5
6
7
8
9
trial test/*.py
test
  AmqpConnectorTest
    test_connector_init ...                                                [OK]

-------------------------------------------------------------------------------
Ran 1 tests in 0.003s

PASSED (successes=1)

Thinking about what our connector must do and referencing the examples provided with txAMQP we’ll implement tests for the following:

  • connect
  • authenticate
  • channel_open

And here are the tests:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from twisted.trial import unittest
from twisted.internet.defer import inlineCallbacks
from connector import AmqpConnector

class AmqpConnectorTest(unittest.TestCase):
    def setUp(self):
        self.connector = AmqpConnector()
   
    def tearDown(self):
        pass
       
    @inlineCallbacks
    def test_connect(self):
        connector = self.connector
        yield connector.connect()
       
    @inlineCallbacks
    def test_authenticate(self):
        connector = self.connector
        yield connector.connect()  
        yield connector.authenticate()
       
    @inlineCallbacks
    def test_channel_open(self):
        connector = self.connector
        yield connector.connect()
        yield connector.authenticate()
        yield connector.channel_open()

You’ll notice that I’ve added an additional import to provide the inlineCallbacks decorator. This allows us to write asynchronous code as if it were synchronous code. I’ve also moved the connector initialization to setUp() and removed the init test. The setUp() method is run before each test and the tearDown() method is run after each test. This means that each test method will be passed a new connector object to work with (self.connector).

To get an idea of how to start writing our connector lets review the test code that comes with txAMQP. In a previous article I went over fetching txAQMP from launchpad. If you have not done that yet you should do it now. Review the file “src/txamqp/testlib.py”. We’re going to “borrow” the init() and connect() methods but re-arrange it a bit to move authentication into a separate method.

I’ve also changed the reference to the spec file. We’ll need to copy “amqp0-8.xml” into the connector directory.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from twisted.internet.defer import inlineCallbacks, Deferred
from txamqp.protocol import AMQClient, TwistedDelegate
from twisted.internet import protocol, reactor
import txamqp.spec

class AmqpConnector():
    def __init__(self):
        self.host = "localhost"
        self.port = 5672
        self.spec = "amqp0-8.xml"
        self.user = "guest"
        self.password = "guest"
        self.vhost = "/"
       
    @inlineCallbacks   
    def connect(self,host=None,port=None,spec=None,vhost=None):
        host = host or self.host
        port = port or self.port
        spec = spec or self.spec
        vhost = vhost or self.vhost

        delegate = TwistedDelegate()
        onConn = Deferred()
        f = protocol._InstanceFactory(reactor, AMQClient(delegate, vhost, txamqp.spec.load(spec)), onConn)
       
        self.cnx = reactor.connectTCP(host, port, f)
       
        self.connection = yield onConn
       
    @inlineCallbacks
    def authenticate(self, user=None, password=None):
        user = user or self.user
        password = password or self.password

        yield self.connection.start({"LOGIN": user, "PASSWORD": password})

    @inlineCallbacks
    def channel_open(self):
        self.channel = yield self.connection.channel(1)
        yield self.channel.channel_open()

When I first ran these tests they all failed:

1
exceptions.IOError: [Errno 2] No such file or directory: 'amqp0-8.xml'

This stumped me for a while until I realized local files were being searched for in the “_trial_temp” directory that Trial uses. A simple fix to setUp() resolved the issue:

1
2
3
def setUp(self):
    self.connector = AmqpConnector()
    self.connector.spec = "../amqp0-8.xml"

Please remember that these tests assume you have a RabbitMQ instance running locally on port 5672 that responds to the user “guest” with the password “guest”. I only mention that because my next test run failed because RabbitMQ was not running.

With RabbitMQ running the next test run with slightly more successful:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
test
  AmqpConnectorTest
    test_authenticate ...                                                  [OK]
                                              [ERROR]
    test_channel_open ...                                                  [OK]
                                              [ERROR]
    test_connect ...                                                       [OK]
                                                   [ERROR]

===============================================================================
[ERROR]: test.AmqpConnectorTest.test_authenticate

Traceback (most recent call last):
Failure: twisted.trial.util.DirtyReactorAggregateError: Reactor was unclean.
Selectables:
<<class 'twisted.internet.tcp.Client'> to ('localhost', 5672) at 12cfdd0>
===============================================================================
[ERROR]: test.AmqpConnectorTest.test_channel_open

Traceback (most recent call last):
Failure: twisted.trial.util.DirtyReactorAggregateError: Reactor was unclean.
Selectables:
<<class 'twisted.internet.tcp.Client'> to ('localhost', 5672) at 149de70>
===============================================================================
[ERROR]: test.AmqpConnectorTest.test_connect

Traceback (most recent call last):
Failure: twisted.trial.util.DirtyReactorAggregateError: Reactor was unclean.
Selectables:
<<class 'twisted.internet.tcp.Client'> to ('localhost', 5672) at 16a0b70>
-------------------------------------------------------------------------------
Ran 3 tests in 1.199s

FAILED (errors=3, successes=3)

The message “Reactor was unclean.” indicates our tests are leaving an open connection. We can fix this by closing the connection in tearDown():

1
2
3
@inlineCallbacks
def tearDown(self):
    yield self.connector.cnx.disconnect()

One final test run should give us all green lights:

1
2
3
4
5
6
7
8
9
10
test
  AmqpConnectorTest
    test_authenticate ...                                                  [OK]
    test_channel_open ...                                                  [OK]
    test_connect ...                                                       [OK]

-------------------------------------------------------------------------------
Ran 3 tests in 1.195s

PASSED (successes=3)
Switch to our mobile site