Commit b6f4de61 authored by 黄振令's avatar 黄振令

【修改内容】1. 修改nanomsg(使用pipe和subpub组合使用)支持多个网关连接;2. 主机和网关连接协议(tcp广播)

parent 26b97c85
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
extern zlog_category_t *g_zlogC; extern zlog_category_t *g_zlogC;
#define DEBUG_PRINT(info,...) zlog_debug(g_zlogC, info, ##__VA_ARGS__)
#define INFO_PRINT(info,...) zlog_info(g_zlogC, info, ##__VA_ARGS__) #define INFO_PRINT(info,...) zlog_info(g_zlogC, info, ##__VA_ARGS__)
#define ERROR_PRINT(info,...) zlog_error(g_zlogC, info, ##__VA_ARGS__) #define ERROR_PRINT(info,...) zlog_error(g_zlogC, info, ##__VA_ARGS__)
#define WARNING_PRINT(info,...) zlog_warn(g_zlogC, info, ##__VA_ARGS__) #define WARNING_PRINT(info,...) zlog_warn(g_zlogC, info, ##__VA_ARGS__)
......
...@@ -390,10 +390,13 @@ int dm_mgr_init(void) ...@@ -390,10 +390,13 @@ int dm_mgr_init(void)
_dm_init_tsl_params(devId); _dm_init_tsl_params(devId);
} }
res = dm_mgr_device_create(KK_DM_DEVICE_GATEWAY,KK_DM_GW_DEVICE_PRODUCT_TYPE,KK_DM_GW_DEVICE_PRODUCT_CODE,"588E81FFFED3834A",deviceCode,&devId); //res = dm_mgr_device_create(KK_DM_DEVICE_GATEWAY,KK_DM_GW_DEVICE_PRODUCT_TYPE,KK_DM_GW_DEVICE_PRODUCT_CODE,"1122334455667788",deviceCode,&devId);
if (res != SUCCESS_RETURN) { //if (res != SUCCESS_RETURN) {
goto ERROR; // goto ERROR;
} //}
//sleep(3);
//kk_dm_subdev_topo_add(devId);
return SUCCESS_RETURN; return SUCCESS_RETURN;
ERROR: ERROR:
...@@ -722,10 +725,13 @@ int dm_mgr_upstream_thing_topo_add(_IN_ int devid) ...@@ -722,10 +725,13 @@ int dm_mgr_upstream_thing_topo_add(_IN_ int devid)
} }
res = dm_mgr_search_dev_by_devid(devid, &node); res = dm_mgr_search_dev_by_devid(devid, &node);
if (res != SUCCESS_RETURN) { if (res != SUCCESS_RETURN) {
ERROR_PRINT("ERROR res:%d\n",res);
return FAIL_RETURN; return FAIL_RETURN;
} }
res = dm_mgr_get_device_by_mac(node->fatherMac,&gw_node); res = dm_mgr_get_device_by_mac(node->fatherMac,&gw_node);
if (res != SUCCESS_RETURN) { if (res != SUCCESS_RETURN) {
ERROR_PRINT("ERROR [%s][%d] res:%d\n",__FUNCTION__,__LINE__,res); ERROR_PRINT("ERROR [%s][%d] res:%d\n",__FUNCTION__,__LINE__,res);
return FAIL_RETURN; return FAIL_RETURN;
...@@ -1102,3 +1108,16 @@ int dm_mgr_subdev_create(_IN_ char productType[PRODUCT_TYPE_MAXLEN], ...@@ -1102,3 +1108,16 @@ int dm_mgr_subdev_create(_IN_ char productType[PRODUCT_TYPE_MAXLEN],
} }
return res; return res;
} }
int dm_mgr_gw_create(_IN_ char productType[PRODUCT_TYPE_MAXLEN],
_IN_ char productCode[PRODUCT_CODE_MAXLEN], _IN_ char deviceCode[DEVICE_CODE_MAXLEN],_IN_ char fatherMac[DEVICE_MAC_MAXLEN], _OU_ int *devid){
int res = 0;
res = dm_mgr_device_create(KK_DM_DEVICE_GATEWAY,productType,productCode,deviceCode,fatherMac, devid);
if(TSL_ALREADY_EXIST == res)
{
ERROR_PRINT("GATEWAY ALREADY EXIST!!!\n");
}
return res;
}
This diff is collapsed.
...@@ -56,17 +56,18 @@ typedef enum{ ...@@ -56,17 +56,18 @@ typedef enum{
}kk_msg_type_t; }kk_msg_type_t;
#define MSG_TYPE_STR "msgtype" #define MSG_TYPE_STR "msgtype"
#define MSG_PRODUCT_TYPE_STR "product_type" #define MSG_PRODUCT_TYPE_STR "productType"
#define MSG_DEVICE_NAME_STR "device_name" #define MSG_PRODUCT_CODE_STR "productCode"
#define MSG_DEVICE_CODE_STR "deviceCode"
#define MSG_PAYLOAD_STR "payload" #define MSG_PAYLOAD_STR "payload"
#define MSG_INFO_STR "info" #define MSG_INFO_STR "info"
#define MSG_INDENTIFIER_STR "identifier" #define MSG_INDENTIFIER_STR "identifier"
typedef void ipc_cb(void* data, int len, char* chlMark); typedef void ipc_cb(void* data, int len, char* chalMark);
int kk_ipc_init(ipc_type type, ipc_cb cb, char* mac, char* ip); int kk_ipc_init(ipc_type type, ipc_cb cb, char* chalMark, char* ip);
int kk_ipc_dinit(); int kk_ipc_dinit();
int kk_ipc_send(ipc_type type, void* data, int len); int kk_ipc_send(ipc_type type, void* data, int len);
int kk_ipc_send_ex(ipc_type type, void* data, int len, char* filter); int kk_ipc_send_ex(ipc_type type, void* data, int len, char* chalMark);
#if defined(__cplusplus) #if defined(__cplusplus)
} }
......
...@@ -11,6 +11,7 @@ ...@@ -11,6 +11,7 @@
#include <sys/wait.h> #include <sys/wait.h>
#include <signal.h> #include <signal.h>
#include <time.h> #include <time.h>
#include <fcntl.h>
#include "RPC_API.h" #include "RPC_API.h"
#include "./rpc_api/inc/rpc_interface_parse.h" #include "./rpc_api/inc/rpc_interface_parse.h"
...@@ -236,11 +237,133 @@ int _init_param(struct jrpc_server *server) { ...@@ -236,11 +237,133 @@ int _init_param(struct jrpc_server *server) {
return 0; return 0;
} }
int addGW_and_getIP(char* ip){
//char rgMessage[128] = "search_kk_gw|null";
char revMessage[128] = {0};
int sock;
int sk_recv;
int iSendbytes;
int iOptval = 1;
int flag;
int iAddrLength;
int recvLen = 0;
struct sockaddr_in Addrto;
struct sockaddr_in AddrRev;
if ((sock = socket(AF_INET, SOCK_DGRAM, 0)) == -1)
{
printf("addGW_and_getIP socket fail\n");
return -1;
}
if ((sk_recv = socket(AF_INET, SOCK_DGRAM, 0)) == -1)
{
printf("addGW_and_getIP socket sk_recv fail\n");
close(sock);
return -1;
}
if (setsockopt(sock, SOL_SOCKET, SO_BROADCAST | SO_REUSEADDR, &iOptval, sizeof(int)) < 0)
{
printf("addGW_and_getIP setsockopt failed!");
close(sock);
close(sk_recv);
return -1;
}
if (setsockopt(sk_recv, SOL_SOCKET, SO_REUSEADDR, &iOptval, sizeof(int)) < 0)
{
printf("addGW_and_getIP setsockopt failed!");
close(sock);
close(sk_recv);
return -1;
}
flag = fcntl(sk_recv, F_GETFL, 0);
if (flag < 0)
{
printf("addGW_and_getIP fcntl failed.\n");
close(sock);
close(sk_recv);
return -1;;
}
flag |= O_NONBLOCK;
if (fcntl(sk_recv, F_SETFL, flag) < 0)
{
printf("addGW_and_getIP fcntl failed.\n");
close(sock);
close(sk_recv);
return -1;
}
memset(&Addrto, 0, sizeof(struct sockaddr_in));
Addrto.sin_family = AF_INET;
Addrto.sin_addr.s_addr = inet_addr("255.255.255.255");
Addrto.sin_port = htons(25556);
memset(&AddrRev, 0, sizeof(struct sockaddr_in));
AddrRev.sin_family = AF_INET;
AddrRev.sin_addr.s_addr = INADDR_ANY;
AddrRev.sin_port = htons(25555);
iAddrLength = sizeof(struct sockaddr);
if (bind(sk_recv, (struct sockaddr *)&AddrRev, sizeof(AddrRev)) == -1)
{
printf("addGW_and_getIP bind failed!\n");
close(sock);
close(sk_recv);
return -1;
}
printf("addGW_and_getIP allan ==============\n");
char info[] = "{\"msgtype\":\"/thing/topo/add\",\"productType\":\"gw\",\"productCode\":\"2\",\"deviceCode\":\"1122334455667788\"}";
char payload[] = "{\"msgId\":\"1\",\"version\":\"1.0\",\"params\":{\"deviceCode\":\"1122334455667788\",\"productCode\":\"2\",\"mac\":\"1122334455667788\"}}";
cJSON* root = cJSON_CreateObject();
cJSON* infoObj = cJSON_Parse(info);
cJSON* payloadObj = cJSON_Parse(payload);
cJSON_AddItemToObject(root, "info", infoObj);
cJSON_AddItemToObject(root, "payload",payloadObj);
char* outbuf = cJSON_Print(root);
cJSON_Delete(root);
printf("addGW_and_getIP allan ===========111111===\n");
while (1)
{
if ((iSendbytes = sendto(sock, outbuf, strlen(outbuf)+1, 0, (struct sockaddr*)&Addrto, sizeof(struct sockaddr))) == -1)
{
printf("addGW_and_getIP sendto fail, errno=%d\n", errno);
close(sock);
close(sk_recv);
free(outbuf);
return -1;
}
sleep(1);
recvLen = recvfrom(sk_recv, revMessage, sizeof(revMessage), 0, (struct sockaddr *)&AddrRev, &iAddrLength);
if (recvLen > 0){
printf("addGW_and_getIP recv:%s\n", revMessage);
if (strstr(revMessage, "/thing/topo/add_reply") != NULL){
memcpy(ip, inet_ntoa(AddrRev.sin_addr), strlen(inet_ntoa(AddrRev.sin_addr)));
printf(" recv ip:%s\n", ip);
break;
}
}
}
close(sock);
close(sk_recv);
free(outbuf);
return 0;
}
void ipcHandle(void) void ipcHandle(void)
{ {
char ip[16] = {0};
emberAfAppPrint( "Thread rpc Interface Parse create\n" ); emberAfAppPrint( "Thread rpc Interface Parse create\n" );
addGW_and_getIP(ip);
_init_param(&my_server); _init_param(&my_server);
kk_ipc_init(IPC_PLAT2MID, _cb, "1122334455667788","127.0.0.1"); kk_ipc_init(IPC_PLAT2MID, _cb, "1122334455667788",ip);
emberAfAppPrint("sizeof(rpc_table)=%d,sizeof(rpc_table_s)=%d,%d\n",sizeof(rpc_table),sizeof(rpc_table_s),sizeof(rpc_table)/sizeof(rpc_table_s)); emberAfAppPrint("sizeof(rpc_table)=%d,sizeof(rpc_table_s)=%d,%d\n",sizeof(rpc_table),sizeof(rpc_table_s),sizeof(rpc_table)/sizeof(rpc_table_s));
for(int i=0;i<sizeof(rpc_table)/sizeof(rpc_table_s);i++){ for(int i=0;i<sizeof(rpc_table)/sizeof(rpc_table_s);i++){
emberAfAppPrint("i=%d,%s\r\n",i,rpc_table[i].name); emberAfAppPrint("i=%d,%s\r\n",i,rpc_table[i].name);
......
...@@ -30,7 +30,7 @@ normal = "%d %-6V (%f:%L) %m%n" ...@@ -30,7 +30,7 @@ normal = "%d %-6V (%f:%L) %m%n"
[rules] [rules]
*.INFO >stdout; normal *.DEBUG >stdout; normal
*.* "/home/kk/share/%c.log", 1MB*2; normal *.* "/home/kk/share/%c.log", 1MB*2; normal
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment