Commit a04fc3d1 authored by chen.weican's avatar chen.weican

【修改内容】增加子设备心跳检测机制

【提交人】陈伟灿
parent 0d796981
......@@ -20,6 +20,8 @@
#define KK_FILTER_EVENT_POST_REPLY "/thing/event/property/post_reply"
#define KK_FILTER_STATUS_ONLINE "/thing/status/online"
#define KK_FILTER_STATUS_ONLINE_REPLY "/thing/status/online_reply"
#define KK_FILTER_STATUS_OFFLINE "/thing/status/offline"
#define KK_FILTER_STATUS_OFFLINE_REPLY "/thing/status/offline_reply"
#define KK_CLOUDSTATE_MSG "/thing/ccu/cloudstate"
......@@ -92,7 +94,7 @@ static int _check_invalid_topic(const char* topic)
strstr(topic,KK_FILTER_LOGIN_TOPIC_REPLY) == NULL){
return 1;
}
else if(strstr(topic, KK_FILTER_SET_TOPIC_REPLY) != NULL){
else if(strstr(topic, KK_FILTER_SET_TOPIC_REPLY) != NULL){
return 1;
}
else if(strstr(topic, KK_FILTER_EVENT_POST_TOPIC) != NULL && \
......@@ -103,6 +105,10 @@ static int _check_invalid_topic(const char* topic)
strstr(topic,KK_FILTER_STATUS_ONLINE_REPLY) == NULL){
return 1;
}
else if(strstr(topic, KK_FILTER_STATUS_OFFLINE) != NULL && \
strstr(topic,KK_FILTER_STATUS_OFFLINE_REPLY) == NULL){
return 1;
}
return 0;
}
......
#include <string.h>
#include <stdlib.h>
#include "klist.h"
#include "kk_log.h"
#include "kk_tsl_common.h"
#include "kk_dm_mng.h"
typedef struct {
void *mutex;
void *hearbeat_thread;
int hearbeat_thread_running;
//struct list_head dev_list;
} kk_heartbeat_ctx_t;
#if 0
typedef struct {
unsigned int timerstamp;
char deviceCode[DEVICE_CODE_MAXLEN];
struct list_head linked_list;
} kk_dev_heartbeat_node_t;
#endif
static kk_heartbeat_ctx_t s_kk_heartbeat_ctx = {0};
static kk_heartbeat_ctx_t *_kk_heartbeat_get_ctx(void)
{
return &s_kk_heartbeat_ctx;
}
static void _kk_heartbeat_lock(void)
{
kk_heartbeat_ctx_t *ctx = _kk_heartbeat_get_ctx();
if (ctx->mutex) {
HAL_MutexLock(ctx->mutex);
}
}
static void _kk_heartbeat_unlock(void)
{
kk_heartbeat_ctx_t *ctx = _kk_heartbeat_get_ctx();
if (ctx->mutex) {
HAL_MutexUnlock(ctx->mutex);
}
}
#if 0
static int _kk_add_heartbeat_node(const char* deviceCode,unsigned int timestamp)
{
kk_heartbeat_ctx_t *ctx = _kk_heartbeat_get_ctx();
kk_dev_heartbeat_node_t *node = NULL;
node = malloc(sizeof(kk_dev_heartbeat_node_t));
if (node == NULL) {
return MEMORY_NOT_ENOUGH;
}
memset(node,0x0,sizeof(kk_dev_heartbeat_node_t));
memcpy(node->deviceCode,deviceCode,strlen(deviceCode));
node->timerstamp = timestamp;
INIT_LIST_HEAD(&node->linked_list);
list_add_tail(&node->linked_list, &ctx->dev_list);
return SUCCESS_RETURN
}
int kk_heartbeat_update_time(const char deviceCode[DEVICE_CODE_MAXLEN],unsigned int timestamp)
{
kk_heartbeat_ctx_t *ctx = _kk_heartbeat_get_ctx();
kk_dev_heartbeat_node_t *search_node = NULL;
_kk_heartbeat_lock();
list_for_each_entry(search_node, &ctx->dev_list, linked_list, kk_dev_heartbeat_node_t) {
if ( (strlen(search_node->deviceCode) == strlen(deviceCode)) &&
(memcmp(search_node->deviceCode, deviceCode, strlen(deviceCode)) == 0)) {
search_node->timerstamp = timestamp;
_kk_heartbeat_unlock();
return SUCCESS_RETURN;
}
}
_kk_add_heartbeat_node(deviceCode,timestamp);
_kk_heartbeat_unlock();
return FAIL_RETURN;
}
#endif
void *kk_heartbeat_yield(void *args)
{
kk_heartbeat_ctx_t *ctx = _kk_heartbeat_get_ctx();
uint64_t current_time = 0;
dm_mgr_dev_node_t *search_node = NULL;
while (ctx->hearbeat_thread_running) {
current_time = HAL_UptimeMs();
_kk_heartbeat_lock();
if(dm_mgr_check_heartbeat_timeout(current_time) == DEVICE_HEARTBEAT_TIMEOUT){
INFO_PRINT("dev timeout,send offline\n");
}
_kk_heartbeat_unlock();
sleep(30);
}
return NULL;
}
int kk_heartbeat_init(void)
{
int res = 0;
kk_heartbeat_ctx_t *ctx = _kk_heartbeat_get_ctx();
/* Create Mutex */
ctx->mutex = HAL_MutexCreate();
if (ctx->mutex == NULL) {
return FAIL_RETURN;
}
/* Init Device Id*/
ctx->hearbeat_thread_running = 1;
res = pthread_create(&ctx->hearbeat_thread, NULL, kk_heartbeat_yield, NULL);
if (res < 0) {
ERROR_PRINT("HAL_ThreadCreate ota Failed\n");
//IOT_Linkkit_Close(mid_ctx->master_devid);
return FAIL_RETURN;
}
return SUCCESS_RETURN;
/* Init Device List */
// INIT_LIST_HEAD(&ctx->dev_list);
}
......@@ -11,6 +11,7 @@
//const char DM_URI_SYS_PREFIX[] DM_READ_ONLY = "/sys/%s/%s/";
const char DM_URI_EXT_SESSION_PREFIX[] DM_READ_ONLY = "/ext/session/%s/%s/";
const char DM_URI_EXT_NTP_PREFIX[] DM_READ_ONLY = "/ext/ntp/%s/%s/";
......@@ -191,7 +192,7 @@ int dm_mgr_device_create(_IN_ int dev_type,_IN_ char productCode[PRODUCT_CODE_MA
if (dev_type != KK_DM_DEVICE_CCU && fatherDeviceCode != NULL) {
memcpy(node->fatherDeviceCode, fatherDeviceCode, strlen(fatherDeviceCode));
}
node->timestamp = HAL_UptimeMs();
//node->dev_status = IOTX_DM_DEV_STATUS_AUTHORIZED;
tsl_str = kk_load_json(productCode, dev_type);
if(tsl_str != NULL)
......@@ -278,6 +279,50 @@ int dm_mgr_get_device_by_devicecode(_IN_ char deviceCode[DEVICE_CODE_MAXLEN], _O
ERROR_PRINT("Device Not Found, deviceCode: %s\n", deviceCode);
return FAIL_RETURN;
}
int dm_mgr_update_timestamp_by_devicecode(_IN_ char deviceCode[DEVICE_CODE_MAXLEN],uint64_t timestamp)
{
dm_mgr_ctx *ctx = _dm_mgr_get_ctx();
dm_mgr_dev_node_t *search_node = NULL;
_dm_mgr_mutex_lock();
list_for_each_entry(search_node, &ctx->dev_list, linked_list, dm_mgr_dev_node_t) {
if ((strlen(search_node->deviceCode) == strlen(deviceCode)) &&
(memcmp(search_node->deviceCode, deviceCode, strlen(deviceCode)) == 0)) {
/* dm_log_debug("Device Found, Product Key: %s, Device Name: %s", product_key, device_name); */
search_node->timestamp = timestamp;
_dm_mgr_mutex_unlock();
return SUCCESS_RETURN;
}
}
_dm_mgr_mutex_unlock();
ERROR_PRINT("Device Not Found, deviceCode: %s\n", deviceCode);
return FAIL_RETURN;
}
#define TEST_TIMEOUT 60000 // one minute
int dm_mgr_check_heartbeat_timeout(uint64_t timestamp)
{
dm_mgr_ctx *ctx = _dm_mgr_get_ctx();
dm_mgr_dev_node_t *search_node = NULL;
_dm_mgr_mutex_lock();
list_for_each_entry(search_node, &ctx->dev_list, linked_list, dm_mgr_dev_node_t) {
printf("search_node->devid--------->%d\n",search_node->devid);
printf("search_node->dev_type--------->%d\n",search_node->dev_type);
if(search_node->dev_type == KK_DM_DEVICE_CCU){
continue;
}
if((timestamp - search_node->timestamp) >= TEST_TIMEOUT/*search_node->hb_timeout*/){
INFO_PRINT("---------->dev timeout,send offline\n");
printf("search_node->devid--------->%d\n",search_node->devid);
iotx_dm_dev_offline(search_node->devid);
//_dm_mgr_mutex_unlock();
//return DEVICE_HEARTBEAT_TIMEOUT;
}
}
_dm_mgr_mutex_unlock();
return SUCCESS_RETURN;
}
int dm_mgr_get_devId_by_devicecode(_IN_ char deviceCode[DEVICE_CODE_MAXLEN],_OU_ int *devid)
{
......@@ -931,6 +976,67 @@ int dm_mgr_upstream_status_online(_IN_ int devid)
return res;
}
const char DM_URI_STATUS_OFFLINE[] = "/thing/status/offline";
int dm_mgr_upstream_status_offline(_IN_ int devid)
{
int res = 0;
dm_mgr_dev_node_t *node = NULL;
dm_mgr_dev_node_t *gw_node = NULL;
dm_msg_request_t request;
if (devid < 0) {
return INVALID_PARAMETER;
}
memset(&request, 0, sizeof(dm_msg_request_t));
res = dm_mgr_search_dev_by_devid(devid, &node);
if (res != SUCCESS_RETURN) {
return FAIL_RETURN;
}
if(strlen(node->fatherDeviceCode) > 0){
res = dm_mgr_get_device_by_devicecode(node->fatherDeviceCode,&gw_node);
if (res != SUCCESS_RETURN) {
ERROR_PRINT("ERROR [%s][%d] res:%d\n",__FUNCTION__,__LINE__,res);
return FAIL_RETURN;
}
memcpy(request.productCode,gw_node->productCode,strlen(gw_node->productCode));
memcpy(request.deviceCode,gw_node->deviceCode,strlen(gw_node->deviceCode));
}else{
memcpy(request.productCode,node->productCode,strlen(node->productCode));
memcpy(request.deviceCode,node->deviceCode,strlen(node->deviceCode));
}
request.msgTypeStr = DM_URI_STATUS_OFFLINE;
/* Get Params And Method */
res = dm_msg_status_offline( node->productCode, node->deviceCode, &request);
if (res != SUCCESS_RETURN) {
return FAIL_RETURN;
}
/* Get Msg ID */
request.msgid = iotx_report_id();
/* Get Dev ID */
request.devid = devid;
/* Callback */
//request.callback = dm_client_combine_login_reply;
/* Send Message To Cloud */
res = dm_msg_request(&request);
if (res == SUCCESS_RETURN) {
res = request.msgid;
}
free(request.params);
return res;
}
const char DM_URI_COMBINE_LOGIN[] = "/thing/combine/login";
int dm_mgr_upstream_combine_login(_IN_ int devid)
{
......
......@@ -24,6 +24,8 @@ typedef struct {
char productCode[PRODUCT_CODE_MAXLEN];
char deviceCode[DEVICE_CODE_MAXLEN];
char fatherDeviceCode[DEVICE_CODE_MAXLEN];
int hb_timeout; //heartbeat time
uint64_t timestamp;
struct list_head linked_list;
} dm_mgr_dev_node_t;
......
......@@ -303,6 +303,49 @@ const char DM_MSG_COMBINE_STATUS_ONLINE[] DM_READ_ONLY =
return SUCCESS_RETURN;
}
const char DM_MSG_COMBINE_STATUS_OFFLINE_METHOD[] DM_READ_ONLY = "thing.status.offline";
const char DM_MSG_COMBINE_STATUS_OFFLINE[] DM_READ_ONLY =
"{\"deviceCode\":\"%s\"}";
int dm_msg_status_offline(_IN_ char productCode[PRODUCT_CODE_MAXLEN],
_IN_ char deviceCode[DEVICE_CODE_MAXLEN], _OU_ dm_msg_request_t *request)
{
char *params = NULL;
int params_len = 0;
char timestamp[DM_UTILS_UINT64_STRLEN] = {0};
char client_id[PRODUCT_TYPE_MAXLEN + DEVICE_CODE_MAXLEN + 20] = {0};
char *sign_method = DM_MSG_SIGN_METHOD_HMACSHA1;
char sign[64] = {0};
if (request == NULL ||
deviceCode == NULL || productCode == NULL ||
(strlen(deviceCode) >= DEVICE_CODE_MAXLEN) ||
(strlen(productCode) >= PRODUCT_CODE_MAXLEN) ||
(strlen(request->deviceCode) >= DEVICE_CODE_MAXLEN)) {
return INVALID_PARAMETER;
}
/* TimeStamp */
HAL_Snprintf(timestamp, DM_UTILS_UINT64_STRLEN, "%llu", (unsigned long long)HAL_UptimeMs());
/* dm_log_debug("Time Stamp: %s", timestamp); */
/* Params */
request->method = (char *)DM_MSG_COMBINE_STATUS_OFFLINE_METHOD;
params_len = strlen(DM_MSG_COMBINE_STATUS_OFFLINE) + strlen(deviceCode) + 1;
params = malloc(params_len);
if (params == NULL) {
return DM_MEMORY_NOT_ENOUGH;
}
memset(params, 0, params_len);
HAL_Snprintf(params, params_len, DM_MSG_COMBINE_STATUS_OFFLINE,deviceCode);
request->params = params;
request->params_len = strlen(request->params);
return SUCCESS_RETURN;
}
const char DM_MSG_COMBINE_LOGIN_SIGN_SOURCE[] DM_READ_ONLY = "clientId%sdeviceCode%stimestamp%s";
const char DM_MSG_COMBINE_LOGIN_METHOD[] DM_READ_ONLY = "combine.login";
......
......@@ -795,6 +795,7 @@ int main(const int argc, const char **argv)
} while (res < 0);*/
kk_init_dmproc();
kk_subDb_init();
kk_heartbeat_init();
mid_ctx->g_mid_dispatch_thread_running = 1;
res = pthread_create(&mid_ctx->g_mid_dispatch_thread, NULL, mid_dispatch_yield, NULL);
if (res < 0) {
......
......@@ -3,6 +3,7 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#ifndef _IN_
#define _IN_
......@@ -11,6 +12,7 @@
#ifndef _OU_
#define _OU_
#endif
//typedef unsigned long int uint64_t;
#define PRODUCT_TYPE_MAXLEN (32 + 1)
#define PRODUCT_CODE_MAXLEN (32 + 1)
......@@ -82,6 +84,7 @@ typedef enum {
} kk_tsl_data_target_e;
typedef enum {
DEVICE_HEARTBEAT_TIMEOUT = -15,
TSL_ALREADY_EXIST = -14,
TSL_SERVICE_GET_FAILED = -13,
TSL_SERVICE_SET_FAILED = -12,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment