# The MIT License (MIT)
# Copyright (c) 2014 abarnert
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# fatomic from abarnert
# https://github.com/abarnert/fatomic
import contextlib
import errno
import os
import shutil
import sys
import tempfile
import types
__all__ = [
"replace",
"open",
"writeall",
"writechunks",
"writelines",
"transformall",
"transformchunks",
"transformlines",
"transform",
]
# If we're on 3.3+, just use os.replace; if we're on POSIX, rename
# and replace do the same thing.
try:
except AttributeError:
if sys.platform != "win32":
replace = os.rename
else:
# This requires PyWin32 if you're on Windows. If that's not
# accepted, you can write a ctypes solution, but then you'll
# have to handle unicode-vs.-bytes strings and creating an
# OSError from GetLastError and so on yourself, which I don't
# feel like doing. (I'll accept a pull request from anyone
# else who does...)
import win32api
import win32con
def replace(src, dst):
win32api.MoveFileEx(src, dst, win32con.MOVEFILE_REPLACE_EXISTING)
def _guessmode(contents, binary):
if binary is not None:
return binary
if isinstance(contents, str):
return False
elif isinstance(contents, (bytes, bytearray)):
return True
else:
return False
def _mode(binary, contents=None):
if binary is not None:
return "b" if binary else ""
if contents is None:
raise TypeError("binary cannot be None")
if isinstance(contents, str):
return ""
elif isinstance(contents, (bytes, bytearray)):
return "b"
else:
return ""
def _tempfile(filename, mode):
return tempfile.NamedTemporaryFile(
mode=mode, prefix=os.path.basename(filename), suffix=".tmp", delete=False
)
_open = open
@contextlib.contextmanager
[docs]
def open(filename, mode, *args, **kwargs):
if mode[0] not in "wxa" or len(mode) > 1 and mode[1] == "+":
raise ValueError("invalid mode: '{}'".format(mode))
f = _tempfile(filename, mode, *args, **kwargs)
_discard = [False]
try:
if mode[0] == "a":
try:
with _open(filename, "r" + mode[1:], *args, **kwargs) as fin:
shutil.copyfileobj(fin, f)
except (OSError, IOError) as e:
if e.errno == errno.ENOENT:
pass
def discard(self, _discard=_discard):
_discard[0] = True
f.discard = types.MethodType(discard, f)
yield f
finally:
f.close()
if not _discard[0]:
try:
replace(f.name, filename)
except OSError:
shutil.move(f.name, filename)
def write(filename, lines, binary=False):
mode = _mode(binary)
with open(filename, "w" + mode) as fout:
fout.writelines(lines)
[docs]
def writeall(filename, contents, binary=None):
mode = _mode(binary, contents)
with open(filename, "w" + mode) as fout:
f.write(contents)
def _chunkfile(f, chunksize=None):
if chunksize is None:
chunksize = 8192
while True:
buf = f.read(chunksize)
if not buf:
return
yield buf
def append(filename, lines, binary=False):
mode = _mode(binary)
with open(filename, "a" + mode) as fout:
fout.writelines(lines)
def appendall(filename, contents, binary=False):
mode = _mode(binary)
with open(filename, "a" + mode) as fout:
fout.write(contents)
appendchunks = append
def test():
with open("foo", "a") as f:
f.write("Hello, ")
with open("foo", "a") as f:
f.write("world\n")
transform("foo", lambda line: line.replace("Hello", "Hi"))
if __name__ == "__main__":
test()