''' libclamav.so.9 functions, for clamav-0.101+ (c) 2018 Jan ONDREJ (SAL) This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. ''' import sys from ctypes import * from ctypes.util import find_library from . import const from .shared import ClamAVError, DBStat, cl_engine if sys.version_info[0]>2: string_types = (str,) else: string_types = (str, unicode) # cl_init can be called only once, we have to check, if it was already done cl_init_done = False def decode(s): if s: return s.decode() return s or "" class clamav: c = const def __init__(self, so, db_options=c.CL_DB_STDOPT, datadir=None): global cl_init_done self.so = so # set types self.so.cl_retdbdir.restype = c_char_p self.so.cl_retver.restype = c_char_p self.so.cl_strerror.restype = c_char_p self.so.cl_engine_new.restype = POINTER(cl_engine) self.so.cl_engine_free.argtypes = [POINTER(cl_engine)] self.so.cl_debug.restype = None # void # init libclamav if not cl_init_done: self.check_status(self.so.cl_init(const.INIT_DEFAULT)) cl_init_done = True self.signo = c_uint(0) self.engine = None self.dbstat = DBStat(None, None, None, 0) if datadir: datadir = datadir.encode('utf8') self.statinidir(datadir) self.load(db_options, datadir) self.new_engine() def __del__(self): # skip delete, if clamav dynamic library has not been loaded if hasattr(self, 'so'): self.statfree() self.free() def retver(self): ''' Return clamav version string. ''' return decode(self.so.cl_retver()) def free(self): ''' Free allocated structures. ''' if self.engine is not None: self.status = self.so.cl_engine_free(self.engine) # Do not call self.check_status(), CLEAN is undefined on exit. if self.status!=0: # ClamaAVError can be undefined when called on exit, # consider changing to another error. raise ClamAVError(self.strerror()) self.signo.value = 0 self.engine = None self.virnum = 0 def new_engine(self): if self.engine is None: self.engine = self.so.cl_engine_new() if self.engine is None: raise ClamAVError("cl_engine_new() failed!") def load(self, db_options=c.CL_DB_STDOPT, datadir=None): ''' Load virus database. ''' self.db_options = db_options if not datadir: datadir = self.so.cl_retdbdir() self.new_engine() self.status = self.so.cl_load(datadir, self.engine, byref(self.signo), db_options) self.virnum = self.signo.value self.check_status(self.status) if self.signo.value==0: raise ClamAVError('No signatures loaded!') self.check_status(self.so.cl_engine_compile(self.engine)) def check_status(self, status, ret=None): self.status = status if status==self.c.CLEAN or status==self.c.VIRUS: if ret is None: return self.status else: return ret else: raise ClamAVError(self.strerror()) def scandesc(self, desc, filename=None, options=None): ''' Scan file descriptor. ''' scanned = c_ulong(0) virname = c_char_p() if type(options)==int: raise ValueError("Unable to use numeric options for clamav 0.101+!") if options is None: options = self.default_options() self.status = self.so.cl_scandesc( desc, filename, byref(virname), byref(scanned), self.engine, byref(options) ) self.virname = decode(virname.value) return self.check_status(self.status, self.virname) def scanfile(self, filename, options=None): ''' Scan file defined by filename. ''' scanned = c_ulong(0) virname = c_char_p() if type(options)==int: raise ValueError("Unable to use numeric options for clamav 0.101+!") if options is None: options = self.default_options() self.status = self.so.cl_scanfile( filename.encode(), byref(virname), byref(scanned), self.engine, byref(options) ) self.virname = decode(virname.value) return self.check_status(self.status, self.virname) def statinidir(self, datadir=None): self.datadir = datadir if not datadir: datadir = self.so.cl_retdbdir() self.check_status(self.so.cl_statinidir(datadir, byref(self.dbstat))) def statchkdir(self): return self.check_status(self.so.cl_statchkdir(byref(self.dbstat))) def statfree(self): try: self.check_status(self.so.cl_statfree(byref(self.dbstat))) except TypeError: pass # ignore 'NoneType' object is not callable def strerror(self, clerror=None): ''' Return error string for last error, or for error defined by parameter. ''' if clerror==None: clerror = self.status return decode(self.so.cl_strerror(clerror)) def reload(self, datadir=None): ''' Try to reload virus database. Do nothing, if currently loaded database is up to date. ''' if datadir: self.datadir=datadir if self.statchkdir(): self.statfree() self.free() self.statinidir(self.datadir) self.load(self.db_options, self.datadir) return 1 else: return 0 def setlimits(self, args={}): ''' Change limits for CL_ENGINE parameters. ''' for key,value in args.items(): if type(value) in string_types: self.check_status( self.so.cl_engine_set_str( self.engine, eval("self.c.CL_ENGINE_%s" % key.upper()), value ) ) else: self.check_status( self.so.cl_engine_set_num( self.engine, eval("self.c.CL_ENGINE_%s" % key.upper()), value ) ) def default_options(self): opts = self.c.OPTIONS() opts.parse = c_uint32(~0) # parse all opts.general = self.c.CL_SCAN_GENERAL_HEURISTICS return opts