Wishful Coding

Didn't you ever wish your
computer understood you?

Twisted pop3 example server

Remember last time when I showed you the Twisted SMTP server? This time it's a POP3 server. I think it is more common to write SMTP servers as a part of a project(to send notifications and stuff), but I needed a POP3 server, so I think there might be others who need a POP3 server as well.

The process is rather similar to SMTP, only you need to implement the IMailbox interface. This is usually done with some sort of file or db based solution, but I used a simple list in this case. You might want to have a look at the mailbox module for a serious implementation.

Below you'll find a bare bones POP3 server, which can be run with `twistd -ny pop3.tac` and below that a sample telnet session, by running `telnet localhost 1230` in another terminal.

Connected to localhost.
Escape character is '^]'.
+OK <20100818095058.3878.894125537.0@pepijn-de-voss-imac.local>
user guest
+OK USER accepted, send PASS
pass password
+OK Authentication succeeded
stat
+OK 20 1020
retr 1
+OK 51
From: me
To: you
Subject: A test mail

Hello world!
.
retr 40
-ERR Bad message number argument
quit
+OK 
Connection closed by foreign host.
"""
An example pop3 server
"""

from twisted.application import internet, service
from twisted.cred.portal import Portal, IRealm
from twisted.internet.protocol import ServerFactory
from twisted.mail import pop3
from twisted.mail.pop3 import IMailbox
from twisted.cred.checkers import InMemoryUsernamePasswordDatabaseDontUse
from zope.interface import implements
from itertools import repeat
from hashlib import md5
from StringIO import StringIO

class SimpleMailbox:
    implements(IMailbox)

    def __init__(self):
        message = """From: me
To: you
Subject: A test mail

Hello world!"""
        self.messages = [m for m in repeat(message, 20)]


    def listMessages(self, index=None):
        if index != None:
            return len(self.messages[index])
        else:
            return [len(m) for m in self.messages]

    def getMessage(self, index):
        return StringIO(self.messages[index])

    def getUidl(self, index):
        return md5(self.messages[index]).hexdigest()

    def deleteMessage(self, index):
        pass

    def undeleteMessages(self):
        pass

    def sync(self):
        pass


class SimpleRealm:
    implements(IRealm)

    def requestAvatar(self, avatarId, mind, *interfaces):
        if IMailbox in interfaces:
            return IMailbox, SimpleMailbox(), lambda: None
        else:
            raise NotImplementedError()

portal = Portal(SimpleRealm())

checker = InMemoryUsernamePasswordDatabaseDontUse()
checker.addUser("guest", "password")
portal.registerChecker(checker)

application = service.Application("example pop3 server")

f = ServerFactory()
f.protocol = pop3.POP3
f.protocol.portal = portal
internet.TCPServer(1230, f).setServiceParent(application)
Published on

Check multiple twisted.cred checkers for a valid login

This is a snippet of a credential checker I wrote to authenticate a user
first via my DB, and if that fails via Twitter. It helped me to speed up
login, after the initial Twitter access tokens are stored.

In theory, this snippet could be used to authenticate against any number
of checkers, providing different interfaces. Only if all of them fail,
it returns an error.

from twisted.cred.checkers import ICredentialsChecker
from twisted.cred import error, credentials
from zope.interface import implements
from collections import deque

class CascadingChecker:
    """
    Check multiple checkers untill one succeeds.
    Else raise UnauthorizedLogin.
    """

    implements(ICredentialsChecker)
    credentialInterfaces = set()
    
    def __init__(self):
        self.checkers = []
        self.checked = []
    
    def registerChecker(self, checker):
        self.checkers.append(checker)
        self.credentialInterfaces.update(checker.credentialInterfaces)
    
    def _requestAvatarId(self, err, queue, credentials):
        try:
            ch = queue.popleft()
        except IndexError:
            raise error.UnauthorizedLogin()
        
        d = ch.requestAvatarId(credentials)
        return d.addErrback(self._requestAvatarId, queue, credentials)
    
    requestAvatarId = lambda self, credentials: self._requestAvatarId(None, deque(self.checkers), credentials)
from twisted.trial import unittest
from checkers import CascadingChecker
from twisted.cred.credentials import UsernamePassword
from twisted.cred.checkers import InMemoryUsernamePasswordDatabaseDontUse
from twisted.cred.error import UnauthorizedLogin
from twisted.cred.credentials import IUsernameHashedPassword, IUsernamePassword

class CascadingTest(unittest.TestCase):
    
    def setUp(self):
        ch1 = InMemoryUsernamePasswordDatabaseDontUse()
        ch2 = InMemoryUsernamePasswordDatabaseDontUse()
        self.cach = CascadingChecker()
        
        ch1.addUser('foo', 'bar')
        ch1.addUser('boo', 'far')
        ch2.addUser('for', 'bao')
        
        self.cach.registerChecker(ch1)
        self.cach.registerChecker(ch2)
    
    def testInterfaces(self):
        self.assertEquals(self.cach.credentialInterfaces.difference((IUsernameHashedPassword, IUsernamePassword)), set())
    
    def testLoginFirstChecker(self):
        user = UsernamePassword('foo', 'bar')
        return self.cach.requestAvatarId(user)
    
    def testLoginSecondChecker(self):
        user = UsernamePassword('for', 'bao')
        return self.cach.requestAvatarId(user)
    
    def testLoginFail(self):
        user = UsernamePassword('steve', 'pswd')
        self.assertFailure(self.cach.requestAvatarId(user), UnauthorizedLogin)

XAuth(CamelCase), xAuth(iCase), XOAUTH(ALLCAPS); 3 distinct technologies, confusing?

While thinking up names, Unix/open source people tend to come up with names containing ‘X’. I can almost hear them scream “‘X’, the new and free(as in beer, pizza and speech) ‘i’(as in iPhone, iPod, etc.)!”

My (possibly fictive) story goes like this: Some time ago, 3 separate development teams came together to find a name for their product.

The first team was making a secure, social, easy way of authenticating browsers. Since they where developing an open authentication protocol, and read loads of developer guidelines, they came up with XAuth, using proper CamelCasing, as one was supposed to.

The second team had a nice OAuth implementation running, but some people required using good ol' passwords, so they came up with the idea of requesting tokens using regular credentials, instead of complicated token exchanges. Since they wanted to be hip, and had a nice marketing department, they came up with xAuth, in line with the iDevice casing.

The third team thought it would be nice to apply the security of OAuth to email. Since this was way to cool to be called OAuth-for-email, they decided to add the cool ‘X’ in front of it. And as we all know(don’t we?), mail servers talk to each other in ALL CAPS, so it was a logical thing to call their system XOAUTH(although they’re not all that consistent about it).

Published on