# -*- coding: utf-8 -*-
#
# <xmpp - stateless and concurrency-agnostic XMPP library for python>
#
# Copyright (C) <2016-2017> Gabriel Falcao <gabriel@nacaolivre.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import re
import uuid
import logging
from xmpp.core import ET
from speakers import Speaker as Events
from xmpp.core import generate_id
from xmpp.util import stderr
from xmpp.extensions import get_known_extensions
from xmpp.models import (
Node,
Error,
PresencePriority,
Stream,
StreamError,
ProceedTLS,
SASLAuth,
SASLChallenge,
SASLResponse,
SASLSuccess,
SASLFailure,
Message,
Presence,
IQ,
ResourceBind,
SASLMechanismSet,
BoundJid,
JID,
StartTLS,
IQRegister,
PresenceDelay,
RosterQuery,
RosterItem,
RosterGroup,
MissingJID,
)
logger = logging.getLogger('xmpp.stream')
class STREAM_STATES(object):
IDLE = 'IDLE'
AUTHENTICATED = 'AUTHENTICATED'
READY = 'READY'
OPEN = 'OPEN'
CLOSED = 'CLOSED'
@classmethod
def keys(cls):
return filter(lambda k: k.upper() == k, dir(cls))
@classmethod
def is_valid(cls, state):
return state in cls.keys()
xml_cleanup_regex1 = re.compile(r'^[<][?]xml[^?]+[^>]+[>]')
def sanitize_feed(data):
data = xml_cleanup_regex1.sub('', data)
return data.lstrip()
def create_spool(data):
treated = data.replace('><', '>\n<')
return treated.splitlines()
def create_stream_events():
return Events('stream', [
'feed', # the XMLStream has just been fed with xml
'open', # the XMLStream is open
'closed', # the XMLStream has been closed
'error', # received a <stream:error></stream:error> from the server
'unhandled_xml', # the XMLStream failed to feed the incremental XML parser with the given value
'node', # a new xmpp.Node was just parsed by the stream and is available to use
'iq', # a new xmpp.IQ was node was received
'message', # a new xmpp.Message node was received
'presence', # a new xmpp.Presence node was received
'start_stream', # a new stream is being negotiated
'start_tls', # server sent <starttls />
'tls_proceed', # the peer allowed the TCP connection to upgrade to TLS
'sasl_challenge', # the peer sent a SASL challenge
'sasl_success', # the peer sent a SASL success
'sasl_failure', # the peer sent a SASL failure
'sasl_response', # the peer sent a SASL response
'sasl_support', # the peer says it supports SASL
'bind_support', # the peer says it supports binding resource
'iq_result', # the peer returned a <iq type="result"></iq>
'iq_set', # the peer returned a <iq type="set"></iq>
'iq_get', # the peer returned a <iq type="get"></iq>
'iq_error', # the peer returned a <iq type="error"></iq>
'user_registration', # the peer supports user registration
'bound_jid', # the peer returned a <jid>username@domain/resource</jid> that should be used in the from= of stanzas
])
[docs]class XMLStream(object):
"""`XML Stream <https://xmpp.org/rfcs/rfc3920.html#streams>`_ behavior class.
:param connection: a :py:class:`~xmpp.networking.core.XMPPConnection` instance
:param debug: whether to print errors to the stderr
"""
def __init__(self, connection, debug=False):
self._state = STREAM_STATES.IDLE
self._connection = connection
self._tls_connection = None
self._connection.on.ready_to_write(self.ready_to_write)
self._connection.on.ready_to_read(self.ready_to_read)
self.extension = {}
self.on = create_stream_events()
self.on.node(self.route_nodes)
if debug:
self.on.error(lambda _, data: stderr.bold_red("error: {0}".format(data)))
self.on.closed(lambda _, data: stderr.bold_yellow("XMLStream closed by the server"))
self.on.iq_error(lambda _, data: stderr.bold_red("error: {0}".format(data)))
self.on.unhandled_xml(lambda _, data: stderr.bold_red("unhandled xml: {0}".format(data)))
else:
self.on.unhandled_xml(lambda event, xml: logger.warning("unhandled xml: %s", xml))
self.reset()
@property
def bound_jid(self):
"""a :py:class:`~xmpp.models.core.JID` or ``None``
Automatically captured from the XML traffic.
"""
return self.__bound_jid
[docs] def reset(self):
"""resets the minimal state of the XML Stream, that is:
* attributes of the <stream> sent by the server during negotiation, used by :py:meth:`~xmpp.stream.XMLStream.id`
* a bound JID sent by the server
* a successful sasl result node to leverage :py:meth:`~xmpp.stream.XMLStream.has_gone_through_sasl`
"""
self._buffer = []
# minimal state:
self.__sasl_result = None
self.__bound_jid = None
self.resource_name = 'xmpp-{0}'.format(uuid.uuid4())
self.nodes = []
self.stream_node = None
self.parser = self.make_parser()
self.load_extensions()
[docs] def load_extensions(self):
"""reloads all the available extensions bound to this stream"""
self.extension = {}
for number, Extension in get_known_extensions():
self.extension[number] = Extension(self)
def handle_bound_jid(self, node):
jid = JID(node.value.strip())
self.__bound_jid = jid
self.on.bound_jid.shout(jid)
def handle_message(self, node):
self.on.message.shout(node)
def handle_presence(self, node):
self.on.presence.shout(node)
def handle_iq(self, node):
iq_type = node.attr.get('type')
ROUTES = {
'get': self.on.iq_get,
'set': self.on.iq_set,
'error': self.on.iq_error,
'result': self.on.iq_result,
}
event = ROUTES.get(iq_type)
if event:
event.shout(node)
def route_nodes(self, _, node):
ROUTES = {
# direct shout throught the event handlers
ProceedTLS: self.on.tls_proceed,
SASLChallenge: self.on.sasl_challenge,
SASLResponse: self.on.sasl_response,
StartTLS: self.on.start_tls,
ResourceBind: self.on.bind_support,
SASLMechanismSet: self.on.sasl_support,
IQRegister: self.on.user_registration,
StreamError: self.on.error,
# internal affairs when handling node before shouting it
# through event handlers
IQ: self.handle_iq,
Message: self.handle_message,
Presence: self.handle_presence,
BoundJid: self.handle_bound_jid,
SASLFailure: self.handle_sasl_failure,
SASLSuccess: self.handle_sasl_success,
}
NodeClass = type(node)
event = ROUTES.get(NodeClass)
if type(node) == Node:
logger.warning("no model defined for %s: %r", node.to_xml(), node)
return
if isinstance(node, Error):
self.on.error.shout(node)
if event:
if hasattr(event, 'shout'):
event.shout(node)
else:
event(node)
@property
def id(self):
"""returns the stream id provided by the server. ``<stream:stream id="SOMETHING">``
Mainly used by the
:py:meth:`~xmpp.extensions.xep0114.Component.authenticate`
when crafting the secret.
"""
if self.stream_node is None:
return
return self.stream_node.attr.get('id')
[docs] def parse(self):
"""attempts to parse whatever is in the buffer of the incremental XML
parser and creates a new parser."""
try:
if self._buffer and self._buffer[-1].endswith('>'):
element = self.parser.close()
self.parser = self.make_parser()
else:
element = None
except ET.ParseError:
return
if element is None and len(self.nodes) > 0:
return self.nodes[-1]
elif element is None:
return
return Node.from_element(element, allow_fixedup=True)
[docs] def ready_to_read(self, _, connection):
"""event handler for the ``on.ready_to_read`` event of a XMPP Connection.
You should probably never have to call this by hand, use
:py:meth:`~xmpp.stream.XMLStream.bind` instead
"""
self.feed(connection.receive())
node = self.parse()
if node:
self.on.node.shout(node)
[docs] def ready_to_write(self, _, connection):
"""even handler for the ``on.ready_to_write`` event of a XMPP
Connection.
You should probably never have to call this by hand, use
:py:meth:`~xmpp.stream.XMLStream.bind` instead
"""
self.set_state(STREAM_STATES.READY)
return
def append_node(self, node):
self.nodes.append(node)
def node_did_open(self, node):
if isinstance(node, Stream):
self.set_state(STREAM_STATES.OPEN)
self.stream_node = node
self.on.open.shout(node)
self.append_node(node)
[docs] def send(self, node):
"""sends a XML serialized Node through the bound XMPP connection
:param node: the :py:class:`~xmpp.models.node.Node`
"""
self._connection.send(node.to_xml())
[docs] def close(self, disconnect=True):
"""sends a final ``</stream:stream>`` to the server then immediately
closes the bound TCP connection,disposes it and resets the
minimum state kept by the stream, so it can be reutilized right away.
"""
self._connection.send(b'</stream:stream>')
self._state = STREAM_STATES.IDLE
if disconnect:
self._connection.disconnect()
self.reset()
self._connection = None
[docs] def open_client(self, domain):
"""Sends a <stream:stream xmlns="jabber:client"> to the given domain
:param domain: the FQDN of the XMPP server
"""
initial = Stream.create_client(
to=domain,
)
self.send(initial)
def node_did_close(self, node):
if node.__tag__ == 'stream:stream':
self.set_state(STREAM_STATES.CLOSED)
else:
self.on.node.shout(node)
@property
def state(self):
return self._state
def set_state(self, state):
if not STREAM_STATES.is_valid(state):
msg = 'invalid state was given: {0}'.format(state)
raise TypeError(msg)
self._state = state
def make_parser(self):
self.target = NodeHandler(self)
return ET.XMLTreeBuilder(target=self.target)
def matches_closed_stream(self, string):
return string.rstrip().endswith('</stream:stream>')
[docs] def feed(self, data, attempt=1):
"""feeds the stream with incoming data from the XMPP server.
This is the basic entrypoint for usage with the XML received
from the :py:class:`~xmpp.networking.core.XMPPConnection`
:param data: the XML string
"""
self.on.feed.shout(data)
data = sanitize_feed(data)
spool = create_spool(data)
if not self.matches_closed_stream(data):
if len(spool) > 1:
for item in spool:
self.feed_from_spool(item)
else:
self.feed_from_spool(data)
else:
self.set_state(STREAM_STATES.CLOSED)
if self.stream_node:
self.on.closed.shout(self.stream_node)
def feed_from_spool(self, item, attempt=0):
try:
self.parser.feed(item)
self._buffer.append(item)
except ET.ParseError:
if attempt > 1:
self.on.unhandled_xml.shout(item)
return
self.parser = self.make_parser()
return self.feed_from_spool(item, attempt + 1)
def start_tls_handshake(self, domain):
self.send(StartTLS.create())
# def finish_tls_upgrade(self, connection):
# if self._connection != connection:
# logger.critical("finish_tls_upgrade: weird!!")
# self._tls_connection = connection
# def is_under_tls(self):
# return self._tls_connection is not None
def handle_sasl_success(self, node):
self.on.sasl_success.shout(node)
def handle_sasl_failure(self, node):
self.__sasl_result = None
self.on.sasl_failure.shout(node)
def finish_sasl(self, result):
self.__sasl_result = result
self.set_state(STREAM_STATES.AUTHENTICATED)
def has_gone_through_sasl(self):
return self.__sasl_result is not None
[docs] def send_sasl_auth(self, mechanism, message):
"""sends a SASL response to the server in order to proceed with authentication handshakes
:param mechanism: the name of SASL mechanism (i.e. SCRAM-SHA-1, PLAIN, EXTERNAL)
"""
node = SASLAuth.prepare(mechanism, message.encode())
self.send(node)
return node
[docs] def send_sasl_response(self, mechanism, message):
"""sends a SASL response to the server in order to proceed with authentication handshakes
:param mechanism: the name of SASL mechanism (i.e. SCRAM-SHA-1, PLAIN, EXTERNAL)
"""
node = SASLResponse.prepare(mechanism, message)
self.send(node)
return node
[docs] def bind_to_resource(self, name):
"""sends an ``<iq type="set"><resource>name</resource></iq>`` in order to bind the resource
:param name: the name of the resource
"""
iq = IQ.create(type='set', id=generate_id())
iq.append(ResourceBind.with_resource(name))
self.send(iq)
[docs] def send_presence(self, to=None, delay=None, priority=10, **params):
"""sends presence
:param to: jid to receive presence.
:param delay: if set, it must be a ISO compatible date string
:param priority: the priority of this resource
"""
from_jid = params.get('from')
if from_jid:
from_jid = JID(from_jid)
elif self.bound_jid:
from_jid = self.bound_jid
else:
msg = 'Presence cannot be sent when missing the "from" jid'
raise MissingJID(msg)
params['from'] = from_jid.full
if to:
params['to'] = JID(to).full
presence = Presence.create(**params)
if delay:
delay_params = {
'stamp': delay,
'from': from_jid.full
}
presence.append(PresenceDelay.create(**delay_params))
if priority:
node = PresencePriority.create(bytes(priority))
presence.append(node)
self.send(presence)
[docs] def send_message(self, message, to, **params):
"""
:param message: the string with the message
:param to: the jid to send the message to
:param **params: keyword args for designating attributes of the message
"""
self.send(Message.create(message, to=to, **params))
class NodeHandler(object):
def __init__(self, parent):
self.nodes = []
self._data = []
self.parent = parent
def start(self, tag, attrib):
parent_node = self.nodes and self.nodes[-1] or None
has_parent = parent_node and not parent_node.is_closed
element = ET.Element(tag, attrib)
node = self.notify_and_store_node(element)
if has_parent:
parent_node.append(node)
self.parent.node_did_open(node)
def end(self, tag):
current = self.nodes.pop()
if not self.nodes:
# always have only one node
self.nodes.append(current)
return
possible_parent = self.nodes[-1]
if possible_parent.is_parent_of(current):
self.parent.node_did_close(current)
def close(self):
if not self.nodes:
return
for node in reversed(self.nodes):
if node.is_closed:
return node._element
return node._element
def data(self, data):
target = self.nodes[-1]._element
if not target.text:
target.text = data
else:
target.text += data
return False
def notify_and_store_node(self, element):
node = Node.from_element(element)
self.nodes.append(node)
return node