Thursday, November 12, 2009

Synchronization of files with SSH and Python

Specification of the problem

I have to maintain a system that needs to be updated from time to time. Update files are uploaded for me to a directory on a
host where I have an ssh account. Let us call this host repomaster.

Rsync, ftp, http etc. are not available.


Repomaster can be accessed from one host only, let us call it repoproxy.

The files are needed on several hosts, on which the application is installed. For the sake of simplicity let us assume that
we have two such hosts: appnode1 and appnode2.

All the involved hosts are some kind of Unix/Linux, but are not the same.

The files are big, and can be updated at any moment, usually Friday evening, when everybody already has gone home. When the new
files are uploaded, they overwrite the old ones. There is nothing besides the files themselves (their modification
time, size and contents) indicating that files on the repomaster changed.


When I come to work on Monday morning, I find a notification email that the application needs to be updated. All the team
members also get the very same notification email and are extremely anxious to have all the application nodes updated
as soon as possible.

Doing this manually is a very unpleasant work - one has to first copy the files from repomaster to repoproxy, and then from
repoproxy to all the application nodes. The network performance is poor, sessions oftentimes get broken, files get corrupted. One has to
change between many screen sessions on several hosts, verify the checksums of files etc. It is an unpleasant and error prone process
that takes several hours to complete manually.

What I wanted was that all this happens automatically during the night, even on weekends or hollidays, so that on Monday morning I only
have to check the logs to verify that everything completed successfully. And, if something did not work, I only have to run one
single command after I correct the error condition to have the job proceed from where it failed.


Solution design

Since neither rsync nor wget (with http or ftp server) are available, I decided to use Python with standard SSH and SCP.

SSH is used by calling comand line directly, not through Paramiko or Twisted libraries. Unix path separator is hardcoded, because
this script is a quick hack to save me manual work until a better production script is produced. (“Better” means something more
sophisticated, transferring files in smaller chunks and more robust - recovering quicker from network failures and re-transferring
only the broken chunks, not whole files).


In this article I share with you only the script that copies the files from parent file repository to the child repository. Code that
deletes the files that were deleted on the parent is not published here to make this article shorter.

The general idea is that when we run this command:

sshsync --sync repoproxy --sync appnode1,appnode2

our script will fetch the configuration of the repoproxy and its parent, repomaster, find all the
in the repomaster, copy the ones that were updated in the master repo, and then move to the next step.

The next step in our example would be to synchronize appnode1 and appnode2 in the same way as we
did with repoproxy in the previous step. Since these nodes were specified as one step, they are done
in parallel, to save time.


Again, to simplify this article, I removed the code for running the synchronization in parallel. The
code that I am sharing with you does not support this, it only supports sequential updates:

sshsync --sync repoproxy --sync appnode1 --sync appnode2

All the needed information - usernames, IPs, remote paths etc. are kept in the configuration file.

The script is simplistic, it uses direct command executed on remote machines with ssh - for this
to be possible the passwordless ssh logins have to be enabled. No /etc/motd or /etc/issue banners
can be displayed on login.

This script can be run on one of the hosts or on separate machine - this does not matter.

Since the target systems run different operating systems and have different tools installed,
the way we get a list of files, calculate a file digest or transfer a file is specified
separately for each host in the configuration file.

Files are transferred if the digests in the current repository and its parent repository do
not match.

The script writes to the log file. Log file is rotated with standard OS tooling (logrotate), this
script does not implement log rotation.


The script is run from cron. Only one instance can be run on a given time.

Solution implementation

Configuration file

Configuration file has a separate section for each repository. Each section contains
two blocks: information needed to find the repository (host, user, repository path) and
its parent (reference to parent configuration block) and the templates for the three
commands that will be used to process information in the repository:

  1. Command to list repository contents
  2. Command to calculate the digest of the file in the repository
  3. Command to transfer a file from the parent repository
#===================================================================
#         FILE:  /etc/sshsync/sshsync.conf
#  DESCRIPTION:  SSH synchronization tooling config
#===================================================================

[repomaster]

user = repoowner
password = ******
host = 192.168.0.1
path = /home/repoowner/apprepo

