Commit 3de8f63f authored by 陈伟灿's avatar 陈伟灿

Merge branch 'hzl' into 'master'

【修改内容】1. tcp连接粘包问题:1)使用json格式找'{'找到完成的json;2). 收到数据有'\0',用'\0'来区分粘包;2....

See merge request chenweican/k-sdk!15
parents a279a29f e1a27285
...@@ -382,7 +382,7 @@ int KK_Send_CloudState(int state) ...@@ -382,7 +382,7 @@ int KK_Send_CloudState(int state)
cJSON_AddItemToObject(root, MSG_INFO_STR, infoObj); cJSON_AddItemToObject(root, MSG_INFO_STR, infoObj);
cJSON_AddItemToObject(root, MSG_PAYLOAD_STR,payloadObj); cJSON_AddItemToObject(root, MSG_PAYLOAD_STR,payloadObj);
out=cJSON_Print(root); out=cJSON_Print(root);
kk_ipc_send(IPC_APP2MID, out, strlen(out)+1); kk_ipc_send(IPC_APP2MID, out, strlen(out));
cJSON_Delete(root); cJSON_Delete(root);
free(payloadStr); free(payloadStr);
free(infoStr); free(infoStr);
...@@ -401,6 +401,6 @@ void KK_Sendto_DevData(const char *topic,const char *data) ...@@ -401,6 +401,6 @@ void KK_Sendto_DevData(const char *topic,const char *data)
if(send_data == NULL){ if(send_data == NULL){
return; return;
} }
kk_ipc_send(IPC_APP2MID, send_data, strlen(send_data)+1); kk_ipc_send(IPC_APP2MID, send_data, strlen(send_data));
free(send_data); free(send_data);
} }
...@@ -100,7 +100,7 @@ static void watcher_cb (struct ev_loop *loop ,struct ev_io *w, int revents) ...@@ -100,7 +100,7 @@ static void watcher_cb (struct ev_loop *loop ,struct ev_io *w, int revents)
if (strncmp(validDat,MAGIC_ACK, strlen(MAGIC_ACK)) == 0){ if (strncmp(validDat,MAGIC_ACK, strlen(MAGIC_ACK)) == 0){
}else{ }else{
kk_ipc_send_ex(loop_ctrl->type, MAGIC_ACK, strlen(MAGIC_ACK)+1, chlMark); kk_ipc_send_ex(loop_ctrl->type, MAGIC_ACK, strlen(MAGIC_ACK), chlMark);
} }
loop_ctrl->isconnect =1; loop_ctrl->isconnect =1;
...@@ -453,7 +453,7 @@ int kk_ipc_isconnect(ipc_type type){ ...@@ -453,7 +453,7 @@ int kk_ipc_isconnect(ipc_type type){
{ {
for(int i =0; i<20;i++){ for(int i =0; i<20;i++){
kk_ipc_send(type, MAGIC, strlen(MAGIC)+1); kk_ipc_send(type, MAGIC, strlen(MAGIC));
usleep(500000); usleep(500000);
if (loop_ctrl->isconnect == 1){ if (loop_ctrl->isconnect == 1){
break; break;
......
...@@ -9,7 +9,7 @@ ...@@ -9,7 +9,7 @@
#include <sys/select.h> #include <sys/select.h>
#include <sys/time.h> #include <sys/time.h>
#include <sys/types.h> #include <sys/types.h>
#include <string.h>
#include "com_api.h" #include "com_api.h"
#define CCU_TCP_PORT 16565 #define CCU_TCP_PORT 16565
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
typedef struct{ typedef struct{
void *mutex;
char deviceCode[DEVICE_CODE_LEN]; char deviceCode[DEVICE_CODE_LEN];
char ip[MAX_IP_LEN]; char ip[MAX_IP_LEN];
int sock; int sock;
...@@ -34,9 +35,170 @@ static int g_init = 0; ...@@ -34,9 +35,170 @@ static int g_init = 0;
static ipc_cb* g_cb = NULL; static ipc_cb* g_cb = NULL;
static struct ev_io w_accept; static struct ev_io w_accept;
static void *_MutexCreate(void)
{
int err_num;
pthread_mutex_t *mutex = (pthread_mutex_t *)malloc(sizeof(pthread_mutex_t));
if (NULL == mutex) {
return NULL;
}
if (0 != (err_num = pthread_mutex_init(mutex, NULL))) {
printf("create mutex failed");
free(mutex);
return NULL;
}
return mutex;
}
static void _MutexDestroy(void *mutex)
{
int err_num;
if (!mutex) {
printf("mutex want to destroy is NULL!");
return;
}
if (0 != (err_num = pthread_mutex_destroy((pthread_mutex_t *)mutex))) {
printf("destroy mutex failed");
}
free(mutex);
}
static void _MutexLock(void *mutex)
{
int err_num;
if (0 != (err_num = pthread_mutex_lock((pthread_mutex_t *)mutex))) {
printf("lock mutex failed: - '%s' (%d)", strerror(err_num), err_num);
}
}
static void _MutexUnlock(void *mutex)
{
int err_num;
if (0 != (err_num = pthread_mutex_unlock((pthread_mutex_t *)mutex))) {
printf("unlock mutex failed - '%s' (%d)", strerror(err_num), err_num);
}
}
static char *_next_json(char *str, int* hasFloor)
{
if(str == NULL) {
return NULL;
}
char *ptr = str;
int floor = 0;
// judge if inside the "..."
int quotes = 0;
while(*ptr) {
if(*ptr++ == '{') {
++floor;
break;
}
}
*hasFloor = floor;
if(!floor) {
//print_json();
return NULL;
}
while(floor && *ptr) {
switch(*ptr++) {
case '{':{
if(quotes)
break;
++floor;
//printf("floor is %d \n", floor);
break;
}
case '}':{
if(quotes)
break;
--floor;
//printf("floor is %d \n", floor);
break;
}
case '"':{
quotes = !quotes;
}
/* notice: judgment of '\\' added new without test*/
case '\\':{
break;
++ptr;
break;
}
default:{
break;
}
}
}
if(floor == 0) {
return ptr;
}
return NULL;
}
static char* __do_data(char* buf, int buflen,ipc_cb* cb, char* deviceCode){
char* endIdx = NULL;
char* startIdx = buf;
int hasfloor = 0;
char buf2[1024] = {0};
while(startIdx < buf + buflen){
hasfloor = 0;
endIdx = _next_json(startIdx,&hasfloor);
if(endIdx == NULL && hasfloor == 0){//一般数据
if (cb != NULL){
cb(startIdx, strlen(startIdx),deviceCode);
}
return NULL;
//break;
}else if (endIdx == NULL){ //有json数据没接收完
return startIdx;
}
int cplen = endIdx - startIdx;
memset(buf2,0, sizeof(buf2));
memcpy(buf2,startIdx, cplen);
if (cb != NULL){
cb(buf2,cplen,deviceCode);
}
startIdx = endIdx;
}
return NULL;
}
static char* __do_spilt(char* buf, int buflen,ipc_cb* cb, char* deviceCode){
char* pInx = buf;
int slen = -1;
while (buflen > 0){
pInx += slen + 1;
slen = strlen(pInx);
printf("read_cb slen ====================================slen[%d] \n",slen);
if (cb != NULL){
cb(pInx,slen,deviceCode);
}
buflen -= slen + 1;
}
if (buflen != 0 ){
return pInx;
}
return NULL;
}
// Save/Load the gateway list // Save/Load the gateway list
void kk_gw_list_save(void) static void kk_gw_list_save(void)
{ {
FILE *fp; FILE *fp;
uint8_t i; uint8_t i;
...@@ -51,7 +213,7 @@ void kk_gw_list_save(void) ...@@ -51,7 +213,7 @@ void kk_gw_list_save(void)
} }
void kk_gw_list_load(void) static void kk_gw_list_load(void)
{ {
uint16_t i; uint16_t i;
FILE *fp; FILE *fp;
...@@ -98,18 +260,18 @@ static int get_idx_by_ip(char ip[MAX_IP_LEN]){ ...@@ -98,18 +260,18 @@ static int get_idx_by_ip(char ip[MAX_IP_LEN]){
} }
static int get_sock_by_deviceCode(char deviceCode[DEVICE_CODE_LEN]){ static kk_tcp_ctrl_t* get_channel_by_deviceCode(char deviceCode[DEVICE_CODE_LEN]){
int i = 0; int i = 0;
if (deviceCode == NULL || strlen(deviceCode) == 0){ if (deviceCode == NULL || strlen(deviceCode) == 0){
return -1; return NULL;
} }
for(;i < MAX_LISTEN_NUM; i++){ for(;i < MAX_LISTEN_NUM; i++){
if(strcmp(deviceCode, g_tcp_ctrl[i].deviceCode) == 0){ if(strcmp(deviceCode, g_tcp_ctrl[i].deviceCode) == 0){
return g_tcp_ctrl[i].sock; return &g_tcp_ctrl[i];
} }
} }
return -1; return NULL;
} }
...@@ -156,7 +318,9 @@ static int reset_by_sock(int sock){ ...@@ -156,7 +318,9 @@ static int reset_by_sock(int sock){
for(;i < MAX_LISTEN_NUM; i++){ for(;i < MAX_LISTEN_NUM; i++){
if(sock == g_tcp_ctrl[i].sock){ if(sock == g_tcp_ctrl[i].sock){
g_tcp_ctrl[i].isConnect = 0; g_tcp_ctrl[i].isConnect = 0;
g_tcp_ctrl[i].isConnect = -1; g_tcp_ctrl[i].sock = -1;
_MutexDestroy(g_tcp_ctrl[i].mutex);
g_tcp_ctrl[i].mutex = NULL;
return 0; return 0;
} }
} }
...@@ -175,6 +339,15 @@ static int set_sock_by_ip(char ip[MAX_IP_LEN], int sock){ ...@@ -175,6 +339,15 @@ static int set_sock_by_ip(char ip[MAX_IP_LEN], int sock){
g_tcp_ctrl[i].sock = sock; g_tcp_ctrl[i].sock = sock;
printf("[%s] idx=%d ip=%s sock=%d\n", __FUNCTION__,i, ip, g_tcp_ctrl[i].sock); printf("[%s] idx=%d ip=%s sock=%d\n", __FUNCTION__,i, ip, g_tcp_ctrl[i].sock);
//create mutex
if (g_tcp_ctrl[i].mutex == NULL){
g_tcp_ctrl[i].mutex = _MutexCreate();
if (g_tcp_ctrl[i].mutex == NULL) {
printf("[%s] _MutexCreate failed \n", __FUNCTION__);
return -1;
}
}
return 0; return 0;
} }
} }
...@@ -193,7 +366,7 @@ int kk_is_tcp_channel(char devCode[DEVICE_CODE_LEN]){ ...@@ -193,7 +366,7 @@ int kk_is_tcp_channel(char devCode[DEVICE_CODE_LEN]){
for(;i < MAX_LISTEN_NUM; i++){ for(;i < MAX_LISTEN_NUM; i++){
if(strcmp(devCode, g_tcp_ctrl[i].deviceCode) == 0){ if(strcmp(devCode, g_tcp_ctrl[i].deviceCode) == 0){
printf("[%s] idx=%d ip=%s sock=%d\n", __FUNCTION__,i, g_tcp_ctrl[i].ip, g_tcp_ctrl[i].sock); //printf("[%s] idx=%d ip=%s sock=%d\n", __FUNCTION__,i, g_tcp_ctrl[i].ip, g_tcp_ctrl[i].sock);
return g_tcp_ctrl[i].sock; return g_tcp_ctrl[i].sock;
} }
} }
...@@ -312,15 +485,19 @@ err1: ...@@ -312,15 +485,19 @@ err1:
/*读回调*/ /*读回调*/
static void read_cb(struct ev_loop *loop, struct ev_io *watcher, int revents) static void read_cb(struct ev_loop *loop, struct ev_io *watcher, int revents)
{ {
printf("================== read_cb \n"); printf("================== read_cb [%d]\n", revents);
char buffer[BUFFER_SIZE] = {0}; char buffer[BUFFER_SIZE] = {0};
if (EV_ERROR & revents) { if (EV_ERROR & revents) {
printf("read got invalid event...\r\n"); printf("read got invalid event...\r\n");
return; return;
} }
int res = 0; int res = 0;
int offset = 0;
kk_tcp_ctrl_t* tcp_ctrl = (kk_tcp_ctrl_t*)watcher->data; kk_tcp_ctrl_t* tcp_ctrl = (kk_tcp_ctrl_t*)watcher->data;
int32_t bytes = recv(watcher->fd, buffer, sizeof(buffer),0); again:
_MutexLock(tcp_ctrl->mutex);
int32_t bytes = recv(watcher->fd, buffer + offset, sizeof(buffer) - offset - 1,0);
_MutexUnlock(tcp_ctrl->mutex);
if (-1 == bytes) { if (-1 == bytes) {
//tcp Error //tcp Error
if (EINTR != errno && EAGAIN != errno) { if (EINTR != errno && EAGAIN != errno) {
...@@ -337,11 +514,56 @@ static void read_cb(struct ev_loop *loop, struct ev_io *watcher, int revents) ...@@ -337,11 +514,56 @@ static void read_cb(struct ev_loop *loop, struct ev_io *watcher, int revents)
ev_io_stop(loop,watcher); ev_io_stop(loop,watcher);
free(watcher); free(watcher);
} else { } else {
//intf("READ:\r\n %s\r\n", buffer); //printf("READ:\r\n %s =====[%d]\r\n", buffer, bytes);
//printf("read_cb deviceCode ip sock [%s][%s][%d] \n",tcp_ctrl->deviceCode,tcp_ctrl->ip, tcp_ctrl->sock); //printf("read_cb deviceCode ip sock [%s][%s][%d] \n",tcp_ctrl->deviceCode,tcp_ctrl->ip, tcp_ctrl->sock);
if (g_cb != NULL){ /*if (g_cb != NULL){
g_cb(buffer,bytes,tcp_ctrl->deviceCode); g_cb(buffer,bytes,tcp_ctrl->deviceCode);
}*/
char* retpst = NULL;
if (strlen(buffer) == bytes){
retpst = __do_data(buffer, bytes + offset, g_cb, tcp_ctrl->deviceCode);
if (retpst != NULL){
offset = bytes + offset - (retpst - buffer);
memmove(buffer, retpst, offset);
memset(buffer + offset, 0, sizeof(buffer) - offset);
printf("====read not complete, need again offset=%d \n",offset);
goto again;
}
}
else{
//处理‘\0’ 结束符粘包
retpst = __do_spilt(buffer, bytes + offset, g_cb, tcp_ctrl->deviceCode);
if (retpst != NULL){
offset = bytes + offset - (retpst - buffer);
memmove(buffer, retpst, offset);
memset(buffer + offset, 0, sizeof(buffer) - offset);
printf("====read not complete, need again offset=%d \n",offset);
goto again;
}
/*char* pInx = buffer;
int buflen = offset + bytes;
int slen = -1;
while (buflen > 0){
pInx += slen + 1;
slen = strlen(pInx);
printf("read_cb slen ====================================slen[%d] \n",slen);
if (g_cb != NULL){
g_cb(pInx,slen,tcp_ctrl->deviceCode);
}
buflen -= slen + 1;
} }
if (buflen != 0 && bytes == (BUFFER_SIZE - offset)){
printf("read_cb need recv agian====================================[%d] \n",buflen);
offset = buflen + slen + 1;
memmove(buffer,pInx,offset);
memset(buffer + offset, 0, (BUFFER_SIZE - offset));
goto again;
}*/
}
} }
} }
...@@ -434,16 +656,59 @@ static void loop_tcp_thread(void *arg){ ...@@ -434,16 +656,59 @@ static void loop_tcp_thread(void *arg){
int kk_tcp_channel_ser_send(char* data, int len, char chalMark[DEVICE_CODE_LEN]){ int kk_tcp_channel_ser_send(char* data, int len, char chalMark[DEVICE_CODE_LEN]){
int ret = 0; int ret = 0;
fd_set fds;
struct timeval timeout={0,200}; //select等待3秒,3秒轮询,要非阻塞就置0
if (data != NULL){ if (data != NULL){
int sd = get_sock_by_deviceCode(chalMark); kk_tcp_ctrl_t* chl_ctrl = get_channel_by_deviceCode(chalMark);
if (sd > -1){ if (NULL == chl_ctrl){
ret = write(sd, data, len); printf("[%s] get_channel_by_deviceCode is NULL!!! \n",__FUNCTION__);
return -1;
}
if (chl_ctrl->sock > -1){
FD_ZERO(&fds); //每次循环都要清空集合,否则不能检测描述符变化
FD_SET(chl_ctrl->sock,&fds); //添加描述符
switch(select(chl_ctrl->sock + 1,NULL,&fds,NULL,&timeout)) //select使用
{
case -1:
chl_ctrl->isConnect = 0;
printf(" [%s] select error ret=%d \n", __FUNCTION__, -1);
break; //select错误 退出循环
case 0:
printf(" [%s] select skip=================== ret=%d \n", __FUNCTION__, 0);
break; //再次轮询
default:
if(FD_ISSET(chl_ctrl->sock,&fds)) //测试sock是否可读,即是否网络上有数据
{
_MutexLock(chl_ctrl->mutex);
ret = write(chl_ctrl->sock, data, len);
_MutexUnlock(chl_ctrl->mutex);
if( ret <= 0){
printf("=================write error ret=%d \n",ret);
if (errno != EINTR){
chl_ctrl->isConnect = 0;
printf("write error reconnect!! \n");
break;
}
}
}
break;
}// end switch
/*_MutexLock(chl_ctrl->mutex);
printf("[%s] kk_tcp_channel_ser_send [%s] \n",__FUNCTION__,data);
ret = write(chl_ctrl->sock, data, len);
_MutexUnlock(chl_ctrl->mutex);
if (ret < 0){ if (ret < 0){
printf("[%s] write failed!!!! \n",__FUNCTION__); printf("[%s] write failed!!!! \n",__FUNCTION__);
} }*/
} }
} }
return ret;
} }
...@@ -459,6 +724,7 @@ int kk_TCP_channel_init(ipc_cb cb) ...@@ -459,6 +724,7 @@ int kk_TCP_channel_init(ipc_cb cb)
g_init = 1; g_init = 1;
//memset(g_tcp_ctrl, 0, sizeof(kk_tcp_ctrl_t)*MAX_LISTEN_NUM); //memset(g_tcp_ctrl, 0, sizeof(kk_tcp_ctrl_t)*MAX_LISTEN_NUM);
//kk_gw_list_load(); //kk_gw_list_load();
for(i = 0; i < MAX_LISTEN_NUM; i++){ for(i = 0; i < MAX_LISTEN_NUM; i++){
g_tcp_ctrl[i].sock = -1; g_tcp_ctrl[i].sock = -1;
} }
...@@ -483,6 +749,10 @@ int kk_TCP_channel_deinit(ipc_type type) ...@@ -483,6 +749,10 @@ int kk_TCP_channel_deinit(ipc_type type)
if(g_tcp_ctrl[i].sock > -1){ if(g_tcp_ctrl[i].sock > -1){
//close(g_tcp_ctrl[i].sock); //close(g_tcp_ctrl[i].sock);
} }
if(g_tcp_ctrl[i].mutex != NULL){
_MutexDestroy(g_tcp_ctrl[i].mutex);
g_tcp_ctrl[i].mutex = NULL;
}
} }
ev_io_stop(g_loop, &w_accept); ev_io_stop(g_loop, &w_accept);
...@@ -494,58 +764,9 @@ int kk_TCP_channel_deinit(ipc_type type) ...@@ -494,58 +764,9 @@ int kk_TCP_channel_deinit(ipc_type type)
g_init = 0; g_init = 0;
return 0;
}
void *_MutexCreate(void)
{
int err_num;
pthread_mutex_t *mutex = (pthread_mutex_t *)malloc(sizeof(pthread_mutex_t));
if (NULL == mutex) {
return NULL;
}
if (0 != (err_num = pthread_mutex_init(mutex, NULL))) {
printf("create mutex failed");
free(mutex);
return NULL;
}
return mutex;
}
void _MutexDestroy(void *mutex) return 0;
{
int err_num;
if (!mutex) {
printf("mutex want to destroy is NULL!");
return;
}
if (0 != (err_num = pthread_mutex_destroy((pthread_mutex_t *)mutex))) {
printf("destroy mutex failed");
}
free(mutex);
}
void _MutexLock(void *mutex)
{
int err_num;
if (0 != (err_num = pthread_mutex_lock((pthread_mutex_t *)mutex))) {
printf("lock mutex failed: - '%s' (%d)", strerror(err_num), err_num);
}
}
void _MutexUnlock(void *mutex)
{
int err_num;
if (0 != (err_num = pthread_mutex_unlock((pthread_mutex_t *)mutex))) {
printf("unlock mutex failed - '%s' (%d)", strerror(err_num), err_num);
}
} }
...@@ -630,6 +851,7 @@ static void loop_tcp_client_thread(void *arg){ ...@@ -630,6 +851,7 @@ static void loop_tcp_client_thread(void *arg){
printf("loop_tcp_client_thread start!\r\n"); printf("loop_tcp_client_thread start!\r\n");
char buf[1024]= {0}; char buf[1024]= {0};
int ret = 0; int ret = 0;
int offset = 0;
fd_set fds; fd_set fds;
struct timeval timeout={0,200}; //select等待3秒,3秒轮询,要非阻塞就置0 struct timeval timeout={0,200}; //select等待3秒,3秒轮询,要非阻塞就置0
...@@ -645,6 +867,7 @@ static void loop_tcp_client_thread(void *arg){ ...@@ -645,6 +867,7 @@ static void loop_tcp_client_thread(void *arg){
while(g_client_ctrl.isConnect){ while(g_client_ctrl.isConnect){
FD_ZERO(&fds); //每次循环都要清空集合,否则不能检测描述符变化 FD_ZERO(&fds); //每次循环都要清空集合,否则不能检测描述符变化
FD_SET(g_client_ctrl.sd,&fds); //添加描述符 FD_SET(g_client_ctrl.sd,&fds); //添加描述符
offset = 0;
switch(select(g_client_ctrl.sd + 1,&fds,NULL,NULL,&timeout)) //select使用 switch(select(g_client_ctrl.sd + 1,&fds,NULL,NULL,&timeout)) //select使用
{ {
case -1: case -1:
...@@ -658,9 +881,11 @@ static void loop_tcp_client_thread(void *arg){ ...@@ -658,9 +881,11 @@ static void loop_tcp_client_thread(void *arg){
if(FD_ISSET(g_client_ctrl.sd,&fds)) //测试sock是否可读,即是否网络上有数据 if(FD_ISSET(g_client_ctrl.sd,&fds)) //测试sock是否可读,即是否网络上有数据
{ {
//接受网络数据 //接受网络数据
_MutexLock(g_client_ctrl.mutex);
memset(buf,0x0,sizeof(buf)); memset(buf,0x0,sizeof(buf));
ret = read(g_client_ctrl.sd, buf, sizeof(buf)); clientRead:
_MutexLock(g_client_ctrl.mutex);
ret = read(g_client_ctrl.sd, buf + offset, sizeof(buf)- offset - 1);
_MutexUnlock(g_client_ctrl.mutex); _MutexUnlock(g_client_ctrl.mutex);
if( ret <= 0){ if( ret <= 0){
...@@ -672,9 +897,21 @@ static void loop_tcp_client_thread(void *arg){ ...@@ -672,9 +897,21 @@ static void loop_tcp_client_thread(void *arg){
} }
}else if(ret > 0){ }else if(ret > 0){
//printf("11buf = %s\n",buf); //printf("11buf = %s\n",buf);
#if 0
if (g_client_ctrl.cb != NULL){ if (g_client_ctrl.cb != NULL){
g_client_ctrl.cb(buf,ret,""); g_client_ctrl.cb(buf,ret,"");
} }
#else
char* retpst = __do_data(buf, ret + offset, g_client_ctrl.cb, "");
if (retpst != NULL){
offset = ret + offset - (retpst - buf);
memmove(buf, retpst, offset);
memset(buf + offset, 0, sizeof(buf) - offset);
printf("====read not complete, need again offset=%d \n",offset);
goto clientRead;
}
#endif
} }
} }
break; break;
...@@ -690,7 +927,6 @@ static void loop_tcp_client_thread(void *arg){ ...@@ -690,7 +927,6 @@ static void loop_tcp_client_thread(void *arg){
} }
int kk_get_retry_num(){ int kk_get_retry_num(){
return g_client_ctrl.retry; return g_client_ctrl.retry;
} }
...@@ -702,6 +938,8 @@ int kk_reset_retry_num(){ ...@@ -702,6 +938,8 @@ int kk_reset_retry_num(){
int kk_tcp_client_send(char* data, int len){ int kk_tcp_client_send(char* data, int len){
int ret = 0; int ret = 0;
int cnt = 0; int cnt = 0;
fd_set fds;
struct timeval timeout={0,20000};
if ( data != NULL){ if ( data != NULL){
while(g_client_ctrl.sd == -1 && cnt < 5){ while(g_client_ctrl.sd == -1 && cnt < 5){
...@@ -713,14 +951,50 @@ int kk_tcp_client_send(char* data, int len){ ...@@ -713,14 +951,50 @@ int kk_tcp_client_send(char* data, int len){
printf("[%s] The tcp socket created fialid !!!! \n",__FUNCTION__); printf("[%s] The tcp socket created fialid !!!! \n",__FUNCTION__);
return -1; return -1;
} }
FD_ZERO(&fds); //每次循环都要清空集合,否则不能检测描述符变化
FD_SET(g_client_ctrl.sd,&fds); //添加描述符
switch(select(g_client_ctrl.sd + 1,NULL,&fds,NULL,&timeout)) //select使用
{
case -1:
g_client_ctrl.isConnect = 0;
printf(" [%s] select error ret=%d \n", __FUNCTION__, ret);
break; //select错误 退出循环
case 0:
printf(" [%s] select error ret=%d \n", __FUNCTION__, 0);
break; //再次轮询
default:
if(FD_ISSET(g_client_ctrl.sd,&fds)) //测试sock是否可读,即是否网络上有数据
{
//接受网络数据
_MutexLock(g_client_ctrl.mutex); _MutexLock(g_client_ctrl.mutex);
//printf("kk_tcp_client_send [%s] =========!\n",data);
ret = write(g_client_ctrl.sd, data, len);
_MutexUnlock(g_client_ctrl.mutex);
if( ret <= 0){
printf("=================send error ret=%d \n",ret);
if (errno != EINTR){
g_client_ctrl.isConnect = 0;
printf("send error reconnect!! \n");
break;
}
}
}
break;
}// end switch
/*_MutexLock(g_client_ctrl.mutex);
printf("kk_tcp_client_send [%s] \n",data);
ret = write(g_client_ctrl.sd, data, len); ret = write(g_client_ctrl.sd, data, len);
_MutexUnlock(g_client_ctrl.mutex); _MutexUnlock(g_client_ctrl.mutex);
if (ret < 0){ if (ret < 0){
printf("[%s] write failed ret=%d, reconnect !!!! \n",__FUNCTION__, ret); printf("[%s] write failed ret=%d, reconnect !!!! \n",__FUNCTION__, ret);
g_client_ctrl.isConnect = 0; g_client_ctrl.isConnect = 0;
ret = -1; ret = -1;
} }*/
} }
return ret; return ret;
} }
......
...@@ -22,7 +22,7 @@ void kk_sendData2app(void *info, void *payload,int isAsync){ ...@@ -22,7 +22,7 @@ void kk_sendData2app(void *info, void *payload,int isAsync){
dm_queue_msg_insert4(buf); dm_queue_msg_insert4(buf);
}else{ }else{
kk_ipc_send(IPC_MID2APP, buf, strlen(buf) + 1); kk_ipc_send(IPC_MID2APP, buf, strlen(buf) );
free(buf); free(buf);
} }
cJSON_Delete(root); cJSON_Delete(root);
......
...@@ -153,7 +153,7 @@ void mid2p_cb(void* data, int len, char* chalMark){ ...@@ -153,7 +153,7 @@ void mid2p_cb(void* data, int len, char* chalMark){
void gw2mid_cb(void* data, int len, char* chalMark){ void gw2mid_cb(void* data, int len, char* chalMark){
if (data != NULL){ if (data != NULL){
printf("gw2mid_cb chalMark=%s, data: %s RECEIVED \r\n", chalMark, data); printf("gw2mid_cb chalMark=%s, data: %s [%d]RECEIVED \r\n", chalMark, data,len);
mid2p_cb(data,len,chalMark); mid2p_cb(data,len,chalMark);
} }
} }
......
...@@ -549,7 +549,7 @@ int OTA_publishProgress(void *handle, char* payload){ ...@@ -549,7 +549,7 @@ int OTA_publishProgress(void *handle, char* payload){
cJSON_AddStringToObject(root, "info", topicBuf); cJSON_AddStringToObject(root, "info", topicBuf);
cJSON_AddStringToObject(root, "payload", payload); cJSON_AddStringToObject(root, "payload", payload);
void *buf = cJSON_Print(root); void *buf = cJSON_Print(root);
kk_ipc_send(IPC_MID2APP, buf, strlen(buf) + 1); kk_ipc_send(IPC_MID2APP, buf, strlen(buf));
free(msgTypeStr); free(msgTypeStr);
free(topicBuf); free(topicBuf);
free(buf); free(buf);
......
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <unistd.h> #include <unistd.h>
...@@ -112,7 +113,7 @@ static int send_result_resp(cJSON * result, ...@@ -112,7 +113,7 @@ static int send_result_resp(cJSON * result,
char * str_result = rpc_cJSON_Print(result_root); char * str_result = rpc_cJSON_Print(result_root);
printf("send json:\n%s\n",str_result); printf("send json:\n%s\n",str_result);
return_value = kk_sendData2CCU(str_result, strlen(str_result)+1); return_value = kk_sendData2CCU(str_result, strlen(str_result));
free(str_result); free(str_result);
rpc_cJSON_Delete(result_root); rpc_cJSON_Delete(result_root);
return return_value; return return_value;
...@@ -154,7 +155,7 @@ static int send_error_resp(int code, char* message, ...@@ -154,7 +155,7 @@ static int send_error_resp(int code, char* message,
} }
char * str_result = rpc_cJSON_Print(result_root); char * str_result = rpc_cJSON_Print(result_root);
//printf("alla=========== :%d\n", strlen(str_result)+1); //printf("alla=========== :%d\n", strlen(str_result)+1);
return_value = kk_sendData2CCU(str_result, strlen(str_result)+1); return_value = kk_sendData2CCU(str_result, strlen(str_result));
printf("send_error_resp:\n%s\n", str_result); printf("send_error_resp:\n%s\n", str_result);
free(str_result); free(str_result);
rpc_cJSON_Delete(result_root); rpc_cJSON_Delete(result_root);
...@@ -227,7 +228,7 @@ static int eval_request(struct jrpc_server *server, cJSON *root) { ...@@ -227,7 +228,7 @@ static int eval_request(struct jrpc_server *server, cJSON *root) {
void _cb(void* data, int len, char* chlmark){ void _cb(void* data, int len, char* chlmark){
if (data != NULL){ if (data != NULL){
printf("plat_cb: %s RECEIVED \r\n", data); printf("plat_cb: %s [%d]RECEIVED \r\n", data,len);
cJSON *root; cJSON *root;
char *end_ptr = NULL; char *end_ptr = NULL;
...@@ -625,7 +626,7 @@ int jrpc_send_msg(cJSON * msgJson) { ...@@ -625,7 +626,7 @@ int jrpc_send_msg(cJSON * msgJson) {
return_value = kk_sendData2CCU(str_result, strlen(str_result)+1); return_value = kk_sendData2CCU(str_result, strlen(str_result));
free(str_result); free(str_result);
return return_value; return return_value;
} }
...@@ -1013,4 +1014,3 @@ void rpc_reportDevices(void) ...@@ -1013,4 +1014,3 @@ void rpc_reportDevices(void)
} }
rpc_report_devices(devicesJson); rpc_report_devices(devicesJson);
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment