Wishful Coding

Didn't you ever wish your
computer understood you?

Check multiple twisted.cred checkers for a valid login

This is a snippet of a credential checker I wrote to authenticate a user
first via my DB, and if that fails via Twitter. It helped me to speed up
login, after the initial Twitter access tokens are stored.

In theory, this snippet could be used to authenticate against any number
of checkers, providing different interfaces. Only if all of them fail,
it returns an error.

from twisted.cred.checkers import ICredentialsChecker
from twisted.cred import error, credentials
from zope.interface import implements
from collections import deque

class CascadingChecker:
    """
    Check multiple checkers untill one succeeds.
    Else raise UnauthorizedLogin.
    """

    implements(ICredentialsChecker)
    credentialInterfaces = set()
    
    def __init__(self):
        self.checkers = []
        self.checked = []
    
    def registerChecker(self, checker):
        self.checkers.append(checker)
        self.credentialInterfaces.update(checker.credentialInterfaces)
    
    def _requestAvatarId(self, err, queue, credentials):
        try:
            ch = queue.popleft()
        except IndexError:
            raise error.UnauthorizedLogin()
        
        d = ch.requestAvatarId(credentials)
        return d.addErrback(self._requestAvatarId, queue, credentials)
    
    requestAvatarId = lambda self, credentials: self._requestAvatarId(None, deque(self.checkers), credentials)
from twisted.trial import unittest
from checkers import CascadingChecker
from twisted.cred.credentials import UsernamePassword
from twisted.cred.checkers import InMemoryUsernamePasswordDatabaseDontUse
from twisted.cred.error import UnauthorizedLogin
from twisted.cred.credentials import IUsernameHashedPassword, IUsernamePassword

class CascadingTest(unittest.TestCase):
    
    def setUp(self):
        ch1 = InMemoryUsernamePasswordDatabaseDontUse()
        ch2 = InMemoryUsernamePasswordDatabaseDontUse()
        self.cach = CascadingChecker()
        
        ch1.addUser('foo', 'bar')
        ch1.addUser('boo', 'far')
        ch2.addUser('for', 'bao')
        
        self.cach.registerChecker(ch1)
        self.cach.registerChecker(ch2)
    
    def testInterfaces(self):
        self.assertEquals(self.cach.credentialInterfaces.difference((IUsernameHashedPassword, IUsernamePassword)), set())
    
    def testLoginFirstChecker(self):
        user = UsernamePassword('foo', 'bar')
        return self.cach.requestAvatarId(user)
    
    def testLoginSecondChecker(self):
        user = UsernamePassword('for', 'bao')
        return self.cach.requestAvatarId(user)
    
    def testLoginFail(self):
        user = UsernamePassword('steve', 'pswd')
        self.assertFailure(self.cach.requestAvatarId(user), UnauthorizedLogin)

XAuth(CamelCase), xAuth(iCase), XOAUTH(ALLCAPS); 3 distinct technologies, confusing?

While thinking up names, Unix/open source people tend to come up with names containing ‘X’. I can almost hear them scream “‘X’, the new and free(as in beer, pizza and speech) ‘i’(as in iPhone, iPod, etc.)!”

My (possibly fictive) story goes like this: Some time ago, 3 separate development teams came together to find a name for their product.

The first team was making a secure, social, easy way of authenticating browsers. Since they where developing an open authentication protocol, and read loads of developer guidelines, they came up with XAuth, using proper CamelCasing, as one was supposed to.

The second team had a nice OAuth implementation running, but some people required using good ol' passwords, so they came up with the idea of requesting tokens using regular credentials, instead of complicated token exchanges. Since they wanted to be hip, and had a nice marketing department, they came up with xAuth, in line with the iDevice casing.

The third team thought it would be nice to apply the security of OAuth to email. Since this was way to cool to be called OAuth-for-email, they decided to add the cool ‘X’ in front of it. And as we all know(don’t we?), mail servers talk to each other in ALL CAPS, so it was a logical thing to call their system XOAUTH(although they’re not all that consistent about it).

