Add DNS resolving

This commit is contained in:
geoffrey 2024-06-19 21:27:39 +02:00
parent 2978fcbcba
commit 926678c6ee
3 changed files with 276 additions and 261 deletions

252
dnsattacks.py Normal file

@ -0,0 +1,252 @@
#!/usr/bin/venv python
# -*- coding: utf-8 -*-
from argparse import ArgumentParser
from scapy.all import *
from scapy.layers.dns import DNSQR, DNSRR, DNS
import matplotlib.pyplot as plt
import numpy as np
import requests
import re
from reports import generateHtmlReport, createReportsDirectory, getTodayDate
from tunneling import tunnelingDNSAttacks
from config import VT_ATTRIBUTES_MAPPING
def _getType(t):
"""
This function identify the message type according to the RFC 1035:
https://datatracker.ietf.org/doc/html/rfc1035#section-3.2.2
"""
rtype = {
1: 'A', 2: 'NS', 3: 'MD', 4: 'MF',
5: 'CNAME', 6: 'SOA', 7: 'MB', 8: 'MG', 9: 'MR',
10: 'NULL', 11: 'WKS', 12: 'PTR', 13: 'HINFO', 14: 'MINFO',
15: 'MX', 16: 'TXT',
28: 'AAAA'
}
try:
return rtype[t]
except KeyError:
return t
def readPcapFile(pcap):
"""
This function read the pcap file with scapy
"""
data = None
try:
data = rdpcap(pcap)
except Scapy_Exception:
print(f"Failed to read {pcap}")
return None
return data
def _privateIP(ip):
"""
This function check if the ip is from RFC 1918
https://stackoverflow.com/questions/2814002/private-ip-address-identifier-in-regular-expression
"""
res = False
rfs1918 = [
'(^10\.)',
'(^172\.1[6-9]\.)',
'(^172\.2[0-9]\.)',
'(^172\.3[0-1]\.)',
'(^192\.168\.)',
]
for privateIP in rfs1918:
if re.search(privateIP, ip):
res = True
return res
def getIPVirusTotal(api_key, ip, report):
"""
This function get information of the IP
"""
url = f"https://www.virustotal.com/api/v3/ip_addresses/{ip}"
headers = {
'x-apikey': api_key,
}
res = requests.get(url, headers=headers).json()
data = dict()
data['ip'] = ip
if 'error' in res:
report.append({
'error': res['error']['message'],
'ip': ip
})
return
vt = res['data']['attributes']
for entry in VT_ATTRIBUTES_MAPPING.keys():
if entry in vt:
try:
data[entry] = vt[entry]
except KeyError:
data[entry] = 'Unknown'
report.append(data)
def main():
args = checkArguments()
report = {}
report['queries'] = list()
report['domains'] = list()
report['graphics'] = dict()
report['ip'] = list()
report['vt'] = list()
report['dnstunneling'] = dict()
if not args.file:
print("Please, specify the option --file")
exit(0)
if not args.config:
print("Please, specify the config file ith the parameter --config")
exit(0)
# Read the config file
config = readConfigFile(args.config)
if config is None:
print("Failed to read the config file")
exit(0)
# Get the API key for VirusTotal
checkVt = True
try:
api_key = config["api_key"]
except KeyError:
print("Can't find the key in the config file. Bypass the check to VirusTotal")
checkVt = False
#exit(0)
# Check if DNS BlackList is specified
try:
dnsbl = config["dnsbl"]
except KeyError:
print("Can't find the dnsbl in the config file")
exit(0)
data = readPcapFile(args.file)
if data is None:
print("Failed to read the pcap file")
exit(0)
# For number of request into the domain
numRequest = dict()
numIPDst = dict()
allIPs = list()
allDomains = list()
print("Parsing DNS capture")
for d in data:
if d.haslayer(DNS):
# For DNS query
if isinstance(d.qd, DNSQR):
query = d.qd.qname
qtype = _getType(d.qd.qtype)
src = d[IP].src
dst = d[IP].dst
# print(f"{d[DNS].id}; SRC: {src}; DST: {dst}, Query: {query}; TYPE: {qtype}; Size: {len(d[DNS])}")
report['queries'].append({
'id': d[DNS].id,
'src': src,
'dst': dst,
'query': query,
'type': qtype,
'len': len(d[DNS]),
})
# Count the number of request for a domain
if query not in numRequest:
numRequest[query] = 1
else:
numRequest[query] += 1
# Count the number of IP dst
if dst not in numIPDst:
numIPDst[dst] = 1
else:
numIPDst[dst] += 1
if not _privateIP(dst) and dst not in allIPs:
allIPs.append(dst)
allDomains.append({
'query': query,
'src': src,
'len': len(d[DNS]),
})
# For DNS response
if isinstance(d.an, DNSRR):
ans = d.an.rrname
atype = _getType(d.an.type)
src = d[IP].src
dst = d[IP].dst
# print(f"{d[DNS].id}; SRC: {src}; DST: {dst}, Answer: {ans}; TYPE: {atype}")
report['queries'].append({
'id': d[DNS].id,
'src': src,
'dst': dst,
'answer': ans,
'type': atype,
'len': len(d[DNS]),
})
# Create directory for the report
print("Generating directory for reports")
reportName = createReportsDirectory()
# Draw matplotlib
today = getTodayDate()
fname_domain = f'domain_{today}.png'
fname_ip = f'ip_{today}.png'
report['graphics']['domain'] = fname_domain
report['graphics']['ip'] = fname_ip
x = list(numRequest.keys())[0:20]
y = list(numRequest.values())[0:20]
plt.figure(figsize=(15, 5))
plt.barh(x, y)
#plt.show()
plt.savefig(
fname=f"reports/{today}/{fname_domain}",
dpi='figure',
format='png'
)
x = list(numIPDst.keys())[0:20]
y = list(numIPDst.values())[0:20]
plt.figure(figsize=(10, 5))
plt.barh(x, y, 0.5)
#plt.show()
plt.savefig(
fname=f"reports/{today}/{fname_ip}",
dpi='figure',
format='png'
)
# Identify DNS Tunneling attacks
print("Analyzing DNS capture for identifying DNS Tunneling")
tunnelingDNSAttacks(report['dnstunneling'], allDomains, dnsbl)
# For external IP, use curl to VirusTotal for analyzing the IP
checkVt = False
if checkVt:
print("Getting IP informations to VirusTotal")
for ip in allIPs:
getIPVirusTotal(api_key, ip, report['vt'])
# We generating the report
print("Generating the report")
generateHtmlReport(report)
print(f"Report generated at this directory: {reportName}")
if __name__ == "__main__":
main()

