''' email parser module, version 0.7.0 (c) 2003-2024 Jan ONDREJ (SAL) , A'rpi Some parts are based on python's email package written by: Ben Gertzfield Barry Warsaw This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. ''' from avlib import * from .match import match_any import sys, os, re import mimetypes import binascii, base64, quopri __all__ = ['parsemail', 'attach_name'] def hdr_parse(s, pmode=1): params = [] qflag = iflag = icnt = cflag = 0 comment = key = value = b"" keymode = 1 lastnonws = 0 wsflag = 1 # "for c in s" does not work in python3, returns integers for c in [s[i:i+1] for i in range(len(s))]: if not qflag: # process control chars, by priority: if c==b'\\': qflag = 1 continue; if c==b'"': iflag = not iflag icnt += 1 wsflag = 0 continue; if not iflag: if wsflag: # skip initial whitespaces, unless they're quoted if c in (b'\t', b' '): continue wsflag = 0 if c==b'(': cflag += 1 continue if c==b')': cflag -= 1 continue if cflag==0 and pmode: if c==b'=' and keymode: keymode = 0 lastnonws = 0 wsflag = 1 continue if c==b';': params.append((key.strip().lower(), value[:lastnonws], comment.strip())) comment = b"" key = b"" value = b"" keymode = 1 icnt = 0 continue if cflag>0: comment += c else: if keymode: key += c else: value += c if iflag or qflag or not c in (b'\t', b' '): lastnonws = len(value) qflag = 0 if not pmode: return (key.strip(), comment.strip()) params.append((key.strip().lower(), value[:lastnonws], comment.strip())) return params # inspired by Header.py::__unicode__() ecre = re.compile(br''' =\? # literal =? (?P[^?]*?) # non-greedy up to the next ? is the charset \? # literal ? (?P[qb]) # either a "q" or a "b", case insensitive \? # literal ? (?P.*?) # non-greedy up to the next ?= is the encoded string \?= # literal ?= ''', re.VERBOSE | re.IGNORECASE) def decode_header(header): """ Decode a message header value without converting charset. Returns a list of (decoded_string, charset) pairs containing each of the decoded parts of the header. Charset is None for non-encoded parts of the header, otherwise a lower-case string containing the name of the character set specified in the encoded string. An email.Errors.HeaderParseError may be raised when certain decoding error occurs (e.g. a base64 decoding exception). """ # If no encoding, just return the header header = bytes(header) if not ecre.search(header): return [(header, None)] decoded = [] dec = b'' for line in header.splitlines(): # This line might not have an encoding in it if not ecre.search(line): decoded.append((line, None)) continue parts = ecre.split(line) while parts: unenc = parts.pop(0).strip() if unenc: # Should we continue a long line? if decoded and decoded[-1][1] is None: decoded[-1] = (decoded[-1][0] + SPACE + unenc, None) else: decoded.append((unenc, None)) if parts: charset, encoding = [s.lower() for s in parts[0:2]] encoded = parts[2] dec = None if encoding == b'q': dec = quopri.decodestring(encoded) elif encoding == b'b': try: dec = b64decode(encoded) except binascii.Error: raise HeaderParseError except ValueError: raise HeaderParseError if dec is None: dec = encoded if decoded and decoded[-1][1] == charset: decoded[-1] = (decoded[-1][0] + dec, decoded[-1][1]) else: decoded.append((dec, charset)) del parts[0:3] return decoded def str_recode(s, encoding="latin-1"): if sys.version_info[0]<=2: return unicode(s, encoding) return str(s, encoding) def hdr_decode(h): if not h: return "" uchunks = [] last = None for s, enc in decode_header(h): if last not in (None, 'us-ascii'): if enc in (None, 'us-ascii'): uchunks.append(' ') else: if enc not in (None, 'us-ascii'): uchunks.append(' ') last = enc try: uchunks.append(str_recode(s, enc or "latin-1")) except: uchunks.append(str_recode(s, "latin-1")) return ''.join(uchunks) def plain(s): return s class decode_email: def debug(self, level, str): if debug.debug_level<5: self.max_errors -= 1 if self.max_errors==0: debug.echo(level, "parsemail(): TOO MANY ERRORS !!!") elif (self.max_errors>0) or (self.max_errors%100==0): debug.echo(level, str) def scan_part(self, filename=None, args={}): buffer = self.part.getvalue() if not buffer: return if filename: self.filename = normalize_filename(filename) self.files.append(self.filename) else: self.filename = 'unknown.bin' for scanner in self.scanners: scanner.filename = self.filename debug.echo(5, "parsemail(): buffer len=", len(buffer), ", filename=", self.filename) debug.echo(9, "decoded data part: " +str(tostr_list([buffer[0:40]])), "...") for scanner in self.scanners: if scanner.is_multibuffer: scanner.addbuffer(buffer) else: scanner.prescan() l, v, r = scanner.scanbuffer(buffer, self.args) scanner.postscan(l, v, r) if not self.vir: self.vir = v self.ret += r self.level += l # delete old part and create a new BytesIO self.part = BytesIO() def __init__(self, data, scanners, args): ''' Decode whole email and run a scanners on decoded parts. ''' self.level, self.vir, self.ret, self.files = 0.0, b'', [], [] self.scanners = scanners self.args = args in_header = 1 bounds = {} header = [] self.lineno = 0 preline = b'' self.part = BytesIO() file_name = '' dio = BytesIO(data) self.max_errors = 20 while 1: line = dio.readline() if not line: break line = line.rstrip(b"\r\n") self.lineno += 1 while line[0:2]==b"> ": line = line[2:] if in_header: # if line is (really) empty => EOH if not line: self.scan_part(file_name) file_type, file_name, coding = None, None, None try: p_ctype = p_cdisp = p_ctenc = None for hs in header: fpos = hs.find(b':') if fpos<=0: # not a field, check if it's a boundary! b_ok = 0 for b in list(bounds.keys()): if hs.find(b"--"+b)==0: debug.echo(8, "HDR: found a boundary inside a header!") b_ok = 1 break if not b_ok: self.debug(1, "HDR: Invalid header field: '"+hs+"'") continue if len(hs)>998: self.debug(1, "HDR: Header line too long! (%d chars)" % (len(hs))) field = hs[0:fpos].strip().lower() fval = hs[fpos+1:] # parse content-type: header if field==b"content-type": if p_ctype: self.debug(1, "HDR: MIME error: content-type redefined!") p_ctype = hdr_parse(fval) file_type = p_ctype[0][0] for param in p_ctype: if param[0]==b"name": if not file_name: file_name = hdr_decode(param[1]) elif param[0]==b"boundary": # not sure if we have to strip() it: bounds[param[1].strip()] = 1 file_ext = mimetypes.guess_extension(file_type) if file_ext: file_name = "unknown"+file_ext continue # parse content-disposition: header if field==b"content-disposition": if p_cdisp: self.debug(1, "HDR: MIME error: content-disposition redefined!") p_cdisp = hdr_parse(fval) for param in p_cdisp: if param[0] in [b"filename"]: file_name = hdr_decode(param[1]) continue if field==b"content-transfer-encoding": if p_ctenc: self.debug(1, "HDR: MIME error: transfer-encoding redefined!") else: coding = hdr_parse(fval, 0)[0] continue except: self.debug(1, "HDR: Exception while MIME header parsing") if self.max_errors>0: debug.traceback(4, "HDR: ") decoder = plain if coding: coding = coding.strip().lower() if coding==b"quoted-printable": decoder = binascii.a2b_qp elif coding==b"base64": decoder = binascii.a2b_base64 elif coding in (b'x-uuencode', b'uuencode', b'uue', b'x-uue'): decoder = binascii.a2b_uu elif file_type==b"application/mac-binhex40": decoder = binascii.a2b_hqx checkend = 0 in_header = 0 header = [] self.part = BytesIO() # create new part continue else: # not in_header # check for boundaries for b in list(bounds.keys()): if line.find(b"--"+b)>=0: in_header = 1 debug.echo(8, "HDR: boundary found, switching to in_header") continue # handle forwarded/bounced MIME emails lline = line.strip().lower() fpos = lline.find(b':') if fpos>0: field = lline[0:fpos] if field in (b"content-type", b"content-transfer-encoding", b"content-disposition", b"content-id"): in_header = 1 debug.echo(7, "HDR: looks like header, switching to in_header") if lline[0:40]==b"(this file must be converted with binhex": decoder = binascii.a2b_hqx continue if lline[0:6]==b"begin " or lline[0:13]==b"begin-base64 ": try: begin, mode, name = lline.split(b' ', 2) if int(mode, 8)>0 and name: file_name = name.strip() if begin.lower()==b"begin": decoder = binascii.a2b_uu else: decoder = binascii.a2b_base64 self.scan_part(file_name) self.part = BytesIO() # create new part checkend = 1 continue except: pass if in_header: # do the header un-folding: if line[0:1] in [b' ', b'\t']: if header: header[-1] += line else: self.debug(1, "syntax error: bad unfolded header line: '" +tostr(line)+"'") else: header.append(line) else: # decode try: preline += line if preline: try: self.part.write(decoder(preline)) preline = b'' except binascii.Error: # try to decode parts without last 1,2,3 characters try: self.part.write(decoder(preline.rstrip()[:-1])) preline = preline.rstrip()[-1:] except binascii.Error: try: self.part.write(decoder(preline.rstrip()[:-2])) preline = preline.rstrip()[-2:] except binascii.Error: self.part.write(decoder(preline.rstrip()[:-3])) preline = preline.rstrip()[-3:] except: if checkend and line.lower()[0:3]==b"end": self.scan_part(file_name) decoder = None self.part = BytesIO() checkend = 0 preline = b'' continue if line[-2:]==b'=9': # correct end preline = b'' continue self.debug(3, "parsemail(): Exception while attachment decoding, file may be truncated! length=%d, line[%d]=...'%s'" \ % (len(preline), self.lineno, str(tostr_list(preline[-80:])))) debug.traceback(7) self.scan_part(file_name) def scan(self): for scanner in self.scanners: if scanner.is_multibuffer: debug.echo(5, "Scanning as multibuffer") self.level, self.vir, self.ret \ = scanner.scanmultibuffer([], self.args) if is_infected(self.level, self.vir): return self.level, self.vir, self.ret del scanner return self.level, self.vir, self.ret class parsemail(match_any): ''' Email parser interscanner. This scanner parses emails and send multiple of buffers into scanner1(). If an multiscanner is used as another_scanner, optimal method is used (scans all files at once). Usage: parsemail(scanner1(...) [, scanner2(...)] ) Example: parsemail(libclam()) ''' name = 'ParseMail()' def __init__(self, *scanners): #self.name = "ParseMail("+scanner.name+")" match_any.__init__(self, scanners) def scanbuffer(self, buffer, args={}): return decode_email(buffer, self.scanners, args).scan() class attach_name(ascanner): ''' Attachment name scanner. This scanner checks defined name of file in mime attachments. By default known executable extensions will be blocked. This scanner can be used only if it's parent is parsemail(). Usage: attach_name({b'VirName': b'regexp_pattern', ..}, flags=re.I) Where: 'VirName' is a string, which defines virus name 'regexp_pattern' is an regular expression. If this expression is found in attachment's name, VirName is returned as virus name. flags is a number, which defines regular expression flags. By default IGNORECASE is used. Example: attach_name({ 'Executable': '\.(exe|com|vxd|dll|cpl|scr|pif|lnk|bat|vbs|js)$' }) ''' name = 'attach_name()' def __init__(self, wrong_types={b'Executable': br'\.(exe|com|pif|lnk|scr|vbs|cpl|vbs|js)$'}, flags=re.I): # compile regexps if type(wrong_types)==type(b''): self.wrong_regs = {b'BLOCKED_NAME': re.compile(wrong_types, flags)} else: self.wrong_regs = {} for vname, reg in list(wrong_types.items()): self.wrong_regs[tobytes(vname)] = re.compile(tobytes(reg), flags) def scanbuffer(self, buffer, args={}): debug.echo(7, "attach_name(): filename: ", self.filename) for vname, reg in list(self.wrong_regs.items()): if reg.search(tobytes(self.filename)): return iret(1.0, vname, [ self.name+": "+tostr(vname)+' found in '+self.filename+'!\n' ]) return 0.0, b'', []