# -*- coding: utf-8 -*-
from __future__ import print_function, unicode_literals
from eight import *
from . import config
from .errors import ReadOnlyProject
from .filesystem import safe_filename, create_dir
from .sqlite import PickleField, SubstitutableDatabase
from .utils import python_2_unicode_compatible
from fasteners import InterProcessLock
from peewee import Model, TextField
from threading import ThreadError
import appdirs
import eight
import os
import shutil
import sys
import tempfile
import warnings
import wrapt
try:
from collections.abc import Iterable
except ImportError:
from collections import Iterable
[docs]
READ_ONLY_PROJECT = """
***Read only project***
This project is being used by another process and no writes can be made until:
1. You close the other program, or switch to a different project, *and*
2. You call `projects.enable_writes` *and* get the response `True`.
If you are **sure** that this warning is incorrect, call
`projects.enable_writes(force=True)` to enable writes.
"""
[docs]
def lockable():
return hasattr(config, "p") and config.p.get('lockable')
@python_2_unicode_compatible
[docs]
class ProjectDataset(Model):
[docs]
name = TextField(index=True, unique=True)
def __str__(self):
return "Project: {}".format(self.name)
__repr__ = lambda x: str(x)
def __lt__(self, other):
if not isinstance(other, ProjectDataset):
raise TypeError
else:
return self.name.lower() < other.name.lower()
[docs]
class ProjectManager(Iterable):
[docs]
_basic_directories = (
"backups",
"intermediate",
"lci",
"processed",
)
def __init__(self):
self._base_data_dir, self._base_logs_dir = self._get_base_directories()
self._create_base_directories()
[docs]
self.db = SubstitutableDatabase(
os.path.join(self._base_data_dir, "projects.db"),
[ProjectDataset]
)
self.set_current("default", update=False)
def __iter__(self):
for project_ds in ProjectDataset.select():
yield project_ds
def __contains__(self, name):
return ProjectDataset.select().where(ProjectDataset.name == name).count() > 0
def __len__(self):
return ProjectDataset.select().count()
def __repr__(self):
if len(self) > 20:
return ("Brightway2 projects manager with {} objects, including:"
"{}\nUse `sorted(projects)` to get full list, "
"`projects.report()` to get\n\ta report on all projects.").format(
len(self),
"".join(["\n\t{}".format(x) for x in sorted([x.name for x in self])[:10]])
)
else:
return ("Brightway2 projects manager with {} objects:{}"
"\nUse `projects.report()` to get a report on all projects.").format(
len(self),
"".join(["\n\t{}".format(x) for x in sorted([x.name for x in self])])
)
### Internal functions for managing projects
[docs]
def _get_base_directories(self):
eight.wrap_os_environ_io()
envvar = os.getenv("BRIGHTWAY2_DIR")
if envvar:
if not os.path.isdir(envvar):
raise OSError(("BRIGHTWAY2_DIR variable is {}, but this is not"
" a valid directory").format(envvar))
else:
print("Using environment variable BRIGHTWAY2_DIR for data "
"directory:\n{}".format(envvar))
envvar = os.path.abspath(envvar)
logs_dir = os.path.join(envvar, "logs")
create_dir(logs_dir)
return envvar, logs_dir
LABEL = "Brightway2" if sys.version_info < (3, 0) else "Brightway3"
data_dir = appdirs.user_data_dir(LABEL, "pylca")
logs_dir = appdirs.user_log_dir(LABEL, "pylca")
return data_dir, logs_dir
[docs]
def _create_base_directories(self):
create_dir(self._base_data_dir)
create_dir(self._base_logs_dir)
@property
[docs]
def current(self):
return self._project_name
[docs]
def set_current(self, name, writable=True, update=True):
if not self.read_only and lockable() and hasattr(self, "_lock"):
try:
self._lock.release()
except (RuntimeError, ThreadError):
pass
self._project_name = str(name)
# Need to allow writes when creating a new project
# for new metadata stores
self.read_only = False
self.create_project(name)
self._reset_meta()
self._reset_sqlite3_databases()
if not lockable():
pass
elif writable:
self._lock = InterProcessLock(os.path.join(self.dir, "write-lock"))
self.read_only = not self._lock.acquire(timeout=0.05)
if self.read_only:
warnings.warn(READ_ONLY_PROJECT)
else:
self.read_only = True
if not self.read_only and update:
self._do_automatic_updates()
[docs]
def _do_automatic_updates(self):
"""Run any available automatic updates"""
from .updates import Updates
for update_name in Updates.check_automatic_updates():
print("Applying automatic update: {}".format(update_name))
Updates.do_update(update_name)
[docs]
def _reset_sqlite3_databases(self):
for relative_path, substitutable_db in config.sqlite3_databases:
substitutable_db.change_path(os.path.join(self.dir, relative_path))
### Public API
@property
[docs]
def dir(self):
return os.path.join(
self._base_data_dir,
safe_filename(self.current)
)
@property
[docs]
def logs_dir(self):
return os.path.join(
self._base_logs_dir,
safe_filename(self.current)
)
@property
[docs]
def output_dir(self):
"""Get directory for output files.
Uses environment variable ``BRIGHTWAY2_OUTPUT_DIR``; ``preferences['output_dir']``; or directory ``output`` in current project.
Returns output directory path.
"""
eight.wrap_os_environ_io()
ep, pp = os.getenv('BRIGHTWAY2_OUTPUT_DIR'), config.p.get('output_dir')
if ep and os.path.isdir(ep):
return ep
elif pp and os.path.isdir(pp):
return pp
else:
return self.request_directory('output')
[docs]
def create_project(self, name=None, **kwargs):
name = name or self.current
if not ProjectDataset.select().where(
ProjectDataset.name == name).count():
ProjectDataset.create(
data=kwargs,
name=name
)
create_dir(self.dir)
for dir_name in self._basic_directories:
create_dir(os.path.join(self.dir, dir_name))
create_dir(self.logs_dir)
[docs]
def copy_project(self, new_name, switch=True):
"""Copy current project to a new project named ``new_name``. If ``switch``, switch to new project."""
if new_name in self:
raise ValueError("Project {} already exists".format(new_name))
fp = os.path.join(self._base_data_dir, safe_filename(new_name))
if os.path.exists(fp):
raise ValueError("Project directory already exists")
project_data = ProjectDataset.select(ProjectDataset.name == self.current).get().data
ProjectDataset.create(data=project_data, name=new_name)
shutil.copytree(self.dir, fp, ignore=lambda x, y: ["write-lock"])
create_dir(os.path.join(
self._base_logs_dir,
safe_filename(new_name)
))
if switch:
self.set_current(new_name)
[docs]
def request_directory(self, name):
"""Return the absolute path to the subdirectory ``dirname``, creating it if necessary.
Returns ``False`` if directory can't be created."""
fp = os.path.join(self.dir, str(name))
create_dir(fp)
if not os.path.isdir(fp):
return False
return fp
[docs]
def _use_temp_directory(self):
"""Point the ProjectManager towards a temporary directory instead of `user_data_dir`.
Used exclusively for tests."""
if not self._is_temp_dir:
self._orig_base_data_dir = self._base_data_dir
self._orig_base_logs_dir = self._base_logs_dir
temp_dir = tempfile.mkdtemp()
self._base_data_dir = os.path.join(temp_dir, "data")
self._base_logs_dir = os.path.join(temp_dir, "logs")
self.db.change_path(':memory:')
self.set_current("default", update=False)
self._is_temp_dir = True
return temp_dir
[docs]
def _restore_orig_directory(self):
"""Point the ProjectManager back to original directories.
Used exclusively in tests."""
if not self._is_temp_dir:
return
self._base_data_dir = self._orig_base_data_dir
del self._orig_base_data_dir
self._base_logs_dir = self._orig_base_logs_dir
del self._orig_base_logs_dir
self.db.change_path(os.path.join(self._base_data_dir, "projects.db"))
self.set_current("default", update=False)
self._is_temp_dir = False
[docs]
def delete_project(self, name=None, delete_dir=False):
"""Delete project ``name``, or the current project.
``name`` is the project to delete. If ``name`` is not provided, delete the current project.
By default, the underlying project directory is not deleted; only the project name is removed from the list of active projects. If ``delete_dir`` is ``True``, then also delete the project directory.
If deleting the current project, this function sets the current directory to ``default`` if it exists, or to a random project.
Returns the current project."""
victim = name or self.current
if victim not in self:
raise ValueError("{} is not a project".format(victim))
if len(self) == 1:
raise ValueError("Can't delete only remaining project")
ProjectDataset.delete().where(ProjectDataset.name == victim).execute()
if delete_dir:
dir_path = os.path.join(
self._base_data_dir,
safe_filename(victim)
)
assert os.path.isdir(dir_path), "Can't find project directory"
shutil.rmtree(dir_path)
if name is None or name == self.current:
if "default" in self:
self.set_current("default")
else:
self.set_current(next(iter(self)).name)
return self.current
[docs]
def purge_deleted_directories(self):
"""Delete project directories for projects which are no longer registered.
Returns number of directories deleted."""
registered = {safe_filename(obj.name) for obj in self}
bad_directories = [os.path.join(self._base_data_dir, dirname)
for dirname in os.listdir(self._base_data_dir)
if os.path.isdir(os.path.join(self._base_data_dir, dirname))
and dirname not in registered]
for fp in bad_directories:
shutil.rmtree(fp)
return len(bad_directories)
[docs]
def report(self):
"""Give a report on current projects, including installed databases and file sizes.
Returns tuples of ``(project name, number of databases, size of all databases (GB))``."""
from . import databases
_current = self.current
data = []
def get_dir_size(dirpath):
"""Modified from http://stackoverflow.com/questions/12480367/how-to-generate-directory-size-recursively-in-python-like-du-does.
Does not follow symbolic links"""
return sum(
sum(os.path.getsize(os.path.join(root, name))
for name in files
) for root, dirs, files in os.walk(dirpath))
names = sorted([x.name for x in self])
for obj in names:
self.set_current(obj, update=False, writable=False)
data.append((obj, len(databases), get_dir_size(projects.dir) / 1e9))
self.set_current(_current)
return data
[docs]
projects = ProjectManager()
@wrapt.decorator
[docs]
def writable_project(wrapped, instance, args, kwargs):
if projects.read_only:
raise ReadOnlyProject(READ_ONLY_PROJECT)
return wrapped(*args, **kwargs)