''' decompress module, version 0.3.3 (c) 2003-2018 Jan ONDREJ (SAL) This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. ''' from avlib import * import filetype,resource,shutil __all__=['decompress','decompressors','internal_decompressors'] decompressors={ 'zip': [[ 'unzip','-P','s','-o' ]], 'rar': [[ 'unrar','x','-p-','-y'], ['rar','x','-p-','-y' ]], 'ace': [[ 'unace','x','-y' ]], 'arj': [[ 'unarj','x','-y'], [ 'arj','x','-y' ]], 'zoo': [[ 'unzoo','-x','-j'], [ 'zoo','-x','-j' ]], 'lha': [[ 'lha','xf' ]], 'tar': [[ 'tar','xvf' ]], #'tar.gz': [[ 'tar','xvzf' ], [ 'gzip','-d' ]], #'tgz': [[ 'tar','xvzf' ]], #'tar.bz2': [[ 'tar','xvjf' ], [ 'bzip2','-d' ]], 'bz2': [[ 'bzip2','-d','-k' ]], 'gz': [[ 'gzip','-d' ]], } internal_decompressors={} # ZIP support try: import zipfile class i_unzip: '''UnZip file''' def __init__(self,maxfilesize=100*1024*1024): ''' if decompressed file is larger than maxfilesize, it is skipped ''' self.maxfilesize=maxfilesize def unpack(self,zipfilename,dir): debug.echo(4,"Internal unzip: ",zipfilename) filenames,fsize=[],0 zip=zipfile.ZipFile(safe.fn(zipfilename),'r') for i in zip.namelist(): info=zip.getinfo(i) if (info.file_size=0): debug.echo(5,"Decompressing ... ",i) tf=mktemp('','-'+normalize_filename(i),'w',0o600) filenames.append(os.path.join(dir,tf.name)) tf.f.write(zip.read(i)) fsize=fsize+tf.f.tell() tf.f.close() else: debug.echo(0,"Internal UnZip: ERROR: FileSize too large: ",info.file_size) zip.close() return filenames,fsize internal_decompressors['zip']=i_unzip() except ImportError: class zipfile: BadZipfile='BadZipFile' # GZIP support try: import gzip class i_ungzip: '''unpack gzip file''' def unpack(self,filename,dir): debug.echo(4,"Internal ungzip: ",filename) gzf=gzip.GzipFile(safe.fn(filename),'r') fn_noext=re.sub(r'\.gz$','',re.sub(r'\.tgz$','.tar',filename)) tf=mktemp('',normalize_filename(fn_noext),'w',0o600).autorm() fsize=0 while 1: buf=gzf.read(BUFSIZE) if buf: tf.f.write(buf) fsize=fsize+len(buf) else: break tf.f.close() gzf.close() return [os.path.join(dir,fn_noext)],fsize internal_decompressors['gz']=i_ungzip() except ImportError: pass class decompress(interscanner): ''' Scanner used to decompress archives (zip,rar,arj,zoo,tar,...). It is a scanner, which can decompress some of files. You can define your own decompressors and assing it to any extensions. This scanner is a filescanner only, because external decompressors can unpack only files. A used scanner must be also a filescanner. If not, use file2buffer() interscanner. Filetype is determined on file content (not on file extension). It can detect some exe SFX archives too. Usage: decompress(filescanner1(),[a,b,c,d]=[3,500,50MB,20MB]) Where: filescanner1() is an filescanner, which can scan all files [a,b,c,d] is an array of 4 numbers: a = max number of recursions b = max number of decompressed files c = max disk usage with all of decompressed files d = max address space usage ''' name='decompress()' def __init__(self,scanner,max_limit=[3,500,50*1024*1024,20*1024*1024]): self.name=self.name[:-1]+scanner.name+')' self.scanner=scanner self.max_recursion=max_limit[0] self.max_file_count=max_limit[1] self.max_file_size=max_limit[2] self.max_as_limit=max_limit[3] self.initvars() def initvars(self): self.vir,self.ret='',[] self.defiles=[] self.rmfiles=[] self.fsize=0 self.fcount=0 self.showed_error=0 def destroy(self): self.scanner.destroy() if debug.debug_level<10: for f in self.rmfiles: try: shutil.rmtree(f,ignore_errors=True) except IOError: pass else: debug.echo(4,"decompress(): unfinished destroy:",self.rmfiles) self.initvars() def scanfile(self,files,dir='',args={}): return self.iscanfile(files,dir,args) def iscanfile(self,files,dir='',args={},level=0): # check for max_recursion if level>=self.max_recursion: if self.showed_error<1: debug.echo(0,"Decompress: ERROR: Max recursion reached, ", level) self.showed_error=1 return 0.0, b'', [] # check for max_filesize if self.fsize>=self.max_file_size: if self.showed_error<2: debug.echo(0,"Decompress: ERROR: Max file size reached, ", self.fsize) self.showed_error=2 return 0.0, b'', [] # check for max_file_count if self.fcount>=self.max_file_count: if self.showed_error<2: debug.echo(0,"Decompress: ERROR: Max file count reached, ", self.fcount) self.showed_error=2 return 0.0, b'', [] # decompress files ... for fname in files: try: nfname=normalize_filename(fname) ft=filetype.file(fname)[0] except IndexError: ft="" debug.echo(6,"decompress(): file name/type: ",nfname," / ",ft) already_scanned=0 if ft in internal_decompressors: try: zipdir=mktemp(fname, '.dir', 'd', 0o700) self.rmfiles.append(zipdir.name) os.chdir(safe.fn(zipdir.name)) newfiles,size=internal_decompressors[ft].unpack(fname,zipdir.name) self.fsize=self.fsize+size self.fcount=self.fcount+len(newfiles) self.iscanfile(newfiles,dir,args,level+1) already_scanned=1 except zipfile.BadZipfile: debug.echo(2,"Internal decompressor: ERROR: file: ",nfname, " -> ",debug.traceback_value_str()) except: debug.echo(0,"Internal decompressor: ERROR: file: ",nfname, " -> ",debug.traceback_value_str()) debug.traceback(4,"internal_decompressor: ") if (already_scanned<1) and (ft in list(decompressors.keys())): progs=decompressors[ft] zipdir=mktemp(fname, '.dir', 'd', 0o700) self.rmfiles.append(zipdir.name) for arg in progs: try: pf=popen(arg+[safe.fn(fname)],safe.fn(zipdir.name), {resource.RLIMIT_FSIZE:self.max_file_size, resource.RLIMIT_AS:self.max_as_limit}) decerr=pf.readlines() if pf.wait()>0: # add fname to decompressed files if unpack returned error self.defiles.append(fname) if pf.execerror=='': break else: debug.echo(0,"Decompressor: ERROR: code: ",pf.exitstatus) except: debug.echo(3,'Decompressor failed: ',str(arg+[fname])) debug.traceback(3,"Decompressor: ") newfiles,size=rlistdir(zipdir.name,zipdir.name) self.fsize=self.fsize+size self.fcount=self.fcount+len(newfiles) self.iscanfile(newfiles,dir,args,level+1) else: # no decompressor found for this filetype, add it as is self.defiles.append(fname) if level==0: debug.echo(5,"decompressed files: ",[self.defiles]) self.scanner.prescan() level,vir,ret=self.scanner.scanfile(self.defiles,dir,args) self.scanner.postscan(level,vir,ret) else: level,vir,ret=0.0, b'', [] return level, vir, ret