Commit 89a95333 authored by 黄振令's avatar 黄振令

【修改内容】1. 网关连接主机失败,重现广播发现,解决主机ip地址变了无法连接问题;2.优化nnmsg代码

【提交人】huang.zhenling
parent 4d5b2765
...@@ -6,6 +6,8 @@ ...@@ -6,6 +6,8 @@
#define GW2CCU_PIPE "tcp://%s:5555" #define GW2CCU_PIPE "tcp://%s:5555"
#define GW2CCU_PUBSUB "tcp://%s:5557" #define GW2CCU_PUBSUB "tcp://%s:5557"
#define MAGIC "magic12" #define MAGIC "magic12"
#define MAGIC_ACK "magic12ack"
#define FILTERSTR "|" #define FILTERSTR "|"
#ifndef _ZLOG_ #ifndef _ZLOG_
...@@ -36,6 +38,7 @@ typedef struct { ...@@ -36,6 +38,7 @@ typedef struct {
ipc_cb* cb; ipc_cb* cb;
ipc_type type; ipc_type type;
int isconnect; int isconnect;
int sendErrCnt;
}Bloop_ctrl_t; }Bloop_ctrl_t;
...@@ -84,10 +87,12 @@ static void watcher_cb (struct ev_loop *loop ,struct ev_io *w, int revents) ...@@ -84,10 +87,12 @@ static void watcher_cb (struct ev_loop *loop ,struct ev_io *w, int revents)
} }
if (bytes <= 0 || dat == NULL) { if (bytes <= 0 || dat == NULL) {
ERROR_PRINT(" recived data is null or len is 0 \n"); ERROR_PRINT(" recived data is null or len is 0 \n");
ERROR_PRINT("nn_recv failed with error code %d, %s \n", nn_errno(), nn_strerror(nn_errno ()));
return; return;
} }
INFO_PRINT("watcher_cb:%s recived\r\n\r\n", (char *)dat); INFO_PRINT("watcher_cb:%s recived\r\n\r\n", (char *)dat);
loop_ctrl->isconnect = 1;
//if sub, need filter sbuscribe str //if sub, need filter sbuscribe str
if (IPC_PLAT2MID == loop_ctrl->type || IPC_MID2PLAT == loop_ctrl->type){ if (IPC_PLAT2MID == loop_ctrl->type || IPC_MID2PLAT == loop_ctrl->type){
validDat = _parse_data_by_subscribe(dat, bytes, &validLen, &chlMark); validDat = _parse_data_by_subscribe(dat, bytes, &validLen, &chlMark);
...@@ -106,10 +111,13 @@ static void watcher_cb (struct ev_loop *loop ,struct ev_io *w, int revents) ...@@ -106,10 +111,13 @@ static void watcher_cb (struct ev_loop *loop ,struct ev_io *w, int revents)
} }
}*/ }*/
if (strncmp(validDat,MAGIC, strlen(MAGIC)) == 0){ if (strncmp(validDat,MAGIC, strlen(MAGIC)) == 0){
if (loop_ctrl->isconnect == 0){ if (strncmp(validDat,MAGIC_ACK, strlen(MAGIC_ACK)) == 0){
kk_ipc_send_ex(loop_ctrl->type, validDat, bytes, chlMark);
loop_ctrl->isconnect =1; }else{
kk_ipc_send_ex(loop_ctrl->type, MAGIC_ACK, strlen(MAGIC_ACK)+1, chlMark);
} }
loop_ctrl->isconnect =1;
nn_freemsg(dat); nn_freemsg(dat);
return; return;
}else if (IPC_MID2PLAT == loop_ctrl->type){// }else if (IPC_MID2PLAT == loop_ctrl->type){//
...@@ -335,7 +343,7 @@ int kk_ipc_init(ipc_type type, ipc_cb cb, char* chlMark, char* ip) ...@@ -335,7 +343,7 @@ int kk_ipc_init(ipc_type type, ipc_cb cb, char* chlMark, char* ip)
} }
int kk_ipc_dinit(ipc_type type) int kk_ipc_deinit(ipc_type type)
{ {
Bloop_ctrl_t* loop_ctrl; Bloop_ctrl_t* loop_ctrl;
if (Bloop_ctrl.type == type){ if (Bloop_ctrl.type == type){
...@@ -389,10 +397,11 @@ int kk_ipc_send(ipc_type type, void* data, int len) ...@@ -389,10 +397,11 @@ int kk_ipc_send(ipc_type type, void* data, int len)
int kk_ipc_send_ex(ipc_type type, void* data, int len, char* chalMark) int kk_ipc_send_ex(ipc_type type, void* data, int len, char* chalMark)
{ {
int ret = 0;
if (data != NULL){ if (data != NULL){
int filterlen = 0; int filterlen = 0;
void* buf = NULL; void* buf = NULL;
Bloop_ctrl_t* loop_ctrl;
if (chalMark != NULL){ if (chalMark != NULL){
filterlen = strlen(chalMark) + strlen(FILTERSTR); filterlen = strlen(chalMark) + strlen(FILTERSTR);
...@@ -410,9 +419,21 @@ int kk_ipc_send_ex(ipc_type type, void* data, int len, char* chalMark) ...@@ -410,9 +419,21 @@ int kk_ipc_send_ex(ipc_type type, void* data, int len, char* chalMark)
memcpy(buf + filterlen, data, len); memcpy(buf + filterlen, data, len);
if (type == IPC_MID2PLAT){ if (type == IPC_MID2PLAT){
nn_send(Mloop_ctrl.ab.n, &buf, NN_MSG, NN_DONTWAIT);//NN_DONTWAIT loop_ctrl = &Mloop_ctrl;
ret =nn_send(Mloop_ctrl.ab.n, &buf, NN_MSG, NN_DONTWAIT);//NN_DONTWAIT
}else{
loop_ctrl = &Bloop_ctrl;
ret = nn_send(Bloop_ctrl.ba.n, &buf, NN_MSG, NN_DONTWAIT);
}
if (ret < 0){
ERROR_PRINT("nn_send failed with error code %d, str=%s \n", nn_errno(), nn_strerror(nn_errno()));
loop_ctrl->sendErrCnt++;
loop_ctrl->isconnect = 0;
nn_freemsg(buf);
}else{ }else{
nn_send(Bloop_ctrl.ba.n, &buf, NN_MSG, NN_DONTWAIT); loop_ctrl->sendErrCnt = 0;
loop_ctrl->isconnect = 1;
} }
} }
...@@ -421,6 +442,18 @@ int kk_ipc_send_ex(ipc_type type, void* data, int len, char* chalMark) ...@@ -421,6 +442,18 @@ int kk_ipc_send_ex(ipc_type type, void* data, int len, char* chalMark)
} }
int kk_ipc_get_senderrcnt(ipc_type type){
Bloop_ctrl_t* loop_ctrl;
if (IPC_MID2PLAT == type){
loop_ctrl = &Mloop_ctrl;
}else {
loop_ctrl = &Bloop_ctrl;
}
return loop_ctrl->sendErrCnt;
}
int kk_ipc_isconnect(ipc_type type){ int kk_ipc_isconnect(ipc_type type){
Bloop_ctrl_t* loop_ctrl; Bloop_ctrl_t* loop_ctrl;
...@@ -430,9 +463,8 @@ int kk_ipc_isconnect(ipc_type type){ ...@@ -430,9 +463,8 @@ int kk_ipc_isconnect(ipc_type type){
loop_ctrl = &Bloop_ctrl; loop_ctrl = &Bloop_ctrl;
} }
if (loop_ctrl->isconnect == 1){ loop_ctrl->isconnect = 0;
return 1; {
}else{
for(int i =0; i<20;i++){ for(int i =0; i<20;i++){
kk_ipc_send(type, MAGIC, strlen(MAGIC)+1); kk_ipc_send(type, MAGIC, strlen(MAGIC)+1);
......
...@@ -40,7 +40,7 @@ typedef enum { ...@@ -40,7 +40,7 @@ typedef enum {
typedef void ipc_cb(void* data, int len, char* chalMark); typedef void ipc_cb(void* data, int len, char* chalMark);
int kk_ipc_init(ipc_type type, ipc_cb cb, char* chalMark, char* ip); int kk_ipc_init(ipc_type type, ipc_cb cb, char* chalMark, char* ip);
int kk_ipc_dinit(); int kk_ipc_deinit();
int kk_ipc_send(ipc_type type, void* data, int len); int kk_ipc_send(ipc_type type, void* data, int len);
int kk_ipc_send_ex(ipc_type type, void* data, int len, char* chalMark); int kk_ipc_send_ex(ipc_type type, void* data, int len, char* chalMark);
......
...@@ -193,8 +193,8 @@ int kk_is_tcp_channel(char devCode[DEVICE_CODE_LEN]){ ...@@ -193,8 +193,8 @@ int kk_is_tcp_channel(char devCode[DEVICE_CODE_LEN]){
for(;i < MAX_LISTEN_NUM; i++){ for(;i < MAX_LISTEN_NUM; i++){
if(strcmp(devCode, g_tcp_ctrl[i].deviceCode) == 0){ if(strcmp(devCode, g_tcp_ctrl[i].deviceCode) == 0){
printf("[%s] idx=%d ip=%s sock=%d\n", __FUNCTION__,i, devCode, g_tcp_ctrl[i].sock); printf("[%s] idx=%d ip=%s sock=%d\n", __FUNCTION__,i, g_tcp_ctrl[i].ip, g_tcp_ctrl[i].sock);
return i; return g_tcp_ctrl[i].sock;
} }
} }
return -1; return -1;
......
...@@ -28,7 +28,6 @@ typedef enum { ...@@ -28,7 +28,6 @@ typedef enum {
IPC_UNDEF IPC_UNDEF
} ipc_type; } ipc_type;
#define MSG_TYPE_STR "msgtype" #define MSG_TYPE_STR "msgtype"
#define MSG_PRODUCT_TYPE_STR "productType" #define MSG_PRODUCT_TYPE_STR "productType"
#define MSG_PRODUCT_CODE_STR "productCode" #define MSG_PRODUCT_CODE_STR "productCode"
...@@ -36,10 +35,12 @@ typedef enum { ...@@ -36,10 +35,12 @@ typedef enum {
#define MSG_PAYLOAD_STR "payload" #define MSG_PAYLOAD_STR "payload"
#define MSG_INFO_STR "info" #define MSG_INFO_STR "info"
#define MSG_INDENTIFIER_STR "identifier" #define MSG_INDENTIFIER_STR "identifier"
#define MSG_PARAMS_STR "params"
#define MSG_IOTClOUDSTATE_STR "IOTCloudState"
typedef void ipc_cb(void* data, int len, char* chalMark); typedef void ipc_cb(void* data, int len, char* chalMark);
int kk_ipc_init(ipc_type type, ipc_cb cb, char* chalMark, char* ip); int kk_ipc_init(ipc_type type, ipc_cb cb, char* chalMark, char* ip);
int kk_ipc_dinit(); int kk_ipc_deinit();
int kk_ipc_send(ipc_type type, void* data, int len); int kk_ipc_send(ipc_type type, void* data, int len);
int kk_ipc_send_ex(ipc_type type, void* data, int len, char* chalMark); int kk_ipc_send_ex(ipc_type type, void* data, int len, char* chalMark);
......
...@@ -495,6 +495,16 @@ void* _msg_event_property_post(char ip[16], int port){ ...@@ -495,6 +495,16 @@ void* _msg_event_property_post(char ip[16], int port){
} }
int kk_connect_check(){
if(strcmp(GW2CCU_PROTOCOL, "tcp") == 0){
return kk_get_retry_num() > 20;
}else{
if (kk_ipc_get_senderrcnt(IPC_PLAT2MID) > 0){
return kk_ipc_isconnect(IPC_PLAT2MID)==0?1:0;
}
return 0;
}
}
void ipcHandle(void) void ipcHandle(void)
{ {
...@@ -554,6 +564,29 @@ void ipcHandle(void) ...@@ -554,6 +564,29 @@ void ipcHandle(void)
free(postmsg); free(postmsg);
} }
if (kk_connect_check()){
//discover ccu
search_ccu(deviceCode, ip, &port);
if(strcmp(GW2CCU_PROTOCOL, "tcp") == 0){
kk_tcp_client_init(ip, port, _cb);
}else{
kk_ipc_deinit(IPC_PLAT2MID);
kk_ipc_init(IPC_PLAT2MID, _cb, macString/*GW_DEVICE_CODE*/, ip);
}
//send add gw to ccu
outbuf = _msg_topo_add();
if (outbuf == NULL){
printf("[%s] topo add msg failed, exit\n",__FUNCTION__);
return;
}
kk_sendData2CCU(outbuf, strlen(outbuf));
free(outbuf);
cnt = 0;
kk_reset_retry_num();
}
} }
//jrpc_server_run(&my_server); //jrpc_server_run(&my_server);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment