First commit

This commit is contained in:
geoffrey 2024-07-02 09:12:58 +02:00
commit f0be826c6e
13 changed files with 214983 additions and 0 deletions

3
.gitignore vendored Normal file

@ -0,0 +1,3 @@
.**.swp
__pycache__/**
config

15
README.md Normal file

@ -0,0 +1,15 @@
# Introduction
This tool can help SOC analyst to identify any threat
# Implementation
First, you should create a virtualenv:
```
$ virtualenv ~/venv/baoSOC
$ source ~/venv/baoSOC/bin/activate
```
And install all packages the tool need:
```
$ pip install -r requirements.txt
```

21
config.py Normal file

@ -0,0 +1,21 @@
#!/usr/bin/venv python
# -*- coding: utf-8 -*-
PROJECT_NAME = "baoSOC"
VT_ATTRIBUTES_MAPPING = {
'asn': 'str',
'as_owner': 'int',
'continent': 'str',
'country': 'str',
'last_analysis_date': 'date',
'regional_internet_registry': 'str',
'network': 'str',
'ip': 'str'
}
DNS_QUERIES_TYPE = {
'A': 'address',
'MX': ['exchange', 'preference'],
'TXT': 'strings',
}

74
dnschecker.py Normal file

@ -0,0 +1,74 @@
#!/usr/bin/venv python
# -*- coding: utf-8 -*-
import whois
import dns.resolver
import dns.name
from config import DNS_QUERIES_TYPE
class DNSChecker:
def __init__(self, api_key, fqdn, rrtype=DNS_QUERIES_TYPE):
self._fqdn = fqdn
self._rrtype = rrtype
def checkDomainExist(self):
"""
This function check if the domain exist
"""
try:
res_query = dns.resolver.resolve(self._fqdn, 'NS')
except dns.resolver.NoAnswer:
return False
except dns.resolver.NXDOMAIN:
return False
return True
def whois(self):
"""
This function will get an whois request for having some information
regarding the domain
"""
report = dict()
w = whois.whois(self._fqdn)
report['domain_name'] = w.domain_name
report['expiration_date'] = w.expiration_date
report['creation_date'] = w.creation_date
report['updated_date'] = w.updated_date
report['ns'] = w.name_servers
report['admin_name'] = w.admin_name
report['registrar'] = w.registrar
return report
def resolver(self):
"""
This function will resolv the FQDN with different type of RR
"""
report = dict()
for t in self._rrtype.keys():
report[t] = self._resolving(self._fqdn, t, self._rrtype[t])
return report
def _resolving(self, fqdn, t, attr):
report = list()
try:
res_query = dns.resolver.resolve(fqdn, t)
for rdata in res_query:
if isinstance(attr, list):
data = dict()
for a in attr:
data[a] = getattr(rdata, a)
report.append(data)
else:
report.append({
attr: getattr(rdata, attr)
})
except dns.resolver.NoAnswer:
pass
except dns.resolver.NXDOMAIN:
report.append({'error': 'Domain not exist'})
except dns.resolver.NoNameservers:
report.append({'error': 'Domain not exist'})
return report

36
emailchecker.py Normal file

@ -0,0 +1,36 @@
#!/usr/bin/env python3
from requests import get
class EmailChecker:
def __init__(self, key, email):
self._url = "https://emailrep.io"
self._headers = {
'Key': key,
'accept': 'application/json',
}
self._email = email
def reportEmailRep(self):
"""
This function get the report of the email
"""
report = dict()
res = get(
f"{self._url}/{self._email}",
headers=self._headers
)
js = res.json()
if res.status_code == 401:
report['error'] = js['reason']
return report
if res.status_code != 200:
report['error'] = 'Failed to get the report of the email'
return report
report['reputation'] = js['reputation']
report['suspicious'] = js['suspicious']
return report

38
hashing.py Normal file

@ -0,0 +1,38 @@
#!/usr/bin/env python3
import hashlib
class Hash:
def __init__(self):
pass
def hashMd5(self, f):
md5 = hashlib.md5()
with open(f, "rb") as data:
md5.update(data.read())
return md5.hexdigest()
def hashSha1(self, f):
sha1 = hashlib.sha1()
with open(f, "rb") as data:
sha1.update(data.read())
return sha1.hexdigest()
def hashSha256(self, f):
sha256 = hashlib.sha256()
with open(f, "rb") as data:
sha256.update(data.read())
return sha256.hexdigest()
def hashSha384(self, f):
sha384 = hashlib.sha384()
with open(f, "rb") as data:
sha384.update(data.read())
return sha384.hexdigest()
def hashSha512(self, f):
sha512 = hashlib.sha512()
with open(f, "rb") as data:
sha512.update(data.read())
return sha512.hexdigest()

78
macchecker.py Normal file

@ -0,0 +1,78 @@
#!/usr/bin/env python3
from requests import get
from os.path import isfile
from re import compile, match
class MACChecker:
def __init__(self):
self._url = "https://standards-oui.ieee.org/oui/oui.txt"
self._ouiTextFile = "oui.txt"
def updateOUIDb(self):
print("Updating the OUI database from IEEE")
report = dict()
# We download the data from IEEE
oui = get(self._url)
if oui.status_code != 200:
report['success'] = False
exit(1)
with open(self._ouiTextFile, "w") as f:
f.write(oui.text)
report['success'] = True
return report
def parseMACAddress(self, mac):
report = dict()
report['hw'] = mac
if ':' in mac:
mac = mac.replace(":", "-")
# Check if mac is valid
regex = "[a-fA-F0-9]{2}-[a-fA-F0-9]{2}-[a-fA-F0-9]{2}-[a-fA-F0-9]{2}-[a-fA-F0-9]{2}-[a-fA-F0-9]{2}"
compiled = compile(regex)
if not compiled.match(mac):
print("Not a valid MAC address")
return
# Check if the oui.txt file exist
if not isfile(self._ouiTextFile):
report['db'] = self.updateOUIDb()
entries = self._parseOUIFile()
macSplitted = mac.split("-")
oui = macSplitted[0] + macSplitted[1] + macSplitted[2]
oui = oui.upper()
report['oui'] = oui
f = self._parseOUIFile()
for entries in f.keys():
if oui == entries:
report['vendor'] = f[entries][0]
return report
def _parseOUIFile(self):
data = list()
entries = dict()
with open(self._ouiTextFile, "r") as f:
data = f.readlines()
# Remove the "header" on the file
d = data[4:]
regex = "[a-zA-Z0-9]{6}"
compiled = compile(regex)
for entry in d:
s_entry = entry.split("\t")
s = len(s_entry)
if compiled.match(s_entry[0]):
oui = s_entry[0].split(" ")
entries[oui[0]] = s_entry[s - 1: s]
return entries

338
main.py Normal file

@ -0,0 +1,338 @@
#!/usr/bin/venv python
# -*- coding: utf-8 -*-
from argparse import ArgumentParser
from config import VT_ATTRIBUTES_MAPPING, PROJECT_NAME
from vt import VT
from dnschecker import DNSChecker as DNS
from emailchecker import EmailChecker
from macchecker import MACChecker
import ipaddress
from datetime import datetime
from hashing import Hash
from os.path import exists
def checkArguments():
parser = ArgumentParser(description=PROJECT_NAME)
parser.add_argument('-c', '--config', help='Config file')
# For dns command
parser.add_argument('--dns', help='Get domain name information', action="store_true")
parser.add_argument('--domain', help='Get domain name information')
parser.add_argument('--host', help='Get domain name information')
parser.add_argument('--ip', help='Get IP information')
# For hash command
parser.add_argument('--hashfile', help='Hash file', action='store_true')
parser.add_argument('--scanvt', help='If specified, scan the hash with VirusTotal', action='store_true')
parser.add_argument('--md5', help='Hash file')
parser.add_argument('--sha1', help='Hash file')
parser.add_argument('--sha256', help='Hash file')
parser.add_argument('--sha384', help='Hash file')
parser.add_argument('--sha512', help='Hash file')
parser.add_argument('--hash', help='Get information about the hash')
# For email command
parser.add_argument('--email', help='Get email reputation', action='store_true')
parser.add_argument('--emailrep', help='Get email reputation')
# For mac command
parser.add_argument('--mac', help='Get mac information')
parser.add_argument('--macdb', help='Update database of OUI', action="store_true")
return parser.parse_args()
def usage():
print("------------------------------")
print(f"| {PROJECT_NAME} |")
print("------------------------------\n")
print("A tool for SOC analyst\n")
print("Usage: main.py [COMMAND]")
print("-c PATH, --config PATH\t\tConfig file - mandatory")
print("--hashfile\t\t\tHash the file and check in VirusTotal")
print("--hash HASH\t\t\tAnalyse the hash from VirusTotal")
print("--dns \t\t\t\tGet information regarding the domain with whois and VirusTotal")
print("--email\t\t\t\tGet informations about an email and check if has been compromised")
print("\n--dns command:")
print("\t --domain FQDN\t\tScan and get domain information")
print("\t --host HOST\t\tScan and get host information")
print("\t --ip IP\t\tScan and get IP information")
print("\n--hashfile command:")
print("\t --md5 FILE\t\tGet the MD5 hash of the file")
print("\t --sha1 FILE\t\tGet the SHA256 of the file")
print("\t --sha256 FILE\t\tGet the SHA256 of the file")
print("\t --sha384 FILE\t\tGet the SHA384 of the file")
print("\t --sha512 FILE\t\tGet the SHA512 of the file")
print("\n--email command")
print("\t --emailrep\t\tGet the email reputation report")
print("\n--mac command")
print("--mac MAC\t\t\tGet mac information")
print("--macdb\t\t\t\tUpdate the OUI database")
def readConfigFile(config):
"""
This function read the config file
"""
data = {}
try:
with open(config, 'r') as f:
lines = f.readlines()
# Split each line into te dictionary
for line in lines:
l = line.split(":")
lineParsed = l[1].replace(" ", "")
lineParsed = lineParsed.replace("\n", "")
data[l[0]] = lineParsed
except FileNotFoundError:
return None
return data
def main():
args = checkArguments()
if not args.config:
usage()
exit(1);
# Read the config file
config = readConfigFile(args.config)
if config is None:
print("Failed to read the config file")
exit(0)
report = dict()
# Analyse DNS
if args.dns:
if args.domain:
_parsingDomain(config, args.domain, report)
if args.host:
_parsingHost(config, args.host, report)
if args.ip:
_parsingIP(config, args.ip, report)
# Analyse hash file
if args.hashfile:
h = Hash()
dispatcher = {
"MD5": h.hashMd5,
"SHA1": h.hashSha1,
"SHA256": h.hashSha256,
"SHA384": h.hashSha384,
"SHA512": h.hashSha512,
}
if args.md5:
hashType = "MD5"
filename = args.md5
if args.sha1:
hashType = "SHA1"
filename = args.sha1
if args.sha256:
hashType = "SHA256"
filename = args.sha256
if args.sha384:
hashType = "SHA384"
filename = args.sha384
if args.sha512:
hashType = "SHA512"
filename = args.sha512
if not exists(filename):
print(f"File {filename} do not exist")
else:
res = dispatcher[hashType](filename)
print(f"{hashType} hash: {res}")
if args.scanvt:
_parsingHash(config, res, report)
# Analyse the hash
if args.hash:
_parsingHash(config, args.hash, report)
# Analyse the email
if args.email:
if args.emailrep:
_parsingEmail(config, args.emailrep)
# Analyse mac address
if args.macdb:
_parseMACAddress(mac=None, db=True)
if args.mac:
_parseMACAddress(mac=args.mac)
def _parseMACAddress(mac=None, db=False):
macchecker = MACChecker()
report = dict()
if db:
report['db'] = macchecker.updateOUIDb()
if mac is not None:
report['mac'] = macchecker.parseMACAddress(mac)
print("----------------------------")
print("| MAC report |")
print("----------------------------")
if 'db' in report:
print(f"The update of the OUI db: {report['db']['success']}")
if 'mac' in report:
print(f"MAC address: {report['mac']['hw']}")
print(f"OUI: {report['mac']['oui']}")
print(f"Vendor: {report['mac']['vendor']}")
def _parsingEmail(config, email):
# Check if the email specified is correct
regex = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b'
if not search(regex, email):
print("Please, specify a valid email address")
return
report = dict()
emailChecker = EmailChecker(config['api_key_emailrep'], email)
report['emailrep'] = emailChecker.reportEmailRep()
print("----------------------------")
print("| Email reputation |")
print("----------------------------")
if 'error' in report['emailrep']:
print(f"Error: {report['emailrep']['error']}")
return
emailrep = report['emailrep']
print(f"Reputation: {emailrep['reputation']}")
print(f"Suspicious: {emailrep['suspicious']}")
def _parsingHash(config, h, report):
report = dict()
vt = VT(config['api_key_vt'])
vt.getRateFromHash(h, report)
print("\n----------------------")
print("| VirusTotal |")
print("----------------------")
if 'error' in report:
print("Error, can not find any informations")
return
# File
print("\nFile information:")
f = report['results']['file']
print(f"\tMD5: {f['md5']}")
print(f"\tSHA1: {f['sha1']}")
print(f"\tMagic: {f['magic']}")
print(f"\tFile type: {f['filetype']}")
print(f"\tSize: {f['size']}")
print(f"\tExtension: {f['extension']}")
print(f"\tFirst seen: {f['first_seen']}")
print(f"\tLast analysis: {f['last_analysis']}")
# Vendors
vendors = report['results']['vendors']
print("\nVendors:")
print(f"\tTotal vendors: {vendors['total_vendors']}")
print(f"\tCategorized as malicious: {vendors['total_malicious']}\n")
for key in vendors['result']:
print(f"\t{key}: {vendors['result'][key]}")
def _parsingIP(config, ip, report):
# Check if it's an IP address
vt = VT(config['api_key_vt'])
try:
ip_obj = ipaddress.ip_address(ip)
except ValueError:
print("The IP is not valid")
return
report['ip'] = dict()
report['ip'] = vt.getIPVirusTotal(ip)
if 'error' not in report['ip']:
print("----------------------------")
print("| IP Informations |")
print("----------------------------")
for vt in VT_ATTRIBUTES_MAPPING.keys():
try:
vtAttributes = VT_ATTRIBUTES_MAPPING[vt]
if 'date' in vtAttributes:
value = datetime.fromtimestamp(int(report['ip'][vt]))
else:
value = report['ip'][vt]
print(f"{vt}: {value}")
except KeyError:
print(f"Cannot find the key {vt}")
def _parsingHost(config, fqdn, report):
vt = VT(config['api_key_vt'])
dns = DNS(config['api_key_vt'], fqdn, {'A': 'address'})
# Resolv and print results
report['resolving'] = dns.resolver()
_printDNSResolving(report['resolving'])
# Print VirusTotal
report['vt'] = dict()
vt.getDomainReport(fqdn, report['vt'])
_printDNSVirusTotal(report['vt'])
def _parsingDomain(config, fqdn, report):
vt = VT(config['api_key_vt'])
dns = DNS(config['api_key_vt'], fqdn)
# Check if domain exist
if not dns.checkDomainExist():
print(f"The domain {fqdn} do not exist")
return
# Resolving domain
report['resolving'] = dns.resolver()
_printDNSResolving(report['resolving'])
# Whois request and print the result
report['whois'] = dns.whois()
_printDNSWhois(report['whois'])
# Print VirusTotal
report['vt'] = dict()
vt.getDomainReport(fqdn, report['vt'])
_printDNSVirusTotal(report['vt'])
def _printDNSResolving(report):
print("----------------------")
print("| resolving |")
print("----------------------")
for key in report.keys():
print(f"{key}: ")
for entry in report[key]:
for subkey in entry.keys():
value = entry[subkey]
if isinstance(value, bytes):
value = value.decode()
print(f"\t{subkey}: {value}")
def _printDNSVirusTotal(report):
print("\n----------------------")
print("| VirusTotal |")
print("----------------------")
for key in report:
print(f"{key}: {report[key]}")
def _printDNSWhois(report):
print("\n----------------------")
print("| whois |")
print("----------------------")
for key in report.keys():
if isinstance(report[key], list):
print(f"{key}:")
for value in report[key]:
print(f"\t{value}")
else:
print(f"{key}: {report[key]}")
if __name__ == "__main__":
main()

214059
oui.txt Normal file

File diff suppressed because it is too large Load Diff

157
reports.py Normal file

@ -0,0 +1,157 @@
#!/usr/bin/venv python
# -*- coding: utf-8 -*-
from datetime import datetime
from os import mkdir
from config import VT_ATTRIBUTES_MAPPING
import jinja2
def generateHtmlReport(data):
env = jinja2.Environment(
loader=jinja2.FileSystemLoader("reports/templates"),
autoescape=jinja2.select_autoescape()
)
_queriesReport(data['queries'], env)
_graphicsReport(data['graphics'], env)
_vtReport(data['vt'], env)
_dnsTunnelingReports(data['dnstunneling'], env)
def _indexReport():
pass
def _queriesReport(queries, env):
"""
This function generate the report for queries
"""
today = getTodayDate()
dataJinja2 = dict()
dataJinja2['title'] = 'Queries'
dataJinja2['year'] = '2023'
dataJinja2['queries'] = queries
tmpl = env.get_template('queries.html.j2')
render = tmpl.render(data=dataJinja2)
with open(f"reports/{today}/reports_queries.html", "w") as f:
f.write(render)
def _graphicsReport(graphics, env):
today = getTodayDate()
dataJinja2 = dict()
dataJinja2['title'] = 'Graphics'
dataJinja2['year'] = '2023'
dataJinja2['graphics'] = graphics
tmpl = env.get_template('graphics.html.j2')
render = tmpl.render(data=dataJinja2)
with open(f"reports/{today}/reports_graphics.html", "w") as f:
f.write(render)
def _vtReport(vt, env):
today = getTodayDate()
# For testing
#vt = list()
#vt.append({
# 'ip': '1.2.3.4',
# 'asn': 3215,
# 'as_owner': 'Orange',
# 'continent': 'EU',
# 'country': 'FR',
# 'last_analysis_date': 1686839532,
# 'regional_internet_registry': 'RIPE NCC',
# 'network': '1.2.3.0/24'
#})
#vt.append({
# 'ip': '2.2.2.1',
# 'asn': 3215,
# 'as_owner': 'Orange',
# 'continent': 'EU',
# 'country': 'FR',
# 'last_analysis_date': 1686839532,
# 'regional_internet_registry': 'RIPE NCC',
# 'network': '2.2.2.0/24'
#})
#vt.append({
# 'ip': '3.3.3.1',
# 'asn': 3215,
# 'as_owner': 'Orange',
# 'continent': 'EU',
# 'country': 'FR',
# 'last_analysis_date': 1686839532,
# 'regional_internet_registry': 'RIPE NCC',
# 'network': '3.3.3.0/24'
#})
dataJinja2 = dict()
dataJinja2['title'] = 'VirusTotal'
dataJinja2['year'] = '2023'
dataJinja2['vt'] = list()
tmpl = env.get_template('vt.html.j2')
body = str()
for entry in vt:
vtEntry = dict()
if 'error' not in entry:
for vt in VT_ATTRIBUTES_MAPPING.keys():
try:
vtAttributes = VT_ATTRIBUTES_MAPPING[vt]
if 'date' in vtAttributes:
value = datetime.fromtimestamp(int(entry[vt]))
else:
value = entry[vt]
vtEntry[vt] = value
except KeyError:
pass
dataJinja2['vt'].append({
'ip': entry['ip'],
'data': vtEntry
})
render = tmpl.render(data=dataJinja2)
with open(f"reports/{today}/reports_vt.html", "w") as f:
f.write(render)
def _dnsTunnelingReports(dnstunneling, env):
today = getTodayDate()
dataJinja2 = dict()
dataJinja2['title'] = 'DNS Tunneling'
dataJinja2['year'] = '2023'
dataJinja2['dnstunneling'] = dnstunneling
tmpl = env.get_template('dnsTunneling.html.j2')
render = tmpl.render(data=dataJinja2)
with open(f"reports/{today}/reports_dns_tunneling.html", "w") as f:
f.write(render)
def createReportsDirectory():
"""
This function will create the reports directory
Return the report name or None if failed
"""
today = getTodayDate()
name = f"reports/{today}"
try:
mkdir(name)
except FileExistsError:
print("Reports directory already created")
return name
return name
def getTodayDate():
"""
This function genrate the today datetime at this format:
year_month_day
"""
return datetime.now().isoformat()[0:10].replace("-", "_")

10
requirements.txt Normal file

@ -0,0 +1,10 @@
pip
scapy
ArgumentParser
matplotlib
numpy
vt-py
requests
jinja2
python-whois
git+https://github.com/rthalley/dnspython.git

31
tests/oui.py Normal file

@ -0,0 +1,31 @@
#!/usr/bin/env python3
import re
import requests
data = str()
# We download the data from IEEE
oui = requests.get("https://standards-oui.ieee.org/oui/oui.txt")
if oui.status_code != 200:
exit(1)
# Convert to list
l = list()
line = str()
for c in oui.text:
line += c
if c == "\n":
l.append(line)
line = str()
# Remove the "header" on the file
d = l[4:]
# We get all OUI
regex = "[a-zA-Z0-9]{6}"
compiled = re.compile(regex)
for entry in d:
s_entry = entry.split(" ")
if compiled.match(s_entry[0]):
print(s_entry[0])

123
vt.py Normal file

@ -0,0 +1,123 @@
#!/ur/bin/env python3
import requests
from config import VT_ATTRIBUTES_MAPPING
from datetime import datetime
class VT:
def __init__(self, api_key):
self._url = "https://www.virustotal.com/api/v3"
self._headers = {
'x-apikey': api_key,
}
def getIPVirusTotal(self, ip):
"""
This function get IP information from VirusTotal
"""
res = requests.get(
f"{self._url}/ip_addresses/{ip}",
headers=self._headers
).json()
data = dict()
data['ip'] = ip
if 'error' in res:
report.append({
'error': res['error']['message'],
'ip': ip
})
return
vt = res['data']['attributes']
for entry in VT_ATTRIBUTES_MAPPING.keys():
if entry in vt:
try:
data[entry] = vt[entry]
except KeyError:
data[entry] = 'Unknown'
return data
def getDomainReport(self, fqdn, report):
"""
This function get the report for the specific domain
"""
res = requests.get(
f"{self._url}/domains/{fqdn}",
headers=self._headers
)
js = res.json()
report['reputation'] = js['data']['attributes']['reputation']
report['last_update'] = \
datetime.fromtimestamp(js['data']['attributes']['last_update_date'])
# Get number of security vendors
report['total_vendors'] = 0
report['clean'] = 0
report['unrated'] = 0
report['malicious'] = 0
vendors = js['data']['attributes']['last_analysis_results']
for entry in vendors:
report['total_vendors'] += 1
if vendors[entry]['result'] == 'clean':
report['clean'] += 1
elif vendors[entry]['result'] == 'unrated':
report['unrated'] += 1
elif vendors[entry]['result'] == 'malicious':
report['malicious'] += 1
def getRateFromHash(self, h, report):
"""
This function get the report of the hash specified by the parameter h
"""
headers = self._headers
res = requests.get(
f"{self._url}/files/{h}",
headers=self._headers
).json()
if 'error' in res:
report["error"] = "Can not find the result"
return
attributes = res['data']['attributes']
report['results'] = dict()
report['results']['file'] = dict()
report['results']['file']['magic'] = attributes['magic']
report['results']['file']['sha1'] = attributes['sha1']
report['results']['file']['md5'] = attributes['md5']
report['results']['file']['filetype'] = attributes['detectiteasy']['filetype']
report['results']['file']['size'] = attributes['size']
report['results']['file']['extension'] = attributes['type_extension']
try:
report['results']['file']['first_seen'] = \
datetime.fromtimestamp(attributes['first_seen_itw_date'])
except KeyError:
report['results']['file']['first_seen'] = "Unknown"
report['results']['file']['last_analysis'] = \
datetime.fromtimestamp(attributes['last_analysis_date'])
# Identify vendors
report['results']['vendors'] = dict()
report['results']['vendors']['total_vendors'] = 0
report['results']['vendors']['total_malicious'] = 0
report['results']['vendors']['result'] = dict()
report['results']['vendors']['result']['undetected'] = 0
results = res['data']['attributes']['last_analysis_results']
for entry in results:
report['results']['vendors']['total_vendors'] += 1
if results[entry]['category'] == 'undetected':
report['results']['vendors']['result']['undetected'] += 1
else:
result = results[entry]['result']
if result not in report['results']['vendors']['result']:
report['results']['vendors']['result'][result] = 0
report['results']['vendors']['result'][result] += 1
report['results']['vendors']['total_malicious'] += 1