Commit e1a27285 authored by 黄振令's avatar 黄振令

【修改内容】1. tcp连接粘包问题:1)使用json格式找'{'找到完成的json;2). 收到数据有'\0',用'\0'来区分粘包;2....

【修改内容】1. tcp连接粘包问题:1)使用json格式找'{'找到完成的json;2). 收到数据有'\0',用'\0'来区分粘包;2. 同意发送格式,发送数据接口数据发送修改为不包括'\0';3. tcp连接增加锁
parent a279a29f
......@@ -382,7 +382,7 @@ int KK_Send_CloudState(int state)
cJSON_AddItemToObject(root, MSG_INFO_STR, infoObj);
cJSON_AddItemToObject(root, MSG_PAYLOAD_STR,payloadObj);
out=cJSON_Print(root);
kk_ipc_send(IPC_APP2MID, out, strlen(out)+1);
kk_ipc_send(IPC_APP2MID, out, strlen(out));
cJSON_Delete(root);
free(payloadStr);
free(infoStr);
......@@ -401,6 +401,6 @@ void KK_Sendto_DevData(const char *topic,const char *data)
if(send_data == NULL){
return;
}
kk_ipc_send(IPC_APP2MID, send_data, strlen(send_data)+1);
kk_ipc_send(IPC_APP2MID, send_data, strlen(send_data));
free(send_data);
}
......@@ -100,7 +100,7 @@ static void watcher_cb (struct ev_loop *loop ,struct ev_io *w, int revents)
if (strncmp(validDat,MAGIC_ACK, strlen(MAGIC_ACK)) == 0){
}else{
kk_ipc_send_ex(loop_ctrl->type, MAGIC_ACK, strlen(MAGIC_ACK)+1, chlMark);
kk_ipc_send_ex(loop_ctrl->type, MAGIC_ACK, strlen(MAGIC_ACK), chlMark);
}
loop_ctrl->isconnect =1;
......@@ -453,7 +453,7 @@ int kk_ipc_isconnect(ipc_type type){
{
for(int i =0; i<20;i++){
kk_ipc_send(type, MAGIC, strlen(MAGIC)+1);
kk_ipc_send(type, MAGIC, strlen(MAGIC));
usleep(500000);
if (loop_ctrl->isconnect == 1){
break;
......
......@@ -9,7 +9,7 @@
#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
#include <string.h>
#include "com_api.h"
#define CCU_TCP_PORT 16565
......@@ -21,6 +21,7 @@
typedef struct{
void *mutex;
char deviceCode[DEVICE_CODE_LEN];
char ip[MAX_IP_LEN];
int sock;
......@@ -34,9 +35,170 @@ static int g_init = 0;
static ipc_cb* g_cb = NULL;
static struct ev_io w_accept;
static void *_MutexCreate(void)
{
int err_num;
pthread_mutex_t *mutex = (pthread_mutex_t *)malloc(sizeof(pthread_mutex_t));
if (NULL == mutex) {
return NULL;
}
if (0 != (err_num = pthread_mutex_init(mutex, NULL))) {
printf("create mutex failed");
free(mutex);
return NULL;
}
return mutex;
}
static void _MutexDestroy(void *mutex)
{
int err_num;
if (!mutex) {
printf("mutex want to destroy is NULL!");
return;
}
if (0 != (err_num = pthread_mutex_destroy((pthread_mutex_t *)mutex))) {
printf("destroy mutex failed");
}
free(mutex);
}
static void _MutexLock(void *mutex)
{
int err_num;
if (0 != (err_num = pthread_mutex_lock((pthread_mutex_t *)mutex))) {
printf("lock mutex failed: - '%s' (%d)", strerror(err_num), err_num);
}
}
static void _MutexUnlock(void *mutex)
{
int err_num;
if (0 != (err_num = pthread_mutex_unlock((pthread_mutex_t *)mutex))) {
printf("unlock mutex failed - '%s' (%d)", strerror(err_num), err_num);
}
}
static char *_next_json(char *str, int* hasFloor)
{
if(str == NULL) {
return NULL;
}
char *ptr = str;
int floor = 0;
// judge if inside the "..."
int quotes = 0;
while(*ptr) {
if(*ptr++ == '{') {
++floor;
break;
}
}
*hasFloor = floor;
if(!floor) {
//print_json();
return NULL;
}
while(floor && *ptr) {
switch(*ptr++) {
case '{':{
if(quotes)
break;
++floor;
//printf("floor is %d \n", floor);
break;
}
case '}':{
if(quotes)
break;
--floor;
//printf("floor is %d \n", floor);
break;
}
case '"':{
quotes = !quotes;
}
/* notice: judgment of '\\' added new without test*/
case '\\':{
break;
++ptr;
break;
}
default:{
break;
}
}
}
if(floor == 0) {
return ptr;
}
return NULL;
}
static char* __do_data(char* buf, int buflen,ipc_cb* cb, char* deviceCode){
char* endIdx = NULL;
char* startIdx = buf;
int hasfloor = 0;
char buf2[1024] = {0};
while(startIdx < buf + buflen){
hasfloor = 0;
endIdx = _next_json(startIdx,&hasfloor);
if(endIdx == NULL && hasfloor == 0){//一般数据
if (cb != NULL){
cb(startIdx, strlen(startIdx),deviceCode);
}
return NULL;
//break;
}else if (endIdx == NULL){ //有json数据没接收完
return startIdx;
}
int cplen = endIdx - startIdx;
memset(buf2,0, sizeof(buf2));
memcpy(buf2,startIdx, cplen);
if (cb != NULL){
cb(buf2,cplen,deviceCode);
}
startIdx = endIdx;
}
return NULL;
}
static char* __do_spilt(char* buf, int buflen,ipc_cb* cb, char* deviceCode){
char* pInx = buf;
int slen = -1;
while (buflen > 0){
pInx += slen + 1;
slen = strlen(pInx);
printf("read_cb slen ====================================slen[%d] \n",slen);
if (cb != NULL){
cb(pInx,slen,deviceCode);
}
buflen -= slen + 1;
}
if (buflen != 0 ){
return pInx;
}
return NULL;
}
// Save/Load the gateway list
void kk_gw_list_save(void)
static void kk_gw_list_save(void)
{
FILE *fp;
uint8_t i;
......@@ -51,7 +213,7 @@ void kk_gw_list_save(void)
}
void kk_gw_list_load(void)
static void kk_gw_list_load(void)
{
uint16_t i;
FILE *fp;
......@@ -98,18 +260,18 @@ static int get_idx_by_ip(char ip[MAX_IP_LEN]){
}
static int get_sock_by_deviceCode(char deviceCode[DEVICE_CODE_LEN]){
static kk_tcp_ctrl_t* get_channel_by_deviceCode(char deviceCode[DEVICE_CODE_LEN]){
int i = 0;
if (deviceCode == NULL || strlen(deviceCode) == 0){
return -1;
return NULL;
}
for(;i < MAX_LISTEN_NUM; i++){
if(strcmp(deviceCode, g_tcp_ctrl[i].deviceCode) == 0){
return g_tcp_ctrl[i].sock;
return &g_tcp_ctrl[i];
}
}
return -1;
return NULL;
}
......@@ -156,7 +318,9 @@ static int reset_by_sock(int sock){
for(;i < MAX_LISTEN_NUM; i++){
if(sock == g_tcp_ctrl[i].sock){
g_tcp_ctrl[i].isConnect = 0;
g_tcp_ctrl[i].isConnect = -1;
g_tcp_ctrl[i].sock = -1;
_MutexDestroy(g_tcp_ctrl[i].mutex);
g_tcp_ctrl[i].mutex = NULL;
return 0;
}
}
......@@ -175,6 +339,15 @@ static int set_sock_by_ip(char ip[MAX_IP_LEN], int sock){
g_tcp_ctrl[i].sock = sock;
printf("[%s] idx=%d ip=%s sock=%d\n", __FUNCTION__,i, ip, g_tcp_ctrl[i].sock);
//create mutex
if (g_tcp_ctrl[i].mutex == NULL){
g_tcp_ctrl[i].mutex = _MutexCreate();
if (g_tcp_ctrl[i].mutex == NULL) {
printf("[%s] _MutexCreate failed \n", __FUNCTION__);
return -1;
}
}
return 0;
}
}
......@@ -193,7 +366,7 @@ int kk_is_tcp_channel(char devCode[DEVICE_CODE_LEN]){
for(;i < MAX_LISTEN_NUM; i++){
if(strcmp(devCode, g_tcp_ctrl[i].deviceCode) == 0){
printf("[%s] idx=%d ip=%s sock=%d\n", __FUNCTION__,i, g_tcp_ctrl[i].ip, g_tcp_ctrl[i].sock);
//printf("[%s] idx=%d ip=%s sock=%d\n", __FUNCTION__,i, g_tcp_ctrl[i].ip, g_tcp_ctrl[i].sock);
return g_tcp_ctrl[i].sock;
}
}
......@@ -312,15 +485,19 @@ err1:
/*读回调*/
static void read_cb(struct ev_loop *loop, struct ev_io *watcher, int revents)
{
printf("================== read_cb \n");
printf("================== read_cb [%d]\n", revents);
char buffer[BUFFER_SIZE] = {0};
if (EV_ERROR & revents) {
printf("read got invalid event...\r\n");
return;
}
int res = 0;
int offset = 0;
kk_tcp_ctrl_t* tcp_ctrl = (kk_tcp_ctrl_t*)watcher->data;
int32_t bytes = recv(watcher->fd, buffer, sizeof(buffer),0);
again:
_MutexLock(tcp_ctrl->mutex);
int32_t bytes = recv(watcher->fd, buffer + offset, sizeof(buffer) - offset - 1,0);
_MutexUnlock(tcp_ctrl->mutex);
if (-1 == bytes) {
//tcp Error
if (EINTR != errno && EAGAIN != errno) {
......@@ -337,11 +514,56 @@ static void read_cb(struct ev_loop *loop, struct ev_io *watcher, int revents)
ev_io_stop(loop,watcher);
free(watcher);
} else {
//intf("READ:\r\n %s\r\n", buffer);
//printf("READ:\r\n %s =====[%d]\r\n", buffer, bytes);
//printf("read_cb deviceCode ip sock [%s][%s][%d] \n",tcp_ctrl->deviceCode,tcp_ctrl->ip, tcp_ctrl->sock);
if (g_cb != NULL){
/*if (g_cb != NULL){
g_cb(buffer,bytes,tcp_ctrl->deviceCode);
}*/
char* retpst = NULL;
if (strlen(buffer) == bytes){
retpst = __do_data(buffer, bytes + offset, g_cb, tcp_ctrl->deviceCode);
if (retpst != NULL){
offset = bytes + offset - (retpst - buffer);
memmove(buffer, retpst, offset);
memset(buffer + offset, 0, sizeof(buffer) - offset);
printf("====read not complete, need again offset=%d \n",offset);
goto again;
}
}
else{
//处理‘\0’ 结束符粘包
retpst = __do_spilt(buffer, bytes + offset, g_cb, tcp_ctrl->deviceCode);
if (retpst != NULL){
offset = bytes + offset - (retpst - buffer);
memmove(buffer, retpst, offset);
memset(buffer + offset, 0, sizeof(buffer) - offset);
printf("====read not complete, need again offset=%d \n",offset);
goto again;
}
/*char* pInx = buffer;
int buflen = offset + bytes;
int slen = -1;
while (buflen > 0){
pInx += slen + 1;
slen = strlen(pInx);
printf("read_cb slen ====================================slen[%d] \n",slen);
if (g_cb != NULL){
g_cb(pInx,slen,tcp_ctrl->deviceCode);
}
buflen -= slen + 1;
}
if (buflen != 0 && bytes == (BUFFER_SIZE - offset)){
printf("read_cb need recv agian====================================[%d] \n",buflen);
offset = buflen + slen + 1;
memmove(buffer,pInx,offset);
memset(buffer + offset, 0, (BUFFER_SIZE - offset));
goto again;
}*/
}
}
}
......@@ -434,16 +656,59 @@ static void loop_tcp_thread(void *arg){
int kk_tcp_channel_ser_send(char* data, int len, char chalMark[DEVICE_CODE_LEN]){
int ret = 0;
fd_set fds;
struct timeval timeout={0,200}; //select等待3秒,3秒轮询,要非阻塞就置0
if (data != NULL){
int sd = get_sock_by_deviceCode(chalMark);
if (sd > -1){
ret = write(sd, data, len);
kk_tcp_ctrl_t* chl_ctrl = get_channel_by_deviceCode(chalMark);
if (NULL == chl_ctrl){
printf("[%s] get_channel_by_deviceCode is NULL!!! \n",__FUNCTION__);
return -1;
}
if (chl_ctrl->sock > -1){
FD_ZERO(&fds); //每次循环都要清空集合,否则不能检测描述符变化
FD_SET(chl_ctrl->sock,&fds); //添加描述符
switch(select(chl_ctrl->sock + 1,NULL,&fds,NULL,&timeout)) //select使用
{
case -1:
chl_ctrl->isConnect = 0;
printf(" [%s] select error ret=%d \n", __FUNCTION__, -1);
break; //select错误 退出循环
case 0:
printf(" [%s] select skip=================== ret=%d \n", __FUNCTION__, 0);
break; //再次轮询
default:
if(FD_ISSET(chl_ctrl->sock,&fds)) //测试sock是否可读,即是否网络上有数据
{
_MutexLock(chl_ctrl->mutex);
ret = write(chl_ctrl->sock, data, len);
_MutexUnlock(chl_ctrl->mutex);
if( ret <= 0){
printf("=================write error ret=%d \n",ret);
if (errno != EINTR){
chl_ctrl->isConnect = 0;
printf("write error reconnect!! \n");
break;
}
}
}
break;
}// end switch
/*_MutexLock(chl_ctrl->mutex);
printf("[%s] kk_tcp_channel_ser_send [%s] \n",__FUNCTION__,data);
ret = write(chl_ctrl->sock, data, len);
_MutexUnlock(chl_ctrl->mutex);
if (ret < 0){
printf("[%s] write failed!!!! \n",__FUNCTION__);
}
}*/
}
}
return ret;
}
......@@ -459,6 +724,7 @@ int kk_TCP_channel_init(ipc_cb cb)
g_init = 1;
//memset(g_tcp_ctrl, 0, sizeof(kk_tcp_ctrl_t)*MAX_LISTEN_NUM);
//kk_gw_list_load();
for(i = 0; i < MAX_LISTEN_NUM; i++){
g_tcp_ctrl[i].sock = -1;
}
......@@ -483,6 +749,10 @@ int kk_TCP_channel_deinit(ipc_type type)
if(g_tcp_ctrl[i].sock > -1){
//close(g_tcp_ctrl[i].sock);
}
if(g_tcp_ctrl[i].mutex != NULL){
_MutexDestroy(g_tcp_ctrl[i].mutex);
g_tcp_ctrl[i].mutex = NULL;
}
}
ev_io_stop(g_loop, &w_accept);
......@@ -494,58 +764,9 @@ int kk_TCP_channel_deinit(ipc_type type)
g_init = 0;
return 0;
}
void *_MutexCreate(void)
{
int err_num;
pthread_mutex_t *mutex = (pthread_mutex_t *)malloc(sizeof(pthread_mutex_t));
if (NULL == mutex) {
return NULL;
}
if (0 != (err_num = pthread_mutex_init(mutex, NULL))) {
printf("create mutex failed");
free(mutex);
return NULL;
}
return mutex;
}
void _MutexDestroy(void *mutex)
{
int err_num;
if (!mutex) {
printf("mutex want to destroy is NULL!");
return;
}
if (0 != (err_num = pthread_mutex_destroy((pthread_mutex_t *)mutex))) {
printf("destroy mutex failed");
}
free(mutex);
}
void _MutexLock(void *mutex)
{
int err_num;
if (0 != (err_num = pthread_mutex_lock((pthread_mutex_t *)mutex))) {
printf("lock mutex failed: - '%s' (%d)", strerror(err_num), err_num);
}
}
return 0;
void _MutexUnlock(void *mutex)
{
int err_num;
if (0 != (err_num = pthread_mutex_unlock((pthread_mutex_t *)mutex))) {
printf("unlock mutex failed - '%s' (%d)", strerror(err_num), err_num);
}
}
......@@ -630,6 +851,7 @@ static void loop_tcp_client_thread(void *arg){
printf("loop_tcp_client_thread start!\r\n");
char buf[1024]= {0};
int ret = 0;
int offset = 0;
fd_set fds;
struct timeval timeout={0,200}; //select等待3秒,3秒轮询,要非阻塞就置0
......@@ -645,6 +867,7 @@ static void loop_tcp_client_thread(void *arg){
while(g_client_ctrl.isConnect){
FD_ZERO(&fds); //每次循环都要清空集合,否则不能检测描述符变化
FD_SET(g_client_ctrl.sd,&fds); //添加描述符
offset = 0;
switch(select(g_client_ctrl.sd + 1,&fds,NULL,NULL,&timeout)) //select使用
{
case -1:
......@@ -658,9 +881,11 @@ static void loop_tcp_client_thread(void *arg){
if(FD_ISSET(g_client_ctrl.sd,&fds)) //测试sock是否可读,即是否网络上有数据
{
//接受网络数据
_MutexLock(g_client_ctrl.mutex);
memset(buf,0x0,sizeof(buf));
ret = read(g_client_ctrl.sd, buf, sizeof(buf));
clientRead:
_MutexLock(g_client_ctrl.mutex);
ret = read(g_client_ctrl.sd, buf + offset, sizeof(buf)- offset - 1);
_MutexUnlock(g_client_ctrl.mutex);
if( ret <= 0){
......@@ -672,9 +897,21 @@ static void loop_tcp_client_thread(void *arg){
}
}else if(ret > 0){
//printf("11buf = %s\n",buf);
#if 0
if (g_client_ctrl.cb != NULL){
g_client_ctrl.cb(buf,ret,"");
}
#else
char* retpst = __do_data(buf, ret + offset, g_client_ctrl.cb, "");
if (retpst != NULL){
offset = ret + offset - (retpst - buf);
memmove(buf, retpst, offset);
memset(buf + offset, 0, sizeof(buf) - offset);
printf("====read not complete, need again offset=%d \n",offset);
goto clientRead;
}
#endif
}
}
break;
......@@ -690,7 +927,6 @@ static void loop_tcp_client_thread(void *arg){
}
int kk_get_retry_num(){
return g_client_ctrl.retry;
}
......@@ -702,6 +938,8 @@ int kk_reset_retry_num(){
int kk_tcp_client_send(char* data, int len){
int ret = 0;
int cnt = 0;
fd_set fds;
struct timeval timeout={0,20000};
if ( data != NULL){
while(g_client_ctrl.sd == -1 && cnt < 5){
......@@ -713,14 +951,50 @@ int kk_tcp_client_send(char* data, int len){
printf("[%s] The tcp socket created fialid !!!! \n",__FUNCTION__);
return -1;
}
FD_ZERO(&fds); //每次循环都要清空集合,否则不能检测描述符变化
FD_SET(g_client_ctrl.sd,&fds); //添加描述符
switch(select(g_client_ctrl.sd + 1,NULL,&fds,NULL,&timeout)) //select使用
{
case -1:
g_client_ctrl.isConnect = 0;
printf(" [%s] select error ret=%d \n", __FUNCTION__, ret);
break; //select错误 退出循环
case 0:
printf(" [%s] select error ret=%d \n", __FUNCTION__, 0);
break; //再次轮询
default:
if(FD_ISSET(g_client_ctrl.sd,&fds)) //测试sock是否可读,即是否网络上有数据
{
//接受网络数据
_MutexLock(g_client_ctrl.mutex);
//printf("kk_tcp_client_send [%s] =========!\n",data);
ret = write(g_client_ctrl.sd, data, len);
_MutexUnlock(g_client_ctrl.mutex);
if( ret <= 0){
printf("=================send error ret=%d \n",ret);
if (errno != EINTR){
g_client_ctrl.isConnect = 0;
printf("send error reconnect!! \n");
break;
}
}
}
break;
}// end switch
/*_MutexLock(g_client_ctrl.mutex);
printf("kk_tcp_client_send [%s] \n",data);
ret = write(g_client_ctrl.sd, data, len);
_MutexUnlock(g_client_ctrl.mutex);
if (ret < 0){
printf("[%s] write failed ret=%d, reconnect !!!! \n",__FUNCTION__, ret);
g_client_ctrl.isConnect = 0;
ret = -1;
}
}*/
}
return ret;
}
......
......@@ -22,7 +22,7 @@ void kk_sendData2app(void *info, void *payload,int isAsync){
dm_queue_msg_insert4(buf);
}else{
kk_ipc_send(IPC_MID2APP, buf, strlen(buf) + 1);
kk_ipc_send(IPC_MID2APP, buf, strlen(buf) );
free(buf);
}
cJSON_Delete(root);
......
......@@ -153,7 +153,7 @@ void mid2p_cb(void* data, int len, char* chalMark){
void gw2mid_cb(void* data, int len, char* chalMark){
if (data != NULL){
printf("gw2mid_cb chalMark=%s, data: %s RECEIVED \r\n", chalMark, data);
printf("gw2mid_cb chalMark=%s, data: %s [%d]RECEIVED \r\n", chalMark, data,len);
mid2p_cb(data,len,chalMark);
}
}
......
......@@ -549,7 +549,7 @@ int OTA_publishProgress(void *handle, char* payload){
cJSON_AddStringToObject(root, "info", topicBuf);
cJSON_AddStringToObject(root, "payload", payload);
void *buf = cJSON_Print(root);
kk_ipc_send(IPC_MID2APP, buf, strlen(buf) + 1);
kk_ipc_send(IPC_MID2APP, buf, strlen(buf));
free(msgTypeStr);
free(topicBuf);
free(buf);
......
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
......@@ -112,7 +113,7 @@ static int send_result_resp(cJSON * result,
char * str_result = rpc_cJSON_Print(result_root);
printf("send json:\n%s\n",str_result);
return_value = kk_sendData2CCU(str_result, strlen(str_result)+1);
return_value = kk_sendData2CCU(str_result, strlen(str_result));
free(str_result);
rpc_cJSON_Delete(result_root);
return return_value;
......@@ -154,7 +155,7 @@ static int send_error_resp(int code, char* message,
}
char * str_result = rpc_cJSON_Print(result_root);
//printf("alla=========== :%d\n", strlen(str_result)+1);
return_value = kk_sendData2CCU(str_result, strlen(str_result)+1);
return_value = kk_sendData2CCU(str_result, strlen(str_result));
printf("send_error_resp:\n%s\n", str_result);
free(str_result);
rpc_cJSON_Delete(result_root);
......@@ -227,7 +228,7 @@ static int eval_request(struct jrpc_server *server, cJSON *root) {
void _cb(void* data, int len, char* chlmark){
if (data != NULL){
printf("plat_cb: %s RECEIVED \r\n", data);
printf("plat_cb: %s [%d]RECEIVED \r\n", data,len);
cJSON *root;
char *end_ptr = NULL;
......@@ -625,7 +626,7 @@ int jrpc_send_msg(cJSON * msgJson) {
return_value = kk_sendData2CCU(str_result, strlen(str_result)+1);
return_value = kk_sendData2CCU(str_result, strlen(str_result));
free(str_result);
return return_value;
}
......@@ -1013,4 +1014,3 @@ void rpc_reportDevices(void)
}
rpc_report_devices(devicesJson);
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment