From 926678c6eee30360df8978af8fc28b797d94655d Mon Sep 17 00:00:00 2001 From: geoffrey Date: Wed, 19 Jun 2024 21:27:39 +0200 Subject: [PATCH] Add DNS resolving --- dnsattacks.py | 252 +++++++++++++++++++++++++++++++++++++++++ dnsinformations.py | 271 +++------------------------------------------ main.py | 14 +-- 3 files changed, 276 insertions(+), 261 deletions(-) create mode 100644 dnsattacks.py diff --git a/dnsattacks.py b/dnsattacks.py new file mode 100644 index 0000000..7a3576c --- /dev/null +++ b/dnsattacks.py @@ -0,0 +1,252 @@ +#!/usr/bin/venv python +# -*- coding: utf-8 -*- + +from argparse import ArgumentParser +from scapy.all import * +from scapy.layers.dns import DNSQR, DNSRR, DNS +import matplotlib.pyplot as plt +import numpy as np +import requests +import re +from reports import generateHtmlReport, createReportsDirectory, getTodayDate +from tunneling import tunnelingDNSAttacks +from config import VT_ATTRIBUTES_MAPPING + + +def _getType(t): + """ + This function identify the message type according to the RFC 1035: + https://datatracker.ietf.org/doc/html/rfc1035#section-3.2.2 + """ + rtype = { + 1: 'A', 2: 'NS', 3: 'MD', 4: 'MF', + 5: 'CNAME', 6: 'SOA', 7: 'MB', 8: 'MG', 9: 'MR', + 10: 'NULL', 11: 'WKS', 12: 'PTR', 13: 'HINFO', 14: 'MINFO', + 15: 'MX', 16: 'TXT', + 28: 'AAAA' + } + try: + return rtype[t] + except KeyError: + return t + +def readPcapFile(pcap): + """ + This function read the pcap file with scapy + """ + data = None + try: + data = rdpcap(pcap) + except Scapy_Exception: + print(f"Failed to read {pcap}") + return None + return data + +def _privateIP(ip): + """ + This function check if the ip is from RFC 1918 + https://stackoverflow.com/questions/2814002/private-ip-address-identifier-in-regular-expression + """ + res = False + rfs1918 = [ + '(^10\.)', + '(^172\.1[6-9]\.)', + '(^172\.2[0-9]\.)', + '(^172\.3[0-1]\.)', + '(^192\.168\.)', + ] + for privateIP in rfs1918: + if re.search(privateIP, ip): + res = True + + return res + + +def getIPVirusTotal(api_key, ip, report): + """ + This function get information of the IP + """ + url = f"https://www.virustotal.com/api/v3/ip_addresses/{ip}" + headers = { + 'x-apikey': api_key, + } + res = requests.get(url, headers=headers).json() + data = dict() + data['ip'] = ip + + if 'error' in res: + report.append({ + 'error': res['error']['message'], + 'ip': ip + }) + return + + vt = res['data']['attributes'] + for entry in VT_ATTRIBUTES_MAPPING.keys(): + if entry in vt: + try: + data[entry] = vt[entry] + except KeyError: + data[entry] = 'Unknown' + + report.append(data) + +def main(): + args = checkArguments() + report = {} + report['queries'] = list() + report['domains'] = list() + report['graphics'] = dict() + report['ip'] = list() + report['vt'] = list() + report['dnstunneling'] = dict() + + if not args.file: + print("Please, specify the option --file") + exit(0) + + if not args.config: + print("Please, specify the config file ith the parameter --config") + exit(0) + + # Read the config file + config = readConfigFile(args.config) + if config is None: + print("Failed to read the config file") + exit(0) + + # Get the API key for VirusTotal + checkVt = True + try: + api_key = config["api_key"] + except KeyError: + print("Can't find the key in the config file. Bypass the check to VirusTotal") + checkVt = False + #exit(0) + + # Check if DNS BlackList is specified + try: + dnsbl = config["dnsbl"] + except KeyError: + print("Can't find the dnsbl in the config file") + exit(0) + + data = readPcapFile(args.file) + if data is None: + print("Failed to read the pcap file") + exit(0) + + # For number of request into the domain + numRequest = dict() + numIPDst = dict() + allIPs = list() + allDomains = list() + + print("Parsing DNS capture") + for d in data: + if d.haslayer(DNS): + # For DNS query + if isinstance(d.qd, DNSQR): + query = d.qd.qname + qtype = _getType(d.qd.qtype) + src = d[IP].src + dst = d[IP].dst + # print(f"{d[DNS].id}; SRC: {src}; DST: {dst}, Query: {query}; TYPE: {qtype}; Size: {len(d[DNS])}") + report['queries'].append({ + 'id': d[DNS].id, + 'src': src, + 'dst': dst, + 'query': query, + 'type': qtype, + 'len': len(d[DNS]), + }) + + # Count the number of request for a domain + if query not in numRequest: + numRequest[query] = 1 + else: + numRequest[query] += 1 + + # Count the number of IP dst + if dst not in numIPDst: + numIPDst[dst] = 1 + else: + numIPDst[dst] += 1 + + if not _privateIP(dst) and dst not in allIPs: + allIPs.append(dst) + + allDomains.append({ + 'query': query, + 'src': src, + 'len': len(d[DNS]), + }) + # For DNS response + if isinstance(d.an, DNSRR): + ans = d.an.rrname + atype = _getType(d.an.type) + src = d[IP].src + dst = d[IP].dst + # print(f"{d[DNS].id}; SRC: {src}; DST: {dst}, Answer: {ans}; TYPE: {atype}") + report['queries'].append({ + 'id': d[DNS].id, + 'src': src, + 'dst': dst, + 'answer': ans, + 'type': atype, + 'len': len(d[DNS]), + }) + + # Create directory for the report + print("Generating directory for reports") + reportName = createReportsDirectory() + + # Draw matplotlib + today = getTodayDate() + fname_domain = f'domain_{today}.png' + fname_ip = f'ip_{today}.png' + report['graphics']['domain'] = fname_domain + report['graphics']['ip'] = fname_ip + + x = list(numRequest.keys())[0:20] + y = list(numRequest.values())[0:20] + + plt.figure(figsize=(15, 5)) + plt.barh(x, y) + #plt.show() + plt.savefig( + fname=f"reports/{today}/{fname_domain}", + dpi='figure', + format='png' + ) + x = list(numIPDst.keys())[0:20] + y = list(numIPDst.values())[0:20] + + plt.figure(figsize=(10, 5)) + plt.barh(x, y, 0.5) + #plt.show() + plt.savefig( + fname=f"reports/{today}/{fname_ip}", + dpi='figure', + format='png' + ) + + # Identify DNS Tunneling attacks + print("Analyzing DNS capture for identifying DNS Tunneling") + tunnelingDNSAttacks(report['dnstunneling'], allDomains, dnsbl) + + # For external IP, use curl to VirusTotal for analyzing the IP + checkVt = False + if checkVt: + print("Getting IP informations to VirusTotal") + for ip in allIPs: + getIPVirusTotal(api_key, ip, report['vt']) + + # We generating the report + print("Generating the report") + generateHtmlReport(report) + + print(f"Report generated at this directory: {reportName}") + +if __name__ == "__main__": + main() diff --git a/dnsinformations.py b/dnsinformations.py index 9732672..77a59e5 100644 --- a/dnsinformations.py +++ b/dnsinformations.py @@ -1,42 +1,42 @@ #!/usr/bin/venv python # -*- coding: utf-8 -*- -from argparse import ArgumentParser -from scapy.all import * -from scapy.layers.dns import DNSQR, DNSRR, DNS -import matplotlib.pyplot as plt -import numpy as np -import requests -import re -from reports import generateHtmlReport, createReportsDirectory, getTodayDate -from tunneling import tunnelingDNSAttacks -from config import VT_ATTRIBUTES_MAPPING import whois import dns.resolver +import dns.name from config import DNS_QUERIES_TYPE class DNSInformations: - def __init__(self, api_key): - pass + def __init__(self, api_key, fqdn): + self._fqdn = fqdn - def whois(self, fqdn): + def whois(self): report = dict() - w = whois.whois(fqdn) + w = whois.whois(self._fqdn) report['domain_name'] = w.domain_name report['expiration_date'] = w.expiration_date report['creation_date'] = w.creation_date report['updated_date'] = w.updated_date - #report['data'] = w.text report['ns'] = w.name_servers report['admin_name'] = w.admin_name return report - def resolver(self, fqdn): + def resolver(self): report = dict() + n = dns.name.from_text(self._fqdn) + s = self._fqdn.split(".") + l = len(s) + fqdn = f"{s[l - 2]}.{s[l - 1]}" + print(fqdn) + o = dns.name.from_text(fqdn) + if n.relativize(o): + print(True) + + for t in DNS_QUERIES_TYPE.keys(): - report[t] = self._resolving(fqdn, t, DNS_QUERIES_TYPE[t]) + report[t] = self._resolving(self._fqdn, t, DNS_QUERIES_TYPE[t]) return report def _resolving(self, fqdn, t, attr): @@ -54,240 +54,3 @@ class DNSInformations: }) return report -def _getType(t): - """ - This function identify the message type according to the RFC 1035: - https://datatracker.ietf.org/doc/html/rfc1035#section-3.2.2 - """ - rtype = { - 1: 'A', 2: 'NS', 3: 'MD', 4: 'MF', - 5: 'CNAME', 6: 'SOA', 7: 'MB', 8: 'MG', 9: 'MR', - 10: 'NULL', 11: 'WKS', 12: 'PTR', 13: 'HINFO', 14: 'MINFO', - 15: 'MX', 16: 'TXT', - 28: 'AAAA' - } - try: - return rtype[t] - except KeyError: - return t - -def readPcapFile(pcap): - """ - This function read the pcap file with scapy - """ - data = None - try: - data = rdpcap(pcap) - except Scapy_Exception: - print(f"Failed to read {pcap}") - return None - return data - -def _privateIP(ip): - """ - This function check if the ip is from RFC 1918 - https://stackoverflow.com/questions/2814002/private-ip-address-identifier-in-regular-expression - """ - res = False - rfs1918 = [ - '(^10\.)', - '(^172\.1[6-9]\.)', - '(^172\.2[0-9]\.)', - '(^172\.3[0-1]\.)', - '(^192\.168\.)', - ] - for privateIP in rfs1918: - if re.search(privateIP, ip): - res = True - - return res - - -def getIPVirusTotal(api_key, ip, report): - """ - This function get information of the IP - """ - url = f"https://www.virustotal.com/api/v3/ip_addresses/{ip}" - headers = { - 'x-apikey': api_key, - } - res = requests.get(url, headers=headers).json() - data = dict() - data['ip'] = ip - - if 'error' in res: - report.append({ - 'error': res['error']['message'], - 'ip': ip - }) - return - - vt = res['data']['attributes'] - for entry in VT_ATTRIBUTES_MAPPING.keys(): - if entry in vt: - try: - data[entry] = vt[entry] - except KeyError: - data[entry] = 'Unknown' - - report.append(data) - -def main(): - args = checkArguments() - report = {} - report['queries'] = list() - report['domains'] = list() - report['graphics'] = dict() - report['ip'] = list() - report['vt'] = list() - report['dnstunneling'] = dict() - - if not args.file: - print("Please, specify the option --file") - exit(0) - - if not args.config: - print("Please, specify the config file ith the parameter --config") - exit(0) - - # Read the config file - config = readConfigFile(args.config) - if config is None: - print("Failed to read the config file") - exit(0) - - # Get the API key for VirusTotal - checkVt = True - try: - api_key = config["api_key"] - except KeyError: - print("Can't find the key in the config file. Bypass the check to VirusTotal") - checkVt = False - #exit(0) - - # Check if DNS BlackList is specified - try: - dnsbl = config["dnsbl"] - except KeyError: - print("Can't find the dnsbl in the config file") - exit(0) - - data = readPcapFile(args.file) - if data is None: - print("Failed to read the pcap file") - exit(0) - - # For number of request into the domain - numRequest = dict() - numIPDst = dict() - allIPs = list() - allDomains = list() - - print("Parsing DNS capture") - for d in data: - if d.haslayer(DNS): - # For DNS query - if isinstance(d.qd, DNSQR): - query = d.qd.qname - qtype = _getType(d.qd.qtype) - src = d[IP].src - dst = d[IP].dst - # print(f"{d[DNS].id}; SRC: {src}; DST: {dst}, Query: {query}; TYPE: {qtype}; Size: {len(d[DNS])}") - report['queries'].append({ - 'id': d[DNS].id, - 'src': src, - 'dst': dst, - 'query': query, - 'type': qtype, - 'len': len(d[DNS]), - }) - - # Count the number of request for a domain - if query not in numRequest: - numRequest[query] = 1 - else: - numRequest[query] += 1 - - # Count the number of IP dst - if dst not in numIPDst: - numIPDst[dst] = 1 - else: - numIPDst[dst] += 1 - - if not _privateIP(dst) and dst not in allIPs: - allIPs.append(dst) - - allDomains.append({ - 'query': query, - 'src': src, - 'len': len(d[DNS]), - }) - # For DNS response - if isinstance(d.an, DNSRR): - ans = d.an.rrname - atype = _getType(d.an.type) - src = d[IP].src - dst = d[IP].dst - # print(f"{d[DNS].id}; SRC: {src}; DST: {dst}, Answer: {ans}; TYPE: {atype}") - report['queries'].append({ - 'id': d[DNS].id, - 'src': src, - 'dst': dst, - 'answer': ans, - 'type': atype, - 'len': len(d[DNS]), - }) - - # Create directory for the report - print("Generating directory for reports") - reportName = createReportsDirectory() - - # Draw matplotlib - today = getTodayDate() - fname_domain = f'domain_{today}.png' - fname_ip = f'ip_{today}.png' - report['graphics']['domain'] = fname_domain - report['graphics']['ip'] = fname_ip - - x = list(numRequest.keys())[0:20] - y = list(numRequest.values())[0:20] - - plt.figure(figsize=(15, 5)) - plt.barh(x, y) - #plt.show() - plt.savefig( - fname=f"reports/{today}/{fname_domain}", - dpi='figure', - format='png' - ) - x = list(numIPDst.keys())[0:20] - y = list(numIPDst.values())[0:20] - - plt.figure(figsize=(10, 5)) - plt.barh(x, y, 0.5) - #plt.show() - plt.savefig( - fname=f"reports/{today}/{fname_ip}", - dpi='figure', - format='png' - ) - - # Identify DNS Tunneling attacks - print("Analyzing DNS capture for identifying DNS Tunneling") - tunnelingDNSAttacks(report['dnstunneling'], allDomains, dnsbl) - - # For external IP, use curl to VirusTotal for analyzing the IP - checkVt = False - if checkVt: - print("Getting IP informations to VirusTotal") - for ip in allIPs: - getIPVirusTotal(api_key, ip, report['vt']) - - # We generating the report - print("Generating the report") - generateHtmlReport(report) - - print(f"Report generated at this directory: {reportName}") - -if __name__ == "__main__": - main() diff --git a/main.py b/main.py index f01b0c1..d1697b8 100644 --- a/main.py +++ b/main.py @@ -13,7 +13,8 @@ def checkArguments(): parser = ArgumentParser(description="baoSOC") parser.add_argument('-c', '--config', help='Config file') parser.add_argument('--hash', help='Hash file', action='store_true') - parser.add_argument('--dns', help='WhoIs') + parser.add_argument('--dns', help='Get domain name information') + parser.add_argument('--dnsattacks', help='Parse DNS pcap file') return parser.parse_args() def usage(): @@ -25,6 +26,7 @@ def usage(): print("-c PATH, --config PATH\t\tConfig file - mandatory") print("--hash FILE\t\t\tHash the file and check in VirusTotal") print("--dns FQDN\t\t\tGet information regarding the domain with whois and VirusTotal") + print("--dnsattacks FILE\t\t\tParse the DNS pcap file and identify some DNS attacks") def mainMenu(): print("\n baoSOC ") @@ -77,23 +79,21 @@ def main(): #print(vt.getIPVirusTotal("1.1.1.1", report)) if args.dns: - dns = DNS(config['api_key']) + dns = DNS(config['api_key'], args.dns) print("IP Informations:\n") - report = dns.resolver(args.dns) + report = dns.resolver() for key in report.keys(): - s = f"{key}: " - print(s) + print(f"{key}: ") for entry in report[key]: for subkey in entry.keys(): - #print(f"\t{subkey}: {entry[subkey].decode()}") value = entry[subkey] if isinstance(value, bytes): value = value.decode() print(f"\t{subkey}: {value}") print("\nReport with Whois:\n") - report = dns.whois(args.dns) + report = dns.whois() for key in report.keys(): if isinstance(report[key], list): print(f"{key}:")