multicpu bzip2 using os.system or queue using python script

H

harijay

I want to quickly bzip2 compress several hundred gigabytes of data
using my 8 core , 16 GB ram workstation.
Currently I am using a simple python script to compress a whole
directory tree using bzip2 and a system call coupled to an os.walk
call.

I see that the bzip2 only uses a single cpu while the other cpus
remain relatively idle.

I am a newbie in queue and threaded processes . But I am wondering how
I can implement this such that I can have four bzip2 running threads
(actually I guess os.system threads ), each using probably their own
cpu , that deplete files from a queue as they bzip them.


Thanks for your suggestions in advance

hari


My single thread script is pasted here .

import os
import sys


for roots, dirlist , filelist in os.walk(os.curdir):
for file in [os.path.join(roots,filegot) for filegot in filelist]:
if "bz2" not in file:
print "Compressing %s" % (file)
os.system("bzip2 %s" % file)
print ":DONE"
 
M

MRAB

harijay said:
I want to quickly bzip2 compress several hundred gigabytes of data
using my 8 core , 16 GB ram workstation.
Currently I am using a simple python script to compress a whole
directory tree using bzip2 and a system call coupled to an os.walk
call.

I see that the bzip2 only uses a single cpu while the other cpus
remain relatively idle.

I am a newbie in queue and threaded processes . But I am wondering how
I can implement this such that I can have four bzip2 running threads
(actually I guess os.system threads ), each using probably their own
cpu , that deplete files from a queue as they bzip them.


Thanks for your suggestions in advance
[snip]
Try this:

import os
import sys
from threading import Thread, Lock
from Queue import Queue

def report(message):
mutex.acquire()
print message
sys.stdout.flush()
mutex.release()

class Compressor(Thread):
def __init__(self, in_queue, out_queue):
Thread.__init__(self)
self.in_queue = in_queue
self.out_queue = out_queue
def run(self):
while True:
path = self.in_queue.get()
sys.stdout.flush()
if path is None:
break
report("Compressing %s" % path)
os.system("bzip2 %s" % path)
report("Done %s" % path)
self.out_queue.put(path)

in_queue = Queue()
out_queue = Queue()
mutex = Lock()

THREAD_COUNT = 4

worker_list = []
for i in range(THREAD_COUNT):
worker = Compressor(in_queue, out_queue)
worker.start()
worker_list.append(worker)

for roots, dirlist, filelist in os.walk(os.curdir):
for file in [os.path.join(roots, filegot) for filegot in filelist]:
if "bz2" not in file:
in_queue.put(file)

for i in range(THREAD_COUNT):
in_queue.put(None)

for worker in worker_list:
worker.join()
 
H

harijay

Thanks a tonne..That code works perfectly and also shows me how to
think of using queue and threads in my python programs

Hari

harijay said:
I want to quickly bzip2 compress several hundred gigabytes of data
using my 8 core , 16 GB ram workstation.
Currently I am using a simple python script to compress a whole
directory tree using bzip2 and a system call coupled to an os.walk
call.
I see that the bzip2 only uses a single cpu while the other cpus
remain relatively idle.
I am a newbie in queue and threaded processes . But I am wondering how
I can implement this such that I can have four bzip2 running threads
(actually I guess os.system threads ), each using probably their own
cpu , that deplete files from a queue as they bzip them.
Thanks for your suggestions in advance

[snip]
Try this:

import os
import sys
from threading import Thread, Lock
from Queue import Queue

def report(message):
     mutex.acquire()
     print message
     sys.stdout.flush()
     mutex.release()

class Compressor(Thread):
     def __init__(self, in_queue, out_queue):
         Thread.__init__(self)
         self.in_queue = in_queue
         self.out_queue = out_queue
     def run(self):
         while True:
             path = self.in_queue.get()
             sys.stdout.flush()
             if path is None:
                 break
             report("Compressing %s" % path)
             os.system("bzip2 %s" % path)
             report("Done %s" %  path)
             self.out_queue.put(path)

in_queue = Queue()
out_queue = Queue()
mutex = Lock()

THREAD_COUNT = 4

worker_list = []
for i in range(THREAD_COUNT):
     worker = Compressor(in_queue, out_queue)
     worker.start()
     worker_list.append(worker)

for roots, dirlist, filelist in os.walk(os.curdir):
     for file in [os.path.join(roots, filegot) for filegot in filelist]:
         if "bz2" not in file:
             in_queue.put(file)

for i in range(THREAD_COUNT):
     in_queue.put(None)

for worker in worker_list:
     worker.join()
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

Forum statistics

Threads
473,764
Messages
2,569,564
Members
45,041
Latest member
RomeoFarnh

Latest Threads

Top