Add 4hr cache for filtered messages; add log helper func
This commit is contained in:
@@ -45,6 +45,11 @@ handler = logging.handlers.TimedRotatingFileHandler(
|
||||
handler.setFormatter(formatter)
|
||||
logger.addHandler(handler)
|
||||
|
||||
# helper function for logging
|
||||
def log(msg, level=logging.INFO):
|
||||
print(msg)
|
||||
logger.log(level, msg)
|
||||
|
||||
|
||||
class O365MailFilter(object):
|
||||
_scopes = [
|
||||
@@ -56,7 +61,10 @@ class O365MailFilter(object):
|
||||
self._config = config
|
||||
self._is_canceled = False
|
||||
self._folders = {}
|
||||
self._normalized = {}
|
||||
self._filtered_cache = {
|
||||
'last_reset': None,
|
||||
'ids': set()
|
||||
}
|
||||
|
||||
# auth with O365
|
||||
self._authenticate()
|
||||
@@ -74,13 +82,13 @@ class O365MailFilter(object):
|
||||
if not self._account.is_authenticated:
|
||||
self._account.authenticate(scopes=self._scopes)
|
||||
|
||||
logger.info('Authentication successful')
|
||||
log('Authentication successful')
|
||||
|
||||
def _load_filters(self):
|
||||
""" load filter code from a file on disk """
|
||||
loader = SourceFileLoader('filters', self._config['FILTERS_FILE'])
|
||||
module = loader.load_module()
|
||||
module.normalize_lists(self)
|
||||
module.init_filters(self)
|
||||
# make 'filter_message()' implemented in the file available for use
|
||||
# within this class as 'self._filter_message()'
|
||||
self._filter_message = module.filter_message
|
||||
@@ -95,17 +103,25 @@ class O365MailFilter(object):
|
||||
for folder in folders:
|
||||
self._folders[folder.name] = folder.folder_id
|
||||
|
||||
def _clear_cache(self):
|
||||
""" clear the filtered message cache """
|
||||
log('Clearing filtered message cache...', logging.DEBUG)
|
||||
self._filtered_cache = {
|
||||
'last_reset': pendulum.now(),
|
||||
'ids': set()
|
||||
}
|
||||
|
||||
def _repr_message(self, message):
|
||||
""" returns a str representation of a message suitable for logging """
|
||||
# to = ','.join([r.address for r in message.to])
|
||||
return f"[FROM: {message.sender.address} SUBJ: {message.subject}]"
|
||||
|
||||
def _log_result(self, message, result):
|
||||
logger.info(f"{self._repr_message(message)} RESULT: {result}")
|
||||
log(f"{self._repr_message(message)} RESULT: {result}")
|
||||
|
||||
def filter(self):
|
||||
self._load_filters()
|
||||
self._load_folders()
|
||||
self._load_filters()
|
||||
|
||||
mailbox = self._account.mailbox()
|
||||
inbox = mailbox.inbox_folder()
|
||||
@@ -122,26 +138,35 @@ class O365MailFilter(object):
|
||||
messages = inbox.get_messages(query=query, limit=limit, batch=25)
|
||||
|
||||
for message in messages:
|
||||
if message.object_id in self._filtered_cache['ids']:
|
||||
# we've already filtered this message, so skip it
|
||||
continue
|
||||
self._filter_message(self, message)
|
||||
self._filtered_cache['ids'].add(message.object_id)
|
||||
|
||||
def run(self):
|
||||
""" run filter as a loop """
|
||||
while not self._is_canceled:
|
||||
# clear the filtered message cache if it's older than 4 hours
|
||||
if (self._filtered_cache['last_reset'] is None or
|
||||
self._filtered_cache['last_reset'] < pendulum.now().subtract(hours=4)):
|
||||
self._clear_cache()
|
||||
|
||||
self.filter()
|
||||
time.sleep(self._config['CHECK_INTERVAL'])
|
||||
|
||||
logger.info('Done.')
|
||||
log('Done.')
|
||||
|
||||
def exit(self):
|
||||
self._is_canceled = True
|
||||
|
||||
|
||||
logger.info('Initializing O365 mail filter...')
|
||||
log('Initializing O365 mail filter...')
|
||||
o365mf = O365MailFilter(config)
|
||||
|
||||
def exit(signum, frame):
|
||||
""" signal handler for a clean exit """
|
||||
logger.info(f"Caught signal {signum}, exiting...")
|
||||
log(f"Caught signal {signum}, exiting...")
|
||||
o365mf.exit()
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
Reference in New Issue
Block a user