Source code for xmpp.models.node

# <xmpp - stateless and concurrency-agnostic XMPP library for python>
#
# Copyright (C) <2016-2017> Gabriel Falcao <gabriel@nacaolivre.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from collections import OrderedDict
from xmpp.core import ET
from xmpp.core import node_to_string
from xmpp.core import split_tag_and_namespace
from xmpp.core import fixup_element
from xmpp._registry import _NODE_MAPPING

# ET.register_namespace("stream", "http://etherx.jabber.org/streams")


class MetaNode(type):
    def __init__(NodeClass, name, bases, members):
        if name == 'Node':
            return super(MetaNode, NodeClass).__init__(name, bases, members)

        tag = (members.get("__tag__") or getattr(NodeClass, '__tag__', None) or '').strip()
        namespaces = members.get("__namespaces__") or getattr(NodeClass, '__namespaces__', None) or []

        etag = (members.get("__etag__") or getattr(NodeClass, '__etag__', tag) or '').strip()
        children_of = members.get("__children_of__") or getattr(NodeClass, '__children_of__', None)

        xmlns = dict(namespaces).get('', None)

        if etag and etag != tag:
            _NODE_MAPPING[etag] = NodeClass

        if tag and xmlns:
            _NODE_MAPPING[(tag, xmlns)] = NodeClass

        NodeClass.__children__ = []
        NodeClass.__tag__ = tag.strip()
        NodeClass.__etag__ = etag.strip()
        NodeClass.__namespaces__ = namespaces

        if children_of:
            siblings = getattr(children_of, '__children__', [])
            siblings.append(NodeClass)
            children_of.__children__ = siblings

        super(MetaNode, NodeClass).__init__(name, bases, members)


[docs]class Node(object): """Base class for all XML node definitions. The xmpp library only supports XML tags that are explicitly defined as python classes that inherit from this one. """ __tag__ = None __etag__ = None __namespaces__ = [] __prefixes__ = None __single__ = False __children_of__ = None __metaclass__ = MetaNode def __init__(self, element, closed=False): # self._original = element.copy() self._element = fixup_element(element) self._closed = closed or self.__single__ # if not element.tag: # raise TypeError('invalid element {0}'.format(element)) self._tag, self._namespaces = self.extract_namespace(element.tag) self._attributes = OrderedDict() for attr in element.attrib: clean, namespace = self.extract_namespace(attr) self._namespaces.update(namespace) self._attributes[clean] = element.attrib[attr] self.initialize() def initialize(self): pass def __eq__(self, other): if not isinstance(other, self.__class__): return False return self.to_dict() == other.to_dict() def close(self): self._closed = True @property def is_closed(self): return self.__single__ or self._closed def is_parent_of(self, other_node): return other_node.__children_of__ == self.__class__ def extract_namespace(self, attribute): name, ns = split_tag_and_namespace(attribute.strip()) if not ns: return attribute, OrderedDict() return name, OrderedDict([(name, ns)])
[docs] @classmethod def create(cls, _stringcontent=None, **kw): """creates a node instance :param _stringcontent: the content text of the tag, if any :param **kw: keyword arguments that will become tag attributes """ params = OrderedDict() if not cls.__etag__: tag = cls.__tag__ for suffix, ns in cls.__namespaces__: attr = ":".join(filter(bool, ['xmlns', suffix])) params[attr] = ns else: tag = cls.__etag__ if not tag: tag = 'xmpp-unknown' params.update(kw) element = ET.Element(tag, **params) if isinstance(_stringcontent, basestring): element.text = _stringcontent return Node.from_element(element, allow_fixedup=True)
@classmethod def with_child_and_attributes(cls, child, **params): node = cls.create(**params) node.append(child) return node def set_attribute(self, attr, value): self._element.attrib[attr] = value self._attributes[attr] = value @property def tag(self): return self._tag @property def attr(self): return self._attributes.copy() @property def namespaces(self): return self._namespaces.copy() def query(self, xpath): items = [] for element in self._element.findall(xpath): items.append(Node.from_element(element, allow_fixedup=True)) return items def get(self, xpath): element = self._element.find(xpath) if element is None: return return Node.from_element(element, allow_fixedup=True) def get_children(self): return [Node.from_element(e, allow_fixedup=True) for e in self._element.getchildren()] def get_value(self): return self._element.text or b'' def set_value(self, value): self._element.text = value value = property(fset=set_value, fget=get_value) @classmethod def from_xml(cls, xml): node = ET.fromstring(xml) return Node.from_element(node) def add_text(self, text): if not self._element.text: self._element.text = b'' self._element.text += bytes(text) def append(self, node): if self.is_closed: msg = 'Refused to append a child to the closed node: {0}' raise TypeError(msg.format(self.to_xml())) self._element.append(node._element) def to_dict(self): data = { 'tag': self.tag, } if not self.__single__: data['nodes'] = [Node.from_element(c).to_dict() for c in self._element] if self.value: data['value'] = self.value if self.attr: data['attributes'] = dict(self.attr) return data def to_xml(self): return node_to_string(self) def __str__(self): return self.to_xml() def __repr__(self): return '{3}(tag={0}, attributes={1}, namespaces={2})'.format( self._element.tag, self.attr, self.namespaces, self.__class__.__name__ ) @staticmethod def from_element(element, allow_fixedup=False): NodeClass = _NODE_MAPPING.get(element.tag, None) if allow_fixedup and NodeClass is None: # element might be fixed up already, let's # try to fetch its Node key = element.tag, element.attrib.get('xmlns') NodeClass = _NODE_MAPPING.get(key, None) if NodeClass is None: # could not find any specialized Node subclasses to # represent this `element` let's fallback to Node NodeClass = Node return NodeClass(element)