Viewing file: find.py (4.12 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
# A parallelized "find(1)" using the thread module.
# This demonstrates the use of a work queue and worker threads. # It really does do more stats/sec when using multiple threads, # although the improvement is only about 20-30 percent. # (That was 8 years ago. In 2002, on Linux, I can't measure # a speedup. :-( )
# I'm too lazy to write a command line parser for the full find(1) # command line syntax, so the predicate it searches for is wired-in, # see function selector() below. (It currently searches for files with # world write permission.)
# Usage: parfind.py [-w nworkers] [directory] ... # Default nworkers is 4
import sys import getopt import string import time import os from stat import * import thread
# Work queue class. Usage: # wq = WorkQ() # wq.addwork(func, (arg1, arg2, ...)) # one or more calls # wq.run(nworkers) # The work is done when wq.run() completes. # The function calls executed by the workers may add more work. # Don't use keyboard interrupts!
class WorkQ:
# Invariants:
# - busy and work are only modified when mutex is locked # - len(work) is the number of jobs ready to be taken # - busy is the number of jobs being done # - todo is locked iff there is no work and somebody is busy
def __init__(self): self.mutex = thread.allocate() self.todo = thread.allocate() self.todo.acquire() self.work = [] self.busy = 0
def addwork(self, func, args): job = (func, args) self.mutex.acquire() self.work.append(job) self.mutex.release() if len(self.work) == 1: self.todo.release()
def _getwork(self): self.todo.acquire() self.mutex.acquire() if self.busy == 0 and len(self.work) == 0: self.mutex.release() self.todo.release() return None job = self.work[0] del self.work[0] self.busy = self.busy + 1 self.mutex.release() if len(self.work) > 0: self.todo.release() return job
def _donework(self): self.mutex.acquire() self.busy = self.busy - 1 if self.busy == 0 and len(self.work) == 0: self.todo.release() self.mutex.release()
def _worker(self): time.sleep(0.00001) # Let other threads run while 1: job = self._getwork() if not job: break func, args = job apply(func, args) self._donework()
def run(self, nworkers): if not self.work: return # Nothing to do for i in range(nworkers-1): thread.start_new(self._worker, ()) self._worker() self.todo.acquire()
# Main program
def main(): nworkers = 4 opts, args = getopt.getopt(sys.argv[1:], '-w:') for opt, arg in opts: if opt == '-w': nworkers = string.atoi(arg) if not args: args = [os.curdir]
wq = WorkQ() for dir in args: wq.addwork(find, (dir, selector, wq))
t1 = time.time() wq.run(nworkers) t2 = time.time()
sys.stderr.write('Total time %r sec.\n' % (t2-t1))
# The predicate -- defines what files we look for. # Feel free to change this to suit your purpose
def selector(dir, name, fullname, stat): # Look for world writable files that are not symlinks return (stat[ST_MODE] & 0002) != 0 and not S_ISLNK(stat[ST_MODE])
# The find procedure -- calls wq.addwork() for subdirectories
def find(dir, pred, wq): try: names = os.listdir(dir) except os.error, msg: print repr(dir), ':', msg return for name in names: if name not in (os.curdir, os.pardir): fullname = os.path.join(dir, name) try: stat = os.lstat(fullname) except os.error, msg: print repr(fullname), ':', msg continue if pred(dir, name, fullname, stat): print fullname if S_ISDIR(stat[ST_MODE]): if not os.path.ismount(fullname): wq.addwork(find, (fullname, pred, wq))
# Call the main program
main()
|