@ -1,42 +1,42 @@
#!/usr/bin/venv python
# -*- coding: utf-8 -*-
from argparse import ArgumentParser
from scapy.all import *
from scapy.layers.dns import DNSQR, DNSRR, DNS
import matplotlib.pyplot as plt
import numpy as np
import requests
import re
from reports import generateHtmlReport, createReportsDirectory, getTodayDate
from tunneling import tunnelingDNSAttacks
from config import VT_ATTRIBUTES_MAPPING
import whois
import dns.resolver
import dns.name
from config import DNS_QUERIES_TYPE
class DNSInformations:
def __init__(self, api_key):
pass
def __init__(self, api_key, fqdn):
self._fqdn = fqdn
def whois(self, fqdn):
def whois(self):
report = dict()
w = whois.whois(fqdn)
w = whois.whois(self._fqdn)
report['domain_name'] = w.domain_name
report['expiration_date'] = w.expiration_date
report['creation_date'] = w.creation_date
report['updated_date'] = w.updated_date
#report['data'] = w.text
report['ns'] = w.name_servers
report['admin_name'] = w.admin_name
return report
def resolver(self, fqdn):
def resolver(self):
report = dict()
n = dns.name.from_text(self._fqdn)
s = self._fqdn.split(".")
l = len(s)
fqdn = f"{s[l - 2]}.{s[l - 1]}"
print(fqdn)
o = dns.name.from_text(fqdn)
if n.relativize(o):
print(True)
for t in DNS_QUERIES_TYPE.keys():
report[t] = self._resolving(fqdn, t, DNS_QUERIES_TYPE[t])
report[t] = self._resolving(self._fqdn, t, DNS_QUERIES_TYPE[t])
return report
def _resolving(self, fqdn, t, attr):
@ -54,240 +54,3 @@ class DNSInformations:
})
return report
def _getType(t):
"""
This function identify the message type according to the RFC 1035:
https://datatracker.ietf.org/doc/html/rfc1035#section-3.2.2
"""
rtype = {
1: 'A', 2: 'NS', 3: 'MD', 4: 'MF',
5: 'CNAME', 6: 'SOA', 7: 'MB', 8: 'MG', 9: 'MR',
10: 'NULL', 11: 'WKS', 12: 'PTR', 13: 'HINFO', 14: 'MINFO',
15: 'MX', 16: 'TXT',
28: 'AAAA'
}
try:
return rtype[t]
except KeyError:
return t
def readPcapFile(pcap):
"""
This function read the pcap file with scapy
"""
data = None
try:
data = rdpcap(pcap)
except Scapy_Exception:
print(f"Failed to read {pcap}")
return None
return data
def _privateIP(ip):
"""
This function check if the ip is from RFC 1918
https://stackoverflow.com/questions/2814002/private-ip-address-identifier-in-regular-expression
"""
res = False
rfs1918 = [
'(^10\.)',
'(^172\.1[6-9]\.)',
'(^172\.2[0-9]\.)',
'(^172\.3[0-1]\.)',
'(^192\.168\.)',
]
for privateIP in rfs1918:
if re.search(privateIP, ip):
res = True
return res
def getIPVirusTotal(api_key, ip, report):
"""
This function get information of the IP
"""
url = f"https://www.virustotal.com/api/v3/ip_addresses/{ip}"
headers = {
'x-apikey': api_key,
}
res = requests.get(url, headers=headers).json()
data = dict()
data['ip'] = ip
if 'error' in res:
report.append({
'error': res['error']['message'],
'ip': ip
})
return
vt = res['data']['attributes']
for entry in VT_ATTRIBUTES_MAPPING.keys():
if entry in vt:
try:
data[entry] = vt[entry]
except KeyError:
data[entry] = 'Unknown'
report.append(data)
def main():
args = checkArguments()
report = {}
report['queries'] = list()
report['domains'] = list()
report['graphics'] = dict()
report['ip'] = list()
report['vt'] = list()
report['dnstunneling'] = dict()
if not args.file:
print("Please, specify the option --file")
exit(0)
if not args.config:
print("Please, specify the config file ith the parameter --config")
exit(0)
# Read the config file
config = readConfigFile(args.config)
if config is None:
print("Failed to read the config file")
exit(0)
# Get the API key for VirusTotal
checkVt = True
try:
api_key = config["api_key"]
except KeyError:
print("Can't find the key in the config file. Bypass the check to VirusTotal")
checkVt = False
#exit(0)
# Check if DNS BlackList is specified
try:
dnsbl = config["dnsbl"]
except KeyError:
print("Can't find the dnsbl in the config file")
exit(0)
data = readPcapFile(args.file)
if data is None:
print("Failed to read the pcap file")
exit(0)
# For number of request into the domain
numRequest = dict()
numIPDst = dict()
allIPs = list()
allDomains = list()
print("Parsing DNS capture")
for d in data:
if d.haslayer(DNS):
# For DNS query
if isinstance(d.qd, DNSQR):
query = d.qd.qname
qtype = _getType(d.qd.qtype)
src = d[IP].src
dst = d[IP].dst
# print(f"{d[DNS].id}; SRC: {src}; DST: {dst}, Query: {query}; TYPE: {qtype}; Size: {len(d[DNS])}")
report['queries'].append({
'id': d[DNS].id,
'src': src,
'dst': dst,
'query': query,
'type': qtype,
'len': len(d[DNS]),
})
# Count the number of request for a domain
if query not in numRequest:
numRequest[query] = 1
else:
numRequest[query] += 1
# Count the number of IP dst
if dst not in numIPDst:
numIPDst[dst] = 1
else:
numIPDst[dst] += 1
if not _privateIP(dst) and dst not in allIPs:
allIPs.append(dst)
allDomains.append({
'query': query,
'src': src,
'len': len(d[DNS]),
})
# For DNS response
if isinstance(d.an, DNSRR):
ans = d.an.rrname
atype = _getType(d.an.type)
src = d[IP].src
dst = d[IP].dst
# print(f"{d[DNS].id}; SRC: {src}; DST: {dst}, Answer: {ans}; TYPE: {atype}")
report['queries'].append({
'id': d[DNS].id,
'src': src,
'dst': dst,
'answer': ans,
'type': atype,
'len': len(d[DNS]),
})
# Create directory for the report
print("Generating directory for reports")
reportName = createReportsDirectory()
# Draw matplotlib
today = getTodayDate()
fname_domain = f'domain_{today}.png'
fname_ip = f'ip_{today}.png'
report['graphics']['domain'] = fname_domain
report['graphics']['ip'] = fname_ip
x = list(numRequest.keys())[0:20]
y = list(numRequest.values())[0:20]
plt.figure(figsize=(15, 5))
plt.barh(x, y)
#plt.show()
plt.savefig(
fname=f"reports/{today}/{fname_domain}",
dpi='figure',
format='png'
)
x = list(numIPDst.keys())[0:20]
y = list(numIPDst.values())[0:20]
plt.figure(figsize=(10, 5))
plt.barh(x, y, 0.5)
#plt.show()
plt.savefig(
fname=f"reports/{today}/{fname_ip}",
dpi='figure',
format='png'
)
# Identify DNS Tunneling attacks
print("Analyzing DNS capture for identifying DNS Tunneling")
tunnelingDNSAttacks(report['dnstunneling'], allDomains, dnsbl)
# For external IP, use curl to VirusTotal for analyzing the IP
checkVt = False
if checkVt:
print("Getting IP informations to VirusTotal")
for ip in allIPs:
getIPVirusTotal(api_key, ip, report['vt'])
# We generating the report
print("Generating the report")
generateHtmlReport(report)
print(f"Report generated at this directory: {reportName}")
if __name__ == "__main__":
main()