listcmd = find ${REPOPATH} -type f
digestcmd = if [ -f ${FILEPATH} ]; then sha1sum ${FILEPATH} | cut -d' ' -f1; else echo \'NONE\'; fi


[repoproxy]

user = proxyrunner
password = ******
host = 192.168.1.1
path = /dat/repoproxy
parent = repomaster

listcmd = find ${REPOPATH} -type f
digestcmd = if [ -f ${FILEPATH} ]; then sha1sum ${FILEPATH} | cut -d' ' -f1; else echo \'NONE\'; fi
synccmd = scp -p -i /home/proxyrunner/.ssh/id_dsa ${REMOTEUSER}@${REMOTEHOST}:${REMOTEDIR}/${FILE} ${LOCALDIR}/${FILE}

[appnode1]

user = john
password = ******
host = 192.168.2.1
path = /apprepo/latest
parent = repoproxy

listcmd = find ${REPOPATH} -type f
digestcmd = if [ -f ${FILEPATH} ]; then digest -a sha1 ${FILEPATH}; else echo \'NONE\'; fi
synccmd = scp -p -i /home/john/.ssh/id_dsa ${REMOTEUSER}@${REMOTEHOST}:${REMOTEDIR}/${FILE} ${LOCALDIR}/${FILE}

[appnode2]

user = sheila
password = ******
host = 192.168.2.2
path = /home/sheila/apprepo
parent = repoproxy

listcmd = find ${REPOPATH} -type f
digestcmd = if [ -f ${FILEPATH} ]; then digest -a sha1 ${FILEPATH}; else echo \'NONE\'; fi
synccmd = scp -p -i /home/sheila/.ssh/id_dsa ${REMOTEUSER}@${REMOTEHOST}:${REMOTEDIR}/${FILE} ${LOCALDIR}/${FILE}

As you can see I did not use the tools that Python’s ConfigParser module gives me to resolve variables defined
in other parts of the file. I use the variables in a shell-like notation and replace them in the script. In my
opinion this is easier to maintain, as I can update the parent-child relationship between repositories much esier.

Patterns for listcmd, digestcmd and synccmd are bash oneliners, but nothing prevents them to be more
sophisticated, separate scripts. They can be different for each repository - as you can see, I sometimes
use digest command to calculate SHA1 of a file, and sometimes I use sha1sum for the same purpose. I do it because
some hosts do not have sha1sum installed. The important thing is, that these commands return exactly the same strings.


Control script

Let me present the control script in chunks.

#!/bin/env python
"""
File repository synchronization with SSH.
Compares digests of files, and transfers them if necessary.
Requires passwordless ssh logins, no output on ssh login, availability of
list, digest and transfer commands on all respective repo servers.

"""

import os
import sys
import getopt
import ConfigParser

import logging, logging.config
import commands

This is a standard shebang to allow calling the script as a command plus docstring and necessary imports.
Modules os and sys are needed for operating system related stuff, getopt for option parsing,

ConfigParser for handling our configuration file, logging for logging support and commands for
calling command line programs and collecting their return code and output.

class UsageException(Exception):

  """
  Exception thrown when parameters are wrong.
  """
  def __init__(self, msg):

      self.msg = msg

class InternalErrorException(Exception):

  """
  Exception thrown when internal application error occurs.
  """
  def __init__(self, msg):

      self.msg = msg

class ProcessingErrorException(Exception):

  """
  Exception thrown when application encountered an external error.
  """
  def __init__(self, msg):

      self.msg = msg

Well, here I deviate a bit from my personal standards. Usually I define one parent exception class and
make all the exceptions that a given application can throw extend it. That way it is easy for any code
importing our module to catch all our exceptions.

In this case I am being lazy, I just define three separate exceptions: one for bad usage, one for
internal bugs, and one for outside conditions that may make the application to fail.

config = None

log = None
cmd = commands.getstatusoutput

scriptconfig = '/etc/sshsync/sshsync.conf'

logconfig = '/etc/sshsync/logging.conf'
pidfile = '/var/run/sshsync.pid'

Globals. Config and log are to be instantiated later in the code, cmd is a mere shortcut, the rest
are the locations of files used by the script. I do not explain logconfig here, as this is plain, standard
configuration file for Python logging.


