首页 > 技术文章 > tcp_client

jacob1934 2015-09-10 10:11 原文

#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include "fifo.h"


struct fifo_t
{
    uint8_t        *buf;
    uint32_t    size;
    uint32_t    in;
    uint32_t    out;
};
struct fifo_t * fifo_create(uint32_t size)
{
    struct fifo_t * fifo = malloc(sizeof(struct fifo_t));
    if(fifo == NULL)
    {
        fprintf(stderr, "[%s][%d]malloc fifo failed:%s", __FUNCTION__, __LINE__, strerror(errno));
        return NULL;
    }
    memset(fifo, 0, sizeof(struct fifo_t));
    fifo->buf = malloc(size + 1);
    if(fifo->buf == NULL)
    {
        fprintf(stderr, "[%s][%d]malloc fifo->buf fialed:%s",__FUNCTION__, __LINE__, strerror(errno));
        free(fifo);
        return NULL;
    }
    memset(fifo->buf, 0, size);
    fifo->size = size;
    fifo->in = 0;
    fifo->out = 0;
    return fifo;
}
/* init/empty    @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@-
 *         in
 *         out
 *
 * in==out    @@@@@@@-@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
 * empty            in
 *                 out
 *
 * in > out    @@@@@@@-#################@@@@@@@@@@@@@@
 *                                  in
 *                 out
 *
 * in < out    ######@@@@@@@@@@@@@@@@@@@@@@@-#########
 *               in
 *                                       out
 *
 * in+1=out    #######-###############################
 * full               in
 *             out
 *
 * in=size-1    ######################################-
 * full                                              in
 *         out
 */
