import configparser import logging import logging.handlers import os import pendulum import signal import time from importlib.machinery import SourceFileLoader from O365 import Account, FileSystemTokenBackend SCRIPTPATH = os.path.dirname(os.path.abspath(__file__)) # parse config file config = {} configfile = configparser.ConfigParser() configfile.read(SCRIPTPATH + "/mail-filter.conf") config["FILTERS_FILE"] = configfile.get("main", "Filters") config["IS_DEBUG"] = configfile.getboolean("main", "EnableDebugging") config["CHECK_INTERVAL"] = int(configfile.get("main", "MailCheckInterval")) config["LOG_DIR"] = configfile.get("logging", "LogDir") config["TIMEZONE"] = configfile.get("logging", "Timezone") config["APP_CLIENT_ID"] = os.getenv("APP_CLIENT_ID") config["APP_SECRET_KEY"] = os.getenv("APP_SECRET_KEY") config["APP_TENANT_ID"] = os.getenv("APP_TENANT_ID") # convert timestamp to local time def local_time(record, datefmt=None): return pendulum.from_timestamp( record.created, tz=pendulum.timezone(config["TIMEZONE"]) ).strftime("%Y-%m-%d %H:%M:%S %z") # set up logger logger = logging.getLogger("o365mf") if config["IS_DEBUG"]: logger.setLevel(logging.DEBUG) else: logger.setLevel(logging.INFO) formatter = logging.Formatter("%(asctime)s %(module)s [%(levelname)s] %(message)s") formatter.formatTime = local_time log_filename = f"{config['LOG_DIR']}/mail-filter.log" handler = logging.handlers.TimedRotatingFileHandler( log_filename, when="midnight", backupCount=5 ) handler.setFormatter(formatter) logger.addHandler(handler) # helper function for logging def log(msg, level=logging.INFO): print(msg) logger.log(level, msg) class O365MailFilter(object): _scopes = [ "basic", "https://graph.microsoft.com/Mail.ReadWrite", "https://graph.microsoft.com/MailboxSettings.Read", ] def __init__(self, config): self._config = config self._is_canceled = False self._folders = {} self._categories = {} self._filtered_cache = {"last_reset": None, "ids": set()} # auth with O365 self._authenticate() def _authenticate(self): token_backend = FileSystemTokenBackend( token_path=".cache", token_filename="token.txt" ) self._account = Account( (self._config["APP_CLIENT_ID"], self._config["APP_SECRET_KEY"]), tenant_id=self._config["APP_TENANT_ID"], token_backend=token_backend, ) if not self._account.is_authenticated: self._account.authenticate(scopes=self._scopes) log("Authentication successful") def _load_filters(self): """load filter code from a file on disk""" loader = SourceFileLoader("filters", self._config["FILTERS_FILE"]) module = loader.load_module() module.init_filters(self) # make 'filter_message()' implemented in the file available for use # within this class as 'self._filter_message()' self._filter_message = module.filter_message def _load_folders(self, mailbox, folders=None, folder_path=None): """recursively cache folder IDs for this mailbox""" if folders is None: folders = mailbox.get_folders() self._folders = {} folder_path = "" for folder in folders: if folder_path == "": current_folder_path = f"{folder.name}" else: current_folder_path = f"{folder_path}/{folder.name}" if not folder.get_folders(): self._folders[current_folder_path] = folder.folder_id else: # add child folders to the cache, because get_folders() doesn't # descend into sub-folders by default self._load_folders(mailbox, folder.get_folders(), current_folder_path) def _load_categories(self): """cache Outlook categories for this account""" oc = self._account.outlook_categories() categories = oc.get_categories() for category in categories: self._categories[category.name] = category def _clear_cache(self): """clear the filtered message cache""" log("Clearing filtered message cache...", logging.DEBUG) self._filtered_cache = {"last_reset": pendulum.now(), "ids": set()} def _repr_message(self, message): """returns a str representation of a message suitable for logging""" # to = ','.join([r.address for r in message.to]) return f"[FROM: {message.sender.address} SUBJ: {message.subject}]" def _log_result(self, message, result): log(f"{self._repr_message(message)} RESULT: {result}") def filter(self, is_first_run=False): log(f" {pendulum.now()} Getting mailbox...", logging.DEBUG) mailbox = self._account.mailbox() log(f" {pendulum.now()} Getting folder...", logging.DEBUG) inbox = mailbox.inbox_folder() if is_first_run: log(f" {pendulum.now()} Loading folders and categories...", logging.DEBUG) self._load_folders(mailbox) self._load_categories() log(f" {pendulum.now()} Loading filter rules...", logging.DEBUG) self._load_filters() # set limit to max allowed by O365, which is 999 messages # we have to explicitly set a limit value when calling get_messages() or # the O365 library will not paginate results correctly limit = self._account.protocol.max_top_value query = inbox.new_query() query = ( query.on_attribute("isRead") .equals(False) .select( "to_recipients", "from", "subject", "body", "internet_message_headers" ) ) log(f" {pendulum.now()} Getting messages from inbox...", logging.DEBUG) messages = inbox.get_messages(query=query, limit=limit, batch=25) for message in messages: log(f" {pendulum.now()} {message}", logging.DEBUG) if message.object_id in self._filtered_cache["ids"]: # we've already filtered this message, so skip it continue self._filter_message(self, message) self._filtered_cache["ids"].add(message.object_id) def run(self): """run filter as a loop""" is_first_run = True while not self._is_canceled: # clear the filtered message cache if it's older than 4 hours if self._filtered_cache["last_reset"] is None or self._filtered_cache[ "last_reset" ] < pendulum.now().subtract(hours=4): self._clear_cache() log(f"Filtering the sludge @ {pendulum.now()}...", logging.DEBUG) self.filter(is_first_run) is_first_run = False time.sleep(self._config["CHECK_INTERVAL"]) log("Done.") def exit(self): self._is_canceled = True log("Initializing O365 mail filter...") o365mf = O365MailFilter(config) def exit(signum, frame): """signal handler for a clean exit""" log(f"Caught signal {signum}, exiting...") o365mf.exit() if __name__ == "__main__": # register signal handlers signal.signal(signal.SIGTERM, exit) signal.signal(signal.SIGHUP, exit) signal.signal(signal.SIGINT, exit) # run it o365mf.run()