自学内容网 自学内容网

DPDK用户态协议栈-TCP Posix API 2

tcp posix api

send发送

ssize_t nsend(int sockfd, const void *buf, size_t len, __attribute__((unused))int flags) {

    ssize_t length = 0;

    void* hostinfo = get_host_fromfd(sockfd);

    if (hostinfo == NULL) {

        return -1;
    }

    struct ln_tcp_stream* stream = (struct ln_tcp_stream*)hostinfo;
    if (stream->proto == IPPROTO_TCP) {

        struct ln_tcp_fragment* fragment = rte_malloc("send frag", sizeof(struct ln_tcp_fragment), 0);

        if (fragment == NULL) {

            return 0;
        }
        memset(fragment, 0, sizeof(struct ln_tcp_fragment));

        fragment->sport = stream->dport;
        fragment->dport = stream->sport;

        fragment->acknum = stream->recv_next;
        fragment->seqnum = stream->send_next;

        fragment->windows = LN_TCP_INITIAL_WINDOWS;
        fragment->tcp_flags = RTE_TCP_ACK_FLAG | RTE_TCP_PSH_FLAG;
        fragment->hdr_off = 0x50;

        fragment->data = rte_malloc("frag data", len + 1, 0);

        if (fragment->data == NULL) {

            rte_free(fragment);
            return -1;
        }
        memset(fragment->data, 0, len + 1);

        rte_memcpy(fragment->data, buf, len);
        fragment->length = len;
        length = fragment->length;

        rte_ring_mp_enqueue(stream->send_next, (void*)fragment);
    }

    return length;
}

recv接收

ssize_t nrecv(int sockfd, void *buf, size_t len, __attribute__((unused))int flags) {

    ssize_t length = 0;

    void* hostinfo = get_host_fromfd(sockfd);

    if (hostinfo == NULL) {

        return -1;
    }

    struct ln_tcp_stream* stream = (struct ln_tcp_stream*)hostinfo;

    if (stream->proto == IPPROTO_TCP) {

        struct ln_tcp_fragment* fragment = NULL;
        int nb_rev = 0;

        pthread_mutex_lock(&stream->mutex);
        while((nb_rev = rte_ring_mc_dequeue(stream->recvbuf, (void**)&fragment)) < 0) {

            pthread_cond_wait(&stream->cond, &stream->mutex);
        }
        pthread_mutex_unlock(&stream->mutex);

        if (fragment->length > len) {

            rte_memcpy(buf, fragment->data, len);

            int i;
            for (i = 0; i < fragment->length - len; i++) {

                fragment->data[i] = fragment->data[i + len];
            }

            fragment->length = fragment->length - len;
            length = fragment->length;

            rte_ring_mp_enqueue(stream->recvbuf, (void*)fragment);
        }
        else if (fragment->length == 0) {

            rte_free(fragment);
        }
        else {

            rte_memcpy(buf, fragment->data, len);

            length = fragment->length;

            rte_free(fragment->data);
            fragment->data = NULL;

            rte_free(fragment);
        }
    }

    return length;
}

数据包收发管理管理

接收

static int ln_tcp_enqueue_recvbuf(struct ln_tcp_stream* stream, struct rte_tcp_hdr* tcphdr, int tcplen) {

    struct ln_tcp_fragment* fragment = rte_malloc("tcp frag", sizeof(struct ln_tcp_fragment));

    if (fragment == NULL) {

        return -1;
    }
    memset(fragment, 0, sizeof(struct ln_tcp_fragment));

    fragment->dport = ntohs(tcphdr->dst_port);
    fragment->sport = ntohs(tcphdr->src_port);

    uint8_t hdrlen = tcphdr->data_off >> 4;
    int payloadlen = tcplen - hdrlen * 4;

    if (payloadlen > 0) {

        uint8_t* payload = (uint8_t*)tcphdr + hdrlen * 4;

        fragment->data = rte_malloc("frag data", payloadlen + 1, 0);

        if (fragment->data == NULL) {

            rte_free(fragment);
            return -1;
        }
        memset(fragment, 0, payloadlen + 1);

        rte_memcpy(fragment->data, payload, payloadlen);
        fragment->length = payloadlen;
    }
    else if (payloadlen == 0) {

        fragment->length = 0;
        fragment->data = NULL;
    }

    rte_ring_mp_enqueue(stream->recvbuf, (void*)fragment);

    pthread_mutex_lock(&stream->mutex);
    pthread_cond_signal(&stream->cond);
    pthread_mutex_unlock(&stream->mutex);

    return 0;
}

发送

static int ln_tcp_send_ackpkt(struct ln_tcp_stream* stream, struct rte_tcp_hdr* tcphdr) {

    struct ln_tcp_fragment* fragment = rte_malloc("ack frag", sizeof(struct ln_tcp_fragment), 0);

    if (fragment == NULL) {

        return -1;
    }
    memset(fragment, 0, sizeof(struct ln_tcp_fragment));

    fragment->sport = stream->dport;
    fragment->dport = stream->sport;

    fragment->acknum = stream->recv_next;
    fragment->seqnum = stream->send_next;

    fragment->windows = LN_TCP_INITIAL_WINDOWS;
    fragment->length = 0;
    fragment->data = NULL;
    fragment->hdr_off = 0x50;
    fragment->tcp_flags = RTE_TCP_ACK_FLAG;

    rte_ring_mp_enqueue(stream->sendbuf, (void*)fragment);

    return 0;
}

establish handle

static int ln_tcp_handle_close_established(struct ln_tcp_stream* stream, struct rte_tcp_hdr* tcphdr, int tcplen) {

    if (tcphdr->tcp_flags & RTE_TCP_SYN_FLAG) {


    }

    if (tcphdr->tcp_flags & RTE_TCP_PSH_FLAG) {

        ln_tcp_enqueue_recvbuf(stream, tcphdr, tcplen);

        uint8_t hdrlen = tcphdr->data_off >> 4;
        int payloadlen = tcphdr - hdrlen * 4;

        stream->recv_next = stream->recv_next + payloadlen;
        stream->send_next = ntohl(tcphdr->recv_ack);
        ln_tcp_send_ackpkt(stream, tcphdr);
    }

    if (tcphdr->tcp_flags & RTE_TCP_ACK_FLAG) {


    }

    if (tcphdr->tcp_flags & RTE_TCP_FIN_FLAG) {

        stream->status = LN_TCP_STATUS_CLOSE_WAIT;

        ln_tcp_enqueue_recvbuf(stream, tcphdr, tcphdr->data_off >> 4);

        stream->recv_next = stream->recv_next + 1;
        stream->send_next = ntohl(tcphdr->recv_ack);

        ln_tcp_send_ackpkt(stream, tcphdr);
    }

    return 0;
}

tcp server

static int tcp_server_entry(__attribute__((unused)) void* arg) {

    int sockfd = nsocket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0) {

        return -1;
    }

    printf("tcp sockfd is %d\n", sockfd);

    struct sockaddr_in servaddr;
    memset(&servaddr, 0, sizeof(struct sockaddr));
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(8888);

    nbind(sockfd, (struct sockaddr*)&servaddr, sizeof(servaddr));

    nlisten(sockfd, 10);

    while (1) {

        struct sockaddr_in client;
        socklen_t len = sizeof(client);

        int connfd = naccept(sockfd, (struct sockaddr*)&client, &len);

        char buff[BUFFER_SIZE] = {0};

        while (1) {

            int n = nrecv(connfd, buff, BUFFER_SIZE, 0); // block
            if (n > 0) {

                printf("recv: %s\n", buff);
                nsend(connfd, buff, n, 0);
            }
            else if (n == 0) {

                nclose(connfd);
                break;
            }
            else {

                //nonblock
            }
        }
    }

    nclose(sockfd);
}

效果展示

回去之后调好了,有点难调

https://imagehyj.oss-cn-hangzhou.aliyuncs.com/blog/20240814234542.png

总结

到目前为止,IP/TCP和IP/UDP的协议栈都写完了,但是没有并发效果;这个后面会解决。下一步是探索一下协议的扩展,写一个dns服务器来看一下如何基于tcp或者udp来扩展协议。

项目地址

项目地址

参考资料:https://github.com/0voice


原文地址:https://blog.csdn.net/H520xcodenodev/article/details/144436475

免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!