Source code for bw2regional.pandarus_remote

import os
import time
import uuid

import requests
import wrapt
from bw2data import config, projects

from . import geocollections, intersections, topocollections
from .errors import WindowsPathCharacterLimit
from .pandarus import import_from_pandarus, import_xt_from_rasterstats
from .utils import hash_collection


[docs] class RemoteError(Exception): """Can't reach pandarus-remote web service""" pass
[docs] class NotYetCalculated(Exception): """Resource hasn't been calculated yet""" pass
[docs] class AlreadyExists(Exception): """Resource has already been calculated""" pass
[docs] class PendingJob(object): """A calculation job enqueued on a remote server""" def __init__(self, url): self.url = url @property
[docs] def status(self): response = requests.get(self.url) if response.status_code != 404: return response.text else: return "forgotten"
[docs] def poll(self, interval=10): try: while True: if self.status not in {"failed", "finished", "forgotten"}: time.sleep(interval) else: break except KeyboardInterrupt: pass finally: print("\nJob ended with status '{}'".format(self.status))
[docs] def run_job(job=None): """Handler that blocks until job is finished.""" if job is None: # Calculated already, job submission skipped return job.poll(interval=2) if job.status != "finished": raise ValueError("Calculation job finished with status '{}'".format(job.status))
@wrapt.decorator
[docs] def check_alive(wrapped, instance, args, kwargs): if not instance.alive: raise RemoteError("Can't reach {}".format(instance.url)) return wrapped(*args, **kwargs)
[docs] class PandarusRemote(object): """Interaction with `pandarus_remote <https://github.com/cmutel/pandarus_remote>`__ web service. Default URL is `https://pandarus.brightway.dev`.""" def __init__(self, url=None): self.url = url or "https://pandarus.brightway.dev" if self.url[-1] == "/": self.url = self.url[:-1] @property
[docs] def alive(self): return requests.get(self.url).status_code == 200
[docs] def _download_file(self, resp): assert "Content-Disposition" in resp.headers download_dirpath = projects.request_directory("regional") filepath = os.path.abspath( os.path.join( download_dirpath, resp.headers["Content-Disposition"].replace( "attachment; filename=", "" ), ) ) if config._windows and len(str(filepath)) > 250: # Windows has an absolute limit of 255 characters in a filepath if len(str(os.path.abspath(download_dirpath))) > 200: ERROR = """Cannot safely save files in this project directory. The project name is too long: {} characters for complete directory path, should fewer than 200. The directory used for downloads is: {} Please start a new project with a shorter project name.""" raise WindowsPathCharacterLimit( ERROR.format(len(download_dirpath), download_dirpath) ) filepath = os.path.abspath( os.path.join( download_dirpath, uuid.uuid4().hex + filepath.split(".")[-1], ) ) chunk = 128 * 1024 with open(filepath, "wb") as f: while True: segment = resp.raw.read(chunk) if not segment: break f.write(segment) return filepath
@check_alive
[docs] def catalog(self): return requests.get(self.url + "/catalog").json()
@check_alive
[docs] def status(self, url): return requests.get(self.url + url).text
@check_alive
[docs] def upload(self, collection): if collection in topocollections: metadata = topocollections[collection] elif collection in geocollections: metadata = geocollections[collection] else: raise ValueError("Unknown geocollection {}".format(collection)) assert "filepath" in metadata, "Can't find file for this collection" try: collection_hash = metadata["sha256"] except KeyError: collection_hash = hash_collection(collection) if collection_hash in {obj[1] for obj in self.catalog()["files"]}: print(f"Geocollection {collection} is already uploaded") return url = self.url + "/upload" data = { "layer": metadata.get("layer") or "", "field": metadata.get("field") or "", "band": metadata.get("band") or "", "sha256": collection_hash, "name": os.path.basename(metadata["filepath"]), } files = {"file": open(metadata["filepath"], "rb")} resp = requests.post(url, data=data, files=files) if resp.status_code == 200: return resp.json() else: raise RemoteError("{}: {}".format(resp.status_code, resp.text))
@check_alive
[docs] def intersection(self, collection_one, collection_two): if (collection_one, collection_two) in intersections: print( "Skipping existing intersection: ({}, {})".format( collection_one, collection_two ) ) return catalog = self.catalog() first = self.hash_and_upload(collection_one, catalog) second = self.hash_and_upload(collection_two, catalog) resp = requests.post( self.url + "/intersection", data={"first": first, "second": second}, stream=True, ) if resp.status_code == 404: raise NotYetCalculated( "Not yet calculated; Run `.calculate_intersection` first." ) self.handle_errors(resp) filepath = self._download_file(resp) return import_from_pandarus(filepath)
@check_alive
[docs] def intersection_as_new_geocollection( self, collection_one, collection_two, new_name ): if new_name in geocollections: print("Skipping creation of existing geocollection") return catalog = self.catalog() first = self.hash_and_upload(collection_one, catalog) second = self.hash_and_upload(collection_two, catalog) resp = requests.post( self.url + "/intersection-file", data={"first": first, "second": second}, stream=True, ) if resp.status_code == 404: raise NotYetCalculated( "Not yet calculated; Run `.calculate_intersection` first." ) self.handle_errors(resp) filepath = self._download_file(resp) geocollections[new_name] = { "filepath": filepath, "field": "id", "url": self.url + "/intersection-file", "is intersection": True, "first": collection_one, "second": collection_two, } try: self.intersection(new_name, collection_one) except NotYetCalculated: print("Remote is calculating intersection") run_job(self.calculate_intersection(new_name, collection_one)) try: self.intersection(new_name, collection_two) except NotYetCalculated: print("Remote is calculating intersection") run_job(self.calculate_intersection(new_name, collection_two))
@check_alive
[docs] def rasterstats_as_xt(self, vector, raster, name): """ """ catalog = self.catalog() first = self.hash_and_upload(vector, catalog) second = self.hash_and_upload(raster, catalog) resp = requests.post( self.url + "/rasterstats", data={"vector": first, "raster": second}, stream=True, ) self.handle_errors(resp) filepath = self._download_file(resp) return import_xt_from_rasterstats(filepath, name, vector)
@check_alive
[docs] def calculate_rasterstats(self, vector, raster): catalog = self.catalog() first = self.hash_and_upload(vector, catalog) second = self.hash_and_upload(raster, catalog) resp = requests.post( self.url + "/calculate-rasterstats", data={"vector": first, "raster": second}, ) try: self.handle_errors(resp) except AlreadyExists: print(f"Rasterstats for {vector} and {raster} already calculated") return print("Calculation job submitted.") return PendingJob(self.url + resp.text)
@check_alive
[docs] def calculate_intersection(self, collection_one, collection_two): catalog = self.catalog() first = self.hash_and_upload(collection_one, catalog) second = self.hash_and_upload(collection_two, catalog) resp = requests.post( self.url + "/calculate-intersection", data={"first": first, "second": second}, ) try: self.handle_errors(resp) except: print(f"Intersection for {collection_one} and {collection_two} already calculated") print("Calculation job submitted.") return PendingJob(self.url + resp.text)
[docs] def hash_and_upload(self, collection, catalog=None): hashes = {obj[1] for obj in (catalog or self.catalog())["files"]} hashed = hash_collection(collection) if not hashed: raise ValueError("Can't find collection {}".format(collection)) if hashed not in hashes: self.upload(collection) return hashed
[docs] def handle_errors(self, response): if response.status_code == 409: raise AlreadyExists elif response.status_code != 200: raise ValueError( "Server returned an error code: {}: {}".format( response.status_code, response.text ) )
[docs] remote = PandarusRemote()