Commit f9a164bf authored by 黄振令's avatar 黄振令
parents 1da343da 7c5d9838
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "MQTTAsync.h"
static const char* OPT_SEND = "MQTTAsync_sendMessage";
static const char* OPT_SUB = "MQTTAsync_subscribe";
static const char* OPT_UNSUB = "MQTTAsync_unsubscribe";
static void onOptSuccess(void* context, MQTTAsync_successData* response)
{
if(strcmp((char *)context,OPT_SEND)==0)
{
printf("MQTTAsync_sendMessage success,return token:%d,msg length:%d \n",
response->token,response->alt.pub.message.payloadlen);
}
else if(strcmp((char *)context,OPT_SUB)==0)
{
printf("MQTTAsync_subscribe success,return token:%d \n",response->token);
}
else if(strcmp((char *)context,OPT_UNSUB)==0)
{
printf("MQTTAsync_unsubscribe success,return token:%d \n",response->token);
}
}
static void onOptFail(void* context, MQTTAsync_failureData* response)
{
if(strcmp((char *)context,OPT_SEND)==0)
{
printf("MQTTAsync_sendMessage fail,token:%d,code:%d,msg:%s \n",
response->token,response->code,response->message);
}
else if(strcmp((char *)context,OPT_SUB)==0)
{
printf("MQTTAsync_subscribe fail,return token:%d \n",response->token);
}
else if(strcmp((char *)context,OPT_UNSUB)==0)
{
printf("MQTTAsync_unsubscribe fail,return token:%d \n",response->token);
}
}
int KK_MQTT_SubTopic(MQTTAsync handle,char *topicName,int qos,int waitTimeout)
{
printf("to subtopic:%s \n",topicName);
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc;
opts.onSuccess = onOptSuccess;
opts.onFailure = onOptFail;
opts.context = (void*)OPT_SUB;
if ((rc = MQTTAsync_subscribe(handle,topicName, qos, &opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start subscribe, return code:%d.\n", rc);
return -1;
}
return 0;
}
int KK_MQTT_SendMsg(MQTTAsync handle,char *topicName,const char *payload,int qos)
{
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
int rc;
opts.onSuccess = onOptSuccess;
opts.onFailure = onOptFail;
opts.context = (void*)OPT_SEND;
pubmsg.payload = (void*)payload;
pubmsg.payloadlen = strlen(payload);
pubmsg.qos = qos;
pubmsg.retained = 0;
printf("mqtt send payload len:%d,qos:%d.\n",pubmsg.payloadlen,qos);
if ((rc = MQTTAsync_sendMessage(handle, topicName, &pubmsg, &opts)) != MQTTASYNC_SUCCESS){
printf("Failed to start sendMessage, return code:%d.\n", rc);
return -1;
}
return rc;
}
int KK_MQTT_RecvMsg(MQTTAsync handle,const char *topicName,const char *payload)
{
if(topicName == NULL)
{
return -1;
}
return 0;
}
int KK_MQTT_UnsubTopic(MQTTAsync handle,const char *topicName)
{
printf("to unsubtopic:%s \n",topicName);
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc;
opts.onSuccess = onOptSuccess;
opts.onFailure = onOptFail;
opts.context = (void*)OPT_UNSUB;
if ((rc = MQTTAsync_unsubscribe(handle,topicName,&opts)) != MQTTASYNC_SUCCESS){
printf("Failed to start unubscribe, return code:%d.\n", rc);
return -1;
}
return rc;
}
#ifndef MQTT_CONF_H_
#define MQTT_CONF_H_
#define ADDRESS "tcp://106.13.117.117:1883"
#define CLIENTID "1234"
#define TOPIC "cwctest"
#define PAYLOAD "Hello cwc World!"
#define QOS 2
#define TIMEOUT 10000L
#define USRNAME ""
#define PASSWORD ""
#define AUTO_CONN 1
#define CONNECT_TIMEOUT 3
#endif
......@@ -18,24 +18,9 @@
#include <stdlib.h>
#include <string.h>
#include "MQTTAsync.h"
#include "mqtt_config.h"
#if !defined(_WIN32)
#include <unistd.h>
#else
#include <windows.h>
#endif
#if defined(_WRS_KERNEL)
#include <OsWrapper.h>
#endif
#define ADDRESS "tcp://mqtt.eclipse.org:1883"
#define CLIENTID "ExampleClientPub"
#define TOPIC "MQTT Examples"
#define PAYLOAD "Hello World!"
#define QOS 1
#define TIMEOUT 10000L
static MQTTAsync s_Client;
int finished = 0;
void connlost(void *context, char *cause)
......@@ -82,7 +67,6 @@ void onSendFailure(void* context, MQTTAsync_failureData* response)
if ((rc = MQTTAsync_disconnect(client, &opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start disconnect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
}
......@@ -93,14 +77,6 @@ void onSend(void* context, MQTTAsync_successData* response)
int rc;
printf("Message with token value %d delivery confirmed\n", response->token);
opts.onSuccess = onDisconnect;
opts.onFailure = onDisconnectFailure;
opts.context = client;
if ((rc = MQTTAsync_disconnect(client, &opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start disconnect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
}
......@@ -129,18 +105,21 @@ void onConnect(void* context, MQTTAsync_successData* response)
if ((rc = MQTTAsync_sendMessage(client, TOPIC, &pubmsg, &opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start sendMessage, return code %d\n", rc);
exit(EXIT_FAILURE);
}
}
int messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
{
/* not expecting any messages */
printf("onMessageArrived topic:%s,message length:%d.\n",topicName,message->payloadlen);
printf("payload:%s,\n",message->payload);
KK_MQTT_RecvMsg((MQTTAsync)context,topicName,message->payload);
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1;
}
static void mqttTraceCallback(enum MQTTASYNC_TRACE_LEVELS level, char *message)
{
//printf("mqttTraceCallback level:%d,msg:%s.\n",level,message);
......@@ -151,63 +130,84 @@ static void onDeliveryComplete(void* context, MQTTAsync_token token)
}
static void onConnectBuild(void *context, char *cause)
{
printf("onConnectBuild,disconnect cause:%s \n",cause);
//MQTTAsync_subscribe
printf("onConnectBuild:%s \n",cause);
KK_MQTT_SubTopic(s_Client,TOPIC,0,2000);
}
static void onDisConnected(void *context, MQTTProperties* properties,enum MQTTReasonCodes reasonCode)
{
printf("onDisConnected,maybe kicked by broker.\n");
}
int main(int argc, char* argv[])
static void mqtt_set_callbacks(void)
{
MQTTAsync_setConnectionLostCallback(s_Client,NULL,connlost);
MQTTAsync_setMessageArrivedCallback(s_Client,NULL,messageArrived);
MQTTAsync_setDeliveryCompleteCallback(s_Client,NULL,onDeliveryComplete);
MQTTAsync_setConnected(s_Client,NULL,onConnectBuild);
MQTTAsync_setDisconnected(s_Client,NULL,onDisConnected);
}
static int mqtt_start(void)
{
MQTTAsync client;
int count = 0;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
int rc;
int rc = 0;
MQTTAsync_createOptions opts = MQTTAsync_createOptions_initializer;
opts.MQTTVersion = MQTTVERSION_3_1_1;
MQTTAsync_setTraceCallback(mqttTraceCallback);
if ((rc = MQTTAsync_createWithOptions(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL,&opts)) != MQTTASYNC_SUCCESS)
if ((rc = MQTTAsync_createWithOptions(&s_Client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL,&opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to create client object, return code %d\n", rc);
return -1;
}
if ((rc = MQTTAsync_setCallbacks(client, NULL, connlost, messageArrived, NULL)) != MQTTASYNC_SUCCESS)
{
printf("Failed to set callback, return code %d\n", rc);
exit(EXIT_FAILURE);
}
MQTTAsync_setConnectionLostCallback(client,NULL,connlost);
MQTTAsync_setMessageArrivedCallback(client,NULL,messageArrived);
MQTTAsync_setDeliveryCompleteCallback(client,NULL,onDeliveryComplete);
MQTTAsync_setConnected(client,NULL,onConnectBuild);
MQTTAsync_setDisconnected(client,NULL,onDisConnected);
mqtt_set_callbacks();
opts.MQTTVersion = MQTTVERSION_3_1_1;
conn_opts.keepAliveInterval = 60;
conn_opts.connectTimeout = CONNECT_TIMEOUT;
conn_opts.automaticReconnect = AUTO_CONN;
conn_opts.minRetryInterval = 1;
conn_opts.maxRetryInterval = 32;
conn_opts.username = USRNAME;
conn_opts.password = PASSWORD;
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.onSuccess = onConnect;
conn_opts.onFailure = onConnectFailure;
conn_opts.context = client;
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
conn_opts.context = s_Client;
if ((rc = MQTTAsync_connect(s_Client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
exit(EXIT_FAILURE);
return -1;
}
printf("Waiting for publication of %s\n"
"on topic %s for client with ClientID: %s\n",
PAYLOAD, TOPIC, CLIENTID);
while (!finished)
#if defined(_WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
{
usleep(100000L);
count++;
if(count>50)
{
count = 0;
printf("i am alive now\n");
KK_MQTT_SendMsg(s_Client,TOPIC,"hello my world",2);
}
MQTTAsync_destroy(&client);
}
printf("MQTTAsync_destroy\n");
MQTTAsync_destroy(&s_Client);
return rc;
}
int main(int argc, char* argv[])
{
int rc = 0;
rc = mqtt_start();
return rc;
}
/*******************************************************************************
* Copyright (c) 2012, 2018 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0/
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial contribution
* Guilherme Maciel Ferreira - add keep alive option
*******************************************************************************/
#if !defined(PUBSUB_OPTS_H)
#define PUBSUB_OPTS_H
#include "MQTTAsync.h"
#include "MQTTClientPersistence.h"
struct pubsub_opts
{
/* debug app options */
int publisher; /* publisher app? */
int quiet;
int verbose;
int tracelevel;
char* delimiter;
int maxdatalen;
/* message options */
char* message;
char* filename;
int stdin_lines;
int stdlin_complete;
int null_message;
/* MQTT options */
int MQTTVersion;
char* topic;
char* clientid;
int qos;
int retained;
char* username;
char* password;
char* host;
char* port;
char* connection;
int keepalive;
/* will options */
char* will_topic;
char* will_payload;
int will_qos;
int will_retain;
/* TLS options */
int insecure;
char* capath;
char* cert;
char* cafile;
char* key;
char* keypass;
char* ciphers;
char* psk_identity;
char* psk;
/* MQTT V5 options */
int message_expiry;
struct {
char *name;
char *value;
} user_property;
};
typedef struct
{
const char* name;
const char* value;
} pubsub_opts_nameValue;
//void usage(struct pubsub_opts* opts, const char* version, const char* program_name);
void usage(struct pubsub_opts* opts, pubsub_opts_nameValue* name_values, const char* program_name);
int getopts(int argc, char** argv, struct pubsub_opts* opts);
char* readfile(int* data_len, struct pubsub_opts* opts);
void logProperties(MQTTProperties *props);
#endif
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment