From 0a87d84fd9fd9fbe9b3d53f91277e948fa1329c2 Mon Sep 17 00:00:00 2001 From: geoffrey Date: Sat, 22 Jun 2024 17:45:40 +0200 Subject: [PATCH] First commit --- .gitignore | 3 + README.md | 15 +++ config.py | 21 ++++ dnschecker.py | 74 +++++++++++ emailchecker.py | 36 ++++++ hashing.py | 38 ++++++ main.py | 318 +++++++++++++++++++++++++++++++++++++++++++++++ reports.py | 157 +++++++++++++++++++++++ requirements.txt | 10 ++ vt.py | 123 ++++++++++++++++++ 10 files changed, 795 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 config.py create mode 100644 dnschecker.py create mode 100644 emailchecker.py create mode 100644 hashing.py create mode 100644 main.py create mode 100644 reports.py create mode 100644 requirements.txt create mode 100644 vt.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f0f0cee --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +.**.swp +__pycache__/** +config diff --git a/README.md b/README.md new file mode 100644 index 0000000..919d7f6 --- /dev/null +++ b/README.md @@ -0,0 +1,15 @@ +# Introduction +This tool can help SOC analyst to identify any threat + +# Implementation +First, you should create a virtualenv: + +``` +$ virtualenv ~/venv/baoSOC +$ source ~/venv/baoSOC/bin/activate +``` + +And install all packages the tool need: +``` +$ pip install -r requirements.txt +``` diff --git a/config.py b/config.py new file mode 100644 index 0000000..a94cfd5 --- /dev/null +++ b/config.py @@ -0,0 +1,21 @@ +#!/usr/bin/venv python +# -*- coding: utf-8 -*- + +PROJECT_NAME = "baoSOC" + +VT_ATTRIBUTES_MAPPING = { + 'asn': 'str', + 'as_owner': 'int', + 'continent': 'str', + 'country': 'str', + 'last_analysis_date': 'date', + 'regional_internet_registry': 'str', + 'network': 'str', + 'ip': 'str' +} + +DNS_QUERIES_TYPE = { + 'A': 'address', + 'MX': ['exchange', 'preference'], + 'TXT': 'strings', +} diff --git a/dnschecker.py b/dnschecker.py new file mode 100644 index 0000000..5902975 --- /dev/null +++ b/dnschecker.py @@ -0,0 +1,74 @@ +#!/usr/bin/venv python +# -*- coding: utf-8 -*- + +import whois +import dns.resolver +import dns.name +from config import DNS_QUERIES_TYPE + + +class DNSChecker: + def __init__(self, api_key, fqdn, rrtype=DNS_QUERIES_TYPE): + self._fqdn = fqdn + self._rrtype = rrtype + + def checkDomainExist(self): + """ + This function check if the domain exist + """ + try: + res_query = dns.resolver.resolve(self._fqdn, 'NS') + except dns.resolver.NoAnswer: + return False + except dns.resolver.NXDOMAIN: + return False + return True + + def whois(self): + """ + This function will get an whois request for having some information + regarding the domain + """ + report = dict() + w = whois.whois(self._fqdn) + report['domain_name'] = w.domain_name + report['expiration_date'] = w.expiration_date + report['creation_date'] = w.creation_date + report['updated_date'] = w.updated_date + report['ns'] = w.name_servers + report['admin_name'] = w.admin_name + report['registrar'] = w.registrar + return report + + def resolver(self): + """ + This function will resolv the FQDN with different type of RR + """ + report = dict() + + for t in self._rrtype.keys(): + report[t] = self._resolving(self._fqdn, t, self._rrtype[t]) + return report + + def _resolving(self, fqdn, t, attr): + report = list() + try: + res_query = dns.resolver.resolve(fqdn, t) + for rdata in res_query: + if isinstance(attr, list): + data = dict() + for a in attr: + data[a] = getattr(rdata, a) + report.append(data) + else: + report.append({ + attr: getattr(rdata, attr) + }) + except dns.resolver.NoAnswer: + pass + except dns.resolver.NXDOMAIN: + report.append({'error': 'Domain not exist'}) + except dns.resolver.NoNameservers: + report.append({'error': 'Domain not exist'}) + return report + diff --git a/emailchecker.py b/emailchecker.py new file mode 100644 index 0000000..6d7a961 --- /dev/null +++ b/emailchecker.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python3 + +from requests import get + + +class EmailChecker: + def __init__(self, key, email): + self._url = "https://emailrep.io" + self._headers = { + 'Key': key, + 'accept': 'application/json', + } + self._email = email + + def reportEmailRep(self): + """ + This function get the report of the email + """ + report = dict() + + res = get( + f"{self._url}/{self._email}", + headers=self._headers + ) + js = res.json() + if res.status_code == 401: + report['error'] = js['reason'] + return report + if res.status_code != 200: + report['error'] = 'Failed to get the report of the email' + return report + + report['reputation'] = js['reputation'] + report['suspicious'] = js['suspicious'] + + return report diff --git a/hashing.py b/hashing.py new file mode 100644 index 0000000..efa5063 --- /dev/null +++ b/hashing.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python3 + +import hashlib + + +class Hash: + def __init__(self): + pass + + def hashMd5(self, f): + md5 = hashlib.md5() + with open(f, "rb") as data: + md5.update(data.read()) + return md5.hexdigest() + + def hashSha1(self, f): + sha1 = hashlib.sha1() + with open(f, "rb") as data: + sha1.update(data.read()) + return sha1.hexdigest() + + def hashSha256(self, f): + sha256 = hashlib.sha256() + with open(f, "rb") as data: + sha256.update(data.read()) + return sha256.hexdigest() + + def hashSha384(self, f): + sha384 = hashlib.sha384() + with open(f, "rb") as data: + sha384.update(data.read()) + return sha384.hexdigest() + + def hashSha512(self, f): + sha512 = hashlib.sha512() + with open(f, "rb") as data: + sha512.update(data.read()) + return sha512.hexdigest() diff --git a/main.py b/main.py new file mode 100644 index 0000000..842476d --- /dev/null +++ b/main.py @@ -0,0 +1,318 @@ +#!/usr/bin/venv python +# -*- coding: utf-8 -*- + +from argparse import ArgumentParser +from config import VT_ATTRIBUTES_MAPPING, PROJECT_NAME +from vt import VT +from dnschecker import DNSChecker as DNS +from emailchecker import EmailChecker +import ipaddress +from datetime import datetime +from hashing import Hash +from os.path import exists +from re import search + + +def checkArguments(): + parser = ArgumentParser(description=PROJECT_NAME) + parser.add_argument('-c', '--config', help='Config file') + parser.add_argument('--dns', help='Get domain name information', action="store_true") + # For dns command + parser.add_argument('--domain', help='Get domain name information') + parser.add_argument('--host', help='Get domain name information') + parser.add_argument('--ip', help='Get IP information') + # For hash command + parser.add_argument('--hashfile', help='Hash file', action='store_true') + parser.add_argument('--scanvt', help='If specified, scan the hash with VirusTotal', action='store_true') + parser.add_argument('--md5', help='Hash file') + parser.add_argument('--sha1', help='Hash file') + parser.add_argument('--sha256', help='Hash file') + parser.add_argument('--sha384', help='Hash file') + parser.add_argument('--sha512', help='Hash file') + parser.add_argument('--hash', help='Get information about the hash') + # For email command + parser.add_argument('--email', help='Get email reputation', action='store_true') + parser.add_argument('--emailrep', help='Get email reputation') + + + return parser.parse_args() + +def usage(): + print("------------------------------") + print(f"| {PROJECT_NAME} |") + print("------------------------------\n") + print("A tool for SOC analyst\n") + print("Usage: main.py [COMMAND]") + print("-c PATH, --config PATH\t\tConfig file - mandatory") + print("--hashfile\t\t\tHash the file and check in VirusTotal") + print("--hash HASH\t\t\tAnalyse the hash from VirusTotal") + print("--dns \t\t\t\tGet information regarding the domain with whois and VirusTotal") + print("--email\t\t\t\tGet informations about an email and check if has been compromised") + + print("\n--dns command:") + print("\t --domain FQDN\t\tScan and get domain information") + print("\t --host HOST\t\tScan and get host information") + print("\t --ip IP\t\tScan and get IP information") + + print("\n--hashfile command:") + print("\t --md5 FILE\t\tGet the MD5 hash of the file") + print("\t --sha1 FILE\t\tGet the SHA256 of the file") + print("\t --sha256 FILE\t\tGet the SHA256 of the file") + print("\t --sha384 FILE\t\tGet the SHA384 of the file") + print("\t --sha512 FILE\t\tGet the SHA512 of the file") + + print("\n--email command") + print("\t --emailrep\t\tGet the email reputation report") + +def mainMenu(): + print(f"\n {PROJECT_NAME} ") + print(" What would you like to do? ") + print("\n OPTION 1: Sanitise URL For emails ") + print(" OPTION 2: Decoders (PP, URL, SafeLinks) ") + print(" OPTION 3: Reputation Checker") + print(" OPTION 4: DNS Tools") + print(" OPTION 5: Hashing Function") + print(" OPTION 6: Phishing Analysis") + print(" OPTION 7: URL scan") + print(" OPTION 9: Extras") + print(" OPTION 0: Exit Tool") + +def readConfigFile(config): + """ + This function read the config file + """ + data = {} + try: + with open(config, 'r') as f: + lines = f.readlines() + + # Split each line into te dictionary + for line in lines: + l = line.split(":") + lineParsed = l[1].replace(" ", "") + lineParsed = lineParsed.replace("\n", "") + data[l[0]] = lineParsed + + except FileNotFoundError: + return None + return data + +def main(): + args = checkArguments() + + if not args.config: + usage() + exit(1); + + # Read the config file + config = readConfigFile(args.config) + if config is None: + print("Failed to read the config file") + exit(0) + + report = dict() + + # Analyse DNS + if args.dns: + if args.domain: + _parsingDomain(config, args.domain, report) + if args.host: + _parsingHost(config, args.host, report) + if args.ip: + _parsingIP(config, args.ip, report) + # Analyse hash file + if args.hashfile: + h = Hash() + dispatcher = { + "MD5": h.hashMd5, + "SHA1": h.hashSha1, + "SHA256": h.hashSha256, + "SHA384": h.hashSha384, + "SHA512": h.hashSha512, + } + if args.md5: + hashType = "MD5" + filename = args.md5 + if args.sha1: + hashType = "SHA1" + filename = args.sha1 + if args.sha256: + hashType = "SHA256" + filename = args.sha256 + if args.sha384: + hashType = "SHA384" + filename = args.sha384 + if args.sha512: + hashType = "SHA512" + filename = args.sha512 + + if not exists(filename): + print(f"File {filename} do not exist") + else: + res = dispatcher[hashType](filename) + print(f"{hashType} hash: {res}") + + if args.scanvt: + _parsingHash(config, res, report) + + # Analyse the hash + if args.hash: + _parsingHash(config, args.hash, report) + + # Analyse the email + if args.email: + if args.emailrep: + _parsingEmail(config, args.emailrep) + +def _parsingEmail(config, email): + # Check if the email specified is correct + regex = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b' + if not search(regex, email): + print("Please, specify a valid email address") + return + + report = dict() + emailChecker = EmailChecker(config['api_key_emailrep'], email) + report['emailrep'] = emailChecker.reportEmailRep() + + print("----------------------------") + print("| Email reputation |") + print("----------------------------") + + if 'error' in report['emailrep']: + print(f"Error: {report['emailrep']['error']}") + return + + emailrep = report['emailrep'] + print(f"Reputation: {emailrep['reputation']}") + print(f"Suspicious: {emailrep['suspicious']}") + +def _parsingHash(config, h, report): + report = dict() + vt = VT(config['api_key_vt']) + + vt.getRateFromHash(h, report) + print("\n----------------------") + print("| VirusTotal |") + print("----------------------") + + if 'error' in report: + print("Error, can not find any informations") + return + + # File + print("\nFile information:") + f = report['results']['file'] + print(f"\tMD5: {f['md5']}") + print(f"\tSHA1: {f['sha1']}") + print(f"\tMagic: {f['magic']}") + print(f"\tFile type: {f['filetype']}") + print(f"\tSize: {f['size']}") + print(f"\tExtension: {f['extension']}") + print(f"\tFirst seen: {f['first_seen']}") + print(f"\tLast analysis: {f['last_analysis']}") + + # Vendors + vendors = report['results']['vendors'] + print("\nVendors:") + print(f"\tTotal vendors: {vendors['total_vendors']}") + print(f"\tCategorized as malicious: {vendors['total_malicious']}\n") + for key in vendors['result']: + print(f"\t{key}: {vendors['result'][key]}") + +def _parsingIP(config, ip, report): + # Check if it's an IP address + vt = VT(config['api_key_vt']) + try: + ip_obj = ipaddress.ip_address(ip) + except ValueError: + print("The IP is not valid") + return + + report['ip'] = dict() + report['ip'] = vt.getIPVirusTotal(ip) + + if 'error' not in report['ip']: + print("----------------------------") + print("| IP Informations |") + print("----------------------------") + for vt in VT_ATTRIBUTES_MAPPING.keys(): + try: + vtAttributes = VT_ATTRIBUTES_MAPPING[vt] + if 'date' in vtAttributes: + value = datetime.fromtimestamp(int(report['ip'][vt])) + else: + value = report['ip'][vt] + print(f"{vt}: {value}") + except KeyError: + print(f"Cannot find the key {vt}") + +def _parsingHost(config, fqdn, report): + vt = VT(config['api_key_vt']) + dns = DNS(config['api_key_vt'], fqdn, {'A': 'address'}) + + # Resolv and print results + report['resolving'] = dns.resolver() + _printDNSResolving(report['resolving']) + + # Print VirusTotal + report['vt'] = dict() + vt.getDomainReport(fqdn, report['vt']) + _printDNSVirusTotal(report['vt']) + +def _parsingDomain(config, fqdn, report): + vt = VT(config['api_key_vt']) + dns = DNS(config['api_key_vt'], fqdn) + + # Check if domain exist + if not dns.checkDomainExist(): + print(f"The domain {fqdn} do not exist") + return + + # Resolving domain + report['resolving'] = dns.resolver() + _printDNSResolving(report['resolving']) + + # Whois request and print the result + report['whois'] = dns.whois() + _printDNSWhois(report['whois']) + + # Print VirusTotal + report['vt'] = dict() + vt.getDomainReport(fqdn, report['vt']) + _printDNSVirusTotal(report['vt']) + +def _printDNSResolving(report): + print("----------------------") + print("| resolving |") + print("----------------------") + for key in report.keys(): + print(f"{key}: ") + for entry in report[key]: + for subkey in entry.keys(): + value = entry[subkey] + if isinstance(value, bytes): + value = value.decode() + print(f"\t{subkey}: {value}") + +def _printDNSVirusTotal(report): + print("\n----------------------") + print("| VirusTotal |") + print("----------------------") + for key in report: + print(f"{key}: {report[key]}") + +def _printDNSWhois(report): + print("\n----------------------") + print("| whois |") + print("----------------------") + for key in report.keys(): + if isinstance(report[key], list): + print(f"{key}:") + for value in report[key]: + print(f"\t{value}") + else: + print(f"{key}: {report[key]}") + +if __name__ == "__main__": + main() diff --git a/reports.py b/reports.py new file mode 100644 index 0000000..3188c7b --- /dev/null +++ b/reports.py @@ -0,0 +1,157 @@ +#!/usr/bin/venv python +# -*- coding: utf-8 -*- + +from datetime import datetime +from os import mkdir +from config import VT_ATTRIBUTES_MAPPING +import jinja2 + + +def generateHtmlReport(data): + env = jinja2.Environment( + loader=jinja2.FileSystemLoader("reports/templates"), + autoescape=jinja2.select_autoescape() + ) + + _queriesReport(data['queries'], env) + _graphicsReport(data['graphics'], env) + _vtReport(data['vt'], env) + _dnsTunnelingReports(data['dnstunneling'], env) + +def _indexReport(): + pass + +def _queriesReport(queries, env): + """ + This function generate the report for queries + """ + today = getTodayDate() + dataJinja2 = dict() + dataJinja2['title'] = 'Queries' + dataJinja2['year'] = '2023' + dataJinja2['queries'] = queries + + tmpl = env.get_template('queries.html.j2') + + render = tmpl.render(data=dataJinja2) + + with open(f"reports/{today}/reports_queries.html", "w") as f: + f.write(render) + +def _graphicsReport(graphics, env): + today = getTodayDate() + + dataJinja2 = dict() + dataJinja2['title'] = 'Graphics' + dataJinja2['year'] = '2023' + dataJinja2['graphics'] = graphics + + tmpl = env.get_template('graphics.html.j2') + + render = tmpl.render(data=dataJinja2) + + with open(f"reports/{today}/reports_graphics.html", "w") as f: + f.write(render) + +def _vtReport(vt, env): + today = getTodayDate() + # For testing + #vt = list() + #vt.append({ + # 'ip': '1.2.3.4', + # 'asn': 3215, + # 'as_owner': 'Orange', + # 'continent': 'EU', + # 'country': 'FR', + # 'last_analysis_date': 1686839532, + # 'regional_internet_registry': 'RIPE NCC', + # 'network': '1.2.3.0/24' + #}) + #vt.append({ + # 'ip': '2.2.2.1', + # 'asn': 3215, + # 'as_owner': 'Orange', + # 'continent': 'EU', + # 'country': 'FR', + # 'last_analysis_date': 1686839532, + # 'regional_internet_registry': 'RIPE NCC', + # 'network': '2.2.2.0/24' + #}) + #vt.append({ + # 'ip': '3.3.3.1', + # 'asn': 3215, + # 'as_owner': 'Orange', + # 'continent': 'EU', + # 'country': 'FR', + # 'last_analysis_date': 1686839532, + # 'regional_internet_registry': 'RIPE NCC', + # 'network': '3.3.3.0/24' + #}) + + dataJinja2 = dict() + dataJinja2['title'] = 'VirusTotal' + dataJinja2['year'] = '2023' + dataJinja2['vt'] = list() + + tmpl = env.get_template('vt.html.j2') + + body = str() + + for entry in vt: + vtEntry = dict() + if 'error' not in entry: + for vt in VT_ATTRIBUTES_MAPPING.keys(): + try: + vtAttributes = VT_ATTRIBUTES_MAPPING[vt] + if 'date' in vtAttributes: + value = datetime.fromtimestamp(int(entry[vt])) + else: + value = entry[vt] + vtEntry[vt] = value + except KeyError: + pass + dataJinja2['vt'].append({ + 'ip': entry['ip'], + 'data': vtEntry + }) + + render = tmpl.render(data=dataJinja2) + + with open(f"reports/{today}/reports_vt.html", "w") as f: + f.write(render) + +def _dnsTunnelingReports(dnstunneling, env): + today = getTodayDate() + + dataJinja2 = dict() + dataJinja2['title'] = 'DNS Tunneling' + dataJinja2['year'] = '2023' + dataJinja2['dnstunneling'] = dnstunneling + + tmpl = env.get_template('dnsTunneling.html.j2') + + render = tmpl.render(data=dataJinja2) + + with open(f"reports/{today}/reports_dns_tunneling.html", "w") as f: + f.write(render) + +def createReportsDirectory(): + """ + This function will create the reports directory + Return the report name or None if failed + """ + today = getTodayDate() + name = f"reports/{today}" + try: + mkdir(name) + except FileExistsError: + print("Reports directory already created") + return name + return name + +def getTodayDate(): + """ + This function genrate the today datetime at this format: + year_month_day + """ + return datetime.now().isoformat()[0:10].replace("-", "_") diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..d5168c2 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,10 @@ +pip +scapy +ArgumentParser +matplotlib +numpy +vt-py +requests +jinja2 +python-whois +git+https://github.com/rthalley/dnspython.git diff --git a/vt.py b/vt.py new file mode 100644 index 0000000..8ebdd36 --- /dev/null +++ b/vt.py @@ -0,0 +1,123 @@ +#!/ur/bin/env python3 + +import requests +from config import VT_ATTRIBUTES_MAPPING +from datetime import datetime + + +class VT: + def __init__(self, api_key): + self._url = "https://www.virustotal.com/api/v3" + self._headers = { + 'x-apikey': api_key, + } + + def getIPVirusTotal(self, ip): + """ + This function get IP information from VirusTotal + """ + res = requests.get( + f"{self._url}/ip_addresses/{ip}", + headers=self._headers + ).json() + + data = dict() + data['ip'] = ip + + if 'error' in res: + report.append({ + 'error': res['error']['message'], + 'ip': ip + }) + return + + vt = res['data']['attributes'] + for entry in VT_ATTRIBUTES_MAPPING.keys(): + if entry in vt: + try: + data[entry] = vt[entry] + except KeyError: + data[entry] = 'Unknown' + return data + + def getDomainReport(self, fqdn, report): + """ + This function get the report for the specific domain + """ + res = requests.get( + f"{self._url}/domains/{fqdn}", + headers=self._headers + ) + + js = res.json() + report['reputation'] = js['data']['attributes']['reputation'] + report['last_update'] = \ + datetime.fromtimestamp(js['data']['attributes']['last_update_date']) + + # Get number of security vendors + report['total_vendors'] = 0 + report['clean'] = 0 + report['unrated'] = 0 + report['malicious'] = 0 + vendors = js['data']['attributes']['last_analysis_results'] + + for entry in vendors: + report['total_vendors'] += 1 + if vendors[entry]['result'] == 'clean': + report['clean'] += 1 + elif vendors[entry]['result'] == 'unrated': + report['unrated'] += 1 + elif vendors[entry]['result'] == 'malicious': + report['malicious'] += 1 + + def getRateFromHash(self, h, report): + """ + This function get the report of the hash specified by the parameter h + """ + headers = self._headers + + res = requests.get( + f"{self._url}/files/{h}", + headers=self._headers + ).json() + + if 'error' in res: + report["error"] = "Can not find the result" + return + + attributes = res['data']['attributes'] + report['results'] = dict() + report['results']['file'] = dict() + report['results']['file']['magic'] = attributes['magic'] + report['results']['file']['sha1'] = attributes['sha1'] + report['results']['file']['md5'] = attributes['md5'] + report['results']['file']['filetype'] = attributes['detectiteasy']['filetype'] + report['results']['file']['size'] = attributes['size'] + report['results']['file']['extension'] = attributes['type_extension'] + try: + report['results']['file']['first_seen'] = \ + datetime.fromtimestamp(attributes['first_seen_itw_date']) + except KeyError: + report['results']['file']['first_seen'] = "Unknown" + + report['results']['file']['last_analysis'] = \ + datetime.fromtimestamp(attributes['last_analysis_date']) + + # Identify vendors + report['results']['vendors'] = dict() + report['results']['vendors']['total_vendors'] = 0 + report['results']['vendors']['total_malicious'] = 0 + report['results']['vendors']['result'] = dict() + report['results']['vendors']['result']['undetected'] = 0 + results = res['data']['attributes']['last_analysis_results'] + for entry in results: + report['results']['vendors']['total_vendors'] += 1 + if results[entry]['category'] == 'undetected': + report['results']['vendors']['result']['undetected'] += 1 + else: + result = results[entry]['result'] + if result not in report['results']['vendors']['result']: + report['results']['vendors']['result'][result] = 0 + report['results']['vendors']['result'][result] += 1 + report['results']['vendors']['total_malicious'] += 1 +