def print_usage():
  """
  Print usage information.
  """
  print >>sys.stdout, """\

  Usage:
      sshsync --option=<value> ...
  Options:
      -h / --help
          Print this help message and exit

      -s / --sync <repository>
          Synchronize <repository> with it's parent
  """
  return 0

This is just a funtion called to print a standard help message. Notice that it returns, not exits.

def lock(lockfile):
  """ """
  rc = True

  if os.path.exists(lockfile):
      rc = False

      log.debug('Can not create lock file. Lock file already exists: %s' % lockfile)
  else:

      log.debug('Creating lock file: %s' % lockfile)
      try:

          f = open(lockfile, 'w')
          f.write(str(os.getpid()))

          f.close()
          log.debug('Lock file created: %s' % lockfile)

      except IOError, error:
          rc = False
          log.error('Can not create lock file. I/O error: %s' % error)

  return rc

def unlock(lockfile):
  """ """

  rc = True
  if not os.path.exists(lockfile):

      rc = False
      log.error('Unlock called, but lock file does not exist: %s' % lockfile)

  else:
      log.debug('Removing lock file: %s' % lockfile)

      try:
          os.unlink(lockfile)
          log.debug('Lock file deleted: %s' % lockfile)

      except IOError, error:
          rc = False
          log.error('Can not delete lock file. I/O error: %s' % error)

  return rc

Our script should have only one, single instance at any given time. I use a PID file to
find out whether there is any other instance running, or not.

def runrmtcmd(host, user, password, rmtcmd):

  """ """
  status, out = cmd('ssh -l %s %s "%s"' % (user, host, rmtcmd))

  if status != 0:
      raise ProcessingErrorException('Can not run command "%s" on host "%s": %s' % (rmtcmd, host, out))

  return out

A tool to run a command on the remote system. Password is ignored. I keep it just in case if later I
want to be more trendy and use Twisted libraries to properly login with SSH.

This function relies on passwordless logins enabled for SSH. Notice that non-zero return code triggers
throwing of exception.

As you can see, I am pretty lazy with writing docstrings.

def getrmtfiles(repository):

  """ """
  files = []
  log.debug('Getting list of files in the repository: %s' % repository)

  repo_host = config.get(repository, 'host')
  repo_user = config.get(repository, 'user')

  repo_password = config.get(repository, 'password')
  repo_path = os.path.normpath(config.get(repository, 'path'))

  listcmd = config.get(repository, 'listcmd')
  listcmd = listcmd.replace('${REPOPATH}', repo_path)

  out = runrmtcmd(repo_host, repo_user, repo_password, listcmd)

  prefix = len(repo_path) + 1
  lines = out.splitlines()

  for line in lines:
      f = line[prefix:]

      files.append(f)
      log.debug('Repository %s has file: %s' % (repository, f))

  return files

This is the function that we call to get a list of files in the repository.

Config is a global that must be initialized before we call this function. Again, this is being
lazy on my part, I would not write such a thing in a library class. Yes, I agree, it would be
cleaner to have a Repository object instead of a bunch of functions and global variables...


Config is wise enough to fetch strings from our configuration file. Read ConfigParser documentation
to learn more about it.

I am cutting off the repository path from the file name, because the repositories can be placed in different
places on different machines. I use substring instead of os.path functions, because I do not want to deal
with leading dots in file paths.

def getrmtdigest(repository, fpath):

  """ """
  log.debug('Getting digest from repository %s for file: %s' % (repository, fpath))

  repo_host = config.get(repository, 'host')
  repo_user = config.get(repository, 'user')

  repo_password = config.get(repository, 'password')
  repo_path = os.path.normpath(config.get(repository, 'path'))

  digestcmd = config.get(repository, 'digestcmd')
  digestcmd = digestcmd.replace('${FILEPATH}', '%s/%s' % (repo_path, fpath))

  digest = runrmtcmd(repo_host, repo_user, repo_password, digestcmd)

  log.debug('Digest from repository %s for file %s : %s' % (repository, fpath, digest))

  return digest

This is the function that gets the digest of a file in a repository. As you can see, I am
repeating myself to fetch the user, password and host from the configuration. If this code were to
be used longer, I would make a repository object and fetch all the configuration only once in __init__
function.

I leave this improvement as an exercise to the visitors of my blog.

def syncrmtfile(repository, fpath):

  """ """

  repo_host = config.get(repository, 'host')

  repo_user = config.get(repository, 'user')
  repo_password = config.get(repository, 'password')

  repo_path = os.path.normpath(config.get(repository, 'path'))

  parent = config.get(repository, 'parent')
  parent_host = config.get(parent, 'host')

  parent_user = config.get(parent, 'user')
  parent_password = config.get(parent, 'password')

  parent_path = os.path.normpath(config.get(parent, 'path'))

  rmtcmd = config.get(repository, 'synccmd')
  rmtcmd = rmtcmd.replace('${FILE}', fpath)

  rmtcmd = rmtcmd.replace('${REMOTEHOST}', parent_host)
  rmtcmd = rmtcmd.replace('${REMOTEUSER}', parent_user)

  rmtcmd = rmtcmd.replace('${REMOTEPASSWORD}', parent_password)
  rmtcmd = rmtcmd.replace('${REMOTEDIR}', parent_path)

  rmtcmd = rmtcmd.replace('${LOCALHOST}', repo_host)
  rmtcmd = rmtcmd.replace('${LOCALUSER}', repo_user)

  rmtcmd = rmtcmd.replace('${LOCALPASSWORD}', repo_password)
  rmtcmd = rmtcmd.replace('${LOCALDIR}', repo_path)

  log.info('Syncing file in repository %s: %s' % (repository, fpath))

  cmdout = runrmtcmd(repo_host, repo_user, repo_password, rmtcmd )

  for line in cmdout.splitlines():
      log.debug("Host %s: %s" % (repo_host, line))

Code used to copy a single file from the parent repository to the current one.

Again, DRY principle violated. Sorry about that.

def syncrepo(repository):
  """ """
  log.info('Syncing repository: %s' % repository)

  log.debug('This script assumes passwordless logins with ssh!')

  parent = config.get(repository, 'parent')

  parent_files = getrmtfiles(parent)

  for f in parent_files:

      pdigest = getrmtdigest(parent, f)
      rdigest = getrmtdigest(repository, f)

      if pdigest == rdigest:
          log.debug('File %s in %s and %s have the same digest (%s). Synchronization is not needed.' % (f, parent, repository, rdigest))

      else:
          syncrmtfile(repository, f)
          rdigest = getrmtdigest(repository, f)

          if pdigest == rdigest:
              log.info('File %s has been successfully copied from %s to %s. File digest is: %s' % (f, parent, repository, rdigest))

          else:
              log.error('Transfer of file %s from %s to %s failed. Digests mismatch (%s != %s)' % (f, parent, repository, pdigest, rdigest))

A function capable of syncing all the files in a single repository. It gets a list of files and for each file
compares its digest in the current and parent repositories. If the digests do not match, the file is being copied from the parrent.

That is wasting the bandwidth, but at least easy to write.

def checkconfig(repository):
  """ """
  rc = True

  parent = None
  if config.has_section(repository):

      if not config.has_option(repository, 'host') or not config.get(repository, 'host'):

          rc = False
          log.error('Missing host for repository %s' % repository)

      if not config.has_option(repository, 'user') or not config.get(repository, 'user'):

          rc = False
          log.error('Missing user for repository %s' % repository)

      if not config.has_option(repository, 'password') or not config.get(repository, 'password'):

          rc = False
          log.error('Missing password for repository %s' % repository)

      if not config.has_option(repository, 'path') or not config.get(repository, 'path'):

          rc = False
          log.error('Missing path for repository %s' % repository)

      if not config.has_option(repository, 'digestcmd') or not config.get(repository, 'digestcmd'):

          rc = False
          log.error('Missing digestcmd for repository %s' % repository)

      if not config.has_option(repository, 'synccmd') or not config.get(repository, 'synccmd'):

          rc = False
          log.error('Missing synccmd for repository %s' % repository)

      if config.has_option(repository, 'parent'):
          parent = config.get(repository, 'parent')

          if not parent:
              rc = False
              log.error('Parent repository name missing for %s' % repository)

          else:
              if not config.has_option(parent, 'host') or not config.get(parent, 'host'):

                  rc = False
                  log.error('Missing host for repository %s' % parent)

              if not config.has_option(parent, 'user') or not config.get(parent, 'user'):

                  rc = False
                  log.error('Missing user for repository %s' % parent)

              if not config.has_option(parent, 'password') or not config.get(parent, 'password'):

                  rc = False
                  log.error('Missing password for repository %s' % parent)

              if not config.has_option(parent, 'path') or not config.get(parent, 'path'):

                  rc = False
                  log.error('Missing path for repository %s' % parent)

              if not config.has_option(parent, 'listcmd') or not config.get(parent, 'listcmd'):

                  rc = False
                  log.error('Missing listcmd for repository %s' % parent)

              if not config.has_option(parent, 'digestcmd') or not config.get(parent, 'digestcmd'):

                  rc = False
                  log.error('Missing digestcmd for repository %s' % parent)

      else:
          rc = False
          log.error('Repository %s does not have parent repository.' % repository)

  else:
      rc = False
      log.error('Missing configuration for repository: %s' % repository)

  return rc

