Commit 09bfaf87 authored by 黄振令's avatar 黄振令

【修改内容】1. midwareu与platform(或网关)交换通过nanomsg的pipe(plat或gw ->...

【修改内容】1. midwareu与platform(或网关)交换通过nanomsg的pipe(plat或gw -> midware(ccu))和subpub(midware(ccu) -> plat或gw)的方式交互;2. platform或gw初始化nanomsg是需要传递唯一标识作为通道标志,建议用gw的mac地址;3. application与midware用nanomsg的pair协议
【提交人】huang.zhenling
parent d59b0f9c
......@@ -68,7 +68,7 @@ int main(int argc, char* argv[])
/*set the callback to get the device date to cloud*/
HAL_SetProduct_Type(PRODUCT_TPYE);
HAL_SetProduct_Code(PRODUCT_CODE);
kk_ipc_init(IPC_APP2MID,KK_Data_FromDev);
kk_ipc_init(IPC_APP2MID,KK_Data_FromDev,NULL,NULL);
rc = mqtt_start();
return rc;
......
//=========kk=============
#include "com_api.h"
#define APP2MID "ipc:///tmp/app2mid.ipc"
#define PLAT2MID "ipc:///tmp/plat2mid.ipc"
#define GW2CCU_PIPE "tcp://%s:5555"
#define GW2CCU_PUBSUB "tcp://%s:5557"
#define MAGIC "magic12"
#define FILTERSTR "|"
#ifndef _ZLOG_
#undef INFO_PRINT
#undef WARNING_PRINT
#undef ERROR_PRINT
#define INFO_PRINT printf
#define WARNING_PRINT printf
#define ERROR_PRINT printf
#else
#include "kk_log.h"
#endif
#define APP2MID "ipc:///tmp/app2mid.ipc"
#define PLAT2MID "ipc:///tmp/plat2mid.ipc"
#define MAGIC "magic12"
typedef struct {
......@@ -15,12 +30,15 @@ typedef struct {
typedef struct {
nanomsg_info_t ba;
nanomsg_info_t ab;
char subscrStr[20];
struct ev_io watcher;
ipc_cb* cb;
ipc_type type;
int isconnect;
}Bloop_ctrl_t;
Bloop_ctrl_t Bloop_ctrl;
Bloop_ctrl_t Mloop_ctrl;
......@@ -28,32 +46,68 @@ struct ev_loop* gloop = NULL;
pthread_t g_pTh = NULL;
static char* _parse_data_by_subscribe(char* data, int len, int *outLen, void** chlMark){
char* foundStr = NULL;
if (data == NULL|| len <= 0 ){
return data;
}
if ( (foundStr= strstr(data, FILTERSTR)) != NULL){
int ind = foundStr - data;
data[ind] = 0;
*chlMark = data;
*outLen = (len - (foundStr - data) -1);
return (foundStr + 1);
}
*outLen = len;
return data;
}
static void watcher_cb (struct ev_loop *loop ,struct ev_io *w, int revents)
{
INFO_PRINT("watcher_cb !! ");
void *user_data = ev_userdata(loop);
INFO_PRINT("watcher_cb !! \n");
//void *user_data = ev_userdata(loop);
Bloop_ctrl_t *loop_ctrl = (Bloop_ctrl_t *)w->data;
uint8_t *dat = NULL;
uint32_t bytes = nn_recv(loop_ctrl->ba.n, &dat, NN_MSG, NN_DONTWAIT);
if (bytes <= 0) {
uint8_t *validDat = NULL;
uint8_t *chlMark = NULL;
uint32_t bytes =0;
uint32_t validLen =0;
if (loop_ctrl->type == IPC_PLAT2MID){
bytes = nn_recv(loop_ctrl->ab.n, &dat, NN_MSG, NN_DONTWAIT);
}else{
bytes = nn_recv(loop_ctrl->ba.n, &dat, NN_MSG, NN_DONTWAIT);
}
if (bytes <= 0 || dat == NULL) {
ERROR_PRINT(" recived data is null or len is 0 \n");
return;
}
INFO_PRINT("watcher_cb:%s recived\r\n\r\n", (char *)dat);
//if sub, need filter sbuscribe str
if (IPC_PLAT2MID == loop_ctrl->type || IPC_MID2PLAT == loop_ctrl->type){
validDat = _parse_data_by_subscribe(dat, bytes, &validLen, &chlMark);
}else{
validDat = dat;
validLen = bytes;
}
//for test ipc connect or not
if (loop_ctrl->isconnect == 0 ){
loop_ctrl->isconnect =1;
if (strncmp(dat,MAGIC, strlen(MAGIC)) == 0){
kk_ipc_send(loop_ctrl->type, dat, bytes);
if (strncmp(validDat,MAGIC, strlen(MAGIC)) == 0){
kk_ipc_send(loop_ctrl->type, validDat, bytes);
nn_freemsg(dat);
return;
}
}
if (loop_ctrl->cb != NULL){
loop_ctrl->cb((void *)dat, bytes);
loop_ctrl->cb((void *)validDat, validLen, chlMark);
}
nn_freemsg(dat);
}
......@@ -63,40 +117,119 @@ void __loop_init(Bloop_ctrl_t *loop_ctrl, struct ev_loop* loop)
{
loop_ctrl->watcher.data = loop_ctrl;
ev_io_init (&(loop_ctrl->watcher), watcher_cb, loop_ctrl->ba.s, EV_READ);
if (loop_ctrl->type == IPC_PLAT2MID){
ev_io_init (&(loop_ctrl->watcher), watcher_cb, loop_ctrl->ab.s, EV_READ);
}else{
ev_io_init (&(loop_ctrl->watcher), watcher_cb, loop_ctrl->ba.s, EV_READ);
}
ev_io_start (loop, &(loop_ctrl->watcher));
}
int __nanomsg_init(Bloop_ctrl_t *loop_ctrl, ipc_type type)
int __nanomsg_init(Bloop_ctrl_t *loop_ctrl, ipc_type type,char* chlMark, char* ip)
{
loop_ctrl->ba.n = nn_socket(AF_SP, NN_PAIR);
INFO_PRINT("__nanomsg_init loop_ctrl->ba.n=%d \r\n",loop_ctrl->ba.n);
if (loop_ctrl->ba.n < 0) {
return -1;
}
loop_ctrl->ba.n = -1;
loop_ctrl->ba.s = -1;
loop_ctrl->ab.n = -1;
loop_ctrl->ab.s = -1;
memset(loop_ctrl->subscrStr, 0, sizeof(loop_ctrl->subscrStr));
if (chlMark != NULL){
if (strlen(chlMark) > sizeof(loop_ctrl->subscrStr) +1){
ERROR_PRINT("__nanomsg_init chlMark is too long, need less than %d \r\n",
sizeof(loop_ctrl->subscrStr));
return -1;
}
memcpy(loop_ctrl->subscrStr, chlMark, strlen(chlMark));
}
switch (type) {
case IPC_APP2MID:{
loop_ctrl->ba.n = nn_socket(AF_SP, NN_PAIR);
if (loop_ctrl->ba.n < 0) {
ERROR_PRINT("__nanomsg_init loop_ctrl->ba.n=%d \r\n",loop_ctrl->ba.n);
return -1;
}
if (nn_connect(loop_ctrl->ba.n, APP2MID) < 0) {
return -1;
}
}
break;
case IPC_PLAT2MID: {
if (nn_connect(loop_ctrl->ba.n, PLAT2MID) < 0) {
case IPC_MID2APP: {
loop_ctrl->ba.n = nn_socket(AF_SP, NN_PAIR);
if (loop_ctrl->ba.n < 0) {
ERROR_PRINT("__nanomsg_init loop_ctrl->ba.n=%d \r\n",loop_ctrl->ba.n);
return -1;
}
if (nn_bind(loop_ctrl->ba.n, APP2MID) < 0) {
return -1;
}
}
break;
case IPC_MID2APP: {
if (nn_bind(loop_ctrl->ba.n, APP2MID) < 0) {
case IPC_PLAT2MID: {
//创建2个通道 pipe和pub/sub
char addr[30] = {0};
loop_ctrl->ba.n = nn_socket(AF_SP, NN_PUSH);
if (loop_ctrl->ba.n < 0) {
ERROR_PRINT("loop_ctrl->ba.n =%d \r\n",loop_ctrl->ba.n);
return -1;
}
loop_ctrl->ab.n = nn_socket(AF_SP, NN_SUB);
if (loop_ctrl->ab.n < 0) {
ERROR_PRINT("loop_ctrl->ab.n =%d \r\n",loop_ctrl->ab.n);
return -1;
}
sprintf(addr, GW2CCU_PIPE, ip);
if (nn_connect(loop_ctrl->ba.n, addr) < 0) {//pipe
return -1;
}
//订阅
memset(addr,0, sizeof(addr));
memcpy(addr, loop_ctrl->subscrStr, strlen(loop_ctrl->subscrStr));
memcpy(addr + strlen(loop_ctrl->subscrStr), FILTERSTR, strlen(FILTERSTR));
if (nn_setsockopt(loop_ctrl->ab.n, NN_SUB, NN_SUB_SUBSCRIBE, addr, strlen(addr)) < 0) {
ERROR_PRINT("nn_setsockopt failed ");
return -1;
}
memset(addr,0, sizeof(addr));
sprintf(addr, GW2CCU_PUBSUB, ip);
if (nn_connect(loop_ctrl->ab.n, addr) < 0) {//sub
return -1;
}
}
break;
case IPC_MID2PLAT: {
if (nn_bind(loop_ctrl->ba.n, PLAT2MID) < 0) {
//创建2个通道 pipe和pub/sub
char addr[30] = {0};
loop_ctrl->ba.n = nn_socket(AF_SP, NN_PULL);
if (loop_ctrl->ba.n < 0) {
ERROR_PRINT("__nanomsg_init loop_ctrl->ba.n =%d \r\n",loop_ctrl->ba.n);
return -1;
}
loop_ctrl->ab.n = nn_socket(AF_SP, NN_PUB);
if (loop_ctrl->ab.n < 0) {
ERROR_PRINT("__nanomsg_init loop_ctrl->ab.n =%d \r\n",loop_ctrl->ab.n);
return -1;
}
sprintf(addr, GW2CCU_PIPE, ip);
if (nn_bind(loop_ctrl->ba.n, addr) < 0) {//pipe
return -1;
}
memset(addr,0, sizeof(addr));
sprintf(addr, GW2CCU_PUBSUB, ip);
if (nn_bind(loop_ctrl->ab.n, addr) < 0) {//pub
return -1;
}
}
......@@ -107,9 +240,18 @@ int __nanomsg_init(Bloop_ctrl_t *loop_ctrl, ipc_type type)
}
size_t size = sizeof(size_t);
if (nn_getsockopt(loop_ctrl->ba.n, NN_SOL_SOCKET, NN_RCVFD, (char *)&loop_ctrl->ba.s, &size) < 0) {
return -1;
if (IPC_PLAT2MID == type){
if (nn_getsockopt(loop_ctrl->ab.n, NN_SOL_SOCKET, NN_RCVFD, (char *)&loop_ctrl->ab.s, &size) < 0) {
ERROR_PRINT("nn_getsockopt IPC_PLAT2MID loop_ctrl->ab.s = %d \n", loop_ctrl->ab.s);
return -1;
}
}else{
if (nn_getsockopt(loop_ctrl->ba.n, NN_SOL_SOCKET, NN_RCVFD, (char *)&loop_ctrl->ba.s, &size) < 0) {
ERROR_PRINT("nn_getsockopt IPC_PLAT2MID loop_ctrl->ba.s = %d \n", loop_ctrl->ba.s);
return -1;
}
}
return 0;
}
......@@ -125,7 +267,15 @@ void loop_thread(void *arg){
}
int kk_ipc_init(ipc_type type, ipc_cb cb)
/*=================================
* for gateway, the "id" and "ip" is necessary
* chlMark: is unique, and suggest use gateway mac
* ip: ccu ip
* for the ccu, id and ip are null
*
*
==================================*/
int kk_ipc_init(ipc_type type, ipc_cb cb, char* chlMark, char* ip)
{
Bloop_ctrl_t* loop_ctrl;
......@@ -135,13 +285,18 @@ int kk_ipc_init(ipc_type type, ipc_cb cb)
loop_ctrl = &Bloop_ctrl;
}
if (type == IPC_PLAT2MID && (chlMark == NULL || ip == NULL)){
ERROR_PRINT("parameter is error \r\n");
return -1;
}
if(loop_ctrl->cb != NULL){
WARNING_PRINT("middleware to platform ipc has been inited!\r\n");
return -1;
}
if (__nanomsg_init(loop_ctrl, type) < 0) {
if (__nanomsg_init(loop_ctrl, type, chlMark, ip) < 0) {
ERROR_PRINT("nanomsg init failed\r\n");
return -1;
}
......@@ -155,6 +310,8 @@ int kk_ipc_init(ipc_type type, ipc_cb cb)
}
}
loop_ctrl->type = type;
__loop_init(loop_ctrl, gloop);
if (g_pTh ==NULL && 0 != pthread_create(&g_pTh, NULL, loop_thread, NULL)) {
......@@ -183,6 +340,9 @@ int kk_ipc_dinit(ipc_type type)
if (loop_ctrl->ba.n > -1){
nn_shutdown(loop_ctrl->ba.n, 0);
}
if (loop_ctrl->ab.n > -1){
nn_shutdown(loop_ctrl->ab.n, 0);
}
ev_io_stop(gloop, &loop_ctrl->watcher);
loop_ctrl->cb = NULL;
......@@ -202,12 +362,45 @@ int kk_ipc_dinit(ipc_type type)
int kk_ipc_send(ipc_type type, void* data, int len)
{
if (data != NULL){
char *chalMark = NULL;
if (type == IPC_MID2PLAT){
ERROR_PRINT(" type=IPC_MID2PLAT, please use kk_ipc_send_ex() api");
return -1;
}else if(type == IPC_PLAT2MID){
if (strlen(Bloop_ctrl.subscrStr) > 0){
chalMark = Bloop_ctrl.subscrStr;
}
}
return kk_ipc_send_ex(type, data, len, chalMark);
}
void* buf = nn_allocmsg(len, 0);
memcpy(buf, data, len);
int kk_ipc_send_ex(ipc_type type, void* data, int len, char* chalMark)
{
if (data != NULL){
int filterlen = 0;
void* buf = NULL;
if (chalMark != NULL){
filterlen = strlen(chalMark) + strlen(FILTERSTR);
}
buf = nn_allocmsg(len+filterlen, 0);
if (buf == NULL){
ERROR_PRINT("nn_allocmsg failed");
return -1;
}
if (filterlen > 0){
memcpy(buf, chalMark, strlen(chalMark));
memcpy(buf + strlen(chalMark), FILTERSTR, strlen(FILTERSTR));
}
memcpy(buf + filterlen, data, len);
if (type == IPC_MID2PLAT){
nn_send(Mloop_ctrl.ba.n, &buf, NN_MSG, NN_DONTWAIT);
nn_send(Mloop_ctrl.ab.n, &buf, NN_MSG, NN_DONTWAIT);
}else{
nn_send(Bloop_ctrl.ba.n, &buf, NN_MSG, NN_DONTWAIT);
}
......
......@@ -15,7 +15,8 @@ extern "C" {
#include "ev.h"
#include "nn.h"
#include "pair.h"
#include "pubsub.h"
#include "pipeline.h"
//=====kk======================
......@@ -62,10 +63,11 @@ typedef enum{
#define MSG_INFO_STR "info"
#define MSG_INDENTIFIER_STR "identifier"
typedef void ipc_cb(void* data, int len);
int kk_ipc_init(ipc_type type, ipc_cb cb);
typedef void ipc_cb(void* data, int len, char* chalMark);
int kk_ipc_init(ipc_type type, ipc_cb cb, char* chalMark, char* ip);
int kk_ipc_dinit();
int kk_ipc_send(ipc_type type, void* data, int len);
int kk_ipc_send_ex(ipc_type type, void* data, int len, char* chalMark);
#if defined(__cplusplus)
}
......
LIBSO_TARGET := libapi_com.so
LIBA_TARGET := libapi_com.a
CFLAGS += -I$(TOP_DIR)/common/nanomsg/include
CFLAGS += -I$(TOP_DIR)/common/ev/include
......
LIBSO_TARGET := libzlog.so
LIBA_TARGET := libzlog.a
#include "kk_log.h"
zlog_category_t *g_zlogC;
int kk_zlog_init(char* module)
{
......@@ -8,7 +7,7 @@ int kk_zlog_init(char* module)
rc = zlog_init("zlog.conf");
if (rc) {
printf("init failed\n");
printf(" kk_zlog_init init failed\n");
return -1;
}
......@@ -16,7 +15,7 @@ int kk_zlog_init(char* module)
g_zlogC = zlog_get_category(module);
if (!g_zlogC) {
printf("get cat fail\n");
printf("kk_zlog_init get cat fail\n");
zlog_fini();
return -2;
}
......
......@@ -11,7 +11,8 @@
#include "klist.h"
typedef struct {
iotx_dm_event_types_t type;
//iotx_dm_event_types_t type;
char* chalMark[30];
char *data;
} dm_queue_msg_t;
......
......@@ -74,7 +74,7 @@ void mid_cb(void* data, int len){
cJSON_AddStringToObject(jsonplay, "mac", mac);
void* out = cJSON_Print(jsonplay);
printf("dm_mgr_search_mac_by_topic out: %s \r\n", out);
kk_ipc_send(IPC_MID2PLAT, out, strlen(out));
kk_ipc_send_ex(IPC_MID2PLAT, out, strlen(out), "1122334455667788");
free(out);
cJSON_Delete(jsonplay);
cJSON_Delete(json);
......@@ -84,13 +84,32 @@ void mid_cb(void* data, int len){
}
}
void mid2p_cb(void* data, int len){
void mid2p_cb(void* data, int len, char* chalMark){
if (data != NULL){
//printf("mid2plat_cb: %s RECEIVED \r\n", data);
void* buf = malloc(len);
int res = 0;
void* buf = NULL;
dm_queue_msg_t *queue_msg = NULL;
queue_msg = malloc(sizeof(dm_queue_msg_t));
if (queue_msg == NULL){
ERROR_PRINT("mid2p_cb malloc queue_msg failed ");
return;
}
buf = malloc(len);
if (buf == NULL){
ERROR_PRINT("mid2p_cb malloc buf failed ");
return;
}
memcpy(buf, data, len);
int res = dm_queue_msg_insert2((void *)buf);
queue_msg.data = buf;
memset(queue_msg.chalMark, 0, sizeof(queue_msg.chalMark));
if(chalMark != NULL){
memcpy(queue_msg.chalMark, chalMark, sizeof(chalMark));
}
res = dm_queue_msg_insert2((void *)queue_msg);
if (res != SUCCESS_RETURN) {
free(queue_msg);
free(buf);
return ;
}
......@@ -99,7 +118,7 @@ void mid2p_cb(void* data, int len){
}
}
void kk_platMsg_handle(void* data){
void kk_platMsg_handle(void* data, char* chalMark){
char *out;
int res = 0;
......@@ -149,13 +168,15 @@ void kk_platMsg_dispatch(void)
while (CONFIG_DISPATCH_QUEUE_MAXLEN == 0 || count++ < CONFIG_DISPATCH_QUEUE_MAXLEN) {
if (dm_queue_msg_next2(&data) == SUCCESS_RETURN) {
//dm_queue_msg_t *msg = (dm_queue_msg_t *)data;
dm_queue_msg_t *msg = (dm_queue_msg_t *)data;
INFO_PRINT("kk_handle_platMsg_dispatch get call \n");
if (kk_platMsg_handle) {
kk_platMsg_handle( data);
kk_platMsg_handle(msg->data,msg->chalMark);
}
if (msg->data != NULL){
free(msg->data);
}
free(data);
data = NULL;
} else {
......@@ -239,8 +260,8 @@ int main(const int argc, const char **argv)
kk_set_product_info();
kk_tsl_api_init();
kk_ipc_init(IPC_MID2APP, mid_cb);
kk_ipc_init(IPC_MID2PLAT, mid2p_cb);
kk_ipc_init(IPC_MID2APP, mid_cb, NULL, NULL);
kk_ipc_init(IPC_MID2PLAT, mid2p_cb, NULL, "*");
......
......@@ -15,7 +15,8 @@ extern "C" {
#include "ev.h"
#include "nn.h"
#include "pair.h"
#include "pubsub.h"
#include "pipeline.h"
//=====kk======================
......@@ -26,15 +27,48 @@ typedef enum {
IPC_PLAT2MID,
IPC_UNDEF
} ipc_type;
typedef enum{
/******MIDDWARE TO APP**************/
MSG_REGISTER = 0,
MSG_UNREGISTER,
MSG_TOPOADD,
MSG_TOPODELETE,
MSG_TOPOGET,
MSG_LISTFOUND,
MSG_LOGIN,
MSG_LOGOUT,
MSG_PROPERTYPOST,
MSG_EVENTPOST,
MSG_SERVICERESPONSE,
MSG_SETREPLY,
MSG_OTA_PROCESS,
MSG_OTA_INFORM,
/*******APP TO MIDDWARE**************/
MSG_REGISTER_REPLY,
MSG_TOPOADD_REPLY,
MSG_OFFLINE_REPLY,
MSG_LOGIN_REPLY,
MSG_PROPERTYSET,
MSG_OTA_UPGRADE,
MSG_INVALID,
}kk_msg_type_t;
#define MSG_TYPE_STR "msgtype"
#define MSG_PRODUCT_TYPE_STR "product_type"
#define MSG_DEVICE_NAME_STR "device_name"
#define MSG_PAYLOAD_STR "payload"
#define MSG_INFO_STR "info"
#define MSG_INDENTIFIER_STR "identifier"
typedef void ipc_cb(void* data, int len);
int kk_ipc_init(ipc_type type, ipc_cb cb);
typedef void ipc_cb(void* data, int len, char* chlMark);
int kk_ipc_init(ipc_type type, ipc_cb cb, char* mac, char* ip);
int kk_ipc_dinit();
int kk_ipc_send(ipc_type type, void* data, int len);
int kk_ipc_send_ex(ipc_type type, void* data, int len, char* filter);
#if defined(__cplusplus)
}
#endif
#endif
......@@ -21,6 +21,8 @@
#include "com_api.h"
#include "kk_test.h"
//#include "kk_log.h"
static struct jrpc_server my_server;
......@@ -219,6 +221,7 @@ void _cb(void* data){
int _init_param(struct jrpc_server *server) {
memset(server, 0, sizeof(struct jrpc_server));
//kk_zlog_init("paltform");
printf("getenv\r\n");
char * debug_level_env = getenv("HOME");
......@@ -237,7 +240,7 @@ void ipcHandle(void)
{
emberAfAppPrint( "Thread rpc Interface Parse create\n" );
_init_param(&my_server);
kk_ipc_init(IPC_PLAT2MID, _cb);
kk_ipc_init(IPC_PLAT2MID, _cb, "1122334455667788","127.0.0.1");
emberAfAppPrint("sizeof(rpc_table)=%d,sizeof(rpc_table_s)=%d,%d\n",sizeof(rpc_table),sizeof(rpc_table_s),sizeof(rpc_table)/sizeof(rpc_table_s));
for(int i=0;i<sizeof(rpc_table)/sizeof(rpc_table_s);i++){
emberAfAppPrint("i=%d,%s\r\n",i,rpc_table[i].name);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment