Update project

This commit is contained in:
geoffrey 2024-07-12 17:36:38 +02:00
parent 91591626fb
commit d89959dc0c
9 changed files with 69 additions and 451 deletions

13
exec.sh

@ -1,6 +1,15 @@
#!/usr/bin/sh #!/usr/bin/sh
use_influxdb=true
influxdb_host="<ip>"
influxdb_orgid="<org ID>"
influxdb_token="<token>"
influxdb_bucket="tcp_metrics"
clang-11 -g -O2 -target bpf -c tp_tcp.c -o tp_tcp.o && \ clang-11 -g -O2 -target bpf -c tp_tcp.c -o tp_tcp.o && \
gcc ic.c load_bpf.c -o load_bpf -lbpf && \ gcc ic.c load_bpf.c -o load_bpf -lbpf && \
#sudo ./load_bpf 192.168.1.68 f32d493484526abc _HK7-cwCZuOiaBIFi17J3riUeQ8OeR6oOp9o3QZMNpehdJMTkleR4B7-CczXSzwhx656GMZi3m6h15h59burbg== tcp_metrics if [ "$use_influxdb" = true ]; then
sudo ./load_bpf 10.231.246.26 392fcb79fc296d8e EwmJtlAXAlJO_e1zjYwxLL2lD3E9jgDRAbba3Wsssn7HcqXKv1OrsmZ66ZlEVwwNMG6gx3_AxqMFnpr6MjuSZQ== tcp_metrics sudo ./load_bpf $influxdb_host $influxdb_orgid $influxdb_token $influxdb_bucket
else
sudo ./load_bpf --no-influxdb
fi

325
ic_v1.c

@ -1,325 +0,0 @@
/*
* Influx C (ic) client for data capture
* Developer: Nigel Griffiths.
* (C) Copyright 2021 Nigel Griffiths
This program is free software: you can redistribute it and/or modify
it under the terms of the gnu general public license as published by
the free software foundation, either version 3 of the license, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but without any warranty; without even the implied warranty of
merchantability or fitness for a particular purpose. see the
gnu general public license for more details.
You should have received a copy of the gnu general public license
along with this program. if not, see <http://www.gnu.org/licenses/>.
Compile: cc ic.c -g -O3 -o ic
*/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <ctype.h>
#include <math.h>
#include <string.h>
#include <sys/errno.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netdb.h>
#define DEBUG if(debug)
#define MEGABYTE ( 1024 * 1024 ) /* USed as the default buffer sizes */
int debug = 0; /* 0=off, 1=on basic, 2=trace like output */
char influx_hostname[1024 + 1] = { 0 };/* details of the influxdb server or telegraf */
char influx_ip[16 + 1] = { 0 };
long influx_port = 0;
char influx_database[256+1]; /* the influxdb database */
char influx_username[64+1]; /* optional for influxdb access */
char influx_password[64+1]; /* optional for influxdb access */
char *output; /* all the stats must fit in this buffer */
long output_size = 0;
long output_char = 0;
char *influx_tags; /* saved tags for every influxdb line protocol mesurement */
int subended = 0; /* stop ic_subend and ic_measureend both enig the measure */
int first_sub = 0; /* need to remove the ic_measure measure before adding ic_sub measure */
char saved_section[64];
char saved_sub[64];
int sockfd; /* file desciptor for socket connection */
void error(char *buf)
{
fprintf(stderr, "error: \"%s\" errno=%d meaning=\"%s\"\n", buf, errno, strerror(errno));
close(sockfd);
sleep(2); /* this can help the socket close cleanly at the remote end */
exit(1);
}
void ic_debug(int level)
{
debug = level;
}
/* ic_tags() argument is the measurement tags for influddb */
/* example: "host=vm1234" note:the comma & hostname of the virtual machine sending the data */
/* complex: "host=lpar42,serialnum=987654,arch=power9" note:the comma separated list */
void ic_tags(char *t)
{
DEBUG fprintf(stderr,"ic_tags(%s)\n",t);
if( influx_tags == (char *) 0) {
if( (influx_tags = (char *)malloc(MEGABYTE)) == (char *)-1)
error("failed to malloc() tags buffer");
}
strncpy(influx_tags,t,256);
}
void ic_influx_database(char *host, long port, char *db) /* note: converts influxdb hostname to ip address */
{
struct hostent *he;
char errorbuf[1024 +1 ];
influx_port = port;
strncpy(influx_database,db,256);
if(host[0] <= '0' && host[0] <='9') {
DEBUG fprintf(stderr,"ic_influx(ipaddr=%s,port=%ld,database=%s))\n",host,port,db);
strncpy(influx_ip,host,16);
} else {
DEBUG fprintf(stderr,"ic_influx_by_hostname(host=%s,port=%ld,database=%s))\n",host,port,db);
strncpy(influx_hostname,host,1024);
if (isalpha(host[0])) {
he = gethostbyname(host);
if (he == NULL) {
sprintf(errorbuf, "influx host=%s to ip address convertion failed gethostbyname(), bailing out\n", host);
error(errorbuf);
}
/* this could return multiple ip addresses but we assume its the first one */
if (he->h_addr_list[0] != NULL) {
strcpy(influx_ip, inet_ntoa(*(struct in_addr *) (he->h_addr_list[0])));
DEBUG fprintf(stderr,"ic_influx_by_hostname hostname=%s converted to ip address %s))\n",host,influx_ip);
} else {
sprintf(errorbuf, "influx host=%s to ip address convertion failed (empty list), bailing out\n", host);
error(errorbuf);
}
} else {
strcpy( influx_ip, host); /* perhaps the hostname is actually an ip address */
}
}
}
void ic_influx_userpw(char *user, char *pw)
{
DEBUG fprintf(stderr,"ic_influx_userpw(username=%s,pssword=%s))\n",user,pw);
strncpy(influx_username,user,64);
strncpy(influx_password,pw,64);
}
int create_socket() /* returns 1 for error and 0 for ok */
{
int i;
static char buffer[4096];
static struct sockaddr_in serv_addr;
if(debug) DEBUG fprintf(stderr, "socket: trying to connect to \"%s\":%ld\n", influx_ip, influx_port);
if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
error("socket() call failed");
return 0;
}
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = inet_addr(influx_ip);
serv_addr.sin_port = htons(influx_port);
/* connect tot he socket offered by the web server */
if (connect(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
DEBUG fprintf(stderr, " connect() call failed errno=%d", errno);
return 0;
}
return 1;
}
void ic_check(long adding) /* Check the buffer space */
{
if(output == (char *)0) { /* First time create the buffer *
if( (output = (char *)malloc(MEGABYTE)) == (char *)-1)
error("failed to malloc() output buffer");
}
if(output_char + (2*adding) > output_size) /* When near the end of the output buffer, extend it*/
if( (output = (char *)realloc(output, output_size + MEGABYTE)) == (char *)-1)
error("failed to realloc() output buffer");
}
}
void remove_ending_comma_if_any()
{
if (output[output_char - 1] == ',') {
output[output_char - 1] = 0; /* remove the char */
output_char--;
}
}
void ic_measure(char *section)
{
ic_check( strlen(section) + strlen(influx_tags) + 3);
output_char += sprintf(&output[output_char], "%s,%s ", section, influx_tags);
strcpy(saved_section, section);
first_sub = 1;
subended = 0;
DEBUG fprintf(stderr, "ic_measure(\"%s\") count=%ld\n", section, output_char);
}
void ic_measureend()
{
ic_check( 4 );
remove_ending_comma_if_any();
if (!subended) {
output_char += sprintf(&output[output_char], " \n");
}
subended = 0;
DEBUG fprintf(stderr, "ic_measureend()\n");
}
/* Note this added a further tag to the measurement of the "resource_name" */
/* measurement might be "disks" */
/* sub might be "sda1", "sdb1", etc */
void ic_sub(char *resource)
{
int i;
ic_check( strlen(saved_section) + strlen(influx_tags) +strlen(saved_sub) + strlen(resource) + 9);
/* remove previously added section */
if (first_sub) {
for (i = output_char - 1; i > 0; i--) {
if (output[i] == '\n') {
output[i + 1] = 0;
output_char = i + 1;
break;
}
}
}
first_sub = 0;
/* remove the trailing s */
strcpy(saved_sub, saved_section);
if (saved_sub[strlen(saved_sub) - 1] == 's') {
saved_sub[strlen(saved_sub) - 1] = 0;
}
output_char += sprintf(&output[output_char], "%s,%s,%s_name=%s ", saved_section, influx_tags, saved_sub, resource);
subended = 0;
DEBUG fprintf(stderr, "ic_sub(\"%s\") count=%ld\n", resource, output_char);
}
void ic_subend()
{
ic_check( 4 );
remove_ending_comma_if_any();
output_char += sprintf(&output[output_char], " \n");
subended = 1;
DEBUG fprintf(stderr, "ic_subend()\n");
}
void ic_long(char *name, long long value)
{
ic_check( strlen(name) + 16 + 4 );
output_char += sprintf(&output[output_char], "%s=%lldi,", name, value);
DEBUG fprintf(stderr, "ic_long(\"%s\",%lld) count=%ld\n", name, value, output_char);
}
void ic_double(char *name, double value)
{
ic_check( strlen(name) + 16 + 4 );
if (isnan(value) || isinf(value)) { /* not-a-number or infinity */
DEBUG fprintf(stderr, "ic_double(%s,%.1f) - nan error\n", name, value);
} else {
output_char += sprintf(&output[output_char], "%s=%.3f,", name, value);
DEBUG fprintf(stderr, "ic_double(\"%s\",%.1f) count=%ld\n", name, value, output_char);
}
}
void ic_string(char *name, char *value)
{
int i;
int len;
ic_check( strlen(name) + strlen(value) + 4 );
len = strlen(value);
for (i = 0; i < len; i++) /* replace problem characters and with a space */
if (value[i] == '\n' || iscntrl(value[i]))
value[i] = ' ';
output_char += sprintf(&output[output_char], "%s=\"%s\",", name, value);
DEBUG fprintf(stderr, "ic_string(\"%s\",\"%s\") count=%ld\n", name, value, output_char);
}
void ic_push()
{
char header[1024];
char result[1024];
char buffer[1024 * 8];
int ret;
int i;
int total;
int sent;
int code;
if (output_char == 0) /* nothing to send so skip this operation */
return;
if (influx_port) {
DEBUG fprintf(stderr, "ic_push() size=%ld\n", output_char);
if (create_socket() == 1) {
sprintf(buffer, "POST /write?db=%s&u=%s&p=%s HTTP/1.1\r\nHost: %s:%ld\r\nContent-Length: %ld\r\n\r\n",
influx_database, influx_username, influx_password, influx_hostname, influx_port, output_char);
DEBUG fprintf(stderr, "buffer size=%ld\nbuffer=<%s>\n", strlen(buffer), buffer);
if ((ret = write(sockfd, buffer, strlen(buffer))) != strlen(buffer)) {
fprintf(stderr, "warning: \"write post to sockfd failed.\" errno=%d\n", errno);
}
total = output_char;
sent = 0;
if (debug == 2)
fprintf(stderr, "output size=%d output=\n<%s>\n", total, output);
while (sent < total) {
ret = write(sockfd, &output[sent], total - sent);
DEBUG fprintf(stderr, "written=%d bytes sent=%d total=%d\n", ret, sent, total);
if (ret < 0) {
fprintf(stderr, "warning: \"write body to sockfd failed.\" errno=%d\n", errno);
break;
}
sent = sent + ret;
}
for (i = 0; i < 1024; i++) /* empty the buffer */
result[i] = 0;
if ((ret = read(sockfd, result, sizeof(result))) > 0) {
result[ret] = 0;
DEBUG fprintf(stderr, "received bytes=%d data=<%s>\n", ret, result);
sscanf(result, "HTTP/1.1 %d", &code);
for (i = 13; i < 1024; i++)
if (result[i] == '\r')
result[i] = 0;
if (debug == 2)
fprintf(stderr, "http-code=%d text=%s [204=Success]\n", code, &result[13]);
if (code != 204)
fprintf(stderr, "code %d -->%s<--\n", code, result);
}
close(sockfd);
sockfd = 0;
DEBUG fprintf(stderr, "ic_push complete\n");
} else {
DEBUG fprintf(stderr, "socket create failed\n");
}
} else error("influx port is not set, bailing out");
output[0] = 0;
output_char = 0;
}

