''' spamassassin module (c) 2003-2018 Jan ONDREJ (SAL) This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. ''' from avlib import * __all__ = ['spamassassind'] class spamassassind(ascanner): ''' SpamAssassin daemon scanner. This scanner scans for spams. If a spam is returned, a string SPAM is returned as virus name. Sagator's level is counted as: spamassassin_score / spamassassin_required_hits, for example for header "X-Spam-Status: Yes, score=12.3 required=5.0": 12.3 / 5.0 = 2.46 This scanner uses a spamassassind daemon. If you are not familiar with it, you also can use spamassassin binary. Look at spamassassin scanner. Usage: spamassassind(['localhost',783], reqspamlevel=-1, sa_max_file=500000, filter=False, mydestination=None, virtual_users=None, sa_user='vscan') Where: ['localhost',783] are the host and port of spamd. if reqspamlevel is less than spam hits, spam status is returned, if it is -1 (default), spam level from spamassassin config is used sa_max_file is a number, which defines maximum file size in bytes, which can be tested. If a mail (with header) is larger, it is not checked (by default 500000). filter is an boolean which can be used to process whole message by spamassassin. New message (generated by spamassassin) will be stored instead of original. It's default is False. mydestination is an regular expression, which defines local domains. If it is set and only one recipient is specified and recipients domain matches against this regular expression, then username part will be sent into spamd. This parameter is new in 0.7.0. virtual_users is an regular expression, which defines virtual users. These users can be used to configure virtual user settings for spamassassin. Only for emails with one recipient will be applied. sa_user is an string, which defines username sent to spamd. By default it's vscan user. This parameter is new in sagator-1.2.0. ''' is_spamscan = 1 name = 'SpamAssassinD()' reg_x_spam = re.compile(b'^(X-Spam-.*?)\r?\n[^ \t]',re.M|re.S).search reg_eoh = re.compile(b'^(.*?\r?\n\r?\n)',re.M|re.S).search reg_status = re.compile( b'^X-Spam-Status: (Yes|No), (score|hits)=([-0-9.]*) (required)=([-0-9.]*) ').search reg_hdr_body = re.compile(b'^([^:]*):[ \t]*(.*)',re.M|re.S).search socket_timeout = 5*60 def __init__(self, arg = ('localhost', 783), reqspamlevel = -1, sa_max_file = 500000, filter = False, mydestination = None, virtual_users = None, sa_user = 'vscan'): self.arg = arg self.REQSPAMLEVEL = reqspamlevel self.SA_MAX_FILE = sa_max_file self.FILTER = filter if not mydestination: self.MYDESTINATION = None elif type(mydestination)==type(''): self.MYDESTINATION = re.compile(mydestination,re.IGNORECASE).search self.VIRTUAL_USERS = virtual_users self.sa_user = sa_user def rcpt_signature(self,rcpt): sa_user = self.sa_user if self.MYDESTINATION: try: sau,domain = rcpt.rsplit('@', 1) if self.MYDESTINATION(domain): sa_user = sau # can't check real user existence in chroot except: pass elif self.VIRTUAL_USERS: sa_user = rcpt return '%s%s%s' % (self.name[:-1], sa_user, self.name[-1]) def scanbuffer(self, buffer, args={}): if len(buffer)>self.SA_MAX_FILE: return 0.0, b'', ['File too long to test it. Size: '+str(len(buffer))] # multiuser for higher performance if type(self.sa_user)==type([]): sa_user = self.sa_user[globals.fork_id] debug.echo(4, "%s: fork_id=%d, sa_user=%s" \ % (self.name, globals.fork_id, sa_user)) else: sa_user = self.sa_user # check if virtual users are set if self.MYDESTINATION and (len(mail.recip)==1): try: sau, domain = mail.recip[0].rsplit('@', 1) if self.MYDESTINATION(domain): sa_user = sau # can't check real user existence in chroot except: pass elif self.VIRTUAL_USERS and (len(mail.recip)==1): sa_user = mail.recip[0] debug.echo(4, '%s: User: %s' % (self.name,sa_user)) # make connection to spamd and send commands to process this message addrinfo = socket.getaddrinfo(self.arg[0], self.arg[1])[0] s = socket.socket(addrinfo[0], socket.SOCK_STREAM) socket_settimeout(s, self.socket_timeout) s.connect(addrinfo[4]) f = s.makefile('rwb', BUFSIZE) s.sendall( b"PROCESS SPAMC/1.2\r\n" + (b"Content-length: %d\r\n" % len(buffer)) + (b"User: %s\r\n\r\n" % sa_user.encode()) ) s.sendall(buffer) s.shutdown(socket.SHUT_WR) # check reply output1 = f.readline().rstrip() debug.echo(4, "Spamd output: [%s] %s" % (len(buffer), output1.decode())) # spamd returned any error? if not re.search(b'^SPAMD/[0-9.]* 0 ', output1): raise ScannerError('SpamAssassind: %s' % tostr(output1)) # spamd returned EX_OK content_length = 0 while True: output2 = f.readline() if not output2.strip(): # end of spamd header? break reg1 = re.search(b'^Content-length: ([0-9]*)$',output2.rstrip()) if reg1: content_length = int(reg1.group(1)) else: debug.echo(0, "ERROR: spamd(): unknown header: %s" % output2) # read reported message and close connection output = f.read(content_length) try: s.shutdown(socket.SHUT_RDWR) except socket.error: pass s.close() f.close() # process returned email's header try: header = self.reg_eoh(output).group(1) except: header = output pos = 0 ret = [] spam_yn, spam_score, spam_req = 'No', 0.0, 1000.0 while pos=req_level: detected = b"SPAM" else: detected = b'' debug.echo(3,"Spamd status: [%s], score=%f/%f" \ % (tostr(detected), spam_score, spam_req)) # store spamassassin output if FILTER flag is set to a string if self.FILTER: mail.data = output mail.findbody() return float(spam_score/req_level), detected, ret