From 7b0977dcfd0e39768ac6ad096b3a32b9ec366fa5 Mon Sep 17 00:00:00 2001 From: Willem Melching Date: Fri, 24 Apr 2020 14:27:05 -0700 Subject: [PATCH] remove overpy from pyextra --- pyextra/overpy/__about__.py | 22 - pyextra/overpy/__init__.py | 1619 ----------------------------------- pyextra/overpy/exception.py | 166 ---- pyextra/overpy/helper.py | 64 -- 4 files changed, 1871 deletions(-) delete mode 100644 pyextra/overpy/__about__.py delete mode 100644 pyextra/overpy/__init__.py delete mode 100644 pyextra/overpy/exception.py delete mode 100644 pyextra/overpy/helper.py diff --git a/pyextra/overpy/__about__.py b/pyextra/overpy/__about__.py deleted file mode 100644 index 33c6c493..00000000 --- a/pyextra/overpy/__about__.py +++ /dev/null @@ -1,22 +0,0 @@ -__all__ = [ - "__author__", - "__copyright__", - "__email__", - "__license__", - "__summary__", - "__title__", - "__uri__", - "__version__", -] - -__title__ = "overpy" -__summary__ = "Python Wrapper to access the OpenStreepMap Overpass API" -__uri__ = "https://github.com/DinoTools/python-overpy" - -__version__ = "0.4" - -__author__ = "PhiBo (DinoTools)" -__email__ = "" - -__license__ = "MIT" -__copyright__ = "Copyright 2014-2016 %s" % __author__ diff --git a/pyextra/overpy/__init__.py b/pyextra/overpy/__init__.py deleted file mode 100644 index 2836080a..00000000 --- a/pyextra/overpy/__init__.py +++ /dev/null @@ -1,1619 +0,0 @@ -from collections import OrderedDict -from datetime import datetime -from decimal import Decimal -from xml.sax import handler, make_parser -import json -import re -import sys -import time -import requests - -from overpy import exception -from overpy.__about__ import ( - __author__, __copyright__, __email__, __license__, __summary__, __title__, - __uri__, __version__ -) - -PY2 = sys.version_info[0] == 2 -PY3 = sys.version_info[0] == 3 - -XML_PARSER_DOM = 1 -XML_PARSER_SAX = 2 - -# Try to convert some common attributes -# http://wiki.openstreetmap.org/wiki/Elements#Common_attributes -GLOBAL_ATTRIBUTE_MODIFIERS = { - "changeset": int, - "timestamp": lambda ts: datetime.strptime(ts, "%Y-%m-%dT%H:%M:%SZ"), - "uid": int, - "version": int, - "visible": lambda v: v.lower() == "true" -} - - -def is_valid_type(element, cls): - """ - Test if an element is of a given type. - - :param Element() element: The element instance to test - :param Element cls: The element class to test - :return: False or True - :rtype: Boolean - """ - return isinstance(element, cls) and element.id is not None - - -class Overpass(object): - """ - Class to access the Overpass API - - :cvar default_max_retry_count: Global max number of retries (Default: 0) - :cvar default_retry_timeout: Global time to wait between tries (Default: 1.0s) - """ - default_max_retry_count = 0 - default_read_chunk_size = 4096 - default_retry_timeout = 1.0 - default_url = "http://overpass-api.de/api/interpreter" - - def __init__(self, read_chunk_size=None, url=None, xml_parser=XML_PARSER_SAX, max_retry_count=None, retry_timeout=None, timeout=5.0, headers=None): - """ - :param read_chunk_size: Max size of each chunk read from the server response - :type read_chunk_size: Integer - :param url: Optional URL of the Overpass server. Defaults to http://overpass-api.de/api/interpreter - :type url: str - :param xml_parser: The xml parser to use - :type xml_parser: Integer - :param max_retry_count: Max number of retries (Default: default_max_retry_count) - :type max_retry_count: Integer - :param retry_timeout: Time to wait between tries (Default: default_retry_timeout) - :type retry_timeout: float - :param timeout: HTTP request timeout - :type timeout: float - :param headers: HTTP request headers - :type headers: dict - """ - self.url = self.default_url - if url is not None: - self.url = url - - self._regex_extract_error_msg = re.compile(b"\(?P\") - self._regex_remove_tag = re.compile(b"<[^>]*?>") - if read_chunk_size is None: - read_chunk_size = self.default_read_chunk_size - self.read_chunk_size = read_chunk_size - - if max_retry_count is None: - max_retry_count = self.default_max_retry_count - self.max_retry_count = max_retry_count - - if retry_timeout is None: - retry_timeout = self.default_retry_timeout - self.retry_timeout = retry_timeout - - self.xml_parser = xml_parser - self.timeout = timeout - self.headers = headers - - def _handle_remark_msg(self, msg): - """ - Try to parse the message provided with the remark tag or element. - - :param str msg: The message - :raises overpy.exception.OverpassRuntimeError: If message starts with 'runtime error:' - :raises overpy.exception.OverpassRuntimeRemark: If message starts with 'runtime remark:' - :raises overpy.exception.OverpassUnknownError: If we are unable to identify the error - """ - msg = msg.strip() - if msg.startswith("runtime error:"): - raise exception.OverpassRuntimeError(msg=msg) - elif msg.startswith("runtime remark:"): - raise exception.OverpassRuntimeRemark(msg=msg) - raise exception.OverpassUnknownError(msg=msg) - - def query(self, query): - """ - Query the Overpass API - - :param String|Bytes query: The query string in Overpass QL - :return: The parsed result - :rtype: overpy.Result - """ - if not isinstance(query, bytes): - query = query.encode("utf-8") - - retry_num = 0 - retry_exceptions = [] - do_retry = True if self.max_retry_count > 0 else False - while retry_num <= self.max_retry_count: - if retry_num > 0: - time.sleep(self.retry_timeout) - retry_num += 1 - - try: - if self.headers is not None: - r = requests.post(self.url, query, timeout=self.timeout, headers=self.headers) - else: - r = requests.post(self.url, query, timeout=self.timeout) - response = r.content - except (requests.exceptions.BaseHTTPError, requests.exceptions.RequestException) as e: - if not do_retry: - raise e - retry_exceptions.append(e) - continue - - if r.status_code == 200: - content_type = r.headers["Content-Type"] - - if content_type == "application/json": - return self.parse_json(response) - - if content_type == "application/osm3s+xml": - return self.parse_xml(response) - - e = exception.OverpassUnknownContentType(content_type) - if not do_retry: - raise e - retry_exceptions.append(e) - continue - elif r.status_code == 400: - msgs = [] - for msg in self._regex_extract_error_msg.finditer(response): - tmp = self._regex_remove_tag.sub(b"", msg.group("msg")) - try: - tmp = tmp.decode("utf-8") - except UnicodeDecodeError: - tmp = repr(tmp) - msgs.append(tmp) - - e = exception.OverpassBadRequest( - query, - msgs=msgs - ) - if not do_retry: - raise e - retry_exceptions.append(e) - continue - elif r.status_code == 429: - e = exception.OverpassTooManyRequests - if not do_retry: - raise e - retry_exceptions.append(e) - continue - elif r.status_code == 504: - e = exception.OverpassGatewayTimeout - if not do_retry: - raise e - retry_exceptions.append(e) - continue - - # No valid response code - e = exception.OverpassUnknownHTTPStatusCode(r.status_code) - if not do_retry: - raise e - retry_exceptions.append(e) - continue - - raise exception.MaxRetriesReached(retry_count=retry_num, exceptions=retry_exceptions) - - def parse_json(self, data, encoding="utf-8"): - """ - Parse raw response from Overpass service. - - :param data: Raw JSON Data - :type data: String or Bytes - :param encoding: Encoding to decode byte string - :type encoding: String - :return: Result object - :rtype: overpy.Result - """ - if isinstance(data, bytes): - data = data.decode(encoding) - - data = json.loads(data, parse_float=Decimal) - if "remark" in data: - self._handle_remark_msg(msg=data.get("remark")) - return Result.from_json(data, api=self) - - def parse_xml(self, data, encoding="utf-8", parser=None): - """ - - :param data: Raw XML Data - :type data: String or Bytes - :param encoding: Encoding to decode byte string - :type encoding: String - :return: Result object - :rtype: overpy.Result - """ - if parser is None: - parser = self.xml_parser - if isinstance(data, bytes): - data = data.decode(encoding) - if PY2 and not isinstance(data, str): - # Python 2.x: Convert unicode strings - data = data.encode(encoding) - - m = re.compile("(?P[^<>]*)").search(data) - if m: - self._handle_remark_msg(m.group("msg")) - - return Result.from_xml(data, api=self, parser=parser) - - -class Result(object): - """ - Class to handle the result. - """ - - def __init__(self, elements=None, api=None): - """ - - :param List elements: - :param api: - :type api: overpy.Overpass - """ - if elements is None: - elements = [] - self._areas = OrderedDict((element.id, element) for element in elements if is_valid_type(element, Area)) - self._nodes = OrderedDict((element.id, element) for element in elements if is_valid_type(element, Node)) - self._ways = OrderedDict((element.id, element) for element in elements if is_valid_type(element, Way)) - self._relations = OrderedDict((element.id, element) - for element in elements if is_valid_type(element, Relation)) - self._class_collection_map = {Node: self._nodes, Way: self._ways, Relation: self._relations, Area: self._areas} - self.api = api - - def expand(self, other): - """ - Add all elements from an other result to the list of elements of this result object. - - It is used by the auto resolve feature. - - :param other: Expand the result with the elements from this result. - :type other: overpy.Result - :raises ValueError: If provided parameter is not instance of :class:`overpy.Result` - """ - if not isinstance(other, Result): - raise ValueError("Provided argument has to be instance of overpy:Result()") - - other_collection_map = {Node: other.nodes, Way: other.ways, Relation: other.relations, Area: other.areas} - for element_type, own_collection in self._class_collection_map.items(): - for element in other_collection_map[element_type]: - if is_valid_type(element, element_type) and element.id not in own_collection: - own_collection[element.id] = element - - def append(self, element): - """ - Append a new element to the result. - - :param element: The element to append - :type element: overpy.Element - """ - if is_valid_type(element, Element): - self._class_collection_map[element.__class__].setdefault(element.id, element) - - def get_elements(self, filter_cls, elem_id=None): - """ - Get a list of elements from the result and filter the element type by a class. - - :param filter_cls: - :param elem_id: ID of the object - :type elem_id: Integer - :return: List of available elements - :rtype: List - """ - result = [] - if elem_id is not None: - try: - result = [self._class_collection_map[filter_cls][elem_id]] - except KeyError: - result = [] - else: - for e in self._class_collection_map[filter_cls].values(): - result.append(e) - return result - - def get_ids(self, filter_cls): - """ - - :param filter_cls: - :return: - """ - return list(self._class_collection_map[filter_cls].keys()) - - def get_node_ids(self): - return self.get_ids(filter_cls=Node) - - def get_way_ids(self): - return self.get_ids(filter_cls=Way) - - def get_relation_ids(self): - return self.get_ids(filter_cls=Relation) - - def get_area_ids(self): - return self.get_ids(filter_cls=Area) - - @classmethod - def from_json(cls, data, api=None): - """ - Create a new instance and load data from json object. - - :param data: JSON data returned by the Overpass API - :type data: Dict - :param api: - :type api: overpy.Overpass - :return: New instance of Result object - :rtype: overpy.Result - """ - result = cls(api=api) - for elem_cls in [Node, Way, Relation, Area]: - for element in data.get("elements", []): - e_type = element.get("type") - if hasattr(e_type, "lower") and e_type.lower() == elem_cls._type_value: - result.append(elem_cls.from_json(element, result=result)) - - return result - - @classmethod - def from_xml(cls, data, api=None, parser=None): - """ - Create a new instance and load data from xml data or object. - - .. note:: - If parser is set to None, the functions tries to find the best parse. - By default the SAX parser is chosen if a string is provided as data. - The parser is set to DOM if an xml.etree.ElementTree.Element is provided as data value. - - :param data: Root element - :type data: str | xml.etree.ElementTree.Element - :param api: The instance to query additional information if required. - :type api: Overpass - :param parser: Specify the parser to use(DOM or SAX)(Default: None = autodetect, defaults to SAX) - :type parser: Integer | None - :return: New instance of Result object - :rtype: Result - """ - if parser is None: - if isinstance(data, str): - parser = XML_PARSER_SAX - else: - parser = XML_PARSER_DOM - - result = cls(api=api) - if parser == XML_PARSER_DOM: - import xml.etree.ElementTree as ET - if isinstance(data, str): - root = ET.fromstring(data) - elif isinstance(data, ET.Element): - root = data - else: - raise exception.OverPyException("Unable to detect data type.") - - for elem_cls in [Node, Way, Relation, Area]: - for child in root: - if child.tag.lower() == elem_cls._type_value: - result.append(elem_cls.from_xml(child, result=result)) - - elif parser == XML_PARSER_SAX: - if PY2: - from StringIO import StringIO - else: - from io import StringIO - source = StringIO(data) - sax_handler = OSMSAXHandler(result) - parser = make_parser() - parser.setContentHandler(sax_handler) - parser.parse(source) - else: - # ToDo: better exception - raise Exception("Unknown XML parser") - return result - - def get_area(self, area_id, resolve_missing=False): - """ - Get an area by its ID. - - :param area_id: The area ID - :type area_id: Integer - :param resolve_missing: Query the Overpass API if the area is missing in the result set. - :return: The area - :rtype: overpy.Area - :raises overpy.exception.DataIncomplete: The requested way is not available in the result cache. - :raises overpy.exception.DataIncomplete: If resolve_missing is True and the area can't be resolved. - """ - areas = self.get_areas(area_id=area_id) - if len(areas) == 0: - if resolve_missing is False: - raise exception.DataIncomplete("Resolve missing area is disabled") - - query = ("\n" - "[out:json];\n" - "area({area_id});\n" - "out body;\n" - ) - query = query.format( - area_id=area_id - ) - tmp_result = self.api.query(query) - self.expand(tmp_result) - - areas = self.get_areas(area_id=area_id) - - if len(areas) == 0: - raise exception.DataIncomplete("Unable to resolve requested areas") - - return areas[0] - - def get_areas(self, area_id=None, **kwargs): - """ - Alias for get_elements() but filter the result by Area - - :param area_id: The Id of the area - :type area_id: Integer - :return: List of elements - """ - return self.get_elements(Area, elem_id=area_id, **kwargs) - - def get_node(self, node_id, resolve_missing=False): - """ - Get a node by its ID. - - :param node_id: The node ID - :type node_id: Integer - :param resolve_missing: Query the Overpass API if the node is missing in the result set. - :return: The node - :rtype: overpy.Node - :raises overpy.exception.DataIncomplete: At least one referenced node is not available in the result cache. - :raises overpy.exception.DataIncomplete: If resolve_missing is True and at least one node can't be resolved. - """ - nodes = self.get_nodes(node_id=node_id) - if len(nodes) == 0: - if not resolve_missing: - raise exception.DataIncomplete("Resolve missing nodes is disabled") - - query = ("\n" - "[out:json];\n" - "node({node_id});\n" - "out body;\n" - ) - query = query.format( - node_id=node_id - ) - tmp_result = self.api.query(query) - self.expand(tmp_result) - - nodes = self.get_nodes(node_id=node_id) - - if len(nodes) == 0: - raise exception.DataIncomplete("Unable to resolve all nodes") - - return nodes[0] - - def get_nodes(self, node_id=None, **kwargs): - """ - Alias for get_elements() but filter the result by Node() - - :param node_id: The Id of the node - :type node_id: Integer - :return: List of elements - """ - return self.get_elements(Node, elem_id=node_id, **kwargs) - - def get_relation(self, rel_id, resolve_missing=False): - """ - Get a relation by its ID. - - :param rel_id: The relation ID - :type rel_id: Integer - :param resolve_missing: Query the Overpass API if the relation is missing in the result set. - :return: The relation - :rtype: overpy.Relation - :raises overpy.exception.DataIncomplete: The requested relation is not available in the result cache. - :raises overpy.exception.DataIncomplete: If resolve_missing is True and the relation can't be resolved. - """ - relations = self.get_relations(rel_id=rel_id) - if len(relations) == 0: - if resolve_missing is False: - raise exception.DataIncomplete("Resolve missing relations is disabled") - - query = ("\n" - "[out:json];\n" - "relation({relation_id});\n" - "out body;\n" - ) - query = query.format( - relation_id=rel_id - ) - tmp_result = self.api.query(query) - self.expand(tmp_result) - - relations = self.get_relations(rel_id=rel_id) - - if len(relations) == 0: - raise exception.DataIncomplete("Unable to resolve requested reference") - - return relations[0] - - def get_relations(self, rel_id=None, **kwargs): - """ - Alias for get_elements() but filter the result by Relation - - :param rel_id: Id of the relation - :type rel_id: Integer - :return: List of elements - """ - return self.get_elements(Relation, elem_id=rel_id, **kwargs) - - def get_way(self, way_id, resolve_missing=False): - """ - Get a way by its ID. - - :param way_id: The way ID - :type way_id: Integer - :param resolve_missing: Query the Overpass API if the way is missing in the result set. - :return: The way - :rtype: overpy.Way - :raises overpy.exception.DataIncomplete: The requested way is not available in the result cache. - :raises overpy.exception.DataIncomplete: If resolve_missing is True and the way can't be resolved. - """ - ways = self.get_ways(way_id=way_id) - if len(ways) == 0: - if resolve_missing is False: - raise exception.DataIncomplete("Resolve missing way is disabled") - - query = ("\n" - "[out:json];\n" - "way({way_id});\n" - "out body;\n" - ) - query = query.format( - way_id=way_id - ) - tmp_result = self.api.query(query) - self.expand(tmp_result) - - ways = self.get_ways(way_id=way_id) - - if len(ways) == 0: - raise exception.DataIncomplete("Unable to resolve requested way") - - return ways[0] - - def get_ways(self, way_id=None, **kwargs): - """ - Alias for get_elements() but filter the result by Way - - :param way_id: The Id of the way - :type way_id: Integer - :return: List of elements - """ - return self.get_elements(Way, elem_id=way_id, **kwargs) - - area_ids = property(get_area_ids) - areas = property(get_areas) - node_ids = property(get_node_ids) - nodes = property(get_nodes) - relation_ids = property(get_relation_ids) - relations = property(get_relations) - way_ids = property(get_way_ids) - ways = property(get_ways) - - -class Element(object): - """ - Base element - """ - - def __init__(self, attributes=None, result=None, tags=None): - """ - :param attributes: Additional attributes - :type attributes: Dict - :param result: The result object this element belongs to - :param tags: List of tags - :type tags: Dict - """ - - self._result = result - self.attributes = attributes - # ToDo: Add option to modify attribute modifiers - attribute_modifiers = dict(GLOBAL_ATTRIBUTE_MODIFIERS.items()) - for n, m in attribute_modifiers.items(): - if n in self.attributes: - self.attributes[n] = m(self.attributes[n]) - self.id = None - self.tags = tags - - @classmethod - def get_center_from_json(cls, data): - """ - Get center information from json data - - :param data: json data - :return: tuple with two elements: lat and lon - :rtype: tuple - """ - center_lat = None - center_lon = None - center = data.get("center") - if isinstance(center, dict): - center_lat = center.get("lat") - center_lon = center.get("lon") - if center_lat is None or center_lon is None: - raise ValueError("Unable to get lat or lon of way center.") - center_lat = Decimal(center_lat) - center_lon = Decimal(center_lon) - return (center_lat, center_lon) - - @classmethod - def get_center_from_xml_dom(cls, sub_child): - center_lat = sub_child.attrib.get("lat") - center_lon = sub_child.attrib.get("lon") - if center_lat is None or center_lon is None: - raise ValueError("Unable to get lat or lon of way center.") - center_lat = Decimal(center_lat) - center_lon = Decimal(center_lon) - return center_lat, center_lon - - -class Area(Element): - """ - Class to represent an element of type area - """ - - _type_value = "area" - - def __init__(self, area_id=None, **kwargs): - """ - :param area_id: Id of the area element - :type area_id: Integer - :param kwargs: Additional arguments are passed directly to the parent class - - """ - - Element.__init__(self, **kwargs) - #: The id of the way - self.id = area_id - - def __repr__(self): - return "".format(self.id) - - @classmethod - def from_json(cls, data, result=None): - """ - Create new Area element from JSON data - - :param data: Element data from JSON - :type data: Dict - :param result: The result this element belongs to - :type result: overpy.Result - :return: New instance of Way - :rtype: overpy.Area - :raises overpy.exception.ElementDataWrongType: If type value of the passed JSON data does not match. - """ - if data.get("type") != cls._type_value: - raise exception.ElementDataWrongType( - type_expected=cls._type_value, - type_provided=data.get("type") - ) - - tags = data.get("tags", {}) - - area_id = data.get("id") - - attributes = {} - ignore = ["id", "tags", "type"] - for n, v in data.items(): - if n in ignore: - continue - attributes[n] = v - - return cls(area_id=area_id, attributes=attributes, tags=tags, result=result) - - @classmethod - def from_xml(cls, child, result=None): - """ - Create new way element from XML data - - :param child: XML node to be parsed - :type child: xml.etree.ElementTree.Element - :param result: The result this node belongs to - :type result: overpy.Result - :return: New Way oject - :rtype: overpy.Way - :raises overpy.exception.ElementDataWrongType: If name of the xml child node doesn't match - :raises ValueError: If the ref attribute of the xml node is not provided - :raises ValueError: If a tag doesn't have a name - """ - if child.tag.lower() != cls._type_value: - raise exception.ElementDataWrongType( - type_expected=cls._type_value, - type_provided=child.tag.lower() - ) - - tags = {} - - for sub_child in child: - if sub_child.tag.lower() == "tag": - name = sub_child.attrib.get("k") - if name is None: - raise ValueError("Tag without name/key.") - value = sub_child.attrib.get("v") - tags[name] = value - - area_id = child.attrib.get("id") - if area_id is not None: - area_id = int(area_id) - - attributes = {} - ignore = ["id"] - for n, v in child.attrib.items(): - if n in ignore: - continue - attributes[n] = v - - return cls(area_id=area_id, attributes=attributes, tags=tags, result=result) - - -class Node(Element): - """ - Class to represent an element of type node - """ - - _type_value = "node" - - def __init__(self, node_id=None, lat=None, lon=None, **kwargs): - """ - :param lat: Latitude - :type lat: Decimal or Float - :param lon: Longitude - :type long: Decimal or Float - :param node_id: Id of the node element - :type node_id: Integer - :param kwargs: Additional arguments are passed directly to the parent class - """ - - Element.__init__(self, **kwargs) - self.id = node_id - self.lat = lat - self.lon = lon - - def __repr__(self): - return "".format(self.id, self.lat, self.lon) - - @classmethod - def from_json(cls, data, result=None): - """ - Create new Node element from JSON data - - :param data: Element data from JSON - :type data: Dict - :param result: The result this element belongs to - :type result: overpy.Result - :return: New instance of Node - :rtype: overpy.Node - :raises overpy.exception.ElementDataWrongType: If type value of the passed JSON data does not match. - """ - if data.get("type") != cls._type_value: - raise exception.ElementDataWrongType( - type_expected=cls._type_value, - type_provided=data.get("type") - ) - - tags = data.get("tags", {}) - - node_id = data.get("id") - lat = data.get("lat") - lon = data.get("lon") - - attributes = {} - ignore = ["type", "id", "lat", "lon", "tags"] - for n, v in data.items(): - if n in ignore: - continue - attributes[n] = v - - return cls(node_id=node_id, lat=lat, lon=lon, tags=tags, attributes=attributes, result=result) - - @classmethod - def from_xml(cls, child, result=None): - """ - Create new way element from XML data - - :param child: XML node to be parsed - :type child: xml.etree.ElementTree.Element - :param result: The result this node belongs to - :type result: overpy.Result - :return: New Way oject - :rtype: overpy.Node - :raises overpy.exception.ElementDataWrongType: If name of the xml child node doesn't match - :raises ValueError: If a tag doesn't have a name - """ - if child.tag.lower() != cls._type_value: - raise exception.ElementDataWrongType( - type_expected=cls._type_value, - type_provided=child.tag.lower() - ) - - tags = {} - - for sub_child in child: - if sub_child.tag.lower() == "tag": - name = sub_child.attrib.get("k") - if name is None: - raise ValueError("Tag without name/key.") - value = sub_child.attrib.get("v") - tags[name] = value - - node_id = child.attrib.get("id") - if node_id is not None: - node_id = int(node_id) - lat = child.attrib.get("lat") - if lat is not None: - lat = Decimal(lat) - lon = child.attrib.get("lon") - if lon is not None: - lon = Decimal(lon) - - attributes = {} - ignore = ["id", "lat", "lon"] - for n, v in child.attrib.items(): - if n in ignore: - continue - attributes[n] = v - - return cls(node_id=node_id, lat=lat, lon=lon, tags=tags, attributes=attributes, result=result) - - -class Way(Element): - """ - Class to represent an element of type way - """ - - _type_value = "way" - - def __init__(self, way_id=None, center_lat=None, center_lon=None, node_ids=None, **kwargs): - """ - :param node_ids: List of node IDs - :type node_ids: List or Tuple - :param way_id: Id of the way element - :type way_id: Integer - :param kwargs: Additional arguments are passed directly to the parent class - - """ - - Element.__init__(self, **kwargs) - #: The id of the way - self.id = way_id - - #: List of Ids of the associated nodes - self._node_ids = node_ids - - #: The lat/lon of the center of the way (optional depending on query) - self.center_lat = center_lat - self.center_lon = center_lon - - def __repr__(self): - return "".format(self.id, self._node_ids) - - @property - def nodes(self): - """ - List of nodes associated with the way. - """ - return self.get_nodes() - - def get_nodes(self, resolve_missing=False): - """ - Get the nodes defining the geometry of the way - - :param resolve_missing: Try to resolve missing nodes. - :type resolve_missing: Boolean - :return: List of nodes - :rtype: List of overpy.Node - :raises overpy.exception.DataIncomplete: At least one referenced node is not available in the result cache. - :raises overpy.exception.DataIncomplete: If resolve_missing is True and at least one node can't be resolved. - """ - result = [] - resolved = False - - for node_id in self._node_ids: - try: - node = self._result.get_node(node_id) - except exception.DataIncomplete: - node = None - - if node is not None: - result.append(node) - continue - - if not resolve_missing: - raise exception.DataIncomplete("Resolve missing nodes is disabled") - - # We tried to resolve the data but some nodes are still missing - if resolved: - raise exception.DataIncomplete("Unable to resolve all nodes") - - query = ("\n" - "[out:json];\n" - "way({way_id});\n" - "node(w);\n" - "out body;\n" - ) - query = query.format( - way_id=self.id - ) - tmp_result = self._result.api.query(query) - self._result.expand(tmp_result) - resolved = True - - try: - node = self._result.get_node(node_id) - except exception.DataIncomplete: - node = None - - if node is None: - raise exception.DataIncomplete("Unable to resolve all nodes") - - result.append(node) - - return result - - @classmethod - def from_json(cls, data, result=None): - """ - Create new Way element from JSON data - - :param data: Element data from JSON - :type data: Dict - :param result: The result this element belongs to - :type result: overpy.Result - :return: New instance of Way - :rtype: overpy.Way - :raises overpy.exception.ElementDataWrongType: If type value of the passed JSON data does not match. - """ - if data.get("type") != cls._type_value: - raise exception.ElementDataWrongType( - type_expected=cls._type_value, - type_provided=data.get("type") - ) - - tags = data.get("tags", {}) - - way_id = data.get("id") - node_ids = data.get("nodes") - (center_lat, center_lon) = cls.get_center_from_json(data=data) - - attributes = {} - ignore = ["center", "id", "nodes", "tags", "type"] - for n, v in data.items(): - if n in ignore: - continue - attributes[n] = v - - return cls( - attributes=attributes, - center_lat=center_lat, - center_lon=center_lon, - node_ids=node_ids, - tags=tags, - result=result, - way_id=way_id - ) - - @classmethod - def from_xml(cls, child, result=None): - """ - Create new way element from XML data - - :param child: XML node to be parsed - :type child: xml.etree.ElementTree.Element - :param result: The result this node belongs to - :type result: overpy.Result - :return: New Way oject - :rtype: overpy.Way - :raises overpy.exception.ElementDataWrongType: If name of the xml child node doesn't match - :raises ValueError: If the ref attribute of the xml node is not provided - :raises ValueError: If a tag doesn't have a name - """ - if child.tag.lower() != cls._type_value: - raise exception.ElementDataWrongType( - type_expected=cls._type_value, - type_provided=child.tag.lower() - ) - - tags = {} - node_ids = [] - center_lat = None - center_lon = None - - for sub_child in child: - if sub_child.tag.lower() == "tag": - name = sub_child.attrib.get("k") - if name is None: - raise ValueError("Tag without name/key.") - value = sub_child.attrib.get("v") - tags[name] = value - if sub_child.tag.lower() == "nd": - ref_id = sub_child.attrib.get("ref") - if ref_id is None: - raise ValueError("Unable to find required ref value.") - ref_id = int(ref_id) - node_ids.append(ref_id) - if sub_child.tag.lower() == "center": - (center_lat, center_lon) = cls.get_center_from_xml_dom(sub_child=sub_child) - - way_id = child.attrib.get("id") - if way_id is not None: - way_id = int(way_id) - - attributes = {} - ignore = ["id"] - for n, v in child.attrib.items(): - if n in ignore: - continue - attributes[n] = v - - return cls(way_id=way_id, center_lat=center_lat, center_lon=center_lon, - attributes=attributes, node_ids=node_ids, tags=tags, result=result) - - -class Relation(Element): - """ - Class to represent an element of type relation - """ - - _type_value = "relation" - - def __init__(self, rel_id=None, center_lat=None, center_lon=None, members=None, **kwargs): - """ - :param members: - :param rel_id: Id of the relation element - :type rel_id: Integer - :param kwargs: - :return: - """ - - Element.__init__(self, **kwargs) - self.id = rel_id - self.members = members - - #: The lat/lon of the center of the way (optional depending on query) - self.center_lat = center_lat - self.center_lon = center_lon - - def __repr__(self): - return "".format(self.id) - - @classmethod - def from_json(cls, data, result=None): - """ - Create new Relation element from JSON data - - :param data: Element data from JSON - :type data: Dict - :param result: The result this element belongs to - :type result: overpy.Result - :return: New instance of Relation - :rtype: overpy.Relation - :raises overpy.exception.ElementDataWrongType: If type value of the passed JSON data does not match. - """ - if data.get("type") != cls._type_value: - raise exception.ElementDataWrongType( - type_expected=cls._type_value, - type_provided=data.get("type") - ) - - tags = data.get("tags", {}) - - rel_id = data.get("id") - (center_lat, center_lon) = cls.get_center_from_json(data=data) - - members = [] - - supported_members = [RelationNode, RelationWay, RelationRelation] - for member in data.get("members", []): - type_value = member.get("type") - for member_cls in supported_members: - if member_cls._type_value == type_value: - members.append( - member_cls.from_json( - member, - result=result - ) - ) - - attributes = {} - ignore = ["id", "members", "tags", "type"] - for n, v in data.items(): - if n in ignore: - continue - attributes[n] = v - - return cls( - rel_id=rel_id, - attributes=attributes, - center_lat=center_lat, - center_lon=center_lon, - members=members, - tags=tags, - result=result - ) - - @classmethod - def from_xml(cls, child, result=None): - """ - Create new way element from XML data - - :param child: XML node to be parsed - :type child: xml.etree.ElementTree.Element - :param result: The result this node belongs to - :type result: overpy.Result - :return: New Way oject - :rtype: overpy.Relation - :raises overpy.exception.ElementDataWrongType: If name of the xml child node doesn't match - :raises ValueError: If a tag doesn't have a name - """ - if child.tag.lower() != cls._type_value: - raise exception.ElementDataWrongType( - type_expected=cls._type_value, - type_provided=child.tag.lower() - ) - - tags = {} - members = [] - center_lat = None - center_lon = None - - supported_members = [RelationNode, RelationWay, RelationRelation, RelationArea] - for sub_child in child: - if sub_child.tag.lower() == "tag": - name = sub_child.attrib.get("k") - if name is None: - raise ValueError("Tag without name/key.") - value = sub_child.attrib.get("v") - tags[name] = value - if sub_child.tag.lower() == "member": - type_value = sub_child.attrib.get("type") - for member_cls in supported_members: - if member_cls._type_value == type_value: - members.append( - member_cls.from_xml( - sub_child, - result=result - ) - ) - if sub_child.tag.lower() == "center": - (center_lat, center_lon) = cls.get_center_from_xml_dom(sub_child=sub_child) - - rel_id = child.attrib.get("id") - if rel_id is not None: - rel_id = int(rel_id) - - attributes = {} - ignore = ["id"] - for n, v in child.attrib.items(): - if n in ignore: - continue - attributes[n] = v - - return cls( - rel_id=rel_id, - attributes=attributes, - center_lat=center_lat, - center_lon=center_lon, - members=members, - tags=tags, - result=result - ) - - -class RelationMember(object): - """ - Base class to represent a member of a relation. - """ - - def __init__(self, attributes=None, geometry=None, ref=None, role=None, result=None): - """ - :param ref: Reference Id - :type ref: Integer - :param role: The role of the relation member - :type role: String - :param result: - """ - self.ref = ref - self._result = result - self.role = role - self.attributes = attributes - self.geometry = geometry - - @classmethod - def from_json(cls, data, result=None): - """ - Create new RelationMember element from JSON data - - :param child: Element data from JSON - :type child: Dict - :param result: The result this element belongs to - :type result: overpy.Result - :return: New instance of RelationMember - :rtype: overpy.RelationMember - :raises overpy.exception.ElementDataWrongType: If type value of the passed JSON data does not match. - """ - if data.get("type") != cls._type_value: - raise exception.ElementDataWrongType( - type_expected=cls._type_value, - type_provided=data.get("type") - ) - - ref = data.get("ref") - role = data.get("role") - - attributes = {} - ignore = ["geometry", "type", "ref", "role"] - for n, v in data.items(): - if n in ignore: - continue - attributes[n] = v - - geometry = data.get("geometry") - if isinstance(geometry, list): - geometry_orig = geometry - geometry = [] - for v in geometry_orig: - geometry.append( - RelationWayGeometryValue( - lat=v.get("lat"), - lon=v.get("lon") - ) - ) - else: - geometry = None - - return cls( - attributes=attributes, - geometry=geometry, - ref=ref, - role=role, - result=result - ) - - @classmethod - def from_xml(cls, child, result=None): - """ - Create new RelationMember from XML data - - :param child: XML node to be parsed - :type child: xml.etree.ElementTree.Element - :param result: The result this element belongs to - :type result: overpy.Result - :return: New relation member oject - :rtype: overpy.RelationMember - :raises overpy.exception.ElementDataWrongType: If name of the xml child node doesn't match - """ - if child.attrib.get("type") != cls._type_value: - raise exception.ElementDataWrongType( - type_expected=cls._type_value, - type_provided=child.tag.lower() - ) - - ref = child.attrib.get("ref") - if ref is not None: - ref = int(ref) - role = child.attrib.get("role") - - attributes = {} - ignore = ["geometry", "ref", "role", "type"] - for n, v in child.attrib.items(): - if n in ignore: - continue - attributes[n] = v - - geometry = None - for sub_child in child: - if sub_child.tag.lower() == "nd": - if geometry is None: - geometry = [] - geometry.append( - RelationWayGeometryValue( - lat=Decimal(sub_child.attrib["lat"]), - lon=Decimal(sub_child.attrib["lon"]) - ) - ) - - return cls( - attributes=attributes, - geometry=geometry, - ref=ref, - role=role, - result=result - ) - - -class RelationNode(RelationMember): - _type_value = "node" - - def resolve(self, resolve_missing=False): - return self._result.get_node(self.ref, resolve_missing=resolve_missing) - - def __repr__(self): - return "".format(self.ref, self.role) - - -class RelationWay(RelationMember): - _type_value = "way" - - def resolve(self, resolve_missing=False): - return self._result.get_way(self.ref, resolve_missing=resolve_missing) - - def __repr__(self): - return "".format(self.ref, self.role) - - -class RelationWayGeometryValue(object): - def __init__(self, lat, lon): - self.lat = lat - self.lon = lon - - def __repr__(self): - return "".format(self.lat, self.lon) - - -class RelationRelation(RelationMember): - _type_value = "relation" - - def resolve(self, resolve_missing=False): - return self._result.get_relation(self.ref, resolve_missing=resolve_missing) - - def __repr__(self): - return "".format(self.ref, self.role) - - -class RelationArea(RelationMember): - _type_value = "area" - - def resolve(self, resolve_missing=False): - return self._result.get_area(self.ref, resolve_missing=resolve_missing) - - def __repr__(self): - return "".format(self.ref, self.role) - - -class OSMSAXHandler(handler.ContentHandler): - """ - SAX parser for Overpass XML response. - """ - #: Tuple of opening elements to ignore - ignore_start = ('osm', 'meta', 'note', 'bounds', 'remark') - #: Tuple of closing elements to ignore - ignore_end = ('osm', 'meta', 'note', 'bounds', 'remark', 'tag', 'nd', 'center') - - def __init__(self, result): - """ - :param result: Append results to this result set. - :type result: overpy.Result - """ - handler.ContentHandler.__init__(self) - self._result = result - self._curr = {} - #: Current relation member object - self.cur_relation_member = None - - def startElement(self, name, attrs): - """ - Handle opening elements. - - :param name: Name of the element - :type name: String - :param attrs: Attributes of the element - :type attrs: Dict - """ - if name in self.ignore_start: - return - try: - handler = getattr(self, '_handle_start_%s' % name) - except AttributeError: - raise KeyError("Unknown element start '%s'" % name) - handler(attrs) - - def endElement(self, name): - """ - Handle closing elements - - :param name: Name of the element - :type name: String - """ - if name in self.ignore_end: - return - try: - handler = getattr(self, '_handle_end_%s' % name) - except AttributeError: - raise KeyError("Unknown element end '%s'" % name) - handler() - - def _handle_start_center(self, attrs): - """ - Handle opening center element - - :param attrs: Attributes of the element - :type attrs: Dict - """ - center_lat = attrs.get("lat") - center_lon = attrs.get("lon") - if center_lat is None or center_lon is None: - raise ValueError("Unable to get lat or lon of way center.") - self._curr["center_lat"] = Decimal(center_lat) - self._curr["center_lon"] = Decimal(center_lon) - - def _handle_start_tag(self, attrs): - """ - Handle opening tag element - - :param attrs: Attributes of the element - :type attrs: Dict - """ - try: - tag_key = attrs['k'] - except KeyError: - raise ValueError("Tag without name/key.") - self._curr['tags'][tag_key] = attrs.get('v') - - def _handle_start_node(self, attrs): - """ - Handle opening node element - - :param attrs: Attributes of the element - :type attrs: Dict - """ - self._curr = { - 'attributes': dict(attrs), - 'lat': None, - 'lon': None, - 'node_id': None, - 'tags': {} - } - if attrs.get('id', None) is not None: - self._curr['node_id'] = int(attrs['id']) - del self._curr['attributes']['id'] - if attrs.get('lat', None) is not None: - self._curr['lat'] = Decimal(attrs['lat']) - del self._curr['attributes']['lat'] - if attrs.get('lon', None) is not None: - self._curr['lon'] = Decimal(attrs['lon']) - del self._curr['attributes']['lon'] - - def _handle_end_node(self): - """ - Handle closing node element - """ - self._result.append(Node(result=self._result, **self._curr)) - self._curr = {} - - def _handle_start_way(self, attrs): - """ - Handle opening way element - - :param attrs: Attributes of the element - :type attrs: Dict - """ - self._curr = { - 'center_lat': None, - 'center_lon': None, - 'attributes': dict(attrs), - 'node_ids': [], - 'tags': {}, - 'way_id': None - } - if attrs.get('id', None) is not None: - self._curr['way_id'] = int(attrs['id']) - del self._curr['attributes']['id'] - - def _handle_end_way(self): - """ - Handle closing way element - """ - self._result.append(Way(result=self._result, **self._curr)) - self._curr = {} - - def _handle_start_area(self, attrs): - """ - Handle opening area element - - :param attrs: Attributes of the element - :type attrs: Dict - """ - self._curr = { - 'attributes': dict(attrs), - 'tags': {}, - 'area_id': None - } - if attrs.get('id', None) is not None: - self._curr['area_id'] = int(attrs['id']) - del self._curr['attributes']['id'] - - def _handle_end_area(self): - """ - Handle closing area element - """ - self._result.append(Area(result=self._result, **self._curr)) - self._curr = {} - - def _handle_start_nd(self, attrs): - """ - Handle opening nd element - - :param attrs: Attributes of the element - :type attrs: Dict - """ - if isinstance(self.cur_relation_member, RelationWay): - if self.cur_relation_member.geometry is None: - self.cur_relation_member.geometry = [] - self.cur_relation_member.geometry.append( - RelationWayGeometryValue( - lat=Decimal(attrs["lat"]), - lon=Decimal(attrs["lon"]) - ) - ) - else: - try: - node_ref = attrs['ref'] - except KeyError: - raise ValueError("Unable to find required ref value.") - self._curr['node_ids'].append(int(node_ref)) - - def _handle_start_relation(self, attrs): - """ - Handle opening relation element - - :param attrs: Attributes of the element - :type attrs: Dict - """ - self._curr = { - 'attributes': dict(attrs), - 'members': [], - 'rel_id': None, - 'tags': {} - } - if attrs.get('id', None) is not None: - self._curr['rel_id'] = int(attrs['id']) - del self._curr['attributes']['id'] - - def _handle_end_relation(self): - """ - Handle closing relation element - """ - self._result.append(Relation(result=self._result, **self._curr)) - self._curr = {} - - def _handle_start_member(self, attrs): - """ - Handle opening member element - - :param attrs: Attributes of the element - :type attrs: Dict - """ - - params = { - # ToDo: Parse attributes - 'attributes': {}, - 'ref': None, - 'result': self._result, - 'role': None - } - if attrs.get('ref', None): - params['ref'] = int(attrs['ref']) - if attrs.get('role', None): - params['role'] = attrs['role'] - - cls_map = { - "area": RelationArea, - "node": RelationNode, - "relation": RelationRelation, - "way": RelationWay - } - cls = cls_map.get(attrs["type"]) - if cls is None: - raise ValueError("Undefined type for member: '%s'" % attrs['type']) - - self.cur_relation_member = cls(**params) - self._curr['members'].append(self.cur_relation_member) - - def _handle_end_member(self): - self.cur_relation_member = None diff --git a/pyextra/overpy/exception.py b/pyextra/overpy/exception.py deleted file mode 100644 index 3d8416a1..00000000 --- a/pyextra/overpy/exception.py +++ /dev/null @@ -1,166 +0,0 @@ -class OverPyException(BaseException): - """OverPy base exception""" - pass - - -class DataIncomplete(OverPyException): - """ - Raised if the requested data isn't available in the result. - Try to improve the query or to resolve the missing data. - """ - def __init__(self, *args, **kwargs): - OverPyException.__init__( - self, - "Data incomplete try to improve the query to resolve the missing data", - *args, - **kwargs - ) - - -class ElementDataWrongType(OverPyException): - """ - Raised if the provided element does not match the expected type. - - :param type_expected: The expected element type - :type type_expected: String - :param type_provided: The provided element type - :type type_provided: String|None - """ - def __init__(self, type_expected, type_provided=None): - self.type_expected = type_expected - self.type_provided = type_provided - - def __str__(self): - return "Type expected '%s' but '%s' provided" % ( - self.type_expected, - str(self.type_provided) - ) - - -class MaxRetriesReached(OverPyException): - """ - Raised if max retries reached and the Overpass server didn't respond with a result. - """ - def __init__(self, retry_count, exceptions): - self.exceptions = exceptions - self.retry_count = retry_count - - def __str__(self): - return "Unable get any result from the Overpass API server after %d retries." % self.retry_count - - -class OverpassBadRequest(OverPyException): - """ - Raised if the Overpass API service returns a syntax error. - - :param query: The encoded query how it was send to the server - :type query: Bytes - :param msgs: List of error messages - :type msgs: List - """ - def __init__(self, query, msgs=None): - self.query = query - if msgs is None: - msgs = [] - self.msgs = msgs - - def __str__(self): - tmp_msgs = [] - for tmp_msg in self.msgs: - if not isinstance(tmp_msg, str): - tmp_msg = str(tmp_msg) - tmp_msgs.append(tmp_msg) - - return "\n".join(tmp_msgs) - - -class OverpassError(OverPyException): - """ - Base exception to report errors if the response returns a remark tag or element. - - .. note:: - If you are not sure which of the subexceptions you should use, use this one and try to parse the message. - - For more information have a look at https://github.com/DinoTools/python-overpy/issues/62 - - :param str msg: The message from the remark tag or element - """ - def __init__(self, msg=None): - #: The message from the remark tag or element - self.msg = msg - - def __str__(self): - if self.msg is None: - return "No error message provided" - if not isinstance(self.msg, str): - return str(self.msg) - return self.msg - - -class OverpassGatewayTimeout(OverPyException): - """ - Raised if load of the Overpass API service is too high and it can't handle the request. - """ - def __init__(self): - OverPyException.__init__(self, "Server load too high") - - -class OverpassRuntimeError(OverpassError): - """ - Raised if the server returns a remark-tag(xml) or remark element(json) with a message starting with - 'runtime error:'. - """ - pass - - -class OverpassRuntimeRemark(OverpassError): - """ - Raised if the server returns a remark-tag(xml) or remark element(json) with a message starting with - 'runtime remark:'. - """ - pass - - -class OverpassTooManyRequests(OverPyException): - """ - Raised if the Overpass API service returns a 429 status code. - """ - def __init__(self): - OverPyException.__init__(self, "Too many requests") - - -class OverpassUnknownContentType(OverPyException): - """ - Raised if the reported content type isn't handled by OverPy. - - :param content_type: The reported content type - :type content_type: None or String - """ - def __init__(self, content_type): - self.content_type = content_type - - def __str__(self): - if self.content_type is None: - return "No content type returned" - return "Unknown content type: %s" % self.content_type - - -class OverpassUnknownError(OverpassError): - """ - Raised if the server returns a remark-tag(xml) or remark element(json) and we are unable to find any reason. - """ - pass - - -class OverpassUnknownHTTPStatusCode(OverPyException): - """ - Raised if the returned HTTP status code isn't handled by OverPy. - - :param code: The HTTP status code - :type code: Integer - """ - def __init__(self, code): - self.code = code - - def __str__(self): - return "Unknown/Unhandled status code: %d" % self.code \ No newline at end of file diff --git a/pyextra/overpy/helper.py b/pyextra/overpy/helper.py deleted file mode 100644 index e3ac0170..00000000 --- a/pyextra/overpy/helper.py +++ /dev/null @@ -1,64 +0,0 @@ -__author__ = 'mjob' - -import overpy - - -def get_street(street, areacode, api=None): - """ - Retrieve streets in a given bounding area - - :param overpy.Overpass api: First street of intersection - :param String street: Name of street - :param String areacode: The OSM id of the bounding area - :return: Parsed result - :raises overpy.exception.OverPyException: If something bad happens. - """ - if api is None: - api = overpy.Overpass() - - query = """ - area(%s)->.location; - ( - way[highway][name="%s"](area.location); - - ( - way[highway=service](area.location); - way[highway=track](area.location); - ); - ); - out body; - >; - out skel qt; - """ - - data = api.query(query % (areacode, street)) - - return data - - -def get_intersection(street1, street2, areacode, api=None): - """ - Retrieve intersection of two streets in a given bounding area - - :param overpy.Overpass api: First street of intersection - :param String street1: Name of first street of intersection - :param String street2: Name of second street of intersection - :param String areacode: The OSM id of the bounding area - :return: List of intersections - :raises overpy.exception.OverPyException: If something bad happens. - """ - if api is None: - api = overpy.Overpass() - - query = """ - area(%s)->.location; - ( - way[highway][name="%s"](area.location); node(w)->.n1; - way[highway][name="%s"](area.location); node(w)->.n2; - ); - node.n1.n2; - out meta; - """ - - data = api.query(query % (areacode, street1, street2)) - - return data.get_nodes()