14
main.py

@ -13,7 +13,8 @@ def checkArguments():
parser = ArgumentParser(description="baoSOC")
parser.add_argument('-c', '--config', help='Config file')
parser.add_argument('--hash', help='Hash file', action='store_true')
parser.add_argument('--dns', help='WhoIs')
parser.add_argument('--dns', help='Get domain name information')
parser.add_argument('--dnsattacks', help='Parse DNS pcap file')
return parser.parse_args()
def usage():
@ -25,6 +26,7 @@ def usage():
print("-c PATH, --config PATH\t\tConfig file - mandatory")
print("--hash FILE\t\t\tHash the file and check in VirusTotal")
print("--dns FQDN\t\t\tGet information regarding the domain with whois and VirusTotal")
print("--dnsattacks FILE\t\t\tParse the DNS pcap file and identify some DNS attacks")
def mainMenu():
print("\n baoSOC ")
@ -77,23 +79,21 @@ def main():
#print(vt.getIPVirusTotal("1.1.1.1", report))
if args.dns:
dns = DNS(config['api_key'])
dns = DNS(config['api_key'], args.dns)
print("IP Informations:\n")
report = dns.resolver(args.dns)
report = dns.resolver()
for key in report.keys():
s = f"{key}: "
print(s)
print(f"{key}: ")
for entry in report[key]:
for subkey in entry.keys():
#print(f"\t{subkey}: {entry[subkey].decode()}")
value = entry[subkey]
if isinstance(value, bytes):
value = value.decode()
print(f"\t{subkey}: {value}")
print("\nReport with Whois:\n")
report = dns.whois(args.dns)
report = dns.whois()
for key in report.keys():
if isinstance(report[key], list):
print(f"{key}:")