From 7bfb138511bd8f411a7d365280498e715e8b5d92 Mon Sep 17 00:00:00 2001 From: gbucchino Date: Mon, 11 Sep 2023 16:49:30 +0200 Subject: [PATCH] Add new vulnerabilities --- audit/system/plugins/grub.py | 11 ++ audit/system/plugins/kernel_va_space.py | 13 ++ audit/system/plugins/localaccount.py | 17 +++ audit/system/plugins/{sysctl => }/sysctl.py | 0 audit/system/plugins/sysctl/__init__.py | 1 - audit/system/plugins/sysctl/parsing.py | 132 ------------------- core/config.py | 2 + core/main.py | 7 ++ core/plugins/apache.py | 12 +- core/plugins/localaccount.py | 69 ++++++++++ core/plugins/sysctl.py | 133 ++++++++++++++++++-- reports/templates/localaccount.html.j2 | 43 +++++++ 12 files changed, 288 insertions(+), 152 deletions(-) create mode 100644 audit/system/plugins/grub.py create mode 100644 audit/system/plugins/kernel_va_space.py create mode 100644 audit/system/plugins/localaccount.py rename audit/system/plugins/{sysctl => }/sysctl.py (100%) delete mode 100644 audit/system/plugins/sysctl/__init__.py delete mode 100644 audit/system/plugins/sysctl/parsing.py create mode 100644 core/plugins/localaccount.py create mode 100644 reports/templates/localaccount.html.j2 diff --git a/audit/system/plugins/grub.py b/audit/system/plugins/grub.py new file mode 100644 index 0000000..5e91d9e --- /dev/null +++ b/audit/system/plugins/grub.py @@ -0,0 +1,11 @@ +#!/usr/bin/env python3 + +def grub() -> list: + grub = list() + grub.append({ + 'description': 'Boot permission', + 'filename': '/boot/grub/grub.cfg' + 'chmod': 600, + }) + return grub + diff --git a/audit/system/plugins/kernel_va_space.py b/audit/system/plugins/kernel_va_space.py new file mode 100644 index 0000000..fab6582 --- /dev/null +++ b/audit/system/plugins/kernel_va_space.py @@ -0,0 +1,13 @@ +#!/usr/bin/env python3 + +def kernel_va_space() -> list: + va = list() + sysctl.append({ + "from": "cis", + "id": "", + "description": "", + "flag": "kernel.randomize_va_space", + "value": 2, + "level": "medium", + }) + return va diff --git a/audit/system/plugins/localaccount.py b/audit/system/plugins/localaccount.py new file mode 100644 index 0000000..e5018ff --- /dev/null +++ b/audit/system/plugins/localaccount.py @@ -0,0 +1,17 @@ +#!/usr/bin/env python3 + +def profile() -> dict: + profile = dict() + profile['filename'] = '/etc/profile' + profile['data'] = list() + profile['data'].append({ + 'description': 'Set timeout for session', + 'flag': 'TMOUT', + 'value': 600, + }) + return profile + +def password_quality() -> list: + passwd = list() + + return passwd diff --git a/audit/system/plugins/sysctl/sysctl.py b/audit/system/plugins/sysctl.py similarity index 100% rename from audit/system/plugins/sysctl/sysctl.py rename to audit/system/plugins/sysctl.py diff --git a/audit/system/plugins/sysctl/__init__.py b/audit/system/plugins/sysctl/__init__.py deleted file mode 100644 index e5a0d9b..0000000 --- a/audit/system/plugins/sysctl/__init__.py +++ /dev/null @@ -1 +0,0 @@ -#!/usr/bin/env python3 diff --git a/audit/system/plugins/sysctl/parsing.py b/audit/system/plugins/sysctl/parsing.py deleted file mode 100644 index adab247..0000000 --- a/audit/system/plugins/sysctl/parsing.py +++ /dev/null @@ -1,132 +0,0 @@ -#!/usr/bin/env python3 - -import re -from json import dumps -from parsing.base import ParsingBase - -class Parsing(ParsingBase): - def __init__(self, objects, audit): - self._parsing = dict() - self._reports = dict() - self._objects = objects - self._audit = audit - - def runParsing(self): - # Generate report - self._constructReports() - - # TODO: - # We are not parse file and process. We parse file and process and for each - # line, we try to find in the file and in the process - for audit in self._audit: - if audit['audit'] == 'file': - with open(audit['value'], 'rb') as fdata: - self._parseFile(fdata) - if audit['audit'] == 'process': - self._parseProcess() - - def _parseFile(self, fdata): - data = fdata.read() - lines = data.splitlines() - numLines = 1 - - vulnerabilityFound = dict() - - # I create an array which contains all flag we need to find - # After that, for each data, I put the number of occurence I found. - # If the array is empty, no entry found for a flag, otherwise, we check the value - for obj in self._objects: - vulnerabilityFound[obj['flag']] = dict() - vulnerabilityFound[obj['flag']]['recommand_value'] = obj['value'] - vulnerabilityFound[obj['flag']]['occurence'] = 0 - for item in obj: - vulnerabilityFound[obj['flag']][item] = obj[item] - - for line in lines: - line = line.decode("utf-8") - - for obj in self._objects: - result = self._parsingFile(line, obj, vulnerabilityFound) - if result: - vulnerabilityFound[obj['flag']]['lineNumber'] = numLines - vulnerabilityFound[obj['flag']]['occurence'] += 1 - - numLines += 1 - - # Now, we can check if the value is specified or not - # And check if the flag is specified and need to put on the sysctl config - for entry in vulnerabilityFound: - obj = vulnerabilityFound[entry] - vulnerabilityFound[entry]['result'] = dict() - if obj['occurence'] > 0: - #print(entry) - #print(obj) - if obj['current_value'] != obj['recommand_value']: - vulnerabilityFound[entry]['result']['result'] = "failed" - else: - vulnerabilityFound[entry]['result']['result'] = "success" - else: - # No find the flag, we recommand to enable it - vulnerabilityFound[entry]['result']['result'] = "failed" - - # Generate report - self._generateReport(vulnerabilityFound) - - def _parseProcess(self): - vulnerabilityFound = dict() - - # Generate report - #self._generateReport(vulnerabilityFound) - - def _generateReport(self, objects): - # We can generate the report - for sysctl in self._reports['file']['sysctl']: - self._reports['file']['sysctl'][sysctl] = objects[sysctl] - - def _parsingFile(self, line, obj, vulnerabilityFound) -> bool: - """ - This function parse the line and try to find the item in it - """ - result = bool() - - groupLine = re.search(obj['flag'], line) - if groupLine: - # Avoid the comment - if not line.startswith('#'): - sLine = line.split('=') - flag = sLine[0].strip() - value = int(sLine[1].strip()) - - vulnerabilityFound[flag]['current_value'] = value - - result = True - - return result - - def _constructReports(self): - """ - Construct dictionary for result of the tests - Each entry contains: - Key: - - filename: filename of the test - - line: line of the test - - parse: Display the line where the vulnerabilites has been found - - description: description of the vulnerability - - level: high, medium or low - """ - # For file - self._reports['file'] = dict() - self._reports['file']['filename'] = self._audit[0]['value'] - self._reports['file']['sysctl'] = dict() - - # For process - self._reports['process'] = dict() - self._reports['process']['sysctl'] = dict() - - - for sysctl in self._objects: - self._reports['file']['sysctl'][sysctl['flag']] = dict() - self._reports['process']['sysctl'][sysctl['flag']] = dict() - - def getResults(self) -> dict: - return self._reports diff --git a/core/config.py b/core/config.py index 9210423..42ed578 100644 --- a/core/config.py +++ b/core/config.py @@ -12,6 +12,7 @@ AUDIT_SYSTEM = [ "sysctl", "postfix", "apache", + "localaccount", ] AUDIT_APPLICATION = [ @@ -29,6 +30,7 @@ def generateConfig() -> dict: config["system"]["apache"]["apache_directory"] = "/etc/apache2/" config["system"]["sysctl"] = dict() config["system"]["sysctl"]["sysctl_file"] = "/etc/sysctl.conf" + config['system']['localaccount'] = dict() config["system"]["exclude_plugins"] = list() # Application config["application"] = dict() diff --git a/core/main.py b/core/main.py index 387de73..3f3fe97 100644 --- a/core/main.py +++ b/core/main.py @@ -4,6 +4,7 @@ from argparse import ArgumentParser from core.plugins.sysctl import Sysctl from core.plugins.postfix import Postfix from core.plugins.apache import Apache +from core.plugins.localaccount import LocalAccount from core.report import generateHtmlReport from core.config import AUDIT_SYSTEM, AUDIT_APPLICATION, generateConfig, parsingConfigFile from core.dispatcher import Dispatcher @@ -95,5 +96,11 @@ def apache(*args) -> dict: apache.runAudit() return apache.getReports() +@Dispatcher.register_plugins +def localaccount(*args) -> dict: + account = LocalAccount(args[1]) + account.runAudit() + return account.getReports() + if __name__ == "__main__": main() diff --git a/core/plugins/apache.py b/core/plugins/apache.py index dd73133..08b35d6 100644 --- a/core/plugins/apache.py +++ b/core/plugins/apache.py @@ -28,12 +28,12 @@ class Apache: def _runParsing(self): # Check if the file exist path = f"{self._apache_directory}/sites-available" - if isdir(self._apache_directory): - for site in listdir(path): - with open(f"{path}/{site}", 'rb') as f: - self._parseFile(f) - else: - self._reports["apache"]["test"] = "No directory found" + #if isdir(self._apache_directory): + # for site in listdir(path): + # with open(f"{path}/{site}", 'rb') as f: + # self._parseFile(f) + #else: + # self._reports["apache"]["test"] = "No directory found" def _parseFile(self, fdata): data = fdata.read() diff --git a/core/plugins/localaccount.py b/core/plugins/localaccount.py new file mode 100644 index 0000000..953e9c6 --- /dev/null +++ b/core/plugins/localaccount.py @@ -0,0 +1,69 @@ +#!/usr/bin/env python3 + +import re +from os import listdir +from os.path import isdir +from audit.system.plugins.localaccount import profile, password_quality + + +class LocalAccount: + def __init__(self, arguments): + self._profile = profile() + self._passwd_quality = password_quality() + self._reports = dict() + + # Create the report + self._constructReports() + + # Report + #self._reports[""] = self._apache_directory + + def runAudit(self): + print("Running test for Local account") + self._analyzingProfile() + self._analyzingPasswordQuality() + + print(self._reports) + + def getReports(self) -> dict: + return self._reports + + def _analyzingProfile(self): + # Check if the file exist + path = self._profile['filename'] + try: + with open(path, 'rb') as fdata: + self._parseFile(fdata) + except FileNotFoundError: + self._reports['localaccount']['profile']['error'] = \ + f'File {path} not found' + + def _parseFile(self, fdata): + data = fdata.read() + lines = data.splitlines() + + for line in lines: + line = line.decode('utf-8') + + for obj in self._profile['data']: + grFlag = re.search(f"^{obj['flag']}", line) + if grFlag: + print(line) + + def _analyzingPasswordQuality(self): + pass + + def _constructReports(self): + """ + Construct dictionary for result of the tests + Each entry contains: + Key: + - filename: filename of the test + - line: line of the test + - parse: Display the line where the vulnerabilites has been found + - description: description of the vulnerability + - level: high, medium or low + """ + self._reports['localaccount'] = dict() + self._reports['localaccount']['profile'] = dict() + self._reports['localaccount']['pwd_quality'] = dict() diff --git a/core/plugins/sysctl.py b/core/plugins/sysctl.py index f219e16..cc603dd 100644 --- a/core/plugins/sysctl.py +++ b/core/plugins/sysctl.py @@ -1,15 +1,15 @@ #!/usr/bin/env python3 -from audit.system.plugins.sysctl.parsing import Parsing -from audit.system.plugins.sysctl.sysctl import sysctl +import re +from audit.system.plugins.sysctl import sysctl class Sysctl: def __init__(self, args): - self._objects = dict() + self._objects = sysctl() self._reports = dict() self._audit = list() - + self._audit.append({ 'audit': 'file', 'value': args["sysctl_file"], @@ -19,19 +19,126 @@ class Sysctl: 'value': 'sysctl -a', }) - self._sysctl() - - self._parsing = Parsing(self._objects, self._audit) - - def _sysctl(self): - self._objects = sysctl() + self._constructReports() def runAudit(self): print("Running test for sysctl") - self._parsing.runParsing() - self._reports = self._parsing.getResults() + self.runParsing() + #self._reports = self.getResults() + + def runParsing(self): + # Generate report + + # TODO: + # We are not parse file and process. We parse file and process and for each + # line, we try to find in the file and in the process + for audit in self._audit: + if audit['audit'] == 'file': + with open(audit['value'], 'rb') as fdata: + self._parseFile(fdata) + if audit['audit'] == 'process': + self._parseProcess() + + def _parseFile(self, fdata): + data = fdata.read() + lines = data.splitlines() + numLines = 1 + + vulnerabilityFound = dict() + + # I create an array which contains all flag we need to find + # After that, for each data, I put the number of occurence I found. + # If the array is empty, no entry found for a flag, otherwise, we check the value + for obj in self._objects: + vulnerabilityFound[obj['flag']] = dict() + vulnerabilityFound[obj['flag']]['recommand_value'] = obj['value'] + vulnerabilityFound[obj['flag']]['occurence'] = 0 + for item in obj: + vulnerabilityFound[obj['flag']][item] = obj[item] + + for line in lines: + line = line.decode("utf-8") + + for obj in self._objects: + result = self._parsingFile(line, obj, vulnerabilityFound) + if result: + vulnerabilityFound[obj['flag']]['lineNumber'] = numLines + vulnerabilityFound[obj['flag']]['occurence'] += 1 + + numLines += 1 + + # Now, we can check if the value is specified or not + # And check if the flag is specified and need to put on the sysctl config + for entry in vulnerabilityFound: + obj = vulnerabilityFound[entry] + vulnerabilityFound[entry]['result'] = dict() + if obj['occurence'] > 0: + if obj['current_value'] != obj['recommand_value']: + vulnerabilityFound[entry]['result']['result'] = "failed" + else: + vulnerabilityFound[entry]['result']['result'] = "success" + else: + # No find the flag, we recommand to enable it + vulnerabilityFound[entry]['result']['result'] = "failed" + + # Generate report + self._generateReport(vulnerabilityFound) + + def _parseProcess(self): + vulnerabilityFound = dict() + + # Generate report + #self._generateReport(vulnerabilityFound) + + def _generateReport(self, objects): + # We can generate the report + for sysctl in self._reports['file']['sysctl']: + self._reports['file']['sysctl'][sysctl] = objects[sysctl] + + def _parsingFile(self, line, obj, vulnerabilityFound) -> bool: + """ + This function parse the line and try to find the item in it + """ + result = bool() + + groupLine = re.search(obj['flag'], line) + if groupLine: + # Avoid the comment + if not line.startswith('#'): + sLine = line.split('=') + flag = sLine[0].strip() + value = int(sLine[1].strip()) + + vulnerabilityFound[flag]['current_value'] = value + + result = True + + return result + + def _constructReports(self): + """ + Construct dictionary for result of the tests + Each entry contains: + Key: + - filename: filename of the test + - line: line of the test + - parse: Display the line where the vulnerabilites has been found + - description: description of the vulnerability + - level: high, medium or low + """ + # For file + self._reports['file'] = dict() + self._reports['file']['filename'] = self._audit[0]['value'] + self._reports['file']['sysctl'] = dict() + + # For process + self._reports['process'] = dict() + self._reports['process']['sysctl'] = dict() + + for sysctl in self._objects: + self._reports['file']['sysctl'][sysctl['flag']] = dict() + self._reports['process']['sysctl'][sysctl['flag']] = dict() def getReports(self) -> dict: return self._reports - diff --git a/reports/templates/localaccount.html.j2 b/reports/templates/localaccount.html.j2 new file mode 100644 index 0000000..a107073 --- /dev/null +++ b/reports/templates/localaccount.html.j2 @@ -0,0 +1,43 @@ +

Apache

+ +{% for item in data['postfix']['vulnerabilities'] %} +
+
+

+ +

+
+
+ {{ data['postfix']['vulnerabilities'][item]['description'] }}.
+ {% if data['postfix']['vulnerabilities'][item]['result'] == 'success' %} +
+
+

+                {{ data['postfix']['vulnerabilities'][item]['flagFound'] }}
+	            
+
+
+ {% else %} + For resolving the issue, add this line in the {{ data['postfix']['filename'] }} vulnerabilities: +
+
+

+            {% for value in data['postfix']['vulnerabilities'][item]['recommand_value'] %}
+ 	          {{ data['postfix']['vulnerabilities'][item]['flag'] }} = {{ value }}
+            {% endfor %}
+	        
+
+
+ {% endif %} +
+
+
+
+{% endfor %}