''' dspam module, version 0.1 (c) 2003-2016 Jan ONDREJ (SAL) This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. ''' from __future__ import absolute_import from avlib import * import socket,re import pwd,grp,os dspam_doc=\ ''' This realscanner uses dspam library. For detailed description see DSPAM documentation. Usage: dspam(home='/var/dspam', user=None, group=None, classify=None, source=None, train=None, flags=pydspam.DSF_SIGNATURE) Where: home is dspam home dir ('/var/dspam') user is dspam user (user running sagator's process) group is dspam group (group running sagator's process) classify is how to classify this message Available options: DSR_ISSPAM, DSR_ISINNOCENT Default: Don't classify. source is message source: Available options: DSS_ERROR, DSS_CORPUS, DSS_INOCULATION Default: No source defined. train is training mode Available options: DST_TEFT, DST_TOE, DST_TUM, DST_NOTRAIN Default: DST_TEFT flags are dspam flags Available options: DSF_SIGNATURE, DSF_BIAS, DSF_NOISE, DSF_WHITELIST, DSF_MERGED Default: DSF_SIGNATURE Examples: basic usage: dspam() training emails badly assigned as spams: drop('',dspam(classify=DSR_ISSPAM,source=DSS_ERROR,train=DST_TOE)) training emails badly assinged as inoocent: drop('',dspam(classify=DSR_ISINNOCENT,source=DSS_ERROR,train=DST_TOE)) ''' dspam_classify_doc=\ ''' Realscanner to classify emails for DSPAM. This scanner can be used to classify messages as spams/hams. For parameters see dspam() scanner documentation. Example: configure SCANNERS=[ dspam_classify(source=DSS_CORPUS,flags=0) ] and you can run sgscan to learn messages from a box: Training ham (clean messages): sgscan --dspam-classify=innocent MAILBOX Training spam: sgscan --dspam-classify=spam MAILBOX ''' try: from . import pydspam from .pydspam import const as pydspam_const class dspam(ascanner): name = 'dspam()' OP = pydspam.DSM_PROCESS CLASSIFICATION = pydspam.DSR_NONE SOURCE = pydspam.DSS_NONE TRAINING_MODE = pydspam.DST_TEFT def __init__(self, home='/var/dspam', user=None, group=None, classify=None, source=None, train=None, flags=pydspam.DSF_NOISE|pydspam.DSF_SIGNATURE): self.FLAGS = flags if not user: user=pwd.getpwuid(os.getuid())[0] if not group: group=grp.getgrgid(os.getgid())[0] self.DSPAM=pydspam.dspam(home,op=self.OP,flags=self.FLAGS,user=user,group=group) if classify!=None: self.CLASSIFICATION=classify if self.CLASSIFICATION!=pydspam.DSR_NONE: self.DSPAM.classification(self.CLASSIFICATION) if source!=None: self.SOURCE=source if self.SOURCE!=pydspam.DSS_NONE: self.DSPAM.source(self.SOURCE) if train!=None: self.TRAINING_MODE=train self.DSPAM.training_mode(train) def param(self,key,value=None): if key=='--dspam-source': try: self.SOURCE={ 'corpus': pydspam.DSS_CORPUS, 'error': pydspam.DSS_ERROR, 'innoculation': pydspam.DSS_INOCULATION }[value] self.DSPAM.source(self.SOURCE) except KeyError: return 'Unknown source: %s' % value elif key=='--dspam-classify': try: self.CLASSIFICATION={ 'spam': pydspam.DSR_ISSPAM, 'innocent': pydspam.DSR_ISINNOCENT }[value] self.DSPAM.classification(self.CLASSIFICATION) except KeyError: return 'Unknown classification: %s' % value elif key=='--dspam-training': try: self.TRAINING_MODE={ 'teft': pydspam.DST_TEFT, 'toe': pydspam.DST_TOE, 'tum': pydspam.DST_TUM, 'notrain': pydspam.DST_NOTRAIN }[value] self.DSPAM.training_mode(self.TRAINING_MODE) except KeyError: return 'Unknown training mode: %s' % value else: return 'Unknown parameter: %s' % key return '' def help(self): return {'dspam-source=':'[corpus|error|innoculation]', 'dspam-classify=':'[spam|innocent]', 'dspam-training=':'[teft|toe|tum|notrain]'} def scanbuffer(self, buffer, args={}): debug.echo(4,self.name+": settings: DST=%s, DSS=%s, DSR=%s" \ % (self.TRAINING_MODE, self.SOURCE, self.CLASSIFICATION)) sig='' if self.SOURCE in [pydspam.DSS_ERROR,pydspam.DSS_INOCULATION]: # try to get signature from meesage header reg_sig=re.compile('^X-DSPAM-Signature: *([a-z0-9]+)\r?$',re.I|re.M).search(buffer) if reg_sig: sig=reg_sig.group(1) self.DSPAM.get_signature(sig) else: debug.echo(1,self.name+": ERROR: Can't get signature ID!") # process through dspam r,p,c=self.DSPAM.process(buffer) # save signature, if it is made if self.CLASSIFICATION==pydspam.DSR_NONE: sig=self.DSPAM.save_signature() # simulate dspam output ret=['result='+str(r),'probability='+str(p),'confidence='+str(c), "sig='%s'" % str(sig)] debug.echo(4,self.name+": "+', '.join(ret)) if self.CLASSIFICATION==pydspam.DSR_ISINNOCENT: return 0.0, b'', ['innocence classified'] if self.CLASSIFICATION==pydspam.DSR_ISSPAM: return 1.0, b'SPAM', ['spam classified'] mail.addheader('X-DSPAM-Confidence', str(c)) mail.addheader('X-DSPAM-Probability' ,str(p)) mail.addheader('X-DSPAM-Signature', sig) if r==pydspam.DSR_ISSPAM: mail.addheader('X-DSPAM-Result','Spam') return 1.0+p, b'SPAM', ret elif r==pydspam.DSR_ISINNOCENT: mail.addheader('X-DSPAM-Result','Innocent') return p, b'', ret debug.echo(3,self.name+": WARNING: Unknown return from dspam!") return 0.0, b'', [] for ds_const in dir(pydspam_const): ds_value = getattr(pydspam_const, ds_const) if type(ds_value)==type(1): setattr(dspam, ds_const, ds_value) class dspam_classify(dspam): name='dspam_classify()' CLASSIFICATION=pydspam.DSR_ISSPAM SOURCE=pydspam.DSS_CORPUS except ImportError as import_es: class dspam(ascanner): name='dspam()' errorstring="Importing of pydspammodule failed!" def __init__(self,*arg1,**arg2): pass def scanbuffer(self, buffer, args={}): debug.echo(1,self.name+": scanbuffer(): "+self.errorstring," [",import_es,"]") raise ScannerError(self.errorstring) def scanfile(self,files,dirname='',args={}): debug.echo(1,self.name+": scanbuffer(): "+self.errorstring," [",import_es,"]") raise ScannerError(self.errorstring) class dspam_classify(dspam): name='dspam_classify()' dspam.__doc__ = dspam_doc del dspam_doc dspam_classify.__doc__ = dspam_classify_doc del dspam_classify_doc