@ -1,3 +1,4 @@
#include <stdlib.h>
#include <stdio.h> #include <stdio.h>
#include <bpf/bpf.h> #include <bpf/bpf.h>
#include <bpf/libbpf.h> #include <bpf/libbpf.h>
@ -23,6 +24,22 @@ static void clean_obj(struct bpf_object *obj){
static void usage(char *app){ static void usage(char *app){
printf("Usage: %s <host> <ordID> <token> <bucket>\n", app); printf("Usage: %s <host> <ordID> <token> <bucket>\n", app);
} }
static int check_arguments(int argc, char *argv[]){
if (argc < 1){
usage(argv[0]);
return -1;
}
if (strcmp(argv[1], "--no-influxdb") == 0)
return 0;
if (argc < CNT_ARGS){
usage(argv[0]);
return -1;
}
return argc; // Return the number of arguments
}
int main(int argc, char *argv[]){ int main(int argc, char *argv[]){
const char *fileObj = "tp_tcp.o"; const char *fileObj = "tp_tcp.o";
struct bpf_object *obj; struct bpf_object *obj;
@ -42,29 +59,35 @@ int main(int argc, char *argv[]){
char orgID[INFLUXDB_SIZE]; char orgID[INFLUXDB_SIZE];
char token[TOKEN_SIZE]; char token[TOKEN_SIZE];
char bucket[BUCKET_SIZE]; char bucket[BUCKET_SIZE];
int use_influxdb = 0;
int debug = 1;
if (argc < CNT_ARGS){ // We get args
usage(argv[0]); err = check_arguments(argc, argv);
if (err == -1)
return -1; return -1;
if (err == CNT_ARGS){
strncpy(host, argv[1], INFLUXDB_SIZE);
strncpy(orgID, argv[2], INFLUXDB_SIZE);
strncpy(token, argv[3], TOKEN_SIZE);
strncpy(bucket, argv[4], BUCKET_SIZE);
use_influxdb = 1;
} }
strncpy(host, argv[1], INFLUXDB_SIZE);
strncpy(orgID, argv[2], INFLUXDB_SIZE);
strncpy(token, argv[3], TOKEN_SIZE);
strncpy(bucket, argv[4], BUCKET_SIZE);
// Connect to InfluxDB // Connect to InfluxDB
ic_influx_database(host, 8086, bucket); if (use_influxdb) {
ic_influx_orgID(orgID); ic_influx_database(host, 8086, bucket);
ic_influx_token(token); ic_influx_orgID(orgID);
ic_influx_token(token);
}
obj = bpf_object__open_file(fileObj, NULL); obj = bpf_object__open_file(fileObj, NULL);
if (!obj){ if (!obj){
printf("Failed to open %s\n", fileObj); printf("Failed to open %s\n", fileObj);
return -1; return -1;
} }
//LIBBPF_OPTS(bpf_map_create_opts, opts, .map_flags = BPF_F_MMAPABLE);
//map_fd = bpf_create_map(BPF_MAP_TYPE_HASH, sizeof(int), sizeof(struct reset), 4096, BPF_ANY);
//map_fd = bpf_create_map(BPF_MAP_TYPE_ARRAY, sizeof(int), sizeof(struct reset), 4096, BPF_ANY); //map_fd = bpf_create_map(BPF_MAP_TYPE_ARRAY, sizeof(int), sizeof(struct reset), 4096, BPF_ANY);
err = bpf_object__load(obj); err = bpf_object__load(obj);
@ -75,7 +98,7 @@ int main(int argc, char *argv[]){
return -1; return -1;
} }
program = bpf_object__find_program_by_name(obj, "tcp_retransmit"); program = bpf_object__find_program_by_name(obj, "tcp_rst_stats");
if (!program){ if (!program){
printf("Failed to find the program\n"); printf("Failed to find the program\n");
clean_obj(obj); clean_obj(obj);
@ -124,7 +147,6 @@ int main(int argc, char *argv[]){
__s16 f = AF_INET; __s16 f = AF_INET;
err = bpf_map_update_elem(map_fd_filter_family, &keys, &f, BPF_ANY); err = bpf_map_update_elem(map_fd_filter_family, &keys, &f, BPF_ANY);
printf("Waiting for new packets\n");
while(1){ while(1){
err = bpf_map_lookup_elem(map_fd_index, &keys, &indexPackets); err = bpf_map_lookup_elem(map_fd_index, &keys, &indexPackets);
@ -143,21 +165,22 @@ int main(int argc, char *argv[]){
memcpy(tmp, s, 35); memcpy(tmp, s, 35);
d = inet_ntoa(*dest); d = inet_ntoa(*dest);
printf("Sport: %d; dport: %d %d %d %s - %s\n", s_reset.sport, s_reset.dport, s_reset.family, s_reset.proto, tmp, d); if (debug)
printf("Sport: %d; dport: %d %d %d %s - %s\n", s_reset.sport, s_reset.dport, s_reset.family, s_reset.proto, tmp, d);
// Get the last value from InfluxDB if (use_influxdb) {
//lastValue = ic_read(s, d); printf("Send data to influx\n");
// Send data to InfluxDB
snprintf(buf, BUF_SIZE, "host=%s", s);
ic_tags(buf);
// Send data to InfluxDB ic_measure("tcp_reset");
snprintf(buf, BUF_SIZE, "host=%s", s); ic_long("value", 1);
ic_tags(buf); ic_measureend();
ic_push();
ic_measure("tcp_reset");
ic_long(d, 1);
ic_measureend();
ic_push();
memset(buf, 0, BUF_SIZE); memset(buf, 0, BUF_SIZE);
}
} }
memset(&s_reset, 0, sizeof(struct reset)); memset(&s_reset, 0, sizeof(struct reset));
} }

Binary file not shown.

@ -1,44 +0,0 @@
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include "ic.h"
int main(int argc, char *argv[], char *argp[]){
char buf[300];
char host[64];
char orgID[64];
char token[128];
int stats = 0;
if (argc < 3){
printf("Usage: ./main <host> <orgID> <token>");
return 0;
}
printf("%s %s %s\n", argv[1], argv[2], argv[3]);
memcpy(host, argv[1], 64);
memcpy(orgID, argv[2], 64);
memcpy(token, argv[3], 128);
printf("Host: %s\n", host);
printf("orgID: %s\n", orgID);
printf("Token: %s\n", token);
ic_influx_database(host, 8086, "tcp_metrics");
ic_influx_orgID(orgID);
ic_influx_token(token);
ic_debug(2);
while(1){
snprintf(buf, 300, "host=%s", host);
ic_tags(buf);
ic_measure("tcp_reset");
stats += 1;
ic_measureend();
ic_push();
sleep(30);
}
return 0;
}

@ -1,9 +0,0 @@
#!/usr/bin/env python3
from bcc import BPF
with open("tp_tcp.c", 'r') as f:
data = f.read()
b = BPF(text=data)
#b = BPF(src_file="tp_tcp_py.c")

@ -4,10 +4,9 @@ curl -i -X POST "http://192.168.1.68:8086/api/v2/query?bucket=tcp_metrics&orgID=
--header "Authorization: Token $1" \ --header "Authorization: Token $1" \
--header "Accept: application/csv" \ --header "Accept: application/csv" \
--header "Content-Type: application/vnd.flux" \ --header "Content-Type: application/vnd.flux" \
--data 'from(bucket: "tcp_metrics")|> range(start: -1m)|> filter(fn: (r) => r["_measurement"] == "tcp_reset")|> filter(fn: (r) => r["host"] == "127.0.0.1")|> filter(fn: (r) => r["_field"] == "127.0.0.1")|> last()' --data 'from(bucket: "tcp_metrics")
# --data 'from(bucket: "tcp_metrics") |> range(start: -5m)
# |> range(start: -5m) |> filter(fn: (r) => r["_measurement"] == "tcp_reset")
# |> filter(fn: (r) => r["_measurement"] == "tcp_reset") |> filter(fn: (r) => r["host"] == "127.0.0.1")
# |> filter(fn: (r) => r["host"] == "127.0.0.1") |> filter(fn: (r) => r["_field"] == "value")
# |> filter(fn: (r) => r["_field"] == "127.0.0.1") |> yield(name: "mean")'
# |> yield(name: "mean")'

@ -1,7 +1,5 @@
#!/usr/bin/bash #!/usr/bin/bash
#curl -i -X POST 'http://localhost:8086/api/v2/write?bucket=tcp_metrics&orgID=392fcb79fc296d8e&rp&precision=ns' \
curl -i -X POST "http://localhost:8086/api/v2/write?bucket=tcp_metrics&orgID=392fcb79fc296d8e" \ curl -i -X POST "http://localhost:8086/api/v2/write?bucket=tcp_metrics&orgID=392fcb79fc296d8e" \
--header "Authorization: Token $1" \ --header "Authorization: Token $1" \
--data "cpu_load_short,host=server01,region=us-west value=1" --data "cpu_load_short,host=server01, value=1"
# --data-raw 'cpu_load_short,host=server01,region=us-west value=0.64 1434055562000000000'

@ -10,22 +10,7 @@
char LICENSE[] SEC("license") = "Dual BSD/GPL"; char LICENSE[] SEC("license") = "Dual BSD/GPL";
struct ctx_receive_reset { // Format: /sys/kernel/debug/tracing/events/tcp/tcp_send_reset/format
__u16 common_type; // unsigned short
__u8 common_flags; // unsigned char
__u8 common_count; // unsigned char
__s32 pid; // int
const void *skaddr;
__u16 sport;
__u16 dport;
__u16 family;
__u8 saddr[4];
__u8 daddr[4];
__u8 saddr_v6[16];
__u8 daddr_v6[16];
__u64 sock_cookie;
};
struct ctx_send_reset { struct ctx_send_reset {
__u16 common_type; // unsigned short __u16 common_type; // unsigned short
__u8 common_flags; // unsigned char __u8 common_flags; // unsigned char
@ -74,31 +59,13 @@ struct {
// sudo tcpdump -i any 'tcp[13] & 4 != 0' -n -> filter TCP reset flags // sudo tcpdump -i any 'tcp[13] & 4 != 0' -n -> filter TCP reset flags
/* /*
* This project do not trace any sniffing ports, because, the tracepoint tcp:tcp_send_reset
* works only for an establish socket, but, if you have a lot of TCP RST, you can have
* an issue with your system
*/
/*
* Identify all tracepoint available
* - cat /sys/kernel/tracing/available_events
* Enable an event:
* - echo 'tcp_receive_reset' >> /sys/kernel/tracing/set_event -> important to add the '>>'
* Docs: https://docs.kernel.org/trace/events.html
* https://events.linuxfoundation.org/wp-content/uploads/2022/10/elena-zannoni-tracing-tutorial-LF-2021.pdf
* https://docs.kernel.org/trace/tracepoints.html
* Why we need to detect RST: * Why we need to detect RST:
* When we scan the port, the scanner send an SYN flag and if the port is block, we receive a RST flag: * When we scan the port, the scanner send an SYN flag and if the port is block, we receive a RST flag:
* listening on any, link-type LINUX_SLL2 (Linux cooked v2), snapshot length 262144 bytes
10:48:28.531295 lo In IP localhost.43961 > localhost.tproxy: Flags [S], seq 2197047013, win 1024, options [mss 1460], length 0
10:48:28.531306 lo In IP localhost.tproxy > localhost.43961: Flags [R.], seq 0, ack 2197047014, win 0, length 0
* But we can also block all receive RST: iptables -I INPUT -p tcp --dport <port> -j REJECT --reject-with tcp-reset * But we can also block all receive RST: iptables -I INPUT -p tcp --dport <port> -j REJECT --reject-with tcp-reset
*/ */
//SEC("tp/tcp_retransmit_synack")
//SEC("tracepoint/tcp/tcp_receive_reset")
SEC("tracepoint/tcp/tcp_send_reset") SEC("tracepoint/tcp/tcp_send_reset")
int tcp_retransmit(struct ctx_send_reset *ctx){ int tcp_rst_stats(struct ctx_send_reset *ctx){
struct reset s_reset = {}; struct reset s_reset = {};
int *index; int *index;
int keys = 0; int keys = 0;