Before we start doing the job, we want to know if our configuration file can be parsed and contains
all the information that we will need. This function is supposed to take care of that.

I am checking if all the keys are present and if they are not empty.

def main(argv=None):

  """ """

  if argv is None:
      argv = sys.argv[1:]

  global config
  global log

  config = ConfigParser.ConfigParser()

  config.read(scriptconfig)

  logging.config.fileConfig(logconfig)

  log = logging.getLogger('mbrepo')

  opts, args = getopt.getopt(argv, 'hs:', ["help","sync="])

  syncqueue = []

  for opt, arg in opts:

      if opt in ('-h', '--help'):
          return print_usage()

      elif opt in ('-s', '--sync'):
          if checkconfig(arg):

              syncqueue.append(arg)
              log.debug('Scheduled repo for syncing: %s' % arg)

          else:
              raise ProcessingErrorException("Missing configuration to sync repository: %s" % arg)

      else:
          raise UsageException('Unknown option: ' + opt)

  if lock(pidfile):
      try:
          log.info('Begin repository synchronization. Syncqueue: %s' % ', '.join(syncqueue))

          for repo in syncqueue:
              syncrepo(repo)
      finally:

          unlock(pidfile)
  else:
      log.warning('Terminating - could not create run lock.')

Well, pretty standard main function, doing all the job a script is supposed to do.

Get the arguments cutting off the script name, initialize configuration and logging machinery, parse
the options, create lock, synchronize everything, remove the lock. Allow all exceptions to
percolate (let the ones who call us decide what to do with exceptions), only make sure that the lock
file is not left behind - otherwise cron would not be able to start the script again until the
operator removes the lock manually.

if __name__ == "__main__":

  rc = 0

  try:
      rc = main(sys.argv[1:])

  except getopt.error, error:
      logging.error('Bad call: %s' % error.msg)

      print >>sys.stderr, error.msg
      print >>sys.stderr, "For help use --help"

      rc = 1

  except UsageException, err:

      logging.error('Bad call: %s' % err.msg)
      print >>sys.stderr, err.msg

      print >>sys.stderr, "For help use --help"
      rc = 2

  except ConfigParser.ParsingError, err:
      print >>sys.stderr, err

      print >>sys.stderr, "Please correct your configuration file!"
      rc = 3

  except ConfigParser.NoSectionError, error:
      print >>sys.stderr, error

      print >>sys.stderr, "Please extend your configuration file!"
      rc = 4

  except ConfigParser.NoOptionError, error:
      print >>sys.stderr, error

      print >>sys.stderr, "Please correct your configuration file!"
      rc = 3

  except InternalErrorException, err:
      logging.critical('Internal error: %s' % err.msg)

      print >>sys.stderr, 'Internal application error occured: %s' % err.msg

      rc = 5

  except ProcessingErrorException, err:

      logging.critical('Processing error: %s' % err.msg)
      print >>sys.stderr, 'Terminating on processing error: %s' % err.msg

      rc = 6

  except:
      raise

  sys.exit(rc)

This is the fragment that calls the main function. We call it only if run
directly from command line, catch the exceptions and convert them to error
messages and error return codes.

Finishing steps

In my case I package the script as RPM, add logrotate configuration and add it to cron.