''' Action interscanners for sagator (c) 2003-2021 Jan ONDREJ (SAL) This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. ''' from __future__ import absolute_import from avlib import * import re,os,time from .match import match_any __all__ = ['quarantine', 'drop', 'deliver', 'deliver_to', 'custom_action', 'rename', 'time_limit'] ##################################################################### ### QUARANTINE class class quarantine(match_any): ''' Quarantine into a directory. This scanner can be used to quarantine an virus/spam. qdir is a directory, in which viruses are stored. Following regular expression can be used to disable quarantining of some viruses or spams. Usage: quarantine('qdir', 'dont quarantine regular expression', scanners) Where: 'qdir' is a string, which defines a qatantine directory, directory, in which viruses/spams are stored. This string can contain %X substrings. These strings are represented as defined in strftime. For more information see "man strftime". If this directory not exitst, it's created. 'dont quarantine regular expression' is a regular expression, which defines virus names, which can't be quarantined Example: quarantine('/var/spool/sagator/quarantine/%Y%m', '', a_scanner()) - every month will be stored in separate directory ''' name='quarantine()' def __init__(self, qdir, dontq, *scanners): self.QDIR = qdir self.DONTQ = tobytes(dontq) match_any.__init__(self,scanners) def scanbuffer(self, buffer, args={}): level, detected, virlist = match_any.scanbuffer(self, buffer, args) if globals.scan_only: return level, detected, virlist if is_infected(level, detected): if (self.QDIR!='') & (self.DONTQ!=b'') \ & (re.search(self.DONTQ, detected, re.IGNORECASE)!=None): qfname='Not quarantined yet.' else: fext = time.strftime("-%Y%m%d-%H%M%S-", time.localtime(time.time())) fpath = time.strftime(self.QDIR) # try to make this directory try: os.makedirs(safe.fn(fpath)) except OSError as err: (ec,es) = err.args if ec!=17: # file exists raise num=0 while 1: qfname = fpath+'/qm'+fext+str(num) num += 1 try: fd = safe.osopen(qfname, os.O_WRONLY|os.O_CREAT|os.O_EXCL, 0o660) f = os.fdopen(fd, 'wb', BUFSIZE) f.write(mail.noop_connect_from()) f.write(mail.comm) f.write(mail.xheader) # added headers f.write(mail.data) # original data, not modified f.write(b".\r\nQUIT\r\n") f.close() debug.echo(3, "Quarantined as: "+qfname) break except OSError as err: (ec,es) = err.args if ec==17: # already exists continue else: debug.echo(2, "ERROR: quarantine(): ", es) #return level, detected, virlist raise ScannerError("qarantine(): OSError: "+es) globals.QFNAME = qfname return level, detected, virlist ##################################################################### ### drop and deliver classes class drop(match_any): ''' Interscanner to drop viruses/spams. By default they are rejected. This scanner can be used to drop some viruses, like Klez. Klez sends fake sender address and this virus can't be sent back to sender, because sender is faked. Usage: drop(drop_pattern, scanners) This scanner does nothing else, like sets the DROP flag. Email is passed to parent scanners and is dropped in sagator. Two constants drop.DEFAULT and drop.DEFAULT_EXT (extensible version) can be used to drop most of worms, trojans and phishings, which fakes sender email addresses. Examples: drop(drop.DEFAULT, scanners) drop(drop.DEFAULT_EXT % 'SPAM|OtherVirus', scanners) drop('.', scanners) # drop every virus/spam ''' name='drop()' DEFAULT='('+'|'.join([ 'worm', # generic worms 'email\\.', # generic email sent viruses 'trojan', # generic trojan 'phishing', # generic phishing 'junk', # generic junk '@mm', # mass-mailing worm 'Sanesecurity\\.', # all sanesecurity 'Suspect\\.', # some suspected viruses # some more 'Sobig|Klez|Bugbear|Gibe|Swen|Mimail|Sober|Bagle', 'Mydoom|Novarg|Lirva|W32.Sality|Bredozip' ])+')' DEFAULT_EXT=DEFAULT.replace(')', '|%s)') def __init__(self, drop_pattern, *scanners): self.ONLY_IF = tobytes(drop_pattern) self.DROP_LEVEL = 9999.9 match_any.__init__(self, scanners) def scanbuffer(self, buffer, args={}): level, detected, virlist = match_any.scanbuffer(self, buffer, args) if re.search(self.ONLY_IF, detected, re.IGNORECASE): globals.ACTION = S_DROP if level>=self.DROP_LEVEL: globals.ACTION = S_DROP return level, detected, virlist class deliver(match_any): ''' A scanner to force sending some viruses/spams to original recipients. This scanner can be used to send some viruses/spams. You can define by regular expression, what virus name or spam can be sent. By default all non empty virus names will be matched. Usage: deliver(deliver_pattern, scanners) Where: deliver_pattern is an regular expression, which defines virus name to really deliver, other emails will not be delivered. For compatibility issues deliver_pattern is used as fist scanner, if it's type is not string. Also suboption .onlyif(pattern) can be used to define deliver_pattern. ''' name='deliver()' def __init__(self, deliver_pattern, *scanners): if type(deliver_pattern)==type(""): self.ONLY_IF = tobytes(deliver_pattern) elif type(deliver_pattern)==type(b""): self.ONLY_IF = deliver_pattern else: self.ONLY_IF = b'.' scanners=[deliver_pattern]+list(scanners) match_any.__init__(self,scanners) def onlyif(self, pattern): self.ONLY_IF = pattern return self def scanbuffer(self, buffer, args={}): level, detected, virlist = match_any.scanbuffer(self, buffer, args) if re.search(self.ONLY_IF, detected, re.IGNORECASE): globals.ACTION = S_FORCE_SEND return level, detected, virlist class deliver_to(match_any): ''' Interscanner to send emails to admins. This scanner can be used to send viruses/spams to an administrator. Usage: deliver_to(recipients,scanners) or: deliver_to(...).onlyif('string') Where: recipients is an array of recipients 'string' is an string, which defines regular expression. If this expression is found in virus name, mail is delivered, otherwise don't. ''' name='deliver_to()' def __init__(self, recipients, *scanners): self.RECIPIENTS = recipients self.ONLY_IF = b'.' match_any.__init__(self,scanners) def onlyif(self, pattern=b'.'): self.ONLY_IF = pattern return self def scanbuffer(self, buffer, args={}): level, detected, virlist = match_any.scanbuffer(self, buffer, args) if globals.scan_only: return level, detected, virlist if re.search(self.ONLY_IF, detected, re.IGNORECASE): if self.RECIPIENTS: try: debug.echo(4, "deliver_to(): Trying to deliver to: ", str(self.RECIPIENTS)) smtpc().sendmail(mail.sender,self.RECIPIENTS, mail.xheader+mail.data[:mail.bodypos]+mail.data[mail.bodypos:]) except: debug.echo(1, "deliver_to(): An error occured when sending reports") debug.traceback(4, "deliver_to():") raise ScannerError("deliver_to()") return level, detected, virlist class custom_action(match_any): ''' Interscanner to set custom action. This scanner can be used to set custom action (reject, drop, deliver) by manually specifing this as parameters. You can use these variables in format string: %(VIRNAME)s name of detected virus, empty if CLEAN %(QNAME)s quarantine file name (quarantine must be inside custom_action scanner to use this) %(SCANNER_NAME)s scanner which reported this virus %(VERSION)s sagator's version Usage: custom_action(pattern, reply_code, reply_message, scanners) Where: pattern is an regular expression, which defines virus/spam to set this action. It is checked over virus name. reply_code is an integer or string, which defines message reply code (250 for success, 451 for temporary failure, 550 for reject). reply_message is an string, which defines reply message string. You can use variables defined above here. Bugs: Does not working with milter() service. Example: custom_action(".", 550, "Content rejected - %(VIRNAME)s", scanners... ) New in version 0.9.0. ''' name='custom_action()' def __init__(self, pattern, reply_code, reply_message, *scanners): self.ONLY_IF = tobytes(pattern) self.REPLY_CODE = reply_code self.REPLY_MESSAGE = reply_message match_any.__init__(self, scanners) def scanbuffer(self, buffer, args={}): level, detected, virlist = match_any.scanbuffer(self, buffer, args) if re.search(self.ONLY_IF, detected, re.IGNORECASE): globals.ACTION = S_CUSTOM globals.REPLY = ( self.REPLY_CODE, replace_tmpl(self.REPLY_MESSAGE, { 'VIRNAME': tostr(detected), 'QNAME': globals.QFNAME, 'SCANNER_NAME': globals.found_by.name, 'VERSION': SG_VER_REL }) ) return level, detected, virlist class rename(match_any): ''' Interscanner to rewrite virus name returned by an scanner. Usage: rename(newname, scanners) or: rename(newname, scanners).multiplier(MP) Where: newname is a string, which defines new virus name. These string will be replaced: %(VIRNAME)s old name %(LEVEL)s detected level as float %(STARS)s detected level as stars MP is an multiplier for LEVEL and STARS. Returned level is multiplied by this constant. Renaming is skipped, if level<1.0. ''' name='rename()'; def __init__(self, newname, *scanners): self.NEWNAME = newname self.MULTIPLIER = 1.0 match_any.__init__(self, scanners) def scanbuffer(self, buffer, args={}): level, detected, virlist=match_any.scanbuffer(self, buffer, args) if not is_infected(level, detected): return level, detected, virlist if type(self.NEWNAME)==type(''): repl_vars={ 'VNAME': tostr(detected), 'VIRNAME': tostr(detected), 'LEVEL': str(level*self.MULTIPLIER), 'STARS': '*'*int(level*self.MULTIPLIER) } return level, tobytes(replace_tmpl(self.NEWNAME, repl_vars)), virlist else: if len(self.NEWNAME)>2: s = re.compile( tobytes(self.NEWNAME[0]), self.NEWNAME[2] ).sub(tobytes(self.NEWNAME[1]), detected) else: s = re.compile( tobytes(self.NEWNAME[0]), re.I ).sub(tobytes(self.NEWNAME[1]), detected) return level, s, virlist def multiplier(self, MP=1.0): self.MULTIPLIER = MP return self class time_limit(match_any): ''' Interscanner to limit scanner execution time. Usage: time_limit(seconds, scanners) Where: seconds is an float, which defines maximum number of seconds. After this limit scanner returns a virus and will be dropped or rejected (maybe also quarantined) according to configuration. Example: time_limit(300, parsemail(b2f(libclam())) ) New in version 0.7.0. ''' name='time_limit()'; def __init__(self, seconds, *scanners): self.SECONDS = seconds match_any.__init__(self, scanners) def scanbuffer(self, buffer, args={}): time0 = time.time() level, detected, virlist = match_any.scanbuffer(self, buffer, args) time1 = time.time() if time1-time0>self.SECONDS: globals.found_by = self ret=[b'Time limit exceeded: %5.3f/%5.3f seconds.' \ % (time1-time0, self.SECONDS)] debug.echo(2, "time_limit(): %5.3f/%5.3f" % (time1-time0, self.SECONDS)) return 1.0, b'TIME_LIMIT_EXCEEDED', ret return level, detected, virlist