import configparser import logging import logging.handlers import os import pendulum import signal import time from importlib.machinery import SourceFileLoader from O365 import Account, FileSystemTokenBackend SCRIPTPATH = os.path.dirname(os.path.abspath(__file__)) # parse config file config = {} configfile = configparser.ConfigParser() configfile.read(SCRIPTPATH + '/mail-filter.conf') config['FILTERS_FILE'] = configfile.get('main', 'Filters') config['IS_DEBUG'] = configfile.getboolean('main', 'EnableDebugging') config['CHECK_INTERVAL'] = int(configfile.get('main', 'MailCheckInterval')) config['LOG_DIR'] = configfile.get('logging', 'LogDir') config['TIMEZONE'] = configfile.get('logging', 'Timezone') config['APP_CLIENT_ID'] = os.getenv('APP_CLIENT_ID') config['APP_SECRET_KEY'] = os.getenv('APP_SECRET_KEY') config['APP_TENANT_ID'] = os.getenv('APP_TENANT_ID') # convert timestamp to local time def local_time(record, datefmt=None): return pendulum.from_timestamp( record.created, tz=pendulum.timezone(config['TIMEZONE']) ).strftime('%Y-%m-%d %H:%M:%S %z') # set up logger logger = logging.getLogger('o365mf') if config['IS_DEBUG']: logger.setLevel(logging.DEBUG) else: logger.setLevel(logging.INFO) formatter = logging.Formatter( '%(asctime)s %(module)s [%(levelname)s] %(message)s') formatter.formatTime = local_time log_filename = f"{config['LOG_DIR']}/mail-filter.log" handler = logging.handlers.TimedRotatingFileHandler( log_filename, when='midnight', backupCount=5) handler.setFormatter(formatter) logger.addHandler(handler) # helper function for logging def log(msg, level=logging.INFO): print(msg) logger.log(level, msg) class O365MailFilter(object): _scopes = [ 'basic', 'https://graph.microsoft.com/Mail.ReadWrite', 'https://graph.microsoft.com/MailboxSettings.Read' ] def __init__(self, config): self._config = config self._is_canceled = False self._folders = {} self._categories = {} self._filtered_cache = { 'last_reset': None, 'ids': set() } # auth with O365 self._authenticate() def _authenticate(self): token_backend = FileSystemTokenBackend(token_path='.cache', token_filename='token.txt') self._account = Account( (self._config['APP_CLIENT_ID'], self._config['APP_SECRET_KEY']), tenant_id=self._config['APP_TENANT_ID'], token_backend=token_backend ) if not self._account.is_authenticated: self._account.authenticate(scopes=self._scopes) log('Authentication successful') def _load_filters(self): """ load filter code from a file on disk """ loader = SourceFileLoader('filters', self._config['FILTERS_FILE']) module = loader.load_module() module.init_filters(self) # make 'filter_message()' implemented in the file available for use # within this class as 'self._filter_message()' self._filter_message = module.filter_message def _load_folders(self, mailbox, folders=None, folder_path=None): """ recursively cache folder IDs for this mailbox """ if folders is None: folders = mailbox.get_folders() self._folders = {} folder_path = '' for folder in folders: if folder_path == '': current_folder_path = f"{folder.name}" else: current_folder_path = f"{folder_path}/{folder.name}" if not folder.get_folders(): self._folders[current_folder_path] = folder.folder_id else: # add child folders to the cache, because get_folders() doesn't # descend into sub-folders by default self._load_folders(mailbox, folder.get_folders(), current_folder_path) def _load_categories(self): """ cache Outlook categories for this account """ oc = self._account.outlook_categories() categories = oc.get_categories() for category in categories: self._categories[category.name] = category def _clear_cache(self): """ clear the filtered message cache """ log('Clearing filtered message cache...', logging.DEBUG) self._filtered_cache = { 'last_reset': pendulum.now(), 'ids': set() } def _repr_message(self, message): """ returns a str representation of a message suitable for logging """ # to = ','.join([r.address for r in message.to]) return f"[FROM: {message.sender.address} SUBJ: {message.subject}]" def _log_result(self, message, result): log(f"{self._repr_message(message)} RESULT: {result}") def filter(self): mailbox = self._account.mailbox() inbox = mailbox.inbox_folder() self._load_folders(mailbox) self._load_categories() self._load_filters() # set limit to max allowed by O365, which is 999 messages # we have to explicitly set a limit value when calling get_messages() or # the O365 library will not paginate results correctly limit = self._account.protocol.max_top_value query = inbox.new_query() query = query.on_attribute('isRead').equals(False).select( 'to_recipients', 'from', 'subject', 'body', 'internet_message_headers' ) messages = inbox.get_messages(query=query, limit=limit, batch=25) for message in messages: if message.object_id in self._filtered_cache['ids']: # we've already filtered this message, so skip it continue self._filter_message(self, message) self._filtered_cache['ids'].add(message.object_id) def run(self): """ run filter as a loop """ while not self._is_canceled: # clear the filtered message cache if it's older than 4 hours if (self._filtered_cache['last_reset'] is None or self._filtered_cache['last_reset'] < pendulum.now().subtract(hours=4)): self._clear_cache() self.filter() time.sleep(self._config['CHECK_INTERVAL']) log('Done.') def exit(self): self._is_canceled = True log('Initializing O365 mail filter...') o365mf = O365MailFilter(config) def exit(signum, frame): """ signal handler for a clean exit """ log(f"Caught signal {signum}, exiting...") o365mf.exit() if __name__ == '__main__': # register signal handlers signal.signal(signal.SIGTERM, exit) signal.signal(signal.SIGHUP, exit) signal.signal(signal.SIGINT, exit) # run it o365mf.run()