void * fifo_write_prepare(struct fifo_t *fifo, uint32_t *size)
{
    if(fifo == NULL)
    {
        fprintf(stderr, "[%s][%d]fifo==NULL\n", __FUNCTION__, __LINE__);
        return NULL;
    }
    uint8_t *addr = fifo->buf + fifo->in;
    if(fifo->in >= fifo->out)
    {
        if(fifo->out == 0)
            *size = fifo->size - fifo->in - 1;
        else
            *size = fifo->size - fifo->in;
    }
    else
        *size = fifo->out - fifo->in - 1;
    return addr;
}
uint32_t fifo_write_done(struct fifo_t *fifo, uint32_t size)
{
    if(fifo == NULL)
    {
        fprintf(stderr, "[%s][%d]fifo==NULL\n", __FUNCTION__, __LINE__);
        return 0;
    }
    if(fifo->in >= fifo->out)
    {
        if(fifo->out == 0)
        {
            if(fifo->in + size > fifo->size - 1)
            {
                fifo->in = fifo->size - 1;
                fprintf(stderr, "[%s][%d]overwrited!!! in=%u,size=%u\n",__FUNCTION__,__LINE__,fifo->in,fifo->size);
                return fifo->size - 1 - fifo->in;
            }
        }
        else
        {
            if(fifo->in + size > fifo->size)
            {
                fifo->in = 0;
                fprintf(stderr, "[%s][%d]overwrited!!! in=%u,size=%u\n",__FUNCTION__,__LINE__,fifo->in,fifo->size);
                return fifo->size - fifo->in;
            }
        }
    }
    else
    {
        if(fifo->in + size >= fifo->out)
        {
            fifo->in = fifo->out - 1;
            fprintf(stderr, "[%s][%d]overwrited!!! in=%u,size=%u,fifo->out=%u\n",__FUNCTION__,__LINE__,fifo->in,size,fifo->out);
            return fifo->out - fifo->in - 1;
        }
    }
    fifo->in += size;
    return size;
}
void * fifo_read_prepare(struct fifo_t *fifo, uint32_t *size)
{
    if(fifo == NULL)
    {
        fprintf(stderr, "[%s][%d]fifo==NULL\n", __FUNCTION__, __LINE__);
        return 0;
    }
    uint8_t *addr = fifo->buf + fifo->out;
    if(fifo->in >= fifo->out)
        *size = fifo->in - fifo->out;
    else
        *size = fifo->size - fifo->out;
    return addr;
}
uint32_t fifo_read_done(struct fifo_t *fifo, uint32_t size)
{
    if(fifo == NULL)
    {
        fprintf(stderr, "[%s][%d]fifo==NULL\n", __FUNCTION__, __LINE__);
        return 0;
    }
    if(fifo->in >= fifo->out)
    {
        if(fifo->out + size > fifo->in)
        {
            fifo->out = fifo->in;
            fprintf(stderr, "[%s][%d]overreaded!!! out=%u,fifo->in=%u,size=%u\n",__FUNCTION__,__LINE__,fifo->out,fifo->in,size);
            return fifo->in - fifo->out;
        }
    }
    else
    {
        if(fifo->out + size >= fifo->size)
        {
            fifo->out = 0;
            if(fifo->out + size > fifo->size)
                fprintf(stderr, "[%s][%d]overreaded!!! out=%u,fifo->size=%u,size=%u\n",__FUNCTION__,__LINE__,fifo->out,fifo->size,size);
            return fifo->size - fifo->out;
        }
    }
    fifo->out += size;
    if(fifo->out == fifo->size)
        fifo->out = 0;
    return size;
}
uint32_t fifo_read_data(struct fifo_t *fifo, uint32_t offset, uint32_t len, void * buf)
{
    if(fifo == NULL)
    {
        fprintf(stderr, "[%s][%d]fifo==NULL\n", __FUNCTION__, __LINE__);
        return 0;
    }
    if(buf == NULL)
    {
        fprintf(stderr, "[%s][%d]buf == NULL\n",__FUNCTION__,__LINE__);
        return 0;
    }
    if(fifo->in >= fifo->out)
    {
        if(fifo->out + offset + len > fifo->in || fifo->out + offset > fifo->in)
        {
//            fprintf(stderr, "[%s][%d]overreaded!!! fifo->out=%u,fifo->in=%u,offset=%u,size=%u\n", __FUNCTION__, __LINE__, fifo->out, fifo->in, offset, size);
            return 0;
        }
        memcpy(buf, &fifo->buf[fifo->out + offset], len);
    }
    else
    {
        if(offset + len > fifo->size - fifo->out + fifo->in || offset > fifo->size - fifo->out + fifo->in)
        {
//            fprintf(stderr, "[%s][%d]overreaded!!! fifo->out=%u,fifo->in=%u,offset=%u,size=%u\n", __FUNCTION__, __LINE__, fifo->out, fifo->in, offset, size);
            return 0;
        }
        if(offset > fifo->size - fifo->out)
        {
            memcpy(    buf,
                &fifo->buf[fifo->out + offset - fifo->size],
                len);
        }
        else
        {
            if(offset + len > fifo->size - fifo->out)
            {
                memcpy(    buf,
                    &fifo->buf[fifo->out + offset],
                    fifo->size - fifo->out - offset);

                uint8_t *buf_l = buf;
                memcpy(    &buf_l[fifo->size - fifo->out],
                    fifo->buf,
                    fifo->out + offset + len - fifo->size);
            }
            else
            {
                memcpy(    buf,
                    &fifo->buf[fifo->out + offset,
                    len);
            }
        }
    }
    return len;
}

void fifo_destroy(struct fifo_t *fifo)
{
    if(fifo == NULL)
    {
        fprintf(stderr, "[%s][%d]fifo==NULL\n", __FUNCTION__, __LINE__);
        return;
    }
    free(fifo->buf);
    free(fifo);
}
void fifo_reset(struct fifo_t *fifo)
{
    fifo->in = 0;
    fifo->out = 0;
}

 

 

#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <poll.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <stdbool.h>
#include <linux/in.h>
#include <endian.h>
#include "fifo.h"
struct transmit_t
{
    uint32_t    server_ip;
    uint16_t    server_port;
    int        fd;
    pthread_t    th_transmit;
    struct fifo_t    *fifo_recv;
    struct fifo_t    *fifo_send;
};
static struct transmit_t transmit =
{
    .fd = -1
};
int transmit_init()
{
    transmit.fd         = -1;
    transmit.fifo_recv     = fifo_create(512*1024);
    transmit.fifo_send     = fifo_create(512*1024);
    return pthread_create(transmit.th_transmit, NULL, thread_transmit, NULL);
}
static bool __do_connect()
{
    transmit.fd = socket(AF_INET, SOCK_STREAM, SOCK_NONBLOCK);
    struct sockaddr_in server_addr =
    {
        .sin_family        = AF_INET,
        .sin_port        = htobe16(transmit.server_port),
        .sin_addr.s_addr    = inet_addr("127.0.0.1");//htobe32(transmit.server_ip)
    };
    socklen_t len = (socklen_t)sizeof(server_addr);
    int ret = connect(transmit.fd, &server_addr, len);
    if(ret < 0)
    {
        fprintf(stderr,"[%s][%d]connect failed:%s\n",__FUNCTION__,__LINE__, strerror(errno));
        close(transmit.fd);
        transmit.fd = -1;
        return false;
    }
    return true;
}
static bool __do_transmit()
{
    struct pollfd pollfd_recv =
    {
        .fd    = transmit.fd,
        .events    = POLL_IN
    };

    // 检查是否有发送的数据
    uint32_t to_send_size = 0;
    fifo_read_prepare(transmit.fifo_send, &to_send_size);
    if(to_send_size > 0)
        pollfd_recv.events |= POLL_OUT;

    // 等待socket就绪,poll/ppoll是线程取消点
    //sigset_t sigset = SIGPIPE | SIGINT | SIGALRM;
    //int num = ppoll(&pollfd_recv, 1, NULL, &sigset);
    int num = poll(&pollfd_recv, 1, -1);
    if(num < 0)
    {
        if(errno == EINTR)
            return;
    }
    if(num > 0)
    {
        // 接收
        if(pollfd_recv.revents & POLL_IN != 0)
        {
            void * buf = fifo_write_prepare(transmit.fifo_recv, &size);
            int size = recv(transmit.fd, buf, size, 0);
            if(size < 0)        //error
            {
                fprintf(sdterr, "[%s][%d]recv failed:%s\n", __FUNCTION__,__LINE__,strerror(errno));
                return false;
            }
            else if(size == 0)    // connection lost
            {
                fprintf(sdterr, "[%s][%d]connection lost\n", __FUNCTION__,__LINE__);
                return false;
            }
            fifo_write_done(transmit.fifo_recv, size);
            while(decode_do() == true) ;
        }

        // 发送
        if(pollfd_recv.revents & POLL_OUT != 0)
        {
            void * buf = fifo_read_prepare(transmit.fifo_recv, &size);
            int size = send(transmit.fd, buf, size);
            if(size < 0)
            {
                fprintf(sdterr, "[%s][%d]recv failed:%s\n", __FUNCTION__,__LINE__,strerror(errno));
                return false;
            }
            fifo_read_done(transmit.fifo_send, size);
        }
    }
    return true;
}

void * thread_transmit()
{

    while(1)
    {

        bool ret = __do_connect();
        if(ret == false)
        {
            sleep(3);
            continue;
        }
        while(1)
        {
            ret = __do_transmit();
            if(ret == false)
                break;
        }
        close(transmit.fd);
    }
    return NULL;
}
void transmit_send_ready()
{
    pthread_kill(&transmit.th_transmit, SIGALRM);
}
void transmit_destroy()
{

}
struct fifo_t *transmit_get_recv_fifo()
{
    return transmit.fifo_recv;
}
struct fifo_t *transmit_get_send_fifo()
{
    return transmit.fifo_send;
}

 

 

#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include "depackage.h"
#include "protocal.h"
enum DECODE_E
{
    DECODE__SYNC_HEAD,
    DECODE__LEN,
    DECODE__SYNC_END,
    DECODE__CHECKSUM,
    DECODE__EXCUTE,
    DECODE__COUNT
};

struct decode_t
{
    enum DECODE_E    state;
    uint16_t     cmd;
    uint32_t    msgid_in;
    uint32_t    msgid_response;
    uint32_t     data_len;
    pthread_t    th_decode;
};
static struct decode_t decode =
{
    .state = DECODE__SYNC_HEAD,
};
void decode_reset()
{
    decode.state        = DECODE__SYNC_HEAD;
    decode.cmd        = 0;
    decode.msgid_in        = 0;
    decode.msgid_response    = 0;
    decode.data_len        = 0;
}
/*
 * return:    true:    data decoded and executed,re-call again
 *         false:    not enough data to decode
 * */
bool decode_do()
{
    struct fifo_t *fifo_recv = transmit_get_recv_fifo();
    static uint8_t data[256*1024] = {0};
    switch(decode.state)
    {
        case DECODE__SYNC_HEAD:
        {
            uint32_t sync_head = 0;
            uint32_t len = fifo_read_data(fifo_recv, 0, SYNC_HEAD__LEN, &sync_end);
            if(len < sizeof(sync_head))
                return false;
            if(sync_head != SYNC_HEAD)
                fifo_read_done(fifo_recv, 1);
            else
                decode.state = DECODE__LEN;
            break;
        }
        case DECODE__LEN:
        {
            uint32_t len = fifo_read_data(fifo_recv, DATA_LEN__OFFSET, DATA_LEN__LEN,    &decode.data_len);
            if(len < DATA_LEN__LEN)
                return false;

            fifo_read_data(fifo_recv, CMD__OFFSET,            CMD__LEN,        &decode.cmd);
            fifo_read_data(fifo_recv, MSGID_SEND__OFFSET,        MSGID_SEND__LEN,    &decode.msgid_in);
            fifo_read_data(fifo_recv, MSGID_RESPONSE__OFFSET,    MSGID_RESPONSE__LEN,    &decode.msgid_response);
            if(decode.data_len > DATA_LEN_MAX)
            {
                fprintf(sdterr, "[%s][%d]data_len too big:%u,max=%u,msgid_send=%u\n", __FUNCTION__, __LINE__, decode.data_len, DATA_LEN_MAX, decode.msgid_in);
                fifo_read_done(fifo_recv, 1);
                decode.state    = DECODE__SYNC_HEAD;
                return true;
            }

            decode.state        = DECODE__SYNC_END;
            break;
        }
        case DECODE__SYNC_END:
        {
            uint32_t sync_end = 0;
            uint32_t len = fifo_read_data(    fifo_recv,
                            DATA__OFFSET + decode.data_len + CHECKSUM__LEN,
                            SYNC_END__LEN,
                            &sync_end);
            if(len < SYNC_END__LEN)
                return false;
            if(sync_end != SYNC_END)
            {
                fprintf(sdterr, "[%s][%d]SYNC_END error:%x\n", __FUNCTION__, __LINE__, sync_end);
                fifo_read_done(fifo_recv, 1);
                decode.state    = DECODE__SYNC_HEAD;
                return true;
            }
            decode.state        = DECODE__CHECKSUM;
            break;
        }
        case DECODE__CHECKSUM:
            fifo_read_data(    fifo_recv,    DATA__OFFSET,    decode.data_len,    data);
            //
            decode.state        = DECODE__EXCUTE;
            break;
        case DECODE__EXCUTE:
            protocal_excute(decode.cmd, decode.msgid_in, data, decode.data_len);
            fifo_read_done(fifo_recv, DATA__OFFSET + decode.data_len + CHECKSUM__LEN + SYNC_END__LEN);
            break;
        default:
            decode.state    = DECODE__SYNC_HEAD;
            return true;
    }
    return true;
}
void *thread_decode()
{
    while(1)
    {
        pthread_mutex_lock(&decode.mutex);
        while(decode_do() == false)
            pthread_cond_wait(&decode.cond, &decode.mutex);
        pthread_mutex_unlock(&decode.mutex);
        usleep(1);
    }
}
int decode_init()
{
    memset(&decode, 0, sizeof(decode));
    decode.state    = DECODE__SYNC_HEAD;
    return pthread_create(&decode.th_decode, NULL, thread_decode, NULL);
}
void decode_destroy()
{
    pthread_cancel(decode.th_decode);
    pthread_join(decode.th_decode, NULL);
}

 

推荐阅读