diff --git a/exec.sh b/exec.sh index ebe860b..5479fd9 100755 --- a/exec.sh +++ b/exec.sh @@ -1,6 +1,15 @@ #!/usr/bin/sh +use_influxdb=true +influxdb_host="" +influxdb_orgid="" +influxdb_token="" +influxdb_bucket="tcp_metrics" + clang-11 -g -O2 -target bpf -c tp_tcp.c -o tp_tcp.o && \ gcc ic.c load_bpf.c -o load_bpf -lbpf && \ -#sudo ./load_bpf 192.168.1.68 f32d493484526abc _HK7-cwCZuOiaBIFi17J3riUeQ8OeR6oOp9o3QZMNpehdJMTkleR4B7-CczXSzwhx656GMZi3m6h15h59burbg== tcp_metrics -sudo ./load_bpf 10.231.246.26 392fcb79fc296d8e EwmJtlAXAlJO_e1zjYwxLL2lD3E9jgDRAbba3Wsssn7HcqXKv1OrsmZ66ZlEVwwNMG6gx3_AxqMFnpr6MjuSZQ== tcp_metrics +if [ "$use_influxdb" = true ]; then + sudo ./load_bpf $influxdb_host $influxdb_orgid $influxdb_token $influxdb_bucket +else + sudo ./load_bpf --no-influxdb +fi diff --git a/ic_v1.c b/ic_v1.c deleted file mode 100644 index 860f465..0000000 --- a/ic_v1.c +++ /dev/null @@ -1,325 +0,0 @@ -/* - * Influx C (ic) client for data capture - * Developer: Nigel Griffiths. - * (C) Copyright 2021 Nigel Griffiths - - This program is free software: you can redistribute it and/or modify - it under the terms of the gnu general public license as published by - the free software foundation, either version 3 of the license, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but without any warranty; without even the implied warranty of - merchantability or fitness for a particular purpose. see the - gnu general public license for more details. - - You should have received a copy of the gnu general public license - along with this program. if not, see . - - Compile: cc ic.c -g -O3 -o ic - */ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#define DEBUG if(debug) -#define MEGABYTE ( 1024 * 1024 ) /* USed as the default buffer sizes */ - -int debug = 0; /* 0=off, 1=on basic, 2=trace like output */ - -char influx_hostname[1024 + 1] = { 0 };/* details of the influxdb server or telegraf */ -char influx_ip[16 + 1] = { 0 }; -long influx_port = 0; - -char influx_database[256+1]; /* the influxdb database */ -char influx_username[64+1]; /* optional for influxdb access */ -char influx_password[64+1]; /* optional for influxdb access */ - -char *output; /* all the stats must fit in this buffer */ -long output_size = 0; -long output_char = 0; - -char *influx_tags; /* saved tags for every influxdb line protocol mesurement */ - -int subended = 0; /* stop ic_subend and ic_measureend both enig the measure */ -int first_sub = 0; /* need to remove the ic_measure measure before adding ic_sub measure */ -char saved_section[64]; -char saved_sub[64]; - -int sockfd; /* file desciptor for socket connection */ - -void error(char *buf) -{ - fprintf(stderr, "error: \"%s\" errno=%d meaning=\"%s\"\n", buf, errno, strerror(errno)); - close(sockfd); - sleep(2); /* this can help the socket close cleanly at the remote end */ - exit(1); -} - -void ic_debug(int level) -{ - debug = level; -} - -/* ic_tags() argument is the measurement tags for influddb */ -/* example: "host=vm1234" note:the comma & hostname of the virtual machine sending the data */ -/* complex: "host=lpar42,serialnum=987654,arch=power9" note:the comma separated list */ -void ic_tags(char *t) -{ - DEBUG fprintf(stderr,"ic_tags(%s)\n",t); - if( influx_tags == (char *) 0) { - if( (influx_tags = (char *)malloc(MEGABYTE)) == (char *)-1) - error("failed to malloc() tags buffer"); - } - - strncpy(influx_tags,t,256); -} - -void ic_influx_database(char *host, long port, char *db) /* note: converts influxdb hostname to ip address */ -{ - struct hostent *he; - char errorbuf[1024 +1 ]; - - influx_port = port; - strncpy(influx_database,db,256); - - if(host[0] <= '0' && host[0] <='9') { - DEBUG fprintf(stderr,"ic_influx(ipaddr=%s,port=%ld,database=%s))\n",host,port,db); - strncpy(influx_ip,host,16); - } else { - DEBUG fprintf(stderr,"ic_influx_by_hostname(host=%s,port=%ld,database=%s))\n",host,port,db); - strncpy(influx_hostname,host,1024); - if (isalpha(host[0])) { - - he = gethostbyname(host); - if (he == NULL) { - sprintf(errorbuf, "influx host=%s to ip address convertion failed gethostbyname(), bailing out\n", host); - error(errorbuf); - } - /* this could return multiple ip addresses but we assume its the first one */ - if (he->h_addr_list[0] != NULL) { - strcpy(influx_ip, inet_ntoa(*(struct in_addr *) (he->h_addr_list[0]))); - DEBUG fprintf(stderr,"ic_influx_by_hostname hostname=%s converted to ip address %s))\n",host,influx_ip); - } else { - sprintf(errorbuf, "influx host=%s to ip address convertion failed (empty list), bailing out\n", host); - error(errorbuf); - } - } else { - strcpy( influx_ip, host); /* perhaps the hostname is actually an ip address */ - } - } -} - -void ic_influx_userpw(char *user, char *pw) -{ - DEBUG fprintf(stderr,"ic_influx_userpw(username=%s,pssword=%s))\n",user,pw); - strncpy(influx_username,user,64); - strncpy(influx_password,pw,64); -} - -int create_socket() /* returns 1 for error and 0 for ok */ -{ - int i; - static char buffer[4096]; - static struct sockaddr_in serv_addr; - - if(debug) DEBUG fprintf(stderr, "socket: trying to connect to \"%s\":%ld\n", influx_ip, influx_port); - if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { - error("socket() call failed"); - return 0; - } - - serv_addr.sin_family = AF_INET; - serv_addr.sin_addr.s_addr = inet_addr(influx_ip); - serv_addr.sin_port = htons(influx_port); - - /* connect tot he socket offered by the web server */ - if (connect(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) { - DEBUG fprintf(stderr, " connect() call failed errno=%d", errno); - return 0; - } - return 1; -} - -void ic_check(long adding) /* Check the buffer space */ -{ - if(output == (char *)0) { /* First time create the buffer * - if( (output = (char *)malloc(MEGABYTE)) == (char *)-1) - error("failed to malloc() output buffer"); - } - if(output_char + (2*adding) > output_size) /* When near the end of the output buffer, extend it*/ - if( (output = (char *)realloc(output, output_size + MEGABYTE)) == (char *)-1) - error("failed to realloc() output buffer"); - } -} - -void remove_ending_comma_if_any() -{ - if (output[output_char - 1] == ',') { - output[output_char - 1] = 0; /* remove the char */ - output_char--; - } -} - -void ic_measure(char *section) -{ - ic_check( strlen(section) + strlen(influx_tags) + 3); - - output_char += sprintf(&output[output_char], "%s,%s ", section, influx_tags); - strcpy(saved_section, section); - first_sub = 1; - subended = 0; - DEBUG fprintf(stderr, "ic_measure(\"%s\") count=%ld\n", section, output_char); -} - -void ic_measureend() -{ - ic_check( 4 ); - remove_ending_comma_if_any(); - if (!subended) { - output_char += sprintf(&output[output_char], " \n"); - } - subended = 0; - DEBUG fprintf(stderr, "ic_measureend()\n"); -} - -/* Note this added a further tag to the measurement of the "resource_name" */ -/* measurement might be "disks" */ -/* sub might be "sda1", "sdb1", etc */ -void ic_sub(char *resource) -{ - int i; - - ic_check( strlen(saved_section) + strlen(influx_tags) +strlen(saved_sub) + strlen(resource) + 9); - - /* remove previously added section */ - if (first_sub) { - for (i = output_char - 1; i > 0; i--) { - if (output[i] == '\n') { - output[i + 1] = 0; - output_char = i + 1; - break; - } - } - } - first_sub = 0; - - /* remove the trailing s */ - strcpy(saved_sub, saved_section); - if (saved_sub[strlen(saved_sub) - 1] == 's') { - saved_sub[strlen(saved_sub) - 1] = 0; - } - output_char += sprintf(&output[output_char], "%s,%s,%s_name=%s ", saved_section, influx_tags, saved_sub, resource); - subended = 0; - DEBUG fprintf(stderr, "ic_sub(\"%s\") count=%ld\n", resource, output_char); -} - -void ic_subend() -{ - ic_check( 4 ); - remove_ending_comma_if_any(); - output_char += sprintf(&output[output_char], " \n"); - subended = 1; - DEBUG fprintf(stderr, "ic_subend()\n"); -} - -void ic_long(char *name, long long value) -{ - ic_check( strlen(name) + 16 + 4 ); - output_char += sprintf(&output[output_char], "%s=%lldi,", name, value); - DEBUG fprintf(stderr, "ic_long(\"%s\",%lld) count=%ld\n", name, value, output_char); -} - -void ic_double(char *name, double value) -{ - ic_check( strlen(name) + 16 + 4 ); - if (isnan(value) || isinf(value)) { /* not-a-number or infinity */ - DEBUG fprintf(stderr, "ic_double(%s,%.1f) - nan error\n", name, value); - } else { - output_char += sprintf(&output[output_char], "%s=%.3f,", name, value); - DEBUG fprintf(stderr, "ic_double(\"%s\",%.1f) count=%ld\n", name, value, output_char); - } -} - -void ic_string(char *name, char *value) -{ - int i; - int len; - - ic_check( strlen(name) + strlen(value) + 4 ); - len = strlen(value); - for (i = 0; i < len; i++) /* replace problem characters and with a space */ - if (value[i] == '\n' || iscntrl(value[i])) - value[i] = ' '; - output_char += sprintf(&output[output_char], "%s=\"%s\",", name, value); - DEBUG fprintf(stderr, "ic_string(\"%s\",\"%s\") count=%ld\n", name, value, output_char); -} - -void ic_push() -{ - char header[1024]; - char result[1024]; - char buffer[1024 * 8]; - int ret; - int i; - int total; - int sent; - int code; - - if (output_char == 0) /* nothing to send so skip this operation */ - return; - if (influx_port) { - DEBUG fprintf(stderr, "ic_push() size=%ld\n", output_char); - if (create_socket() == 1) { - - sprintf(buffer, "POST /write?db=%s&u=%s&p=%s HTTP/1.1\r\nHost: %s:%ld\r\nContent-Length: %ld\r\n\r\n", - influx_database, influx_username, influx_password, influx_hostname, influx_port, output_char); - DEBUG fprintf(stderr, "buffer size=%ld\nbuffer=<%s>\n", strlen(buffer), buffer); - if ((ret = write(sockfd, buffer, strlen(buffer))) != strlen(buffer)) { - fprintf(stderr, "warning: \"write post to sockfd failed.\" errno=%d\n", errno); - } - total = output_char; - sent = 0; - if (debug == 2) - fprintf(stderr, "output size=%d output=\n<%s>\n", total, output); - while (sent < total) { - ret = write(sockfd, &output[sent], total - sent); - DEBUG fprintf(stderr, "written=%d bytes sent=%d total=%d\n", ret, sent, total); - if (ret < 0) { - fprintf(stderr, "warning: \"write body to sockfd failed.\" errno=%d\n", errno); - break; - } - sent = sent + ret; - } - for (i = 0; i < 1024; i++) /* empty the buffer */ - result[i] = 0; - if ((ret = read(sockfd, result, sizeof(result))) > 0) { - result[ret] = 0; - DEBUG fprintf(stderr, "received bytes=%d data=<%s>\n", ret, result); - sscanf(result, "HTTP/1.1 %d", &code); - for (i = 13; i < 1024; i++) - if (result[i] == '\r') - result[i] = 0; - if (debug == 2) - fprintf(stderr, "http-code=%d text=%s [204=Success]\n", code, &result[13]); - if (code != 204) - fprintf(stderr, "code %d -->%s<--\n", code, result); - } - close(sockfd); - sockfd = 0; - DEBUG fprintf(stderr, "ic_push complete\n"); - } else { - DEBUG fprintf(stderr, "socket create failed\n"); - } - } else error("influx port is not set, bailing out"); - - output[0] = 0; - output_char = 0; -} diff --git a/load_bpf.c b/load_bpf.c index c021d08..fe7f9b4 100644 --- a/load_bpf.c +++ b/load_bpf.c @@ -1,3 +1,4 @@ +#include #include #include #include @@ -23,6 +24,22 @@ static void clean_obj(struct bpf_object *obj){ static void usage(char *app){ printf("Usage: %s \n", app); } +static int check_arguments(int argc, char *argv[]){ + if (argc < 1){ + usage(argv[0]); + return -1; + } + + if (strcmp(argv[1], "--no-influxdb") == 0) + return 0; + + if (argc < CNT_ARGS){ + usage(argv[0]); + return -1; + } + + return argc; // Return the number of arguments +} int main(int argc, char *argv[]){ const char *fileObj = "tp_tcp.o"; struct bpf_object *obj; @@ -42,29 +59,35 @@ int main(int argc, char *argv[]){ char orgID[INFLUXDB_SIZE]; char token[TOKEN_SIZE]; char bucket[BUCKET_SIZE]; + int use_influxdb = 0; + int debug = 1; - if (argc < CNT_ARGS){ - usage(argv[0]); + // We get args + err = check_arguments(argc, argv); + if (err == -1) return -1; + + if (err == CNT_ARGS){ + strncpy(host, argv[1], INFLUXDB_SIZE); + strncpy(orgID, argv[2], INFLUXDB_SIZE); + strncpy(token, argv[3], TOKEN_SIZE); + strncpy(bucket, argv[4], BUCKET_SIZE); + use_influxdb = 1; } - strncpy(host, argv[1], INFLUXDB_SIZE); - strncpy(orgID, argv[2], INFLUXDB_SIZE); - strncpy(token, argv[3], TOKEN_SIZE); - strncpy(bucket, argv[4], BUCKET_SIZE); - // Connect to InfluxDB - ic_influx_database(host, 8086, bucket); - ic_influx_orgID(orgID); - ic_influx_token(token); + if (use_influxdb) { + ic_influx_database(host, 8086, bucket); + ic_influx_orgID(orgID); + ic_influx_token(token); + } obj = bpf_object__open_file(fileObj, NULL); if (!obj){ printf("Failed to open %s\n", fileObj); return -1; } - //LIBBPF_OPTS(bpf_map_create_opts, opts, .map_flags = BPF_F_MMAPABLE); - //map_fd = bpf_create_map(BPF_MAP_TYPE_HASH, sizeof(int), sizeof(struct reset), 4096, BPF_ANY); + //map_fd = bpf_create_map(BPF_MAP_TYPE_ARRAY, sizeof(int), sizeof(struct reset), 4096, BPF_ANY); err = bpf_object__load(obj); @@ -75,7 +98,7 @@ int main(int argc, char *argv[]){ return -1; } - program = bpf_object__find_program_by_name(obj, "tcp_retransmit"); + program = bpf_object__find_program_by_name(obj, "tcp_rst_stats"); if (!program){ printf("Failed to find the program\n"); clean_obj(obj); @@ -124,7 +147,6 @@ int main(int argc, char *argv[]){ __s16 f = AF_INET; err = bpf_map_update_elem(map_fd_filter_family, &keys, &f, BPF_ANY); - printf("Waiting for new packets\n"); while(1){ err = bpf_map_lookup_elem(map_fd_index, &keys, &indexPackets); @@ -143,21 +165,22 @@ int main(int argc, char *argv[]){ memcpy(tmp, s, 35); d = inet_ntoa(*dest); - printf("Sport: %d; dport: %d %d %d %s - %s\n", s_reset.sport, s_reset.dport, s_reset.family, s_reset.proto, tmp, d); + if (debug) + printf("Sport: %d; dport: %d %d %d %s - %s\n", s_reset.sport, s_reset.dport, s_reset.family, s_reset.proto, tmp, d); - // Get the last value from InfluxDB - //lastValue = ic_read(s, d); + if (use_influxdb) { + printf("Send data to influx\n"); + // Send data to InfluxDB + snprintf(buf, BUF_SIZE, "host=%s", s); + ic_tags(buf); - // Send data to InfluxDB - snprintf(buf, BUF_SIZE, "host=%s", s); - ic_tags(buf); - - ic_measure("tcp_reset"); - ic_long(d, 1); - ic_measureend(); - ic_push(); + ic_measure("tcp_reset"); + ic_long("value", 1); + ic_measureend(); + ic_push(); - memset(buf, 0, BUF_SIZE); + memset(buf, 0, BUF_SIZE); + } } memset(&s_reset, 0, sizeof(struct reset)); } diff --git a/tests/main b/tests/main deleted file mode 100755 index c579cf5..0000000 Binary files a/tests/main and /dev/null differ diff --git a/tests/main.c b/tests/main.c deleted file mode 100644 index 5a9b353..0000000 --- a/tests/main.c +++ /dev/null @@ -1,44 +0,0 @@ -#include -#include -#include -#include "ic.h" - -int main(int argc, char *argv[], char *argp[]){ - char buf[300]; - char host[64]; - char orgID[64]; - char token[128]; - int stats = 0; - - if (argc < 3){ - printf("Usage: ./main "); - return 0; - } - printf("%s %s %s\n", argv[1], argv[2], argv[3]); - memcpy(host, argv[1], 64); - memcpy(orgID, argv[2], 64); - memcpy(token, argv[3], 128); - printf("Host: %s\n", host); - printf("orgID: %s\n", orgID); - printf("Token: %s\n", token); - - ic_influx_database(host, 8086, "tcp_metrics"); - ic_influx_orgID(orgID); - ic_influx_token(token); - ic_debug(2); - - while(1){ - snprintf(buf, 300, "host=%s", host); - ic_tags(buf); - - ic_measure("tcp_reset"); - - stats += 1; - - ic_measureend(); - ic_push(); - sleep(30); - } - - return 0; -} diff --git a/tests/main.py b/tests/main.py deleted file mode 100644 index 59add27..0000000 --- a/tests/main.py +++ /dev/null @@ -1,9 +0,0 @@ -#!/usr/bin/env python3 - -from bcc import BPF - -with open("tp_tcp.c", 'r') as f: - data = f.read() - -b = BPF(text=data) -#b = BPF(src_file="tp_tcp_py.c") diff --git a/tests/read.sh b/tests/read.sh index 2eb9b4a..223d680 100755 --- a/tests/read.sh +++ b/tests/read.sh @@ -4,10 +4,9 @@ curl -i -X POST "http://192.168.1.68:8086/api/v2/query?bucket=tcp_metrics&orgID= --header "Authorization: Token $1" \ --header "Accept: application/csv" \ --header "Content-Type: application/vnd.flux" \ - --data 'from(bucket: "tcp_metrics")|> range(start: -1m)|> filter(fn: (r) => r["_measurement"] == "tcp_reset")|> filter(fn: (r) => r["host"] == "127.0.0.1")|> filter(fn: (r) => r["_field"] == "127.0.0.1")|> last()' -# --data 'from(bucket: "tcp_metrics") -# |> range(start: -5m) -# |> filter(fn: (r) => r["_measurement"] == "tcp_reset") -# |> filter(fn: (r) => r["host"] == "127.0.0.1") -# |> filter(fn: (r) => r["_field"] == "127.0.0.1") -# |> yield(name: "mean")' + --data 'from(bucket: "tcp_metrics") + |> range(start: -5m) + |> filter(fn: (r) => r["_measurement"] == "tcp_reset") + |> filter(fn: (r) => r["host"] == "127.0.0.1") + |> filter(fn: (r) => r["_field"] == "value") + |> yield(name: "mean")' diff --git a/tests/write.sh b/tests/write.sh index e168010..54edc5e 100644 --- a/tests/write.sh +++ b/tests/write.sh @@ -1,7 +1,5 @@ #!/usr/bin/bash -#curl -i -X POST 'http://localhost:8086/api/v2/write?bucket=tcp_metrics&orgID=392fcb79fc296d8e&rp&precision=ns' \ curl -i -X POST "http://localhost:8086/api/v2/write?bucket=tcp_metrics&orgID=392fcb79fc296d8e" \ --header "Authorization: Token $1" \ - --data "cpu_load_short,host=server01,region=us-west value=1" -# --data-raw 'cpu_load_short,host=server01,region=us-west value=0.64 1434055562000000000' + --data "cpu_load_short,host=server01, value=1" diff --git a/tp_tcp.c b/tp_tcp.c index 06257f2..1c4284d 100644 --- a/tp_tcp.c +++ b/tp_tcp.c @@ -10,22 +10,7 @@ char LICENSE[] SEC("license") = "Dual BSD/GPL"; -struct ctx_receive_reset { - __u16 common_type; // unsigned short - __u8 common_flags; // unsigned char - __u8 common_count; // unsigned char - __s32 pid; // int - - const void *skaddr; - __u16 sport; - __u16 dport; - __u16 family; - __u8 saddr[4]; - __u8 daddr[4]; - __u8 saddr_v6[16]; - __u8 daddr_v6[16]; - __u64 sock_cookie; -}; +// Format: /sys/kernel/debug/tracing/events/tcp/tcp_send_reset/format struct ctx_send_reset { __u16 common_type; // unsigned short __u8 common_flags; // unsigned char @@ -74,31 +59,13 @@ struct { // sudo tcpdump -i any 'tcp[13] & 4 != 0' -n -> filter TCP reset flags /* - * This project do not trace any sniffing ports, because, the tracepoint tcp:tcp_send_reset - * works only for an establish socket, but, if you have a lot of TCP RST, you can have - * an issue with your system - */ - -/* - * Identify all tracepoint available - * - cat /sys/kernel/tracing/available_events - * Enable an event: - * - echo 'tcp_receive_reset' >> /sys/kernel/tracing/set_event -> important to add the '>>' - * Docs: https://docs.kernel.org/trace/events.html - * https://events.linuxfoundation.org/wp-content/uploads/2022/10/elena-zannoni-tracing-tutorial-LF-2021.pdf - * https://docs.kernel.org/trace/tracepoints.html * Why we need to detect RST: * When we scan the port, the scanner send an SYN flag and if the port is block, we receive a RST flag: - * listening on any, link-type LINUX_SLL2 (Linux cooked v2), snapshot length 262144 bytes -10:48:28.531295 lo In IP localhost.43961 > localhost.tproxy: Flags [S], seq 2197047013, win 1024, options [mss 1460], length 0 -10:48:28.531306 lo In IP localhost.tproxy > localhost.43961: Flags [R.], seq 0, ack 2197047014, win 0, length 0 * But we can also block all receive RST: iptables -I INPUT -p tcp --dport -j REJECT --reject-with tcp-reset */ -//SEC("tp/tcp_retransmit_synack") -//SEC("tracepoint/tcp/tcp_receive_reset") SEC("tracepoint/tcp/tcp_send_reset") -int tcp_retransmit(struct ctx_send_reset *ctx){ +int tcp_rst_stats(struct ctx_send_reset *ctx){ struct reset s_reset = {}; int *index; int keys = 0;