# -*- coding: utf-8 -*-
#
# Copyright (C) <2016-2017> Gabriel Falcao <gabriel@nacaolivre.org>
# (C) Copyright 2003-2011 Jacek Konieczny <jajcus@jajcus.net>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License Version
# 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this program; if not, write to the Free Software
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
#
"""Base classes for PyXMPP SASL implementation.
Normative reference:
- `RFC 4422 <http://www.ietf.org/rfc/rfc4422.txt>`__
Authentication properties
-------------------------
Most authentication mechanisms needs some data to identify the
authenticating entity and/or to provide characteristics of the communication
channel. These are passed as a `properties` mapping to the ``.start()``
method to a server or client authenticator.
Similar mechanism is used to return data obtained via the authentication
process: the `Success` object has a `Success.properties` attribute with
the data obtained.
The mapping contains name->value pairs. Meaning of those is generally
mechanism-dependant, but these are the usually expected properties:
* For input to the ``start()`` method:
* ``"username"`` - the user name. Required by all password based mechanisms.
* ``"password"`` - the user's password. Required by all password based
mechanisms.
* ``"authzid"`` - authorization id. Optional for most mechanisms.
* ``"security-layer"`` - security layer if any. ``"TLS"`` when TLS is in
use.
* ``"channel-binding"`` - mapping of 'channel binding type' to 'channel
binding date' if available on the channel
* ``"service-type"`` - service type as required by the DIGEST-MD5 protocol
* ``"service-domain"`` - service domain (the 'serv-name' or 'host' part of
diges-uri of DIGEST-MD5)
* ``"service-hostname"`` - service host name (the 'host' par of diges-uri
of DIGEST-MD5)
* ``"remote-ip"`` - remote IP address
* ``"realm"`` - the realm to use if needed
* ``"realms"`` - list of acceptable realms
* ``"available_mechanisms"`` - mechanism list provided by peer
* ``"enabled_mechanisms"`` - mechanisms enabled on our side
* For output, via the `Success.properties` attribute:
* ``"username"`` - the authenticated user name
* ``"authzid"`` - the authorization id
* ``"realm"`` - the realm
"""
from __future__ import absolute_import, division
import uuid
import hashlib
import logging
from base64 import standard_b64encode
from abc import ABCMeta, abstractmethod
try:
from abc import abstractclassmethod
except ImportError:
abstractclassmethod = classmethod
logger = logging.getLogger("xmpp.sasl.core")
CLIENT_MECHANISMS_D = {}
CLIENT_MECHANISMS = []
SECURE_CLIENT_MECHANISMS = []
SERVER_MECHANISMS_D = {}
SERVER_MECHANISMS = []
SECURE_SERVER_MECHANISMS = []
[docs]class PasswordDatabase:
"""Password database interface.
PasswordDatabase object is responsible for providing or verification of
user authentication credentials on a server.
All the methods of the `PasswordDatabase` may be overridden in derived
classes for specific authentication and authorization policy.
"""
__metaclass__ = ABCMeta
[docs] def get_password(self, username, acceptable_formats, properties):
"""Get the password for user authentication.
By default returns (None, None) providing no password. Should be
overridden in derived classes unless only `check_password` functionality
is available.
:Parameters:
- `username`: the username for which the password is requested.
- `acceptable_formats`: a sequence of acceptable formats of the
password data. Could be "plain" (plain text password),
"md5:user:realm:password" (MD5 hex digest of user:realm:password)
or any other mechanism-specific encoding. This allows
non-plain-text storage of passwords. But only "plain" format will
work with all password authentication mechanisms.
- `properties`: mapping with authentication properties (those
provided to the authenticator's ``start()`` method plus some
already obtained via the mechanism).
:Types:
- `username`: `unicode`
- `acceptable_formats`: sequence of `unicode`
- `properties`: mapping
:return: the password and its encoding (format).
:returntype: `unicode`,`unicode` tuple.
"""
return None, None
[docs] def check_password(self, username, password, properties):
"""Check the password validity.
Used by plain-text authentication mechanisms.
Default implementation: retrieve a "plain" password for the `username`
and `realm` using `self.get_password` and compare it with the password
provided.
May be overridden e.g. to check the password against some external
authentication mechanism (PAM, LDAP, etc.).
:Parameters:
- `username`: the username for which the password verification is
requested.
- `password`: the password to verify.
- `properties`: mapping with authentication properties (those
provided to the authenticator's ``start()`` method plus some
already obtained via the mechanism).
:Types:
- `username`: `unicode`
- `password`: `unicode`
- `properties`: mapping
:return: `True` if the password is valid.
:returntype: `bool`
"""
logger.debug("check_password{0!r}".format(
(username, password, properties)))
pwd, pwd_format = self.get_password(username,
(u"plain", u"md5:user:realm:password"), properties)
if pwd_format == u"plain":
logger.debug("got plain password: {0!r}".format(pwd))
return pwd is not None and password == pwd
elif pwd_format in (u"md5:user:realm:password"):
logger.debug("got md5:user:realm:password password: {0!r}"
.format(pwd))
realm = properties.get("realm")
if realm is None:
realm = ""
else:
realm = realm.encode("utf-8")
username = username.encode("utf-8")
password = password.encode("utf-8")
urp_hash = hashlib.md5(b"%s:%s:%s").hexdigest()
return urp_hash == pwd
logger.debug(
"got password in unknown format: {0!r}".format(pwd_format))
return False
def default_nonce_factory():
"""Generate a random string for digest authentication challenges.
The string should be cryptographicaly secure random pattern.
:return: the string generated.
:returntype: `bytes`
"""
return uuid.uuid4().hex.encode("us-ascii")
[docs]class Reply(object):
"""Base class for SASL authentication reply objects.
:Ivariables:
- `data`: optional reply data.
:Types:
- `data`: `bytes`
"""
def __init__(self, data=None):
"""Initialize the `Reply` object.
:Parameters:
- `data`: optional reply data.
:Types:
- `data`: `bytes`
"""
self.data = data
[docs] def encode(self):
"""Base64-encode the data contained in the reply when appropriate.
:return: encoded data.
:returntype: `unicode`
"""
if self.data is None:
return ""
elif not self.data:
return "="
else:
ret = standard_b64encode(self.data)
return ret.decode("us-ascii")
[docs]class Challenge(Reply):
"""The challenge SASL message (server's challenge for the client)."""
def __init__(self, data):
"""Initialize the `Challenge` object."""
Reply.__init__(self, data)
def __repr__(self):
return "<sasl.Challenge: {0!r}>".format(self.data)
[docs]class Response(Reply):
"""The response SASL message (clients's reply the server's
challenge)."""
def __init__(self, data):
"""Initialize the `Response` object."""
Reply.__init__(self, data)
def __repr__(self):
return "<sasl.Response: {0!r}>".format(self.data)
[docs]class Failure(Reply):
"""The failure SASL message.
:Ivariables:
- `reason`: the failure reason.
:Types:
- `reason`: `unicode`.
"""
def __init__(self, reason):
"""Initialize the `Failure` object.
:Parameters:
- `reason`: the failure reason.
:Types:
- `reason`: `unicode`.
"""
Reply.__init__(self, None)
self.reason = reason
def __repr__(self):
return "<sasl.Failure: {0!r}>".format(self.reason)
[docs]class Success(Reply):
"""The success SASL message (sent by the server on authentication
success).
"""
def __init__(self, properties=None, data=None):
"""Initialize the `Success` object.
:Parameters:
- `properties`: the `authentication properties`_ obtained
- `data`: the success data to be sent to the client
:Types:
- `properties`: mapping
- `data`: `bytes`
"""
Reply.__init__(self, data)
if properties:
self.properties = properties
else:
self.properties = {}
def __repr__(self):
return "<sasl.Success: {0!r} data: {1!r}>".format(
self.properties, self.data)
class ClientAuthenticator:
"""Base class for client authenticators.
A client authenticator class is a client-side implementation of a SASL
mechanism. One `ClientAuthenticator` object may be used for one
client authentication process.
"""
__metaclass__ = ABCMeta
def __init__(self):
"""Initialize a `ClientAuthenticator` object."""
pass
@abstractclassmethod
def are_properties_sufficient(cls, properties):
"""Check if the provided properties are sufficient for
this authentication mechanism.
If `are_properties_sufficient` returns False for given `properties`
mapping, the `start` method of `cls` instance will also fail with
such argument.
:Parameters:
- `properties`: the `authentication properties`_
:Types:
- `properties`: mapping
:Return: if the mechanism can be used with those properties
"""
return False
@abstractmethod
def start(self, properties):
"""Start the authentication process.
:Parameters:
- `properties`: the `authentication properties`_
:Types:
- `properties`: mapping
:return: the initial response to send to the server or a failuer
indicator.
:returntype: `Response` or `Failure`
"""
raise NotImplementedError
@abstractmethod
def challenge(self, challenge):
"""Process the server's challenge.
:Parameters:
- `challenge`: the challenge.
:Types:
- `challenge`: `bytes`
:return: the response or a failure indicator.
:returntype: `Response` or `Failure`"""
raise NotImplementedError
@abstractmethod
def finish(self, data):
"""Handle authentication succes information from the server.
:Parameters:
- `data`: the optional additional data returned with the success.
:Types:
- `data`: `bytes`
:return: success or failure indicator.
:returntype: `Success` or `Failure`"""
raise NotImplementedError
class ServerAuthenticator:
"""Base class for server authenticators.
A server authenticator class is a server-side implementation of a SASL
mechanism. One `ServerAuthenticator` object may be used for one
client authentication process.
"""
__metaclass__ = ABCMeta
def __init__(self, password_database):
"""Initialize a `ServerAuthenticator` object.
:Parameters:
- `password_database`: a password database
:Types:
- `password_database`: `PasswordDataBase`
"""
self.password_database = password_database
@classmethod
def are_properties_sufficient(cls, properties):
"""Check if the provided properties are sufficient for
this authentication mechanism.
If `are_properties_sufficient` returns False for given `properties`
mapping, the `start` method of `cls` instance will also fail with
such argument.
:Parameters:
- `properties`: the `authentication properties`_
:Types:
- `properties`: mapping
:Return: if the mechanism can be used with those properties
"""
return True
@abstractmethod
def start(self, properties, initial_response):
"""Start the authentication process.
:Parameters:
- `properties`: the `authentication properties`_
- `initial_response`: the initial response send by the client with
the authentication request.
:Types:
- `properties`: mapping
- `initial_response`: `bytes`
:return: a challenge, a success or a failure indicator.
:returntype: `Challenge` or `Failure` or `Success`"""
raise NotImplementedError
@abstractmethod
def response(self, response):
"""Process a response from a client.
:Parameters:
- `response`: the response from the client to our challenge.
:Types:
- `response`: `bytes`
:return: a challenge, a success or a failure indicator.
:returntype: `Challenge` or `Success` or `Failure`"""
raise NotImplementedError
def _key_func(item):
"""Key function used for sorting SASL authenticator classes
"""
klass = item[1]
return (klass._pyxmpp_sasl_secure, klass._pyxmpp_sasl_preference)
def _register_client_authenticator(klass, name):
"""Add a client authenticator class to `CLIENT_MECHANISMS_D`,
`CLIENT_MECHANISMS` and, optionally, to `SECURE_CLIENT_MECHANISMS`
"""
CLIENT_MECHANISMS_D[name] = klass
items = sorted(CLIENT_MECHANISMS_D.items(), key=_key_func, reverse=True)
CLIENT_MECHANISMS[:] = [k for (k, v) in items]
SECURE_CLIENT_MECHANISMS[:] = [k for (k, v) in items
if v._pyxmpp_sasl_secure]
def _register_server_authenticator(klass, name):
"""Add a client authenticator class to `SERVER_MECHANISMS_D`,
`SERVER_MECHANISMS` and, optionally, to `SECURE_SERVER_MECHANISMS`
"""
SERVER_MECHANISMS_D[name] = klass
items = sorted(SERVER_MECHANISMS_D.items(), key=_key_func, reverse=True)
SERVER_MECHANISMS[:] = [k for (k, v) in items]
SECURE_SERVER_MECHANISMS[:] = [k for (k, v) in items
if v._pyxmpp_sasl_secure]
def sasl_mechanism(name, secure, preference=50):
"""Class decorator generator for `ClientAuthenticator` or
`ServerAuthenticator` subclasses. Adds the class to the pyxmpp.sasl
mechanism registry.
:Parameters:
- `name`: SASL mechanism name
- `secure`: if the mechanims can be considered secure - `True`
if it can be used over plain-text channel
- `preference`: mechanism preference level (the higher the better)
:Types:
- `name`: `unicode`
- `secure`: `bool`
- `preference`: `int`
"""
def decorator(klass):
"""The decorator."""
klass._pyxmpp_sasl_secure = secure
klass._pyxmpp_sasl_preference = preference
if issubclass(klass, ClientAuthenticator):
_register_client_authenticator(klass, name)
elif issubclass(klass, ServerAuthenticator):
_register_server_authenticator(klass, name)
else:
raise TypeError("Not a ClientAuthenticator"
" or ServerAuthenticator class")
return klass
return decorator
# vi: sts=4 et sw=4