Published on

Twisted SMTP server with authentication

This is the next post about Twisted, this time about Twisted.mail. After my Twisted project is done I'm going to contribute all my findings in some examples, docstrings and maybe even a howto. This post mainly serves as my notebook for things to remember about SMTP in Twisted.

This is a twistd app taken from the Twisted examples section, and modified to support authentication. Below you'll find all my notes.
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
# See LICENSE for details.

# You can run this module directly with:
#    twistd -ny esmtpserver.tac


"""
A toy email server with authentication.
"""

from zope.interface import implements

from twisted.internet import defer
from twisted.mail import smtp
from twisted.mail.mail import MailService
# these challengers are located in imap4
from twisted.mail.imap4 import LOGINCredentials, PLAINCredentials

from twisted.cred.checkers import InMemoryUsernamePasswordDatabaseDontUse #except in examples and testing
from twisted.cred.portal import IRealm
from twisted.cred.portal import Portal

class ConsoleMessageDelivery:
    implements(smtp.IMessageDelivery)
    
    def receivedHeader(self, helo, origin, recipients):
        return "Received: ConsoleMessageDelivery"
    
    def validateFrom(self, helo, origin):
        # All addresses are accepted
        return origin
    
    def validateTo(self, user):
        # Only messages directed to the "console" user are accepted.
        if user.dest.local == "console":
            return lambda: ConsoleMessage()
        raise smtp.SMTPBadRcpt(user)

class ConsoleMessage:
    implements(smtp.IMessage)
    
    def __init__(self):
        self.lines = []
    
    def lineReceived(self, line):
        self.lines.append(line)
    
    def eomReceived(self):
        print "New message received:"
        print "\n".join(self.lines)
        self.lines = None
        return defer.succeed(None)
    
    def connectionLost(self):
        # There was an error, throw away the stored lines
        self.lines = None

class ConsoleSMTPFactory(smtp.SMTPFactory):
    def __init__(self, *a, **kw):
        smtp.SMTPFactory.__init__(self, *a, **kw)
        # make this factory make ESMTP servers
        self.protocol = smtp.ESMTP
    
    def buildProtocol(self, addr):
        p = smtp.SMTPFactory.buildProtocol(self, addr)
        # add the challengers from imap4, more secure and complicated challengers are available
        p.challengers = {"LOGIN": LOGINCredentials, "PLAIN": PLAINCredentials}
        return p

class SimpleRealm:
    implements(IRealm)

    def requestAvatar(self, avatarId, mind, *interfaces):
        # if we are authenticating a IMessageDelivery
        if smtp.IMessageDelivery in interfaces:
            # a tuple of the implemented interface, an instance implementing it and a logout callable
            return smtp.IMessageDelivery, ConsoleMessageDelivery(), lambda: None
        raise NotImplementedError()


def main():
    from twisted.application import internet
    from twisted.application import service    
    
    portal = Portal(SimpleRealm())
    # initiate a simple checker
    checker = InMemoryUsernamePasswordDatabaseDontUse()
    checker.addUser("guest", "password")
    portal.registerChecker(checker)
    
    a = service.Application("Console SMTP Server")
    internet.TCPServer(2500, ConsoleSMTPFactory(portal)).setServiceParent(a)
    
    return a

application = main()
  • If you want anything beyond a toy, you'll need to use ESMTP instead of SMTP
  • You need to have a portal with a realm and a checker set up to use the AUTH command
  • challengers are what provide the protocol for logging in, note that they are located in twisted.imap4
  • A portal takes one checker instance per authentication method, so you can have one for handling strange tokens, and one for regular username/password authentication, but not two for the same type.
  • Take a look at the source for InMemoryUsernamePasswordDatabaseDontUse, it is really simple to write your own checker.