''' File/buffer interscanners for sagator, version 0.6.2 (c) 2003-2019 Jan ONDREJ (SAL) This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. ''' from avlib import * import shutil __all__ = ['file2buffer', 'buffer2file', 'f2b', 'b2f', 'buffer2mbox'] ##################################################################### ### convertors class file2buffer(interscanner): ''' File to buffer converter. This scanner converts a filescanner input into bufferscanner. Usage: file2buffer(scanner1()) Where: scanner1() is an interscanner or realscanner. It must be a bufferscanner. ''' name='file2buffer()' def __init__(self, scanner): self.rename([scanner]) self.scanner = scanner def scanfile(self, files, dir='', args={}): level = 0.0 for fn in files: f = safe.open(fn, "rb") self.scanner.prescan() self.scanner.filename = fn level, vir, ret = self.scanner.scanbuffer(f.read(), args) self.scanner.postscan(level, vir, ret) f.close() if vir: return level, vir, ret return level, b'', [] class f2b(file2buffer): ''' An alias for file2buffer() interscanner. ''' name='f2b()' class buffer2file(interscanner): ''' Buffer to file converter. You can use it to save buffers into files. Usage: buffer2file(scanner1()) Where: scanner1() is a interscanner or realscanner. It must be a filescanner. ''' name='buffer2file()' is_multibuffer=1 def __init__(self, scanner, addheader=0): self.rename([scanner]) self.scanner = scanner self.addheader = addheader self.filename = '' self.mboxdir = '' self.root_mboxdir = '' self.filenames = [] self.counter = 1 def destroy(self): if self.root_mboxdir: if debug.debug_level<10: debug.echo(5, "b2f(): destroy():", self.root_mboxdir) try: shutil.rmtree(self.root_mboxdir) except OSError: debug.echo(1, "b2f(): WARNING: temp rmovation failed in", self.root_mboxdir) pass else: debug.echo(4, "b2f(): unfinished destroy:", self.mboxdir) self.scanner.destroy() self.filenames = [] self.mboxdir = '' self.root_mboxdir = '' def creatembox(self, f, data, addheader=0): try: if addheader: f.write(fromhdr()) f.write(data) f.close() except os.error as err: (ec,es) = err.args debug.echo(0, 'creatembox: ERROR: Open: ', es) raise except IOError as err: (ec,es) = err.args debug.echo(0, 'creatembox: ERROR: IO: ', es) raise def addbuffer(self, buffer): if buffer: if self.mboxdir=='': self.mbox = mktemp('/tmp/b2f-', '.mbd', 'd', 0o700) self.mboxdir = self.mbox.name self.root_mboxdir = self.mbox.root_name if self.filename=='': self.filename = 'part-%06d' % (self.counter) self.counter = self.counter+1 mboxfile = mktemp(self.mboxdir+'/', '-'+self.filename, 'wb', 0o600) self.creatembox(mboxfile.f, buffer, self.addheader) self.filenames.append(mboxfile.name) def scanbuffer(self, buffer, args={}): level, vir, ret = 0.0, b'', [] self.addbuffer(buffer) try: self.scanner.prescan() level, vir, ret = self.scanner.scanfile(self.filenames, self.mboxdir, args) self.scanner.postscan(level, vir, ret) except: raise return level, vir, ret def scanmultibuffer(self,buffers,args={}): level, vir, ret = 0.0, b'', [] if type(buffers)==type({}): for filename, buf in list(buffers.items()): self.filename = filename self.addbuffer(buf) else: for buf in buffers: self.addbuffer(buf) try: self.scanner.prescan() level, vir, ret = self.scanner.scanfile(self.filenames, self.mboxdir, args) self.scanner.postscan(level, vir, ret) except: raise return level, vir, ret class b2f(buffer2file): ''' An alias for buffer2file() interscanner. ''' name='b2f()' class buffer2mbox(buffer2file): ''' An alias for buffer2file(...,1) interscanner. ''' name='buffer2mbox()' def __init__(self, scanner): buffer2file.__init__(self, scanner, 1)