310 lines
10 KiB
Python
Executable file
310 lines
10 KiB
Python
Executable file
#!/usr/bin/python
|
|
|
|
import optparse, sys, os, tempfile, re
|
|
try: import readline
|
|
except ImportError: pass
|
|
from stat import *
|
|
|
|
def show_license(*eat):
|
|
print """rpl - replace strings in files
|
|
Copyright (C) 2004-2005 Goran Weinholt <weinholt@debian.org>
|
|
Copyright (C) 2004 Christian Haggstrom <chm@c00.info>
|
|
|
|
This program is free software; you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation; either version 2 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
|
|
"""
|
|
sys.exit(0)
|
|
|
|
def get_files(filenames, recurse, suffixen, verbose, hidden_files):
|
|
new_files = []
|
|
for filename in filenames:
|
|
try:
|
|
perms = os.lstat(filename)
|
|
except OSError, e:
|
|
sys.stderr.write("\nrpl: Unable to read permissions of %s."
|
|
% filename)
|
|
sys.stderr.write("\nrpl: Error: %s" % e)
|
|
sys.stderr.write("\nrpl: SKIPPING %s\n\n" % filename)
|
|
continue
|
|
|
|
if S_ISDIR(perms.st_mode):
|
|
if recurse:
|
|
if verbose:
|
|
sys.stderr.write("Scanning Directory: %s\n" % filename)
|
|
for f in os.listdir(filename):
|
|
if not hidden_files and f.startswith('.'):
|
|
if verbose:
|
|
sys.stderr.write("Skipping: %s (hidden)\n"
|
|
% os.path.join(filename, f))
|
|
continue
|
|
new_files += get_files([os.path.join(filename, f)],
|
|
recurse, suffixen, verbose,
|
|
hidden_files)
|
|
else:
|
|
if verbose:
|
|
sys.stderr.write("Directory: %s skipped.\n" % filename)
|
|
continue
|
|
elif S_ISREG(perms.st_mode):
|
|
if suffixen != [] and \
|
|
not True in [ filename.endswith(s) for s in suffixen ]:
|
|
sys.stderr.write("Skipping: %s (suffix not in list)\n"
|
|
% filename)
|
|
continue
|
|
new_files += [(filename, perms)]
|
|
else:
|
|
sys.stderr.write("Skipping: %s (not a regular file)\n"
|
|
% filename)
|
|
return new_files
|
|
|
|
def unescape(s):
|
|
regex = re.compile(r'\\([0-7]{1,3}|x[0-9a-fA-F]{2}|[nrtvafb\\])')
|
|
return regex.sub(lambda match: eval('"%s"' % match.group()), s)
|
|
|
|
def blockrepl(instream, outstream, regex, before, after, blocksize=None):
|
|
patlen = len(before)
|
|
sum = 0
|
|
if not blocksize: blocksize = 2*patlen
|
|
tonext = ''
|
|
while 1:
|
|
block = instream.read(blocksize)
|
|
if not block: break
|
|
parts = regex.split(tonext+block)
|
|
sum += len(parts)-1
|
|
lastpart = parts[-1]
|
|
if lastpart:
|
|
tonext = lastpart[-patlen:]
|
|
parts[-1] = lastpart[:-len(tonext)]
|
|
else:
|
|
tonext = ''
|
|
outstream.write(after.join(parts))
|
|
outstream.write(tonext)
|
|
return sum
|
|
|
|
def main():
|
|
# First we parse the command line arguments...
|
|
usage = "usage: %prog [options] old_string new_string target_file(s)"
|
|
parser = optparse.OptionParser(usage, version="%prog 1.5.2")
|
|
parser.add_option("-L", "--license", action="callback",
|
|
callback=show_license, help="show the software license")
|
|
parser.add_option("-x", metavar="SUFFIX",
|
|
action="append", dest="suffixen", default=[],
|
|
help="specify file suffix to match")
|
|
parser.add_option("-i", "--ignore-case",
|
|
action="store_true", dest="ignore_case", default=False,
|
|
help="do a case insensitive match")
|
|
parser.add_option("-w", "--whole-words",
|
|
action="store_true", dest="whole_words", default=False,
|
|
help="whole words (old_string matches on word boundaries only)")
|
|
parser.add_option("-b", "--backup",
|
|
action="store_true", dest="do_backup", default=False,
|
|
help="make a backup before overwriting files")
|
|
parser.add_option("-q", "--quiet",
|
|
action="store_true", dest="quiet", default=False,
|
|
help="quiet mode")
|
|
parser.add_option("-v", "--verbose",
|
|
action="store_true", dest="verbose", default=False,
|
|
help="verbose mode")
|
|
parser.add_option("-s", "--dry-run",
|
|
action="store_true", dest="dry_run", default=False,
|
|
help="simulation mode")
|
|
parser.add_option("-R", "--recursive",
|
|
action="store_true", dest="recurse", default=False,
|
|
help="recurse into subdirectories")
|
|
parser.add_option("-e", "--escape",
|
|
action="store_true", dest="escapes", default=False,
|
|
help="expand escapes in old_string and new_string")
|
|
parser.add_option("-p", "--prompt",
|
|
action="store_true", dest="prompt", default=False,
|
|
help="prompt before modifying each file")
|
|
parser.add_option("-f", "--force",
|
|
action="store_true", dest="force", default=False,
|
|
help="ignore errors when trying to preserve permissions")
|
|
parser.add_option("-d", "--keep-times",
|
|
action="store_true", dest="keep_times", default=False,
|
|
help="keep the modification times on modified files")
|
|
parser.add_option("-t", "--use-tmpdir",
|
|
action="store_true", dest="use_tmpdir", default=False,
|
|
help="use $TMPDIR for storing temporary files")
|
|
parser.add_option("-a", "--all",
|
|
action="store_true", dest="hidden_files", default=False,
|
|
help="do not ignore files and directories starting with .")
|
|
(opts, args) = parser.parse_args()
|
|
|
|
# args should now contain old_str, new_str and a list of files/dirs
|
|
if len(args) < 3:
|
|
parser.error("must have at least three arguments")
|
|
if args[0] == "":
|
|
parser.error("must have something to replace")
|
|
|
|
old_str = args[0]
|
|
new_str = args[1]
|
|
files = args[2:]
|
|
|
|
# See if all the files actually exist
|
|
for file in files:
|
|
if not os.path.exists(file):
|
|
sys.stderr.write("\nrpl: File \"%s\" not found.\n" % file)
|
|
sys.exit(os.EX_DATAERR)
|
|
|
|
if new_str == "" and not opts.quiet:
|
|
sys.stderr.write("Really DELETE all occurences of %s " % old_str)
|
|
if opts.ignore_case:
|
|
sys.stderr.write("(ignoring case)? (Y/[N]) ")
|
|
else:
|
|
sys.stderr.write("(case sensitive)? (Y/[N]) ")
|
|
line = raw_input()
|
|
if line != "" and line[0] in "nN":
|
|
sys.stderr.write("\nrpl: User cancelled operation.\n")
|
|
sys.exit(os.EX_TEMPFAIL)
|
|
|
|
# Tell the user what is going to happen
|
|
if opts.dry_run:
|
|
sys.stderr.write("Simulating replacement of \"%s\" with \"%s\" "
|
|
% (old_str, new_str))
|
|
else:
|
|
sys.stderr.write("Replacing \"%s\" with \"%s\" " % (old_str, new_str))
|
|
if opts.ignore_case: sys.stderr.write("(ignoring case) ")
|
|
else: sys.stderr.write("(case sensitive) ")
|
|
if opts.whole_words: sys.stderr.write("(whole words only)\n")
|
|
else: sys.stderr.write("(partial words matched)\n")
|
|
if opts.dry_run and not opts.quiet:
|
|
sys.stderr.write("The files listed below would be modified in a replace operation.\n")
|
|
|
|
if opts.escapes:
|
|
old_str = unescape(old_str)
|
|
new_str = unescape(new_str)
|
|
if opts.whole_words:
|
|
regex = re.compile(r"(?:(?<=\s)|^)" + re.escape(old_str) + r"(?=\s|$)",
|
|
opts.ignore_case and re.I or 0)
|
|
else:
|
|
regex = re.compile(re.escape(old_str), opts.ignore_case and re.I or 0)
|
|
|
|
total_matches = 0
|
|
files = get_files(files, opts.recurse, opts.suffixen, opts.verbose, opts.hidden_files)
|
|
for filename, perms in files:
|
|
# Open the input file
|
|
try: f = open(filename, "rb")
|
|
except IOError, e:
|
|
sys.stderr.write("\nrpl: Unable to open %s for reading." % fn)
|
|
sys.stderr.write("\nrpl: Error: %s" % e)
|
|
sys.stderr.write("\nrpl: SKIPPING %s\n\n" % fn)
|
|
continue
|
|
|
|
# Find out where we should put the temporary file
|
|
if opts.use_tmpdir: tempfile.tempdir = None
|
|
else: tempfile.tempdir = os.path.dirname(filename)
|
|
|
|
# Create the output file
|
|
try:
|
|
o, tmp_path = tempfile.mkstemp("", ".tmp.")
|
|
o = os.fdopen(o, "wb")
|
|
except OSError, e:
|
|
sys.stderr.write("\nrpl: Unable to create temp file.")
|
|
sys.stderr.write("\nrpl: Error: %s" % e)
|
|
sys.stderr.write("\nrpl: (Type \"rpl -h\" and consider \"-t\" to specify temp file location.)")
|
|
sys.stderr.write("\nrpl: SKIPPING %s\n\n" % filename)
|
|
continue
|
|
|
|
# Set permissions and owner
|
|
try:
|
|
os.chown(tmp_path, perms.st_uid, perms.st_gid)
|
|
os.chmod(tmp_path, perms.st_mode)
|
|
except OSError, e:
|
|
sys.stderr.write("\nrpl: Unable to set owner/group/perms of %s"
|
|
% filename)
|
|
sys.stderr.write("\nrpl: Error: %s" % e)
|
|
if opts.force:
|
|
sys.stderr.write("\nrpl: WARNING: New owner/group/perms may not match!\n\n")
|
|
else:
|
|
sys.stderr.write("\nrpl: SKIPPING %s!\n\n" % filename)
|
|
os.unlink(tmp_path)
|
|
continue
|
|
|
|
if opts.verbose and not opts.dry_run:
|
|
sys.stderr.write("Processing: %s\n" % filename)
|
|
elif not opts.quiet and not opts.dry_run:
|
|
sys.stderr.write(".")
|
|
sys.stderr.flush()
|
|
|
|
# Do the actual work now
|
|
matches = blockrepl(f, o, regex, old_str, new_str, 1024)
|
|
|
|
f.close()
|
|
o.close()
|
|
|
|
if matches == 0:
|
|
os.unlink(tmp_path)
|
|
continue
|
|
|
|
if opts.dry_run:
|
|
try:
|
|
fn = os.path.realpath(filename)
|
|
except OSError, e:
|
|
fn = filename
|
|
if not opts.quiet: sys.stderr.write(" %s\n" % fn)
|
|
os.unlink(tmp_path)
|
|
total_matches += matches
|
|
continue
|
|
|
|
if opts.prompt:
|
|
sys.stderr.write("\nSave '%s' ? ([Y]/N) " % filename)
|
|
line = ""
|
|
while line == "" or line[0] not in "Yy\nnN":
|
|
line = raw_input()
|
|
if line[0] in "nN":
|
|
sys.stderr.write("Not Saved.\n")
|
|
os.unlink(tmp_path)
|
|
continue
|
|
sys.stderr.write("Saved.\n")
|
|
|
|
if opts.do_backup:
|
|
try: os.rename(filename, filename + "~")
|
|
except OSError, e:
|
|
sys.stderr.write("rpl: An error occured renaming %s to %s." % (filename, filename + "~"))
|
|
sys.stderr.write("\nrpl: Error: %s" % e)
|
|
continue
|
|
|
|
# Rename the file
|
|
try: os.rename(tmp_path, filename)
|
|
except OSError, e:
|
|
sys.stderr.write("rpl: An error occured replacing %s with %s."
|
|
% (tmp_path, filename))
|
|
sys.stderr.write("\nrpl: Error: %s" % e)
|
|
os.unlink(tmp_path)
|
|
continue
|
|
|
|
# Restore the times
|
|
if opts.keep_times:
|
|
try: os.utime(filename, (perms.st_atime, perms.st_mtime))
|
|
except OSError, e:
|
|
sys.stderr.write("\nrpl: An error occured setting the access time and mod time of the file %s.", filename)
|
|
sys.stderr.write("\nrpl: Error: %s" % e)
|
|
total_matches += matches
|
|
|
|
# We're about to exit, give a summary
|
|
if not opts.quiet:
|
|
if opts.dry_run:
|
|
sys.stderr.write("\nA Total of %lu matches found in %lu file%s searched."
|
|
% (total_matches,
|
|
len(files),
|
|
len(files) != 1 and "s" or ""))
|
|
sys.stderr.write("\nNone replaced (simulation mode).\n")
|
|
else:
|
|
sys.stderr.write("\nA Total of %lu matches replaced in %lu file%s searched.\n"
|
|
% (total_matches,
|
|
len(files),
|
|
len(files) != 1 and "s" or ""))
|
|
|
|
if __name__ == "__main__":
|
|
main()
|