nopenpilot/pyextra/overpy/__init__.py

1620 lines
52 KiB
Python

from collections import OrderedDict
from datetime import datetime
from decimal import Decimal
from xml.sax import handler, make_parser
import json
import re
import sys
import time
import requests
from overpy import exception
from overpy.__about__ import (
__author__, __copyright__, __email__, __license__, __summary__, __title__,
__uri__, __version__
)
PY2 = sys.version_info[0] == 2
PY3 = sys.version_info[0] == 3
XML_PARSER_DOM = 1
XML_PARSER_SAX = 2
# Try to convert some common attributes
# http://wiki.openstreetmap.org/wiki/Elements#Common_attributes
GLOBAL_ATTRIBUTE_MODIFIERS = {
"changeset": int,
"timestamp": lambda ts: datetime.strptime(ts, "%Y-%m-%dT%H:%M:%SZ"),
"uid": int,
"version": int,
"visible": lambda v: v.lower() == "true"
}
def is_valid_type(element, cls):
"""
Test if an element is of a given type.
:param Element() element: The element instance to test
:param Element cls: The element class to test
:return: False or True
:rtype: Boolean
"""
return isinstance(element, cls) and element.id is not None
class Overpass(object):
"""
Class to access the Overpass API
:cvar default_max_retry_count: Global max number of retries (Default: 0)
:cvar default_retry_timeout: Global time to wait between tries (Default: 1.0s)
"""
default_max_retry_count = 0
default_read_chunk_size = 4096
default_retry_timeout = 1.0
default_url = "http://overpass-api.de/api/interpreter"
def __init__(self, read_chunk_size=None, url=None, xml_parser=XML_PARSER_SAX, max_retry_count=None, retry_timeout=None, timeout=5.0, headers=None):
"""
:param read_chunk_size: Max size of each chunk read from the server response
:type read_chunk_size: Integer
:param url: Optional URL of the Overpass server. Defaults to http://overpass-api.de/api/interpreter
:type url: str
:param xml_parser: The xml parser to use
:type xml_parser: Integer
:param max_retry_count: Max number of retries (Default: default_max_retry_count)
:type max_retry_count: Integer
:param retry_timeout: Time to wait between tries (Default: default_retry_timeout)
:type retry_timeout: float
:param timeout: HTTP request timeout
:type timeout: float
:param headers: HTTP request headers
:type headers: dict
"""
self.url = self.default_url
if url is not None:
self.url = url
self._regex_extract_error_msg = re.compile(b"\<p\>(?P<msg>\<strong\s.*?)\</p\>")
self._regex_remove_tag = re.compile(b"<[^>]*?>")
if read_chunk_size is None:
read_chunk_size = self.default_read_chunk_size
self.read_chunk_size = read_chunk_size
if max_retry_count is None:
max_retry_count = self.default_max_retry_count
self.max_retry_count = max_retry_count
if retry_timeout is None:
retry_timeout = self.default_retry_timeout
self.retry_timeout = retry_timeout
self.xml_parser = xml_parser
self.timeout = timeout
self.headers = headers
def _handle_remark_msg(self, msg):
"""
Try to parse the message provided with the remark tag or element.
:param str msg: The message
:raises overpy.exception.OverpassRuntimeError: If message starts with 'runtime error:'
:raises overpy.exception.OverpassRuntimeRemark: If message starts with 'runtime remark:'
:raises overpy.exception.OverpassUnknownError: If we are unable to identify the error
"""
msg = msg.strip()
if msg.startswith("runtime error:"):
raise exception.OverpassRuntimeError(msg=msg)
elif msg.startswith("runtime remark:"):
raise exception.OverpassRuntimeRemark(msg=msg)
raise exception.OverpassUnknownError(msg=msg)
def query(self, query):
"""
Query the Overpass API
:param String|Bytes query: The query string in Overpass QL
:return: The parsed result
:rtype: overpy.Result
"""
if not isinstance(query, bytes):
query = query.encode("utf-8")
retry_num = 0
retry_exceptions = []
do_retry = True if self.max_retry_count > 0 else False
while retry_num <= self.max_retry_count:
if retry_num > 0:
time.sleep(self.retry_timeout)
retry_num += 1
try:
if self.headers is not None:
r = requests.post(self.url, query, timeout=self.timeout, headers=self.headers)
else:
r = requests.post(self.url, query, timeout=self.timeout)
response = r.content
except (requests.exceptions.BaseHTTPError, requests.exceptions.RequestException) as e:
if not do_retry:
raise e
retry_exceptions.append(e)
continue
if r.status_code == 200:
content_type = r.headers["Content-Type"]
if content_type == "application/json":
return self.parse_json(response)
if content_type == "application/osm3s+xml":
return self.parse_xml(response)
e = exception.OverpassUnknownContentType(content_type)
if not do_retry:
raise e
retry_exceptions.append(e)
continue
elif r.status_code == 400:
msgs = []
for msg in self._regex_extract_error_msg.finditer(response):
tmp = self._regex_remove_tag.sub(b"", msg.group("msg"))
try:
tmp = tmp.decode("utf-8")
except UnicodeDecodeError:
tmp = repr(tmp)
msgs.append(tmp)
e = exception.OverpassBadRequest(
query,
msgs=msgs
)
if not do_retry:
raise e
retry_exceptions.append(e)
continue
elif r.status_code == 429:
e = exception.OverpassTooManyRequests
if not do_retry:
raise e
retry_exceptions.append(e)
continue
elif r.status_code == 504:
e = exception.OverpassGatewayTimeout
if not do_retry:
raise e
retry_exceptions.append(e)
continue
# No valid response code
e = exception.OverpassUnknownHTTPStatusCode(r.status_code)
if not do_retry:
raise e
retry_exceptions.append(e)
continue
raise exception.MaxRetriesReached(retry_count=retry_num, exceptions=retry_exceptions)
def parse_json(self, data, encoding="utf-8"):
"""
Parse raw response from Overpass service.
:param data: Raw JSON Data
:type data: String or Bytes
:param encoding: Encoding to decode byte string
:type encoding: String
:return: Result object
:rtype: overpy.Result
"""
if isinstance(data, bytes):
data = data.decode(encoding)
data = json.loads(data, parse_float=Decimal)
if "remark" in data:
self._handle_remark_msg(msg=data.get("remark"))
return Result.from_json(data, api=self)
def parse_xml(self, data, encoding="utf-8", parser=None):
"""
:param data: Raw XML Data
:type data: String or Bytes
:param encoding: Encoding to decode byte string
:type encoding: String
:return: Result object
:rtype: overpy.Result
"""
if parser is None:
parser = self.xml_parser
if isinstance(data, bytes):
data = data.decode(encoding)
if PY2 and not isinstance(data, str):
# Python 2.x: Convert unicode strings
data = data.encode(encoding)
m = re.compile("<remark>(?P<msg>[^<>]*)</remark>").search(data)
if m:
self._handle_remark_msg(m.group("msg"))
return Result.from_xml(data, api=self, parser=parser)
class Result(object):
"""
Class to handle the result.
"""
def __init__(self, elements=None, api=None):
"""
:param List elements:
:param api:
:type api: overpy.Overpass
"""
if elements is None:
elements = []
self._areas = OrderedDict((element.id, element) for element in elements if is_valid_type(element, Area))
self._nodes = OrderedDict((element.id, element) for element in elements if is_valid_type(element, Node))
self._ways = OrderedDict((element.id, element) for element in elements if is_valid_type(element, Way))
self._relations = OrderedDict((element.id, element)
for element in elements if is_valid_type(element, Relation))
self._class_collection_map = {Node: self._nodes, Way: self._ways, Relation: self._relations, Area: self._areas}
self.api = api
def expand(self, other):
"""
Add all elements from an other result to the list of elements of this result object.
It is used by the auto resolve feature.
:param other: Expand the result with the elements from this result.
:type other: overpy.Result
:raises ValueError: If provided parameter is not instance of :class:`overpy.Result`
"""
if not isinstance(other, Result):
raise ValueError("Provided argument has to be instance of overpy:Result()")
other_collection_map = {Node: other.nodes, Way: other.ways, Relation: other.relations, Area: other.areas}
for element_type, own_collection in self._class_collection_map.items():
for element in other_collection_map[element_type]:
if is_valid_type(element, element_type) and element.id not in own_collection:
own_collection[element.id] = element
def append(self, element):
"""
Append a new element to the result.
:param element: The element to append
:type element: overpy.Element
"""
if is_valid_type(element, Element):
self._class_collection_map[element.__class__].setdefault(element.id, element)
def get_elements(self, filter_cls, elem_id=None):
"""
Get a list of elements from the result and filter the element type by a class.
:param filter_cls:
:param elem_id: ID of the object
:type elem_id: Integer
:return: List of available elements
:rtype: List
"""
result = []
if elem_id is not None:
try:
result = [self._class_collection_map[filter_cls][elem_id]]
except KeyError:
result = []
else:
for e in self._class_collection_map[filter_cls].values():
result.append(e)
return result
def get_ids(self, filter_cls):
"""
:param filter_cls:
:return:
"""
return list(self._class_collection_map[filter_cls].keys())
def get_node_ids(self):
return self.get_ids(filter_cls=Node)
def get_way_ids(self):
return self.get_ids(filter_cls=Way)
def get_relation_ids(self):
return self.get_ids(filter_cls=Relation)
def get_area_ids(self):
return self.get_ids(filter_cls=Area)
@classmethod
def from_json(cls, data, api=None):
"""
Create a new instance and load data from json object.
:param data: JSON data returned by the Overpass API
:type data: Dict
:param api:
:type api: overpy.Overpass
:return: New instance of Result object
:rtype: overpy.Result
"""
result = cls(api=api)
for elem_cls in [Node, Way, Relation, Area]:
for element in data.get("elements", []):
e_type = element.get("type")
if hasattr(e_type, "lower") and e_type.lower() == elem_cls._type_value:
result.append(elem_cls.from_json(element, result=result))
return result
@classmethod
def from_xml(cls, data, api=None, parser=None):
"""
Create a new instance and load data from xml data or object.
.. note::
If parser is set to None, the functions tries to find the best parse.
By default the SAX parser is chosen if a string is provided as data.
The parser is set to DOM if an xml.etree.ElementTree.Element is provided as data value.
:param data: Root element
:type data: str | xml.etree.ElementTree.Element
:param api: The instance to query additional information if required.
:type api: Overpass
:param parser: Specify the parser to use(DOM or SAX)(Default: None = autodetect, defaults to SAX)
:type parser: Integer | None
:return: New instance of Result object
:rtype: Result
"""
if parser is None:
if isinstance(data, str):
parser = XML_PARSER_SAX
else:
parser = XML_PARSER_DOM
result = cls(api=api)
if parser == XML_PARSER_DOM:
import xml.etree.ElementTree as ET
if isinstance(data, str):
root = ET.fromstring(data)
elif isinstance(data, ET.Element):
root = data
else:
raise exception.OverPyException("Unable to detect data type.")
for elem_cls in [Node, Way, Relation, Area]:
for child in root:
if child.tag.lower() == elem_cls._type_value:
result.append(elem_cls.from_xml(child, result=result))
elif parser == XML_PARSER_SAX:
if PY2:
from StringIO import StringIO
else:
from io import StringIO
source = StringIO(data)
sax_handler = OSMSAXHandler(result)
parser = make_parser()
parser.setContentHandler(sax_handler)
parser.parse(source)
else:
# ToDo: better exception
raise Exception("Unknown XML parser")
return result
def get_area(self, area_id, resolve_missing=False):
"""
Get an area by its ID.
:param area_id: The area ID
:type area_id: Integer
:param resolve_missing: Query the Overpass API if the area is missing in the result set.
:return: The area
:rtype: overpy.Area
:raises overpy.exception.DataIncomplete: The requested way is not available in the result cache.
:raises overpy.exception.DataIncomplete: If resolve_missing is True and the area can't be resolved.
"""
areas = self.get_areas(area_id=area_id)
if len(areas) == 0:
if resolve_missing is False:
raise exception.DataIncomplete("Resolve missing area is disabled")
query = ("\n"
"[out:json];\n"
"area({area_id});\n"
"out body;\n"
)
query = query.format(
area_id=area_id
)
tmp_result = self.api.query(query)
self.expand(tmp_result)
areas = self.get_areas(area_id=area_id)
if len(areas) == 0:
raise exception.DataIncomplete("Unable to resolve requested areas")
return areas[0]
def get_areas(self, area_id=None, **kwargs):
"""
Alias for get_elements() but filter the result by Area
:param area_id: The Id of the area
:type area_id: Integer
:return: List of elements
"""
return self.get_elements(Area, elem_id=area_id, **kwargs)
def get_node(self, node_id, resolve_missing=False):
"""
Get a node by its ID.
:param node_id: The node ID
:type node_id: Integer
:param resolve_missing: Query the Overpass API if the node is missing in the result set.
:return: The node
:rtype: overpy.Node
:raises overpy.exception.DataIncomplete: At least one referenced node is not available in the result cache.
:raises overpy.exception.DataIncomplete: If resolve_missing is True and at least one node can't be resolved.
"""
nodes = self.get_nodes(node_id=node_id)
if len(nodes) == 0:
if not resolve_missing:
raise exception.DataIncomplete("Resolve missing nodes is disabled")
query = ("\n"
"[out:json];\n"
"node({node_id});\n"
"out body;\n"
)
query = query.format(
node_id=node_id
)
tmp_result = self.api.query(query)
self.expand(tmp_result)
nodes = self.get_nodes(node_id=node_id)
if len(nodes) == 0:
raise exception.DataIncomplete("Unable to resolve all nodes")
return nodes[0]
def get_nodes(self, node_id=None, **kwargs):
"""
Alias for get_elements() but filter the result by Node()
:param node_id: The Id of the node
:type node_id: Integer
:return: List of elements
"""
return self.get_elements(Node, elem_id=node_id, **kwargs)
def get_relation(self, rel_id, resolve_missing=False):
"""
Get a relation by its ID.
:param rel_id: The relation ID
:type rel_id: Integer
:param resolve_missing: Query the Overpass API if the relation is missing in the result set.
:return: The relation
:rtype: overpy.Relation
:raises overpy.exception.DataIncomplete: The requested relation is not available in the result cache.
:raises overpy.exception.DataIncomplete: If resolve_missing is True and the relation can't be resolved.
"""
relations = self.get_relations(rel_id=rel_id)
if len(relations) == 0:
if resolve_missing is False:
raise exception.DataIncomplete("Resolve missing relations is disabled")
query = ("\n"
"[out:json];\n"
"relation({relation_id});\n"
"out body;\n"
)
query = query.format(
relation_id=rel_id
)
tmp_result = self.api.query(query)
self.expand(tmp_result)
relations = self.get_relations(rel_id=rel_id)
if len(relations) == 0:
raise exception.DataIncomplete("Unable to resolve requested reference")
return relations[0]
def get_relations(self, rel_id=None, **kwargs):
"""
Alias for get_elements() but filter the result by Relation
:param rel_id: Id of the relation
:type rel_id: Integer
:return: List of elements
"""
return self.get_elements(Relation, elem_id=rel_id, **kwargs)
def get_way(self, way_id, resolve_missing=False):
"""
Get a way by its ID.
:param way_id: The way ID
:type way_id: Integer
:param resolve_missing: Query the Overpass API if the way is missing in the result set.
:return: The way
:rtype: overpy.Way
:raises overpy.exception.DataIncomplete: The requested way is not available in the result cache.
:raises overpy.exception.DataIncomplete: If resolve_missing is True and the way can't be resolved.
"""
ways = self.get_ways(way_id=way_id)
if len(ways) == 0:
if resolve_missing is False:
raise exception.DataIncomplete("Resolve missing way is disabled")
query = ("\n"
"[out:json];\n"
"way({way_id});\n"
"out body;\n"
)
query = query.format(
way_id=way_id
)
tmp_result = self.api.query(query)
self.expand(tmp_result)
ways = self.get_ways(way_id=way_id)
if len(ways) == 0:
raise exception.DataIncomplete("Unable to resolve requested way")
return ways[0]
def get_ways(self, way_id=None, **kwargs):
"""
Alias for get_elements() but filter the result by Way
:param way_id: The Id of the way
:type way_id: Integer
:return: List of elements
"""
return self.get_elements(Way, elem_id=way_id, **kwargs)
area_ids = property(get_area_ids)
areas = property(get_areas)
node_ids = property(get_node_ids)
nodes = property(get_nodes)
relation_ids = property(get_relation_ids)
relations = property(get_relations)
way_ids = property(get_way_ids)
ways = property(get_ways)
class Element(object):
"""
Base element
"""
def __init__(self, attributes=None, result=None, tags=None):
"""
:param attributes: Additional attributes
:type attributes: Dict
:param result: The result object this element belongs to
:param tags: List of tags
:type tags: Dict
"""
self._result = result
self.attributes = attributes
# ToDo: Add option to modify attribute modifiers
attribute_modifiers = dict(GLOBAL_ATTRIBUTE_MODIFIERS.items())
for n, m in attribute_modifiers.items():
if n in self.attributes:
self.attributes[n] = m(self.attributes[n])
self.id = None
self.tags = tags
@classmethod
def get_center_from_json(cls, data):
"""
Get center information from json data
:param data: json data
:return: tuple with two elements: lat and lon
:rtype: tuple
"""
center_lat = None
center_lon = None
center = data.get("center")
if isinstance(center, dict):
center_lat = center.get("lat")
center_lon = center.get("lon")
if center_lat is None or center_lon is None:
raise ValueError("Unable to get lat or lon of way center.")
center_lat = Decimal(center_lat)
center_lon = Decimal(center_lon)
return (center_lat, center_lon)
@classmethod
def get_center_from_xml_dom(cls, sub_child):
center_lat = sub_child.attrib.get("lat")
center_lon = sub_child.attrib.get("lon")
if center_lat is None or center_lon is None:
raise ValueError("Unable to get lat or lon of way center.")
center_lat = Decimal(center_lat)
center_lon = Decimal(center_lon)
return center_lat, center_lon
class Area(Element):
"""
Class to represent an element of type area
"""
_type_value = "area"
def __init__(self, area_id=None, **kwargs):
"""
:param area_id: Id of the area element
:type area_id: Integer
:param kwargs: Additional arguments are passed directly to the parent class
"""
Element.__init__(self, **kwargs)
#: The id of the way
self.id = area_id
def __repr__(self):
return "<overpy.Area id={}>".format(self.id)
@classmethod
def from_json(cls, data, result=None):
"""
Create new Area element from JSON data
:param data: Element data from JSON
:type data: Dict
:param result: The result this element belongs to
:type result: overpy.Result
:return: New instance of Way
:rtype: overpy.Area
:raises overpy.exception.ElementDataWrongType: If type value of the passed JSON data does not match.
"""
if data.get("type") != cls._type_value:
raise exception.ElementDataWrongType(
type_expected=cls._type_value,
type_provided=data.get("type")
)
tags = data.get("tags", {})
area_id = data.get("id")
attributes = {}
ignore = ["id", "tags", "type"]
for n, v in data.items():
if n in ignore:
continue
attributes[n] = v
return cls(area_id=area_id, attributes=attributes, tags=tags, result=result)
@classmethod
def from_xml(cls, child, result=None):
"""
Create new way element from XML data
:param child: XML node to be parsed
:type child: xml.etree.ElementTree.Element
:param result: The result this node belongs to
:type result: overpy.Result
:return: New Way oject
:rtype: overpy.Way
:raises overpy.exception.ElementDataWrongType: If name of the xml child node doesn't match
:raises ValueError: If the ref attribute of the xml node is not provided
:raises ValueError: If a tag doesn't have a name
"""
if child.tag.lower() != cls._type_value:
raise exception.ElementDataWrongType(
type_expected=cls._type_value,
type_provided=child.tag.lower()
)
tags = {}
for sub_child in child:
if sub_child.tag.lower() == "tag":
name = sub_child.attrib.get("k")
if name is None:
raise ValueError("Tag without name/key.")
value = sub_child.attrib.get("v")
tags[name] = value
area_id = child.attrib.get("id")
if area_id is not None:
area_id = int(area_id)
attributes = {}
ignore = ["id"]
for n, v in child.attrib.items():
if n in ignore:
continue
attributes[n] = v
return cls(area_id=area_id, attributes=attributes, tags=tags, result=result)
class Node(Element):
"""
Class to represent an element of type node
"""
_type_value = "node"
def __init__(self, node_id=None, lat=None, lon=None, **kwargs):
"""
:param lat: Latitude
:type lat: Decimal or Float
:param lon: Longitude
:type long: Decimal or Float
:param node_id: Id of the node element
:type node_id: Integer
:param kwargs: Additional arguments are passed directly to the parent class
"""
Element.__init__(self, **kwargs)
self.id = node_id
self.lat = lat
self.lon = lon
def __repr__(self):
return "<overpy.Node id={} lat={} lon={}>".format(self.id, self.lat, self.lon)
@classmethod
def from_json(cls, data, result=None):
"""
Create new Node element from JSON data
:param data: Element data from JSON
:type data: Dict
:param result: The result this element belongs to
:type result: overpy.Result
:return: New instance of Node
:rtype: overpy.Node
:raises overpy.exception.ElementDataWrongType: If type value of the passed JSON data does not match.
"""
if data.get("type") != cls._type_value:
raise exception.ElementDataWrongType(
type_expected=cls._type_value,
type_provided=data.get("type")
)
tags = data.get("tags", {})
node_id = data.get("id")
lat = data.get("lat")
lon = data.get("lon")
attributes = {}
ignore = ["type", "id", "lat", "lon", "tags"]
for n, v in data.items():
if n in ignore:
continue
attributes[n] = v
return cls(node_id=node_id, lat=lat, lon=lon, tags=tags, attributes=attributes, result=result)
@classmethod
def from_xml(cls, child, result=None):
"""
Create new way element from XML data
:param child: XML node to be parsed
:type child: xml.etree.ElementTree.Element
:param result: The result this node belongs to
:type result: overpy.Result
:return: New Way oject
:rtype: overpy.Node
:raises overpy.exception.ElementDataWrongType: If name of the xml child node doesn't match
:raises ValueError: If a tag doesn't have a name
"""
if child.tag.lower() != cls._type_value:
raise exception.ElementDataWrongType(
type_expected=cls._type_value,
type_provided=child.tag.lower()
)
tags = {}
for sub_child in child:
if sub_child.tag.lower() == "tag":
name = sub_child.attrib.get("k")
if name is None:
raise ValueError("Tag without name/key.")
value = sub_child.attrib.get("v")
tags[name] = value
node_id = child.attrib.get("id")
if node_id is not None:
node_id = int(node_id)
lat = child.attrib.get("lat")
if lat is not None:
lat = Decimal(lat)
lon = child.attrib.get("lon")
if lon is not None:
lon = Decimal(lon)
attributes = {}
ignore = ["id", "lat", "lon"]
for n, v in child.attrib.items():
if n in ignore:
continue
attributes[n] = v
return cls(node_id=node_id, lat=lat, lon=lon, tags=tags, attributes=attributes, result=result)
class Way(Element):
"""
Class to represent an element of type way
"""
_type_value = "way"
def __init__(self, way_id=None, center_lat=None, center_lon=None, node_ids=None, **kwargs):
"""
:param node_ids: List of node IDs
:type node_ids: List or Tuple
:param way_id: Id of the way element
:type way_id: Integer
:param kwargs: Additional arguments are passed directly to the parent class
"""
Element.__init__(self, **kwargs)
#: The id of the way
self.id = way_id
#: List of Ids of the associated nodes
self._node_ids = node_ids
#: The lat/lon of the center of the way (optional depending on query)
self.center_lat = center_lat
self.center_lon = center_lon
def __repr__(self):
return "<overpy.Way id={} nodes={}>".format(self.id, self._node_ids)
@property
def nodes(self):
"""
List of nodes associated with the way.
"""
return self.get_nodes()
def get_nodes(self, resolve_missing=False):
"""
Get the nodes defining the geometry of the way
:param resolve_missing: Try to resolve missing nodes.
:type resolve_missing: Boolean
:return: List of nodes
:rtype: List of overpy.Node
:raises overpy.exception.DataIncomplete: At least one referenced node is not available in the result cache.
:raises overpy.exception.DataIncomplete: If resolve_missing is True and at least one node can't be resolved.
"""
result = []
resolved = False
for node_id in self._node_ids:
try:
node = self._result.get_node(node_id)
except exception.DataIncomplete:
node = None
if node is not None:
result.append(node)
continue
if not resolve_missing:
raise exception.DataIncomplete("Resolve missing nodes is disabled")
# We tried to resolve the data but some nodes are still missing
if resolved:
raise exception.DataIncomplete("Unable to resolve all nodes")
query = ("\n"
"[out:json];\n"
"way({way_id});\n"
"node(w);\n"
"out body;\n"
)
query = query.format(
way_id=self.id
)
tmp_result = self._result.api.query(query)
self._result.expand(tmp_result)
resolved = True
try:
node = self._result.get_node(node_id)
except exception.DataIncomplete:
node = None
if node is None:
raise exception.DataIncomplete("Unable to resolve all nodes")
result.append(node)
return result
@classmethod
def from_json(cls, data, result=None):
"""
Create new Way element from JSON data
:param data: Element data from JSON
:type data: Dict
:param result: The result this element belongs to
:type result: overpy.Result
:return: New instance of Way
:rtype: overpy.Way
:raises overpy.exception.ElementDataWrongType: If type value of the passed JSON data does not match.
"""
if data.get("type") != cls._type_value:
raise exception.ElementDataWrongType(
type_expected=cls._type_value,
type_provided=data.get("type")
)
tags = data.get("tags", {})
way_id = data.get("id")
node_ids = data.get("nodes")
(center_lat, center_lon) = cls.get_center_from_json(data=data)
attributes = {}
ignore = ["center", "id", "nodes", "tags", "type"]
for n, v in data.items():
if n in ignore:
continue
attributes[n] = v
return cls(
attributes=attributes,
center_lat=center_lat,
center_lon=center_lon,
node_ids=node_ids,
tags=tags,
result=result,
way_id=way_id
)
@classmethod
def from_xml(cls, child, result=None):
"""
Create new way element from XML data
:param child: XML node to be parsed
:type child: xml.etree.ElementTree.Element
:param result: The result this node belongs to
:type result: overpy.Result
:return: New Way oject
:rtype: overpy.Way
:raises overpy.exception.ElementDataWrongType: If name of the xml child node doesn't match
:raises ValueError: If the ref attribute of the xml node is not provided
:raises ValueError: If a tag doesn't have a name
"""
if child.tag.lower() != cls._type_value:
raise exception.ElementDataWrongType(
type_expected=cls._type_value,
type_provided=child.tag.lower()
)
tags = {}
node_ids = []
center_lat = None
center_lon = None
for sub_child in child:
if sub_child.tag.lower() == "tag":
name = sub_child.attrib.get("k")
if name is None:
raise ValueError("Tag without name/key.")
value = sub_child.attrib.get("v")
tags[name] = value
if sub_child.tag.lower() == "nd":
ref_id = sub_child.attrib.get("ref")
if ref_id is None:
raise ValueError("Unable to find required ref value.")
ref_id = int(ref_id)
node_ids.append(ref_id)
if sub_child.tag.lower() == "center":
(center_lat, center_lon) = cls.get_center_from_xml_dom(sub_child=sub_child)
way_id = child.attrib.get("id")
if way_id is not None:
way_id = int(way_id)
attributes = {}
ignore = ["id"]
for n, v in child.attrib.items():
if n in ignore:
continue
attributes[n] = v
return cls(way_id=way_id, center_lat=center_lat, center_lon=center_lon,
attributes=attributes, node_ids=node_ids, tags=tags, result=result)
class Relation(Element):
"""
Class to represent an element of type relation
"""
_type_value = "relation"
def __init__(self, rel_id=None, center_lat=None, center_lon=None, members=None, **kwargs):
"""
:param members:
:param rel_id: Id of the relation element
:type rel_id: Integer
:param kwargs:
:return:
"""
Element.__init__(self, **kwargs)
self.id = rel_id
self.members = members
#: The lat/lon of the center of the way (optional depending on query)
self.center_lat = center_lat
self.center_lon = center_lon
def __repr__(self):
return "<overpy.Relation id={}>".format(self.id)
@classmethod
def from_json(cls, data, result=None):
"""
Create new Relation element from JSON data
:param data: Element data from JSON
:type data: Dict
:param result: The result this element belongs to
:type result: overpy.Result
:return: New instance of Relation
:rtype: overpy.Relation
:raises overpy.exception.ElementDataWrongType: If type value of the passed JSON data does not match.
"""
if data.get("type") != cls._type_value:
raise exception.ElementDataWrongType(
type_expected=cls._type_value,
type_provided=data.get("type")
)
tags = data.get("tags", {})
rel_id = data.get("id")
(center_lat, center_lon) = cls.get_center_from_json(data=data)
members = []
supported_members = [RelationNode, RelationWay, RelationRelation]
for member in data.get("members", []):
type_value = member.get("type")
for member_cls in supported_members:
if member_cls._type_value == type_value:
members.append(
member_cls.from_json(
member,
result=result
)
)
attributes = {}
ignore = ["id", "members", "tags", "type"]
for n, v in data.items():
if n in ignore:
continue
attributes[n] = v
return cls(
rel_id=rel_id,
attributes=attributes,
center_lat=center_lat,
center_lon=center_lon,
members=members,
tags=tags,
result=result
)
@classmethod
def from_xml(cls, child, result=None):
"""
Create new way element from XML data
:param child: XML node to be parsed
:type child: xml.etree.ElementTree.Element
:param result: The result this node belongs to
:type result: overpy.Result
:return: New Way oject
:rtype: overpy.Relation
:raises overpy.exception.ElementDataWrongType: If name of the xml child node doesn't match
:raises ValueError: If a tag doesn't have a name
"""
if child.tag.lower() != cls._type_value:
raise exception.ElementDataWrongType(
type_expected=cls._type_value,
type_provided=child.tag.lower()
)
tags = {}
members = []
center_lat = None
center_lon = None
supported_members = [RelationNode, RelationWay, RelationRelation, RelationArea]
for sub_child in child:
if sub_child.tag.lower() == "tag":
name = sub_child.attrib.get("k")
if name is None:
raise ValueError("Tag without name/key.")
value = sub_child.attrib.get("v")
tags[name] = value
if sub_child.tag.lower() == "member":
type_value = sub_child.attrib.get("type")
for member_cls in supported_members:
if member_cls._type_value == type_value:
members.append(
member_cls.from_xml(
sub_child,
result=result
)
)
if sub_child.tag.lower() == "center":
(center_lat, center_lon) = cls.get_center_from_xml_dom(sub_child=sub_child)
rel_id = child.attrib.get("id")
if rel_id is not None:
rel_id = int(rel_id)
attributes = {}
ignore = ["id"]
for n, v in child.attrib.items():
if n in ignore:
continue
attributes[n] = v
return cls(
rel_id=rel_id,
attributes=attributes,
center_lat=center_lat,
center_lon=center_lon,
members=members,
tags=tags,
result=result
)
class RelationMember(object):
"""
Base class to represent a member of a relation.
"""
def __init__(self, attributes=None, geometry=None, ref=None, role=None, result=None):
"""
:param ref: Reference Id
:type ref: Integer
:param role: The role of the relation member
:type role: String
:param result:
"""
self.ref = ref
self._result = result
self.role = role
self.attributes = attributes
self.geometry = geometry
@classmethod
def from_json(cls, data, result=None):
"""
Create new RelationMember element from JSON data
:param child: Element data from JSON
:type child: Dict
:param result: The result this element belongs to
:type result: overpy.Result
:return: New instance of RelationMember
:rtype: overpy.RelationMember
:raises overpy.exception.ElementDataWrongType: If type value of the passed JSON data does not match.
"""
if data.get("type") != cls._type_value:
raise exception.ElementDataWrongType(
type_expected=cls._type_value,
type_provided=data.get("type")
)
ref = data.get("ref")
role = data.get("role")
attributes = {}
ignore = ["geometry", "type", "ref", "role"]
for n, v in data.items():
if n in ignore:
continue
attributes[n] = v
geometry = data.get("geometry")
if isinstance(geometry, list):
geometry_orig = geometry
geometry = []
for v in geometry_orig:
geometry.append(
RelationWayGeometryValue(
lat=v.get("lat"),
lon=v.get("lon")
)
)
else:
geometry = None
return cls(
attributes=attributes,
geometry=geometry,
ref=ref,
role=role,
result=result
)
@classmethod
def from_xml(cls, child, result=None):
"""
Create new RelationMember from XML data
:param child: XML node to be parsed
:type child: xml.etree.ElementTree.Element
:param result: The result this element belongs to
:type result: overpy.Result
:return: New relation member oject
:rtype: overpy.RelationMember
:raises overpy.exception.ElementDataWrongType: If name of the xml child node doesn't match
"""
if child.attrib.get("type") != cls._type_value:
raise exception.ElementDataWrongType(
type_expected=cls._type_value,
type_provided=child.tag.lower()
)
ref = child.attrib.get("ref")
if ref is not None:
ref = int(ref)
role = child.attrib.get("role")
attributes = {}
ignore = ["geometry", "ref", "role", "type"]
for n, v in child.attrib.items():
if n in ignore:
continue
attributes[n] = v
geometry = None
for sub_child in child:
if sub_child.tag.lower() == "nd":
if geometry is None:
geometry = []
geometry.append(
RelationWayGeometryValue(
lat=Decimal(sub_child.attrib["lat"]),
lon=Decimal(sub_child.attrib["lon"])
)
)
return cls(
attributes=attributes,
geometry=geometry,
ref=ref,
role=role,
result=result
)
class RelationNode(RelationMember):
_type_value = "node"
def resolve(self, resolve_missing=False):
return self._result.get_node(self.ref, resolve_missing=resolve_missing)
def __repr__(self):
return "<overpy.RelationNode ref={} role={}>".format(self.ref, self.role)
class RelationWay(RelationMember):
_type_value = "way"
def resolve(self, resolve_missing=False):
return self._result.get_way(self.ref, resolve_missing=resolve_missing)
def __repr__(self):
return "<overpy.RelationWay ref={} role={}>".format(self.ref, self.role)
class RelationWayGeometryValue(object):
def __init__(self, lat, lon):
self.lat = lat
self.lon = lon
def __repr__(self):
return "<overpy.RelationWayGeometryValue lat={} lon={}>".format(self.lat, self.lon)
class RelationRelation(RelationMember):
_type_value = "relation"
def resolve(self, resolve_missing=False):
return self._result.get_relation(self.ref, resolve_missing=resolve_missing)
def __repr__(self):
return "<overpy.RelationRelation ref={} role={}>".format(self.ref, self.role)
class RelationArea(RelationMember):
_type_value = "area"
def resolve(self, resolve_missing=False):
return self._result.get_area(self.ref, resolve_missing=resolve_missing)
def __repr__(self):
return "<overpy.RelationArea ref={} role={}>".format(self.ref, self.role)
class OSMSAXHandler(handler.ContentHandler):
"""
SAX parser for Overpass XML response.
"""
#: Tuple of opening elements to ignore
ignore_start = ('osm', 'meta', 'note', 'bounds', 'remark')
#: Tuple of closing elements to ignore
ignore_end = ('osm', 'meta', 'note', 'bounds', 'remark', 'tag', 'nd', 'center')
def __init__(self, result):
"""
:param result: Append results to this result set.
:type result: overpy.Result
"""
handler.ContentHandler.__init__(self)
self._result = result
self._curr = {}
#: Current relation member object
self.cur_relation_member = None
def startElement(self, name, attrs):
"""
Handle opening elements.
:param name: Name of the element
:type name: String
:param attrs: Attributes of the element
:type attrs: Dict
"""
if name in self.ignore_start:
return
try:
handler = getattr(self, '_handle_start_%s' % name)
except AttributeError:
raise KeyError("Unknown element start '%s'" % name)
handler(attrs)
def endElement(self, name):
"""
Handle closing elements
:param name: Name of the element
:type name: String
"""
if name in self.ignore_end:
return
try:
handler = getattr(self, '_handle_end_%s' % name)
except AttributeError:
raise KeyError("Unknown element end '%s'" % name)
handler()
def _handle_start_center(self, attrs):
"""
Handle opening center element
:param attrs: Attributes of the element
:type attrs: Dict
"""
center_lat = attrs.get("lat")
center_lon = attrs.get("lon")
if center_lat is None or center_lon is None:
raise ValueError("Unable to get lat or lon of way center.")
self._curr["center_lat"] = Decimal(center_lat)
self._curr["center_lon"] = Decimal(center_lon)
def _handle_start_tag(self, attrs):
"""
Handle opening tag element
:param attrs: Attributes of the element
:type attrs: Dict
"""
try:
tag_key = attrs['k']
except KeyError:
raise ValueError("Tag without name/key.")
self._curr['tags'][tag_key] = attrs.get('v')
def _handle_start_node(self, attrs):
"""
Handle opening node element
:param attrs: Attributes of the element
:type attrs: Dict
"""
self._curr = {
'attributes': dict(attrs),
'lat': None,
'lon': None,
'node_id': None,
'tags': {}
}
if attrs.get('id', None) is not None:
self._curr['node_id'] = int(attrs['id'])
del self._curr['attributes']['id']
if attrs.get('lat', None) is not None:
self._curr['lat'] = Decimal(attrs['lat'])
del self._curr['attributes']['lat']
if attrs.get('lon', None) is not None:
self._curr['lon'] = Decimal(attrs['lon'])
del self._curr['attributes']['lon']
def _handle_end_node(self):
"""
Handle closing node element
"""
self._result.append(Node(result=self._result, **self._curr))
self._curr = {}
def _handle_start_way(self, attrs):
"""
Handle opening way element
:param attrs: Attributes of the element
:type attrs: Dict
"""
self._curr = {
'center_lat': None,
'center_lon': None,
'attributes': dict(attrs),
'node_ids': [],
'tags': {},
'way_id': None
}
if attrs.get('id', None) is not None:
self._curr['way_id'] = int(attrs['id'])
del self._curr['attributes']['id']
def _handle_end_way(self):
"""
Handle closing way element
"""
self._result.append(Way(result=self._result, **self._curr))
self._curr = {}
def _handle_start_area(self, attrs):
"""
Handle opening area element
:param attrs: Attributes of the element
:type attrs: Dict
"""
self._curr = {
'attributes': dict(attrs),
'tags': {},
'area_id': None
}
if attrs.get('id', None) is not None:
self._curr['area_id'] = int(attrs['id'])
del self._curr['attributes']['id']
def _handle_end_area(self):
"""
Handle closing area element
"""
self._result.append(Area(result=self._result, **self._curr))
self._curr = {}
def _handle_start_nd(self, attrs):
"""
Handle opening nd element
:param attrs: Attributes of the element
:type attrs: Dict
"""
if isinstance(self.cur_relation_member, RelationWay):
if self.cur_relation_member.geometry is None:
self.cur_relation_member.geometry = []
self.cur_relation_member.geometry.append(
RelationWayGeometryValue(
lat=Decimal(attrs["lat"]),
lon=Decimal(attrs["lon"])
)
)
else:
try:
node_ref = attrs['ref']
except KeyError:
raise ValueError("Unable to find required ref value.")
self._curr['node_ids'].append(int(node_ref))
def _handle_start_relation(self, attrs):
"""
Handle opening relation element
:param attrs: Attributes of the element
:type attrs: Dict
"""
self._curr = {
'attributes': dict(attrs),
'members': [],
'rel_id': None,
'tags': {}
}
if attrs.get('id', None) is not None:
self._curr['rel_id'] = int(attrs['id'])
del self._curr['attributes']['id']
def _handle_end_relation(self):
"""
Handle closing relation element
"""
self._result.append(Relation(result=self._result, **self._curr))
self._curr = {}
def _handle_start_member(self, attrs):
"""
Handle opening member element
:param attrs: Attributes of the element
:type attrs: Dict
"""
params = {
# ToDo: Parse attributes
'attributes': {},
'ref': None,
'result': self._result,
'role': None
}
if attrs.get('ref', None):
params['ref'] = int(attrs['ref'])
if attrs.get('role', None):
params['role'] = attrs['role']
cls_map = {
"area": RelationArea,
"node": RelationNode,
"relation": RelationRelation,
"way": RelationWay
}
cls = cls_map.get(attrs["type"])
if cls is None:
raise ValueError("Undefined type for member: '%s'" % attrs['type'])
self.cur_relation_member = cls(**params)
self._curr['members'].append(self.cur_relation_member)
def _handle_end_member(self):
self.cur_relation_member = None