baoSOC/dnsattacks.py
2024-06-19 21:27:39 +02:00

253 lines
6.8 KiB
Python

#!/usr/bin/venv python
# -*- coding: utf-8 -*-
from argparse import ArgumentParser
from scapy.all import *
from scapy.layers.dns import DNSQR, DNSRR, DNS
import matplotlib.pyplot as plt
import numpy as np
import requests
import re
from reports import generateHtmlReport, createReportsDirectory, getTodayDate
from tunneling import tunnelingDNSAttacks
from config import VT_ATTRIBUTES_MAPPING
def _getType(t):
"""
This function identify the message type according to the RFC 1035:
https://datatracker.ietf.org/doc/html/rfc1035#section-3.2.2
"""
rtype = {
1: 'A', 2: 'NS', 3: 'MD', 4: 'MF',
5: 'CNAME', 6: 'SOA', 7: 'MB', 8: 'MG', 9: 'MR',
10: 'NULL', 11: 'WKS', 12: 'PTR', 13: 'HINFO', 14: 'MINFO',
15: 'MX', 16: 'TXT',
28: 'AAAA'
}
try:
return rtype[t]
except KeyError:
return t
def readPcapFile(pcap):
"""
This function read the pcap file with scapy
"""
data = None
try:
data = rdpcap(pcap)
except Scapy_Exception:
print(f"Failed to read {pcap}")
return None
return data
def _privateIP(ip):
"""
This function check if the ip is from RFC 1918
https://stackoverflow.com/questions/2814002/private-ip-address-identifier-in-regular-expression
"""
res = False
rfs1918 = [
'(^10\.)',
'(^172\.1[6-9]\.)',
'(^172\.2[0-9]\.)',
'(^172\.3[0-1]\.)',
'(^192\.168\.)',
]
for privateIP in rfs1918:
if re.search(privateIP, ip):
res = True
return res
def getIPVirusTotal(api_key, ip, report):
"""
This function get information of the IP
"""
url = f"https://www.virustotal.com/api/v3/ip_addresses/{ip}"
headers = {
'x-apikey': api_key,
}
res = requests.get(url, headers=headers).json()
data = dict()
data['ip'] = ip
if 'error' in res:
report.append({
'error': res['error']['message'],
'ip': ip
})
return
vt = res['data']['attributes']
for entry in VT_ATTRIBUTES_MAPPING.keys():
if entry in vt:
try:
data[entry] = vt[entry]
except KeyError:
data[entry] = 'Unknown'
report.append(data)
def main():
args = checkArguments()
report = {}
report['queries'] = list()
report['domains'] = list()
report['graphics'] = dict()
report['ip'] = list()
report['vt'] = list()
report['dnstunneling'] = dict()
if not args.file:
print("Please, specify the option --file")
exit(0)
if not args.config:
print("Please, specify the config file ith the parameter --config")
exit(0)
# Read the config file
config = readConfigFile(args.config)
if config is None:
print("Failed to read the config file")
exit(0)
# Get the API key for VirusTotal
checkVt = True
try:
api_key = config["api_key"]
except KeyError:
print("Can't find the key in the config file. Bypass the check to VirusTotal")
checkVt = False
#exit(0)
# Check if DNS BlackList is specified
try:
dnsbl = config["dnsbl"]
except KeyError:
print("Can't find the dnsbl in the config file")
exit(0)
data = readPcapFile(args.file)
if data is None:
print("Failed to read the pcap file")
exit(0)
# For number of request into the domain
numRequest = dict()
numIPDst = dict()
allIPs = list()
allDomains = list()
print("Parsing DNS capture")
for d in data:
if d.haslayer(DNS):
# For DNS query
if isinstance(d.qd, DNSQR):
query = d.qd.qname
qtype = _getType(d.qd.qtype)
src = d[IP].src
dst = d[IP].dst
# print(f"{d[DNS].id}; SRC: {src}; DST: {dst}, Query: {query}; TYPE: {qtype}; Size: {len(d[DNS])}")
report['queries'].append({
'id': d[DNS].id,
'src': src,
'dst': dst,
'query': query,
'type': qtype,
'len': len(d[DNS]),
})
# Count the number of request for a domain
if query not in numRequest:
numRequest[query] = 1
else:
numRequest[query] += 1
# Count the number of IP dst
if dst not in numIPDst:
numIPDst[dst] = 1
else:
numIPDst[dst] += 1
if not _privateIP(dst) and dst not in allIPs:
allIPs.append(dst)
allDomains.append({
'query': query,
'src': src,
'len': len(d[DNS]),
})
# For DNS response
if isinstance(d.an, DNSRR):
ans = d.an.rrname
atype = _getType(d.an.type)
src = d[IP].src
dst = d[IP].dst
# print(f"{d[DNS].id}; SRC: {src}; DST: {dst}, Answer: {ans}; TYPE: {atype}")
report['queries'].append({
'id': d[DNS].id,
'src': src,
'dst': dst,
'answer': ans,
'type': atype,
'len': len(d[DNS]),
})
# Create directory for the report
print("Generating directory for reports")
reportName = createReportsDirectory()
# Draw matplotlib
today = getTodayDate()
fname_domain = f'domain_{today}.png'
fname_ip = f'ip_{today}.png'
report['graphics']['domain'] = fname_domain
report['graphics']['ip'] = fname_ip
x = list(numRequest.keys())[0:20]
y = list(numRequest.values())[0:20]
plt.figure(figsize=(15, 5))
plt.barh(x, y)
#plt.show()
plt.savefig(
fname=f"reports/{today}/{fname_domain}",
dpi='figure',
format='png'
)
x = list(numIPDst.keys())[0:20]
y = list(numIPDst.values())[0:20]
plt.figure(figsize=(10, 5))
plt.barh(x, y, 0.5)
#plt.show()
plt.savefig(
fname=f"reports/{today}/{fname_ip}",
dpi='figure',
format='png'
)
# Identify DNS Tunneling attacks
print("Analyzing DNS capture for identifying DNS Tunneling")
tunnelingDNSAttacks(report['dnstunneling'], allDomains, dnsbl)
# For external IP, use curl to VirusTotal for analyzing the IP
checkVt = False
if checkVt:
print("Getting IP informations to VirusTotal")
for ip in allIPs:
getIPVirusTotal(api_key, ip, report['vt'])
# We generating the report
print("Generating the report")
generateHtmlReport(report)
print(f"Report generated at this directory: {reportName}")
if __name__ == "__main__":
main()