Wishful Coding

Didn't you ever wish your computer understood you?

Check multiple twisted.cred checkers for a valid login

This is a snippet of a credential checker I wrote to authenticate a user
first via my DB, and if that fails via Twitter. It helped me to speed up
login, after the initial Twitter access tokens are stored.

In theory, this snippet could be used to authenticate against any number
of checkers, providing different interfaces. Only if all of them fail,
it returns an error.

from twisted.cred.checkers import ICredentialsChecker
from twisted.cred import error, credentials
from zope.interface import implements
from collections import deque

class CascadingChecker:
    """
    Check multiple checkers untill one succeeds.
    Else raise UnauthorizedLogin.
    """

    implements(ICredentialsChecker)
    credentialInterfaces = set()
    
    def __init__(self):
        self.checkers = []
        self.checked = []
    
    def registerChecker(self, checker):
        self.checkers.append(checker)
        self.credentialInterfaces.update(checker.credentialInterfaces)
    
    def _requestAvatarId(self, err, queue, credentials):
        try:
            ch = queue.popleft()
        except IndexError:
            raise error.UnauthorizedLogin()
        
        d = ch.requestAvatarId(credentials)
        return d.addErrback(self._requestAvatarId, queue, credentials)
    
    requestAvatarId = lambda self, credentials: self._requestAvatarId(None, deque(self.checkers), credentials)
from twisted.trial import unittest
from checkers import CascadingChecker
from twisted.cred.credentials import UsernamePassword
from twisted.cred.checkers import InMemoryUsernamePasswordDatabaseDontUse
from twisted.cred.error import UnauthorizedLogin
from twisted.cred.credentials import IUsernameHashedPassword, IUsernamePassword

class CascadingTest(unittest.TestCase):
    
    def setUp(self):
        ch1 = InMemoryUsernamePasswordDatabaseDontUse()
        ch2 = InMemoryUsernamePasswordDatabaseDontUse()
        self.cach = CascadingChecker()
        
        ch1.addUser('foo', 'bar')
        ch1.addUser('boo', 'far')
        ch2.addUser('for', 'bao')
        
        self.cach.registerChecker(ch1)
        self.cach.registerChecker(ch2)
    
    def testInterfaces(self):
        self.assertEquals(self.cach.credentialInterfaces.difference((IUsernameHashedPassword, IUsernamePassword)), set())
    
    def testLoginFirstChecker(self):
        user = UsernamePassword('foo', 'bar')
        return self.cach.requestAvatarId(user)
    
    def testLoginSecondChecker(self):
        user = UsernamePassword('for', 'bao')
        return self.cach.requestAvatarId(user)
    
    def testLoginFail(self):
        user = UsernamePassword('steve', 'pswd')
        self.assertFailure(self.cach.requestAvatarId(user), UnauthorizedLogin)

Pepijn de Vos