Change arbo and create config file
This commit is contained in:
parent
0312e1dbe4
commit
39e83d2336
2
.gitignore
vendored
2
.gitignore
vendored
@ -1,3 +1,5 @@
|
|||||||
__pycache__/
|
__pycache__/
|
||||||
__pycache__/**
|
__pycache__/**
|
||||||
**.swp
|
**.swp
|
||||||
|
reports/
|
||||||
|
reports/**
|
||||||
|
1
audit/system/plugins/postfix/__init__.py
Normal file
1
audit/system/plugins/postfix/__init__.py
Normal file
@ -0,0 +1 @@
|
|||||||
|
#!/usr/bin/env python3
|
1
audit/system/plugins/sysctl/__init__.py
Normal file
1
audit/system/plugins/sysctl/__init__.py
Normal file
@ -0,0 +1 @@
|
|||||||
|
#!/usr/bin/env python3
|
@ -78,7 +78,6 @@ class Parsing(ParsingBase):
|
|||||||
def _generateReport(self, objects):
|
def _generateReport(self, objects):
|
||||||
# We can generate the report
|
# We can generate the report
|
||||||
for sysctl in self._reports['file']['sysctl']:
|
for sysctl in self._reports['file']['sysctl']:
|
||||||
#self._reports['file']['sysctl'][sysctl] = vulnerabilityFound[sysctl]
|
|
||||||
self._reports['file']['sysctl'][sysctl] = objects[sysctl]
|
self._reports['file']['sysctl'][sysctl] = objects[sysctl]
|
||||||
|
|
||||||
def _parsingFile(self, line, obj, vulnerabilityFound) -> bool:
|
def _parsingFile(self, line, obj, vulnerabilityFound) -> bool:
|
@ -1,6 +0,0 @@
|
|||||||
#!/usr/bin/env python3
|
|
||||||
|
|
||||||
# Constantes
|
|
||||||
HIGH = "high"
|
|
||||||
MEDIUM = "medium"
|
|
||||||
LOW = "low"
|
|
15
config.yaml
Normal file
15
config.yaml
Normal file
@ -0,0 +1,15 @@
|
|||||||
|
# Audit system
|
||||||
|
system:
|
||||||
|
exclude_plugins:
|
||||||
|
- ""
|
||||||
|
postfix:
|
||||||
|
postfix_file: "/etc/postfix/master.cf"
|
||||||
|
sysctl:
|
||||||
|
sysctl_file: "/etc/sysctl.conf"
|
||||||
|
|
||||||
|
# Audit application
|
||||||
|
application:
|
||||||
|
pattern_file:
|
||||||
|
- ".py"
|
||||||
|
- ".php"
|
||||||
|
- ".c"
|
103
core/config.py
Normal file
103
core/config.py
Normal file
@ -0,0 +1,103 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
import yaml
|
||||||
|
from utils import ConfigError
|
||||||
|
|
||||||
|
# Constantes
|
||||||
|
HIGH = "high"
|
||||||
|
MEDIUM = "medium"
|
||||||
|
LOW = "low"
|
||||||
|
|
||||||
|
AUDIT_SYSTEM = [
|
||||||
|
"sysctl",
|
||||||
|
"postfix",
|
||||||
|
]
|
||||||
|
|
||||||
|
AUDIT_APPLICATION = [
|
||||||
|
'keywords',
|
||||||
|
'calls',
|
||||||
|
]
|
||||||
|
|
||||||
|
def generateConfig() -> dict:
|
||||||
|
config = dict()
|
||||||
|
# System
|
||||||
|
config["system"] = dict()
|
||||||
|
config["system"]["postfix"] = dict()
|
||||||
|
config["system"]["postfix"]["postfix_file"] = "/etc/postfix/main.cf"
|
||||||
|
config["system"]["sysctl"] = dict()
|
||||||
|
config["system"]["sysctl"]["sysctl_file"] = "/etc/sysctl.conf"
|
||||||
|
config["system"]["exclude_plugins"] = list()
|
||||||
|
# Application
|
||||||
|
config["application"] = dict()
|
||||||
|
config["application"]["pattern_file"] = list()
|
||||||
|
|
||||||
|
return config
|
||||||
|
|
||||||
|
def _get_exclude_plugins():
|
||||||
|
pass
|
||||||
|
|
||||||
|
def parsingConfigFile(filename, configs):
|
||||||
|
# This function overwrite the config
|
||||||
|
try:
|
||||||
|
if not filename.endswith(".yaml"):
|
||||||
|
raise ConfigError(
|
||||||
|
"You must specified a YAML config file",
|
||||||
|
filename
|
||||||
|
)
|
||||||
|
with open(filename, 'rb') as f:
|
||||||
|
yamlConfig = yaml.safe_load(f)
|
||||||
|
|
||||||
|
# Mapping config file to the config dict
|
||||||
|
# TODO: recursive function
|
||||||
|
for category in yamlConfig:
|
||||||
|
if "system" in category:
|
||||||
|
for plugin in yamlConfig["system"]:
|
||||||
|
if plugin not in configs["system"]:
|
||||||
|
raise ConfigError(
|
||||||
|
f"{plugin} unknown",
|
||||||
|
filename
|
||||||
|
)
|
||||||
|
for flag in yamlConfig["system"][plugin]:
|
||||||
|
try:
|
||||||
|
if flag is not None:
|
||||||
|
if isinstance(configs["system"][plugin], list):
|
||||||
|
configs["system"][plugin].append(flag)
|
||||||
|
else:
|
||||||
|
if flag not in configs["system"][plugin]:
|
||||||
|
raise ConfigError(
|
||||||
|
f"{flag} unknown",
|
||||||
|
filename
|
||||||
|
)
|
||||||
|
configs["system"][plugin][flag] = yamlConfig["system"][plugin][flag]
|
||||||
|
except TypeError as e:
|
||||||
|
raise e
|
||||||
|
|
||||||
|
#if "application" in category:
|
||||||
|
# for plugin in yamlConfig["application"]:
|
||||||
|
# for flag in yamlConfig["application"][plugin]:
|
||||||
|
# try:
|
||||||
|
# configs["application"][plugin][flag] = yamlConfig["application"][plugin][flag]
|
||||||
|
# except TypeError:
|
||||||
|
# pass
|
||||||
|
|
||||||
|
|
||||||
|
#fdata = f.read()
|
||||||
|
#lines = fdata.splitlines()
|
||||||
|
|
||||||
|
#for line in lines:
|
||||||
|
# line = line.decode('utf-8')
|
||||||
|
# try:
|
||||||
|
# sLine = line.split("=")
|
||||||
|
# flag = sLine[0].strip()
|
||||||
|
# value = sLine[1].strip()
|
||||||
|
|
||||||
|
# if flag in configs:
|
||||||
|
# if flag == "exclude_plugins":
|
||||||
|
# value = value.replace("\"", "")
|
||||||
|
# value = value.split(",")
|
||||||
|
# configs[flag] = value
|
||||||
|
# except IndexError:
|
||||||
|
# pass
|
||||||
|
|
||||||
|
except FileNotFoundError:
|
||||||
|
print(f"Config file {filename} not found. Bypass it")
|
20
core/dispatcher.py
Normal file
20
core/dispatcher.py
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
class Dispatcher:
|
||||||
|
_plugins = dict()
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def runPlugin(self, plugin, *args) -> dict:
|
||||||
|
"""
|
||||||
|
We run the puglin. The result of this function
|
||||||
|
is the report of the audit
|
||||||
|
"""
|
||||||
|
return self._plugins[plugin](self, *args)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def register_plugins(cls, plugin):
|
||||||
|
cls._plugins[plugin.__name__] = plugin
|
||||||
|
return plugin
|
||||||
|
|
65
core/main.py
Normal file
65
core/main.py
Normal file
@ -0,0 +1,65 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
|
||||||
|
from argparse import ArgumentParser
|
||||||
|
from core.sysctl import Sysctl
|
||||||
|
from core.postfix import Postfix
|
||||||
|
from core.report import generateHtmlReport
|
||||||
|
from core.config import AUDIT_SYSTEM, AUDIT_APPLICATION, generateConfig, parsingConfigFile
|
||||||
|
from core.dispatcher import Dispatcher
|
||||||
|
|
||||||
|
|
||||||
|
def checkArguments():
|
||||||
|
args = ArgumentParser(description="Check Gitlab repositories")
|
||||||
|
args.add_argument('-a', '--audit', help="Kind of audit", choices=['system', 'application'])
|
||||||
|
args.add_argument('-c', '--config', help="Config file")
|
||||||
|
return args.parse_args()
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
args = checkArguments()
|
||||||
|
|
||||||
|
# If audit is not specified
|
||||||
|
if args.audit is None:
|
||||||
|
print("Please, you must specify the audit type")
|
||||||
|
exit(1)
|
||||||
|
|
||||||
|
# If config file is specified
|
||||||
|
configs = generateConfig()
|
||||||
|
if args.config is not None:
|
||||||
|
parsingConfigFile(args.config, configs)
|
||||||
|
|
||||||
|
# Report
|
||||||
|
report = dict()
|
||||||
|
report['system'] = dict()
|
||||||
|
|
||||||
|
# Create our dispatcher
|
||||||
|
dispatcher = Dispatcher()
|
||||||
|
|
||||||
|
print(configs)
|
||||||
|
|
||||||
|
if args.audit == "system":
|
||||||
|
for audit in AUDIT_SYSTEM:
|
||||||
|
if audit not in configs["system"]["exclude_plugins"]:
|
||||||
|
report["system"][audit] = dispatcher.runPlugin(audit, configs["system"][audit])
|
||||||
|
|
||||||
|
if args.audit == "application":
|
||||||
|
pass
|
||||||
|
|
||||||
|
print(report)
|
||||||
|
generateHtmlReport(report)
|
||||||
|
|
||||||
|
@Dispatcher.register_plugins
|
||||||
|
def sysctl(*args) -> dict:
|
||||||
|
sysctl = Sysctl(args[1])
|
||||||
|
sysctl.runAudit()
|
||||||
|
return sysctl.getReports()
|
||||||
|
|
||||||
|
@Dispatcher.register_plugins
|
||||||
|
def postfix(*args) -> dict:
|
||||||
|
arguments = args[1]
|
||||||
|
postfix = Postfix()
|
||||||
|
postfix.runAudit()
|
||||||
|
return postfix.getReports()
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
@ -1,7 +1,7 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
from parsing.postfix import Parsing
|
from audit.system.plugins.postfix.parsing import Parsing
|
||||||
from issues.postfix import postfix
|
from audit.system.plugins.postfix.postfix import postfix
|
||||||
|
|
||||||
class Postfix:
|
class Postfix:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
@ -14,3 +14,9 @@ class Postfix:
|
|||||||
|
|
||||||
def _postfix(self):
|
def _postfix(self):
|
||||||
self._objects = postfix()
|
self._objects = postfix()
|
||||||
|
|
||||||
|
def runAudit(self):
|
||||||
|
print("Running test for postfix")
|
||||||
|
|
||||||
|
def getReports(self) -> dict:
|
||||||
|
return self._reports
|
||||||
|
@ -4,27 +4,24 @@ from datetime import datetime
|
|||||||
|
|
||||||
def generateHtmlReport(data):
|
def generateHtmlReport(data):
|
||||||
today = datetime.now().isoformat()[0:10].replace("-", "_")
|
today = datetime.now().isoformat()[0:10].replace("-", "_")
|
||||||
html = "<!doctype html>" \
|
html = _getHeader()
|
||||||
"<html>" \
|
html += "<body>" \
|
||||||
"<head>" \
|
|
||||||
"</head>" \
|
|
||||||
"<body>" \
|
|
||||||
f"<h1>Reports of {today}</h1>"
|
f"<h1>Reports of {today}</h1>"
|
||||||
|
|
||||||
body = str()
|
body = str()
|
||||||
# For sysctl
|
# For sysctl
|
||||||
for entry in data['sysctl']:
|
#for entry in data['sysctl']:
|
||||||
body += f"<h2>Sysctl</h2>"
|
# body += f"<h2>Sysctl</h2>"
|
||||||
|
|
||||||
# For file
|
# # For file
|
||||||
body += f"<h3>File</h3>"
|
# body += f"<h3>File</h3>"
|
||||||
for f in data['sysctl']['file']:
|
#for f in data['sysctl']['file']:
|
||||||
body += f"<h4>{data['sysctl']['file']['filename']}</h4>"
|
# body += f"<h4>{data['sysctl']['file']['filename']}</h4>"
|
||||||
for vul in data['sysctl']['file']['sysctl']:
|
# for vul in data['sysctl']['file']['sysctl']:
|
||||||
#print(data['sysctl']['file']['sysctl'][vul])
|
# #print(data['sysctl']['file']['sysctl'][vul])
|
||||||
body += f"<h5>{vul}</h5>"
|
# body += f"<h5>{vul}</h5>"
|
||||||
body += f"<p>"
|
# body += f"<p>"
|
||||||
body += f"Results:<br />"
|
# body += f"Results:<br />"
|
||||||
#for result in data['sysctl']['file']['sysctl'][vul]:
|
#for result in data['sysctl']['file']['sysctl'][vul]:
|
||||||
# print(result)
|
# print(result)
|
||||||
# body += f"Line: {result['lineNumber']}<br />"
|
# body += f"Line: {result['lineNumber']}<br />"
|
||||||
@ -39,3 +36,10 @@ def generateHtmlReport(data):
|
|||||||
with open(f"reports/reports_{today}.html", "w") as f:
|
with open(f"reports/reports_{today}.html", "w") as f:
|
||||||
f.write(html)
|
f.write(html)
|
||||||
|
|
||||||
|
def _getHeader() -> str:
|
||||||
|
header = "<!doctype html>" \
|
||||||
|
"<html>" \
|
||||||
|
"<head>" \
|
||||||
|
"</head>" \
|
||||||
|
|
||||||
|
return header
|
@ -1,18 +1,18 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
from parsing.sysctl import Parsing
|
from audit.system.plugins.sysctl.parsing import Parsing
|
||||||
from issues.sysctl import sysctl
|
from audit.system.plugins.sysctl.sysctl import sysctl
|
||||||
|
|
||||||
|
|
||||||
class Sysctl:
|
class Sysctl:
|
||||||
def __init__(self):
|
def __init__(self, args):
|
||||||
self._objects = dict()
|
self._objects = dict()
|
||||||
self._reports = dict()
|
self._reports = dict()
|
||||||
self._audit = list()
|
self._audit = list()
|
||||||
|
|
||||||
self._audit.append({
|
self._audit.append({
|
||||||
'audit': 'file',
|
'audit': 'file',
|
||||||
'value': '/etc/sysctl.conf',
|
'value': args["sysctl_file"],
|
||||||
})
|
})
|
||||||
self._audit.append({
|
self._audit.append({
|
||||||
'audit': 'process',
|
'audit': 'process',
|
||||||
@ -27,6 +27,7 @@ class Sysctl:
|
|||||||
self._objects = sysctl()
|
self._objects = sysctl()
|
||||||
|
|
||||||
def runAudit(self):
|
def runAudit(self):
|
||||||
|
print("Running test for sysctl")
|
||||||
# Read /etc/sysctl.conf
|
# Read /etc/sysctl.conf
|
||||||
self._parsing.runParsing()
|
self._parsing.runParsing()
|
||||||
#self._reports.append(self._parsing.getResults())
|
#self._reports.append(self._parsing.getResults())
|
||||||
|
1
design_pattern.txt
Normal file
1
design_pattern.txt
Normal file
@ -0,0 +1 @@
|
|||||||
|
https://refactoring.guru/design-patterns/python
|
@ -1,6 +0,0 @@
|
|||||||
#!/usr/bin/env python3
|
|
||||||
|
|
||||||
def system() -> list:
|
|
||||||
system = list()
|
|
||||||
|
|
||||||
return system
|
|
36
main.py
36
main.py
@ -1,41 +1,7 @@
|
|||||||
#!/usr/bin/env python
|
#!/usr/bin/env python
|
||||||
|
|
||||||
from argparse import ArgumentParser
|
from core.main import main
|
||||||
from core.sysctl import Sysctl
|
|
||||||
from core.postfix import Postfix
|
|
||||||
from report import generateHtmlReport
|
|
||||||
|
|
||||||
|
|
||||||
def checkArguments():
|
|
||||||
args = ArgumentParser(description="Check Gitlab repositories")
|
|
||||||
args.add_argument('-a', '--audit', help="Kind of audit", choices=['system', 'application'])
|
|
||||||
return args.parse_args()
|
|
||||||
|
|
||||||
def main():
|
|
||||||
args = checkArguments()
|
|
||||||
|
|
||||||
# If audit is not specified
|
|
||||||
if args.audit is None:
|
|
||||||
print("Please, you must specify the audit type")
|
|
||||||
exit(1)
|
|
||||||
|
|
||||||
# Report
|
|
||||||
report = dict()
|
|
||||||
report['system'] = None
|
|
||||||
|
|
||||||
# Audit application
|
|
||||||
if args.audit == "application":
|
|
||||||
pass
|
|
||||||
|
|
||||||
# Audit system
|
|
||||||
if args.audit == "system":
|
|
||||||
sysctl = Sysctl()
|
|
||||||
sysctl.runAudit()
|
|
||||||
|
|
||||||
# Getting reports
|
|
||||||
report['sysctl'] = sysctl.getReports()
|
|
||||||
|
|
||||||
generateHtmlReport(report)
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
||||||
|
41
parsing.py
41
parsing.py
@ -1,41 +0,0 @@
|
|||||||
import re
|
|
||||||
from json import dumps
|
|
||||||
|
|
||||||
class Parsing:
|
|
||||||
def __init__(self, objects, audit):
|
|
||||||
self._parsing = dict()
|
|
||||||
self._results = dict()
|
|
||||||
self._objects = objects
|
|
||||||
self._audit = audit
|
|
||||||
|
|
||||||
def runParsing(self):
|
|
||||||
for audit in self._audit:
|
|
||||||
if audit['audit'] == 'file':
|
|
||||||
with open(audit['value'], 'rb') as fdata:
|
|
||||||
self._parseFile(fdata)
|
|
||||||
|
|
||||||
def _parseFile(self, fdata):
|
|
||||||
data = fdata.read()
|
|
||||||
lines = data.splitlines()
|
|
||||||
|
|
||||||
for line in lines:
|
|
||||||
self._parsingFile(line, self._objects['sysctl'])
|
|
||||||
|
|
||||||
def _parsingFile(self, line, item) -> dict:
|
|
||||||
"""
|
|
||||||
This function parse the line and try to find the item in it
|
|
||||||
"""
|
|
||||||
res = None
|
|
||||||
|
|
||||||
line = line.decode("utf-8")
|
|
||||||
for entry in item:
|
|
||||||
groupLine = re.search(entry['item'], line)
|
|
||||||
if groupLine:
|
|
||||||
sLine = line.split('=')
|
|
||||||
|
|
||||||
return res
|
|
||||||
|
|
||||||
def getResults(self) -> dict:
|
|
||||||
result = dict()
|
|
||||||
|
|
||||||
return result
|
|
2
requirements.txt
Normal file
2
requirements.txt
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
pyyaml
|
||||||
|
jinja2
|
8
utils.py
8
utils.py
@ -4,6 +4,14 @@ import re
|
|||||||
from subprocess import run
|
from subprocess import run
|
||||||
|
|
||||||
|
|
||||||
|
class ConfigError(Exception):
|
||||||
|
"""Raised when the config file fails validation."""
|
||||||
|
|
||||||
|
def __init__(self, message, config_file):
|
||||||
|
self.config_file = config_file
|
||||||
|
self.message = f"{config_file} : {message}"
|
||||||
|
super().__init__(self.message)
|
||||||
|
|
||||||
def identifySystem():
|
def identifySystem():
|
||||||
os = None
|
os = None
|
||||||
with open('/etc/issue', 'r') as f:
|
with open('/etc/issue', 'r') as f:
|
||||||
|
Loading…
Reference in New Issue
Block a user