#!/usr/bin/python ''' Sagators statistics collector, distributor and graph maker (c) 2003-2011 Jan ONDREJ (SAL) This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. Examples: python stats.py / python stats.py mrtg Total-count Filtered-count python stats.py rrdtool /var/www/html/sagator ''' __pychecker__ = 'unusednames=_ \ maxreturns=20 maxlines=300 \ no-local no-argsused no-stringiter no-abstract no-isliteral \ no-stdlib no-callinit no-badexcept' import socket, re, os, sys import urllib, time, signal, select, struct, resource import interscan.match from avlib import * __all__ = ['collector', 'statistics', 'status'] COLLECTOR_SERVER = () COLLECTOR_STATFILE = None MYCOLLECTOR = None def uptime(starttime): up = time.time()-starttime mins = int(up/60)%60 hours = int(up/3600)%24 days = int(up/86400) if days>0: return "%d days, %02d:%02d" % (days, hours, mins) else: return "%02d:%02d" % (hours, mins) class collector: pid=0 data={ 'Total-count':0.0, 'Total-bytes':0.0, 'Filtered-count':0.0, 'Filtered-bytes':0.0, 'Clean-count':0.0, 'Clean-bytes':0.0, 'Total-time':0.0 } def __init__(self, bindaddr, statusfile): global MYCOLLECTOR, COLLECTOR_SERVER MYCOLLECTOR = self COLLECTOR_SERVER = bindaddr # startup initialization self.bindaddr = bindaddr self.statusfile = statusfile # create a server if self.bindaddr: self.s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) self.s.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) try: self.s.bind(self.bindaddr) self.s.listen(10) except socket.error,(ec, es): debug.echo(0, "collector(): ERROR: BIND %s (%s)" % (es,self.bindaddr)) sys.exit(1) else: self.s = None def start(self): self.lastsave = time.time() self.pipe_r, self.pipe_w = os.pipe() # pipe for updates self.pid = os.fork() if self.pid==0: # main process # chroot from aglib import dochroot, sigchld dochroot() signal.signal(signal.SIGTERM, self.sigterm) signal.signal(signal.SIGHUP, self.sighup) # reopen log #signal.signal(signal.SIGUSR1, self.read_pipe) # do update signal.signal(signal.SIGCHLD, sigchld) # remove zombie os.close(self.pipe_w) self.data['Start-at'] = self.lastsave self.loadstats() self.data['Restart-at'] = self.lastsave self.data['Hostname'] = socket.gethostname() self.avg1m = average(2) self.mypid = os.getpid() debug.echo(1,"collector(): service started," " waiting for connections ... [%d]" % self.mypid) while 1: try: self.read_pipe() if not self.s: time.sleep(0.1) # no socket defined, continue to read pipe again continue try: socket_settimeout(self.s, 0.1) self.conn,addr=self.s.accept() except socket.timeout: continue except socket.error, eces: if eces[0]==4: # Interrupted system call continue else: raise self.handle_request() try: self.conn.shutdown(2) self.conn.close() except socket.error: pass except (SystemExit, KeyboardInterrupt): raise except Exception, e: debug.echo(1, "collector(): ERROR: ", e) debug.traceback(4, "collector(): ") try: self.conn.shutdown(2) self.conn.close() except: pass # while 1 ... end else: if self.s: self.s.close() os.close(self.pipe_r) return self.pid def handle_request(self): socket_settimeout(self.conn, 8) try: f = self.conn.makefile("rw", 0) req = f.readline() debug.echo(5,"collector(): QUERY: ",req.strip()) proto = 0.0 reg1 = re.search("^GET /([^ ]*) HTTP/(1.[01])",req) if reg1: dirs = reg1.group(1).split("/") proto = float(reg1.group(2)) else: reg1 = re.search("^GET /([^ ]*)$",req.rstrip()) if reg1: proto = 1.0 dirs = reg1.group(1).split("/") if proto>=1.0: args = self.parseargs(dirs[1:]) lines = {} while proto>=1.1: line = f.readline() if (line=="\r\n") or (line=="\n"): break a = line.split(": ") if len(a)>1: lines[a[0]] = a[1].strip() else: lines[a[0]] = a[0].strip() self.conn.sendall("HTTP/1.1 200 OK\r\n") self.conn.sendall("Content-Type: text/plain;" " charset=iso-8859-1\r\n") self.conn.sendall("\r\n"); self.data["Uptime"] = uptime(self.data['Start-at']) self.data["Restart-age"] = uptime(self.data['Restart-at']) self.data['Avg-count-1m'], self.data['Avg-bytes-1m']\ = self.avg1m.get() sstr='' if dirs[0]=="": for key,value in self.data.items(): sstr += key+":\t"+str(value)+"\r\n" self.conn.sendall(sstr) elif dirs[0]=="mrtg": for key in dirs[1:3]+['Uptime','Hostname']: if key in self.data.keys(): try: o="%-20.0f" % self.data[key] sstr += o.strip()+"\r\n" except TypeError: sstr += str(self.data[key])+"\r\n" else: # return 0 for unknown key sstr += "0\r\n" self.conn.sendall(sstr) else: self.message("404 Not Found") else: self.message("501 Method Not Implemented") self.conn.sendall(req) except socket.error,(ec,es): if not ec in [32,104,107]: debug.echo(3, "collector(): socket.error: ", ec, es) debug.traceback(4, "collector(): ") except: debug.traceback(3, "collector(): ") def sighup(self, sn, stack): debug.reopen() def sigterm(self, sn, stack): try: os.close(self.pipe_r) self.savestats() except: pass debug.echo(1,"collector(): Exiting - SIGTERM ...") os._exit(0) def update(self, dirs): d = '/'.join(dirs)+"\n"+' '*1024 try: if select.select([], [self.pipe_w], [], 1)==([], [self.pipe_w], []): os.write(self.pipe_w, d[:1024]) else: debug.echo(1, "collector(): WARNING: Pipe full! Ignoring data.") except OSError,(ec,es): signal.alarm(0) if ec==4: # Interrupted system call debug.echo(3,"collector(): writing to pipe: ",es) except: debug.echo(1,"collector(): unknown error on update") debug.traceback(3,"collector()") def sigalrm(self, sn, stack): # ignore this signal, only do an "interrupted system call" debug.echo(1,"collector(): WARNING: Interrupting read from pipe!" " (pipe empty?)") def read_pipe(self, sn=None, stack=None): while True: try: if select.select([self.pipe_r],[],[], 0.1)==([],[],[]): break signal.signal(signal.SIGALRM, self.sigalrm) signal.alarm(1) sstr=os.read(self.pipe_r,1024) signal.alarm(0) except select.error, (ec, es): if ec==4: # Interrupted system call continue else: raise except OSError, (ec,es): signal.alarm(0) args=self.parseargs(sstr.rstrip(' ').split("/")) for k,v in args.items(): try: self.data[k] += float(v) except KeyError: self.data[k] = float(v) except ValueError: self.data[k] = v except TypeError: debug.echo(0,"collector(): TypeError: ",k,v,type(v)) # update averages try: self.avg1m.update(float(args['Total-count']), float(args['Total-bytes'])) except KeyError: # do not update totals for policy data pass # save stats minutely ctime = time.time() if (ctime-self.lastsave)>60: self.lastsave = ctime self.savestats() def savestats(self): self.data["Uptime"] = uptime(self.data["Start-at"]) self.data["Restart-age"] = uptime(self.data["Restart-at"]) self.data["Avg-count-1m"], self.data["Avg-bytes-1m"]\ = self.avg1m.get() out="" for k,v in sorted(self.data.items()): if type(v)==float: out += "%s:\t%f\n" % (k, v) else: out += "%s:\t%s\n" % (k, v) debug.echo(4, "collector(): Saving stats ...") open(safe.fn(self.statusfile), "w").write(out) def loadstats(self): try: f=open(safe.fn(self.statusfile), 'r') for line in f.readlines(): a=line.split(":\t") try: self.data[a[0]]=float(a[1]) except ValueError: self.data[a[0]]=a[1] except IndexError: # ignore wrong lines from status file pass f.close() except IOError,(ec,es): debug.echo(1,"collector(): loadstat error: ",es) except: debug.echo(1,"collector(): Can't load status file.") debug.traceback(1,"collector()") def message(self, sstr): self.conn.sendall("HTTP/1.1 "+sstr+"\r\n\r\n") def parseargs(self, dirs): args = {} for arg in dirs: a = arg.split("=") k = urllib.unquote_plus(a[0]).rstrip(": \t\r\n") if re.search("^[^: \t\r\n]*$", k): if len(a)>1: args[k]=urllib.unquote_plus(a[1]) else: args[k]="1" else: debug.echo(1,"collector(): Unknown key: ", k) return args class statistics(object): total_time=None def __init__(self): self.t0=time.time() self.user_time0,self.sys_time0=\ resource.getrusage(resource.RUSAGE_SELF)[0:2] def end(self): user_time1,sys_time1=resource.getrusage(resource.RUSAGE_SELF)[0:2] self.total_time = time.time() - self.t0 self.user_time = user_time1 - self.user_time0 self.sys_time = sys_time1 - self.sys_time0 return self.total_time def update(self, bs=0, infected=0, tempfail=0): ''' Update statistics by 'bs' bytes, if infected, then update also filtered bytes,files. ''' if self.total_time==None: self.end() args=['Total-count', 'Total-bytes='+str(bs), 'Total-time='+str(self.total_time), 'User-time='+str(self.user_time), 'System-time='+str(self.sys_time), 'Temp-fail='+str(tempfail)] if infected: args.extend(['Filtered-count','Filtered-bytes='+str(bs)]) else: args.extend(['Clean-count','Clean-bytes='+str(bs)]) try: MYCOLLECTOR.update(args+status.collect) status.collect=[] except: pass def policy_update(self): ''' Update policy statistics. ''' args=['Policy-count'] try: MYCOLLECTOR.update(args+status.collect) status.collect=[] except: pass try: import rrdtool except ImportError: class rrdtool_class: '''rrdtool module simulator''' def cmd(self, cmd, args): os.system("rrdtool %s %s" \ % (cmd, ' '.join(['"'+x+'"' for x in args if x])) ) def create(self, *args): self.cmd("create", args) def update(self, *args): self.cmd("update", args) def graph(self, *args): self.cmd("graph", args) rrdtool=rrdtool_class() class rrd: RRDDATA={ 'Total-bytes': 'totalbs', 'Clean-bytes': 'cleanbs', 'Virus-bytes': 'virusbs', 'Spam-bytes': 'spambs', 'Total-count': 'total', 'Clean-count': 'clean', 'Virus-count': 'virus', 'Spam-count': 'spam', 'Temp-fail': 'fails', 'Total-time': 'time' } DATES=[ [3600*24, 'Day', 1, 600], [3600*24*7, 'Week', 6, 700], [3600*24*31, 'Month', 24, 775], [3600*24*365, 'Year', 288, 797] ] PARAMS=['--imgformat', 'PNG', '--interlaced', '--width', '576'] def __init__(self,path='./'): self.RRDPATH=path self.RRDFILE=os.path.join(path,'sagator.rrd') def create(self): # create an RRD cmd=[self.RRDFILE] for ds in self.RRDDATA.values(): cmd.append('DS:%s:DERIVE:600:0:%d' % (ds,2**30)) for p1 in ['AVERAGE','MAX']: for p2 in self.DATES: cmd.append('RRA:%s:0.5:%d:%d' % (p1,p2[2],p2[3])) rrdtool.create(*cmd) return cmd def stat_query(self, *args): if not COLLECTOR_SERVER: data = open(COLLECTOR_STATFILE, 'r').read() if len(args)>0 and args[0]=='mrtg': out = [] for key in ['Uptime','Hostname']+args[1]: out.append( re.compile('^('+key+':.*)$', re.M).search(data).group(1) ) return '\n'.join(out) else: return data for counter in range(3): try: s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) socket_settimeout(s,30) s.connect(COLLECTOR_SERVER) urlargv='' for arg in args[1]: urlargv+="/"+urllib.quote_plus(arg,"/=") s.sendall("GET /"+args[0]+urlargv+" HTTP/1.1\r\n\r\n") f=s.makefile("r",0) lines=[] for _ in range(512): line=f.readline() if (line=="\r\n") or (line=="\n") or (line==""): break; lines.append(line) lines=[] s.shutdown(2) ret='' for i in range(1024): r=s.recv(2048) if not r: break ret+=r return ret.rstrip() except socket.error, err: debug.echo(3, "collector(): stat_query(%s, %s): socket.error: %s" % (args[0], args[1], err)) time.sleep(0.7) return '' def update(self): try: open(self.RRDFILE) except IOError: self.create() q=self.stat_query('/', []) cmd=[self.RRDFILE, 'N'] for key,value in self.RRDDATA.items(): reg1=re.compile('^%s:[ \t]*([0-9]+)' % key,re.M).search(q) if reg1: cmd[-1]+=':'+reg1.group(1) else: cmd[-1]+=':U' #print "WARNING: Key %s not found!" % key rrdtool.update(*cmd) return self def time(self): return time.strftime("%c") def range(self,delta,r,xparams): rrdtool.graph(*[ '%s/%s-count.png' % (self.RRDPATH, delta) ] + self.PARAMS + xparams + [ '--vertical-label', 'msgs/min', '--units-exponent', '0', '--start', '-%s' % r, 'DEF:total=%s:total:AVERAGE' % self.RRDFILE, 'DEF:clean=%s:clean:AVERAGE' % self.RRDFILE, 'DEF:virus=%s:virus:AVERAGE' % self.RRDFILE, 'DEF:spam=%s:spam:AVERAGE' % self.RRDFILE, 'CDEF:mtotal=total,60,*', 'CDEF:mclean=clean,60,*', 'CDEF:mvirus=virus,60,*', 'CDEF:mspam=spam,60,*', 'CDEF:dtotal=total,UN,0,total,IF,%s,*' % self.STEP, 'CDEF:stotal=PREV,UN,dtotal,PREV,IF,dtotal,+', 'CDEF:dclean=clean,UN,0,clean,IF,%s,*' % self.STEP, 'CDEF:sclean=PREV,UN,dclean,PREV,IF,dclean,+', 'CDEF:dvirus=virus,UN,0,virus,IF,%s,*' % self.STEP, 'CDEF:svirus=PREV,UN,dvirus,PREV,IF,dvirus,+', 'CDEF:dspam=spam,UN,0,spam,IF,%s,*' % self.STEP, 'CDEF:sspam=PREV,UN,dspam,PREV,IF,dspam,+', 'CDEF:rclean=sclean,100,*,stotal,/', 'CDEF:rvirus=svirus,100,*,stotal,/', 'CDEF:rspam=sspam,100,*,stotal,/', 'AREA:mtotal#00a000:Total ', 'GPRINT:stotal:MAX:%8.0lf msgs ', 'GPRINT:mtotal:AVERAGE:avg\\: %5.2lf%s/min ', 'GPRINT:mtotal:MAX:max\\: %5.2lf%s/min\\l', 'LINE2:mclean#c0c000:Clean ', 'GPRINT:sclean:MAX:%8.0lf msgs ', 'GPRINT:rclean:AVERAGE:(%6.2lf %%)\\l', 'LINE2:mvirus#a00000:Virus ', 'GPRINT:svirus:MAX:%8.0lf msgs ', 'GPRINT:rvirus:AVERAGE:(%6.2lf %%)\\l', 'LINE2:mspam#0000a0:Spam ', 'GPRINT:sspam:MAX:%8.0lf msgs ', 'GPRINT:rspam:AVERAGE:(%6.2lf %%)\\l', 'COMMENT:Generated at %s by SAGATOR \\r' % self.time() ]) rrdtool.graph(*[ '%s/%s-bytes.png' % (self.RRDPATH, delta) ] + self.PARAMS + xparams + [ '--vertical-label', 'bytes/min', '--start', '-%s' % r, 'DEF:total=%s:totalbs:AVERAGE' % self.RRDFILE, 'DEF:clean=%s:cleanbs:AVERAGE' % self.RRDFILE, 'DEF:virus=%s:virusbs:AVERAGE' % self.RRDFILE, 'DEF:spam=%s:spambs:AVERAGE' % self.RRDFILE, 'CDEF:mtotal=total,60,*', 'CDEF:mclean=clean,60,*', 'CDEF:mvirus=virus,60,*', 'CDEF:mspam=spam,60,*', 'CDEF:dtotal=total,UN,0,total,IF,%s,*' % self.STEP, 'CDEF:stotal=PREV,UN,dtotal,PREV,IF,dtotal,+', 'CDEF:dclean=clean,UN,0,clean,IF,%s,*' % self.STEP, 'CDEF:sclean=PREV,UN,dclean,PREV,IF,dclean,+', 'CDEF:dvirus=virus,UN,0,virus,IF,%s,*' % self.STEP, 'CDEF:svirus=PREV,UN,dvirus,PREV,IF,dvirus,+', 'CDEF:dspam=spam,UN,0,spam,IF,%s,*' % self.STEP, 'CDEF:sspam=PREV,UN,dspam,PREV,IF,dspam,+', 'CDEF:rclean=sclean,100,*,stotal,/', 'CDEF:rvirus=svirus,100,*,stotal,/', 'CDEF:rspam=sspam,100,*,stotal,/', 'AREA:mtotal#00a000:Total ', 'GPRINT:stotal:MAX:%8.2lf %sB ', 'GPRINT:mtotal:AVERAGE:avg\\: %5.2lf %sB/min ', 'GPRINT:mtotal:MAX:max\\: %5.2lf %sB/min\\l', 'LINE2:mclean#c0c000:Clean ', 'GPRINT:sclean:MAX:%8.2lf %sB ', 'GPRINT:rclean:AVERAGE:(%6.2lf %%)\\l', 'LINE2:mvirus#a00000:Virus ', 'GPRINT:svirus:MAX:%8.2lf %sB ', 'GPRINT:rvirus:AVERAGE:(%6.2lf %%)\\l', 'LINE2:mspam#0000a0:Spam ', 'GPRINT:sspam:MAX:%8.2lf %sB ', 'GPRINT:rspam:AVERAGE:(%6.2lf %%)\\l', 'COMMENT:Generated at %s by SAGATOR \\r' % self.time() ]) rrdtool.graph(*[ '%s/%s-time.png' % (self.RRDPATH, delta) ] + self.PARAMS + xparams + [ '--vertical-label', 'Sagator Load', '--units-exponent', '0', '--start', '-%s' % r, 'DEF:time=%s:time:MAX' % self.RRDFILE, 'AREA:time#00a000:Sagator load', 'GPRINT:time:AVERAGE:avg\\: %5.2lf ', 'GPRINT:time:MAX:max\\: %5.2lf\\l', 'COMMENT:Generated at %s by SAGATOR \\r' % self.time() ]) def graph(self,lazy=['--lazy']): xparams=[] for d in self.DATES: self.STEP=str(300*d[2]) self.range(d[1].lower(),d[0],xparams) xparams=lazy return self class rrd12(rrd): def time(self): return time.strftime("%c").replace(':','\:') class status(interscan.match.match_any): ''' This interscanner can be used to collect some other statistics. Usage: status("String",scanner1(),scanner2(),...) Where: "String" is a string, which defines a prefix for status update This scring can't contain spaces. These string will be replaced: %(VIRNAME)s virus name %(LEVEL)s detected level as float %(STARS)s detected level as stars scanner1(),... are sagator's scanners Preffered usage: SCANNERS=[ status("Virus",virus_scanner1(),...), status("Spam",spam_scanner1(),...) ] This collects Virus-count,Virus-bytes,Spam-count,Spam-bytes in collector. ''' name='status()' collect=[] def __init__(self,s,*scanners): self.status_string=s interscan.match.match_any.__init__(self,scanners) def scanbuffer(self,buffer,args={}): level,detected,ret=interscan.match.match_any.scanbuffer(self,buffer,args) if is_infected(level,detected): repl_vars={ 'VNAME': detected.replace(' ','_').replace(':','_'), 'VIRNAME': detected.replace(' ','_').replace(':','_'), 'LEVEL': str(level), 'STARS': '*'*int(level) } status_string=replace_tmpl(self.status_string,repl_vars) debug.echo(7,'stats(): %s: %d' % (status_string,len(buffer))) status.collect.extend([status_string+"-count", status_string+"-bytes="+str(len(buffer))]) return level,detected,ret if __name__ == '__main__': if len(sys.argv)>1: # get host and port from collector service from etc import SRV, CHROOT safe.ROOT_PATH=CHROOT for service in SRV: if service.name=='collector()': COLLECTOR_SERVER=service.BINDTO COLLECTOR_STATFILE=safe.fn(service.STATFILE) if sys.argv[1]=='rrdtool10': r=rrd(sys.argv[2]) try: r.update() except Exception, e: debug.traceback(4) r.graph() elif sys.argv[1] in ('rrdtool','rrdtool12'): r=rrd12(sys.argv[2]) try: r.update() except Exception, e: debug.traceback(4) r.graph() elif sys.argv[1]=='rrdupdate': rrd(sys.argv[2]).update() elif sys.argv[1]=='rrdgraph': rrd(sys.argv[2]).graph([]) else: print rrd12().stat_query(sys.argv[1],sys.argv[2:])