Menu

[r10]: / trunk / Whitelist.py  Maximize  Restore  History

Download this file

107 lines (81 with data), 4.0 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
#!/usr/bin/env python
import anydbm,logging,os,re
from stat import *
from SmtpException import *
LogLineFormat = '%(asctime)s %(levelname)s: %(message)s'
logging.addLevelName (25, 'NOTICE')
Log = logging.getLogger ('greyd')
Log.setLevel (logging.DEBUG)
Log.notice = lambda m: Log.log(25, m)
class Whitelist:
def __init__(self, config):
self._filename = config['filename']
self._mtime = 0
self._db = anydbm.open (config['database'], 'c')
Log.notice('whitelist database opened')
def destroy(self):
self._db.close()
Log.notice('whitelist database closed')
def startup(self):
self.refreshDB()
def reset(self):
self.refreshDB()
def remoteIP(self, uuid, ip):
Log.debug('Whitelist.remoteIP (%s, %s)' % (uuid, ip))
if self._db.has_key('H=%s' % ip):
Log.debug('host (%s) found in whitelist' % ip)
raise SmtpAccept ("%s: host (%s) found in whitelist" % (uuid, ip))
def mailFrom(self, uuid, email):
Log.debug('Whitelist.mailFrom(%s, %s)' % (uuid, email))
if self._db.has_key('F=%s' % email):
Log.debug("\temail (%s) found in whitelist" % email)
raise SmtpAccept ('%s: email (%s) found in whitelist' % (uuid, email))
elif self._db.has_key('F=%s' % self._emailDomain(email)):
Log.debug("\temail domain (%s) found in whitelist" % self._emailDomain(email))
raise SmtpAccept ('%s: email domain (%s) found in whitelist' % (uuid, self._emailDomain(email)))
def mailTo(self, uuid, email):
Log.debug('Whitelist.mailTo(%s, %s)' % (uuid, email))
for emailaddr in email:
if self._db.has_key('T=%s' % emailaddr):
Log.debug("\temail (%s) found in whitelist" % emailaddr)
raise SmtpAccept ('%s: email (%s) found in whitelist' % (uuid, emailaddr))
elif self._db.has_key('T=%s' % self._emailDomain(emailaddr)):
Log.debug("\temail domain (%s) found in whitelist" % self._emailDomain(emailaddr))
raise SmtpAccept ('%s: email domain (%s) found in whitelist' % (uuid, self._emailDomain(emailaddr)))
def refreshDB(self):
"""Check to see if the whitelist text file has been updated and
force a reload of the whitelist database if the file has been
updated since the last load."""
Log.debug('Whitelist.refreshDB()')
mtime = (os.stat(self._filename))[ST_MTIME]
if mtime > self._mtime:
# first, remove everything out of the database
for key in self._db.keys():
del self._db[key]
# setup a few key regex
host_re = re.compile(r'(\d+\.\d+\.\d+\.\d+)')
email_re = re.compile(r'([\w\.\-%+_]*@.+)\s*')
# open up and process text file version
fh = open(self._filename, 'r')
for line in fh.readlines():
key = None
item = line.strip() # get rid of any whitespace
match = host_re.match(item)
if match:
key = 'H=%s' % match.group(1)
match = email_re.match(item)
if match:
key = 'F=%s' % match.group(1)
if key:
self._db[key] = '1'
fh.close()
self._mtime = mtime
Log.notice('whitelist database refreshed')
def _emailDomain(self, email):
"""Break email address up and return just the domain portion."""
user,domain = email.split('@')
return '@' + domain
def _emailUser(self, email):
"""Break email address up and return just the user portion."""
user, domain = email.split('@')
return user