Source code for bw2io.export.gexf

# -*- coding: utf-8 -*-
from bw2data import Database, projects
from bw2data.query import Filter
from lxml.builder import ElementMaker
from lxml.etree import tostring
import datetime
import itertools
import os
import pyprind


[docs] class DatabaseToGEXF(object): """Export a Gephi graph for a database. Call ``.export()`` to export the file after class instantiation. Args: * *database* (str): Database name. * *include_descendants* (bool): Include databases which are linked from ``database``. .. warning:: ``include_descendants`` is not yet implemented. """ def __init__(self, database, include_descendants=False):
[docs] self.database = database
[docs] self.descendants = include_descendants
if self.descendants: raise NotImplemented
[docs] filename = database + ("_plus" if include_descendants else "")
[docs] self.filepath = os.path.join(projects.output_dir, filename + ".gexf")
[docs] self.data = Database(self.database).load()
[docs] self.id_mapping = dict([(key, str(i)) for i, key in enumerate(self.data)])
[docs] def export(self): """Export the Gephi XML file. Returns the filepath of the created file.""" E = ElementMaker( namespace="http://www.gexf.net/1.2draft", nsmap={None: "http://www.gexf.net/1.2draft"}, ) meta = E.meta( E.creator("Brightway2"), E.description(self.database), lastmodified=datetime.date.today().strftime("%Y-%m-%d"), ) attributes = E.attributes( E.attribute(id="0", title="category", type="string"), **{"class": "node"} ) nodes, edges = self.get_data(E) graph = E.graph( attributes, nodes, edges, mode="static", defaultedgetype="directed" ) with open(self.filepath, "w", encoding="utf-8") as f: # Need XML declaration, but then ``tostring`` returns bytes # so need to decode. # See https://bugs.python.org/issue10942 # and http://makble.com/python-why-lxml-etree-tostring-method-returns-bytes f.write( tostring( E.gexf(meta, graph, version="1.2"), xml_declaration=True, encoding="utf-8", pretty_print=True, ).decode("utf-8") ) return self.filepath
[docs] def get_data(self, E): """Get Gephi nodes and edges.""" count = itertools.count() nodes = [] edges = [] pbar = pyprind.ProgBar( len(self.data), title="Get nodes and edges:", monitor=True ) for key, value in self.data.items(): nodes.append( E.node( E.attvalues( E.attvalue( value="-".join(value.get("categories", [])), **{"for": "0"} ) ), id=self.id_mapping[key], label=value.get("name", "Unknown"), ) ) for exc in value.get("exchanges", []): if exc["input"] not in self.id_mapping: continue elif exc["input"] == key: # Don't need production process in graph continue else: edges.append( E.edge( id=str(next(count)), source=self.id_mapping[exc["input"]], target=self.id_mapping[key], label="%.3g" % exc["amount"], ) ) pbar.update() print(pbar) return E.nodes(*nodes), E.edges(*edges)
[docs] class DatabaseSelectionToGEXF(DatabaseToGEXF): """Export a Gephi graph for a selection of activities from a database. Also includes all inputs for the filtered activities. Args: * *database* (str): Database name. * *keys* (str): The activity keys to export. """ def __init__(self, database, keys):
[docs] self.database = database
[docs] self.filepath = os.path.join(projects.output_dir, database + ".selection.gexf")
[docs] unfiltered_data = Database(self.database).load()
[docs] self.data = { key: value for key, value in unfiltered_data.items() if key in keys }
[docs] self.id_mapping = dict([(key, str(i)) for i, key in enumerate(self.data)])
[docs] def keyword_to_gephi_graph(database, keyword): """Export a Gephi graph for a database for all activities whose names include the string ``keyword``. Args: * *database* (str): Database name. * *keyword* (str): Keyword to search for. Returns: The filepath of the exported file. """ query = Database(database).query(Filter("name", "ihas", keyword)) return DatabaseSelectionToGEXF(database, set(query.keys())).export()