213 lines
7.2 KiB
Python
213 lines
7.2 KiB
Python
import configparser
|
|
import logging
|
|
import logging.handlers
|
|
import os
|
|
import pendulum
|
|
import signal
|
|
import time
|
|
from importlib.machinery import SourceFileLoader
|
|
from O365 import Account, FileSystemTokenBackend
|
|
|
|
SCRIPTPATH = os.path.dirname(os.path.abspath(__file__))
|
|
|
|
# parse config file
|
|
config = {}
|
|
configfile = configparser.ConfigParser()
|
|
configfile.read(SCRIPTPATH + "/mail-filter.conf")
|
|
config["FILTERS_FILE"] = configfile.get("main", "Filters")
|
|
config["IS_DEBUG"] = configfile.getboolean("main", "EnableDebugging")
|
|
config["CHECK_INTERVAL"] = int(configfile.get("main", "MailCheckInterval"))
|
|
config["LOG_DIR"] = configfile.get("logging", "LogDir")
|
|
config["TIMEZONE"] = configfile.get("logging", "Timezone")
|
|
config["APP_CLIENT_ID"] = os.getenv("APP_CLIENT_ID")
|
|
config["APP_SECRET_KEY"] = os.getenv("APP_SECRET_KEY")
|
|
config["APP_TENANT_ID"] = os.getenv("APP_TENANT_ID")
|
|
|
|
|
|
# convert timestamp to local time
|
|
def local_time(record, datefmt=None):
|
|
return pendulum.from_timestamp(
|
|
record.created, tz=pendulum.timezone(config["TIMEZONE"])
|
|
).strftime("%Y-%m-%d %H:%M:%S %z")
|
|
|
|
|
|
# set up logger
|
|
logger = logging.getLogger("o365mf")
|
|
if config["IS_DEBUG"]:
|
|
logger.setLevel(logging.DEBUG)
|
|
else:
|
|
logger.setLevel(logging.INFO)
|
|
formatter = logging.Formatter("%(asctime)s %(module)s [%(levelname)s] %(message)s")
|
|
formatter.formatTime = local_time
|
|
log_filename = f"{config['LOG_DIR']}/mail-filter.log"
|
|
handler = logging.handlers.TimedRotatingFileHandler(
|
|
log_filename, when="midnight", backupCount=5
|
|
)
|
|
handler.setFormatter(formatter)
|
|
logger.addHandler(handler)
|
|
|
|
|
|
# helper function for logging
|
|
def log(msg, level=logging.INFO):
|
|
print(msg)
|
|
logger.log(level, msg)
|
|
|
|
|
|
class O365MailFilter(object):
|
|
_scopes = [
|
|
"basic",
|
|
"https://graph.microsoft.com/Mail.ReadWrite",
|
|
"https://graph.microsoft.com/MailboxSettings.Read",
|
|
]
|
|
|
|
def __init__(self, config):
|
|
self._config = config
|
|
self._is_canceled = False
|
|
self._folders = {}
|
|
self._categories = {}
|
|
self._filtered_cache = {"last_reset": None, "ids": set()}
|
|
|
|
# auth with O365
|
|
self._authenticate()
|
|
|
|
def _authenticate(self):
|
|
token_backend = FileSystemTokenBackend(
|
|
token_path=".cache", token_filename="token.txt"
|
|
)
|
|
|
|
self._account = Account(
|
|
(self._config["APP_CLIENT_ID"], self._config["APP_SECRET_KEY"]),
|
|
tenant_id=self._config["APP_TENANT_ID"],
|
|
token_backend=token_backend,
|
|
)
|
|
|
|
if not self._account.is_authenticated:
|
|
self._account.authenticate(scopes=self._scopes)
|
|
|
|
log("Authentication successful")
|
|
|
|
def _load_filters(self):
|
|
"""load filter code from a file on disk"""
|
|
loader = SourceFileLoader("filters", self._config["FILTERS_FILE"])
|
|
module = loader.load_module()
|
|
module.init_filters(self)
|
|
# make 'filter_message()' implemented in the file available for use
|
|
# within this class as 'self._filter_message()'
|
|
self._filter_message = module.filter_message
|
|
|
|
def _load_folders(self, mailbox, folders=None, folder_path=None):
|
|
"""recursively cache folder IDs for this mailbox"""
|
|
if folders is None:
|
|
folders = mailbox.get_folders()
|
|
self._folders = {}
|
|
folder_path = ""
|
|
|
|
for folder in folders:
|
|
if folder_path == "":
|
|
current_folder_path = f"{folder.name}"
|
|
else:
|
|
current_folder_path = f"{folder_path}/{folder.name}"
|
|
|
|
if not folder.get_folders():
|
|
self._folders[current_folder_path] = folder.folder_id
|
|
else:
|
|
# add child folders to the cache, because get_folders() doesn't
|
|
# descend into sub-folders by default
|
|
self._load_folders(mailbox, folder.get_folders(), current_folder_path)
|
|
|
|
def _load_categories(self):
|
|
"""cache Outlook categories for this account"""
|
|
oc = self._account.outlook_categories()
|
|
categories = oc.get_categories()
|
|
for category in categories:
|
|
self._categories[category.name] = category
|
|
|
|
def _clear_cache(self):
|
|
"""clear the filtered message cache"""
|
|
log("Clearing filtered message cache...", logging.DEBUG)
|
|
self._filtered_cache = {"last_reset": pendulum.now(), "ids": set()}
|
|
|
|
def _repr_message(self, message):
|
|
"""returns a str representation of a message suitable for logging"""
|
|
# to = ','.join([r.address for r in message.to])
|
|
return f"[FROM: {message.sender.address} SUBJ: {message.subject}]"
|
|
|
|
def _log_result(self, message, result):
|
|
log(f"{self._repr_message(message)} RESULT: {result}")
|
|
|
|
def filter(self, is_first_run=False):
|
|
log(f" {pendulum.now()} Getting mailbox...", logging.DEBUG)
|
|
mailbox = self._account.mailbox()
|
|
log(f" {pendulum.now()} Getting folder...", logging.DEBUG)
|
|
inbox = mailbox.inbox_folder()
|
|
|
|
if is_first_run:
|
|
log(f" {pendulum.now()} Loading folders and categories...", logging.DEBUG)
|
|
self._load_folders(mailbox)
|
|
self._load_categories()
|
|
log(f" {pendulum.now()} Loading filter rules...", logging.DEBUG)
|
|
self._load_filters()
|
|
|
|
# set limit to max allowed by O365, which is 999 messages
|
|
# we have to explicitly set a limit value when calling get_messages() or
|
|
# the O365 library will not paginate results correctly
|
|
limit = self._account.protocol.max_top_value
|
|
query = inbox.new_query()
|
|
query = (
|
|
query.on_attribute("isRead")
|
|
.equals(False)
|
|
.select(
|
|
"to_recipients", "from", "subject", "body", "internet_message_headers"
|
|
)
|
|
)
|
|
log(f" {pendulum.now()} Getting messages from inbox...", logging.DEBUG)
|
|
messages = inbox.get_messages(query=query, limit=limit, batch=25)
|
|
|
|
for message in messages:
|
|
log(f" {pendulum.now()} {message}", logging.DEBUG)
|
|
if message.object_id in self._filtered_cache["ids"]:
|
|
# we've already filtered this message, so skip it
|
|
continue
|
|
self._filter_message(self, message)
|
|
self._filtered_cache["ids"].add(message.object_id)
|
|
|
|
def run(self):
|
|
"""run filter as a loop"""
|
|
is_first_run = True
|
|
while not self._is_canceled:
|
|
# clear the filtered message cache if it's older than 4 hours
|
|
if self._filtered_cache["last_reset"] is None or self._filtered_cache[
|
|
"last_reset"
|
|
] < pendulum.now().subtract(hours=4):
|
|
self._clear_cache()
|
|
|
|
log(f"Filtering the sludge @ {pendulum.now()}...", logging.DEBUG)
|
|
self.filter(is_first_run)
|
|
is_first_run = False
|
|
time.sleep(self._config["CHECK_INTERVAL"])
|
|
|
|
log("Done.")
|
|
|
|
def exit(self):
|
|
self._is_canceled = True
|
|
|
|
|
|
log("Initializing O365 mail filter...")
|
|
o365mf = O365MailFilter(config)
|
|
|
|
|
|
def exit(signum, frame):
|
|
"""signal handler for a clean exit"""
|
|
log(f"Caught signal {signum}, exiting...")
|
|
o365mf.exit()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# register signal handlers
|
|
signal.signal(signal.SIGTERM, exit)
|
|
signal.signal(signal.SIGHUP, exit)
|
|
signal.signal(signal.SIGINT, exit)
|
|
|
|
# run it
|
|
o365mf.run()
|