You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
2130 lines
56 KiB
2130 lines
56 KiB
/*
|
|
* libwebsockets - small server side websockets and web server implementation
|
|
*
|
|
* Copyright (C) 2010 - 2020 Andy Green <andy@warmcat.com>
|
|
*
|
|
* Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
* of this software and associated documentation files (the "Software"), to
|
|
* deal in the Software without restriction, including without limitation the
|
|
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
|
* sell copies of the Software, and to permit persons to whom the Software is
|
|
* furnished to do so, subject to the following conditions:
|
|
*
|
|
* The above copyright notice and this permission notice shall be included in
|
|
* all copies or substantial portions of the Software.
|
|
*
|
|
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
|
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
|
|
* IN THE SOFTWARE.
|
|
*
|
|
* MQTT v5
|
|
*
|
|
* http://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html
|
|
*
|
|
* Control Packet structure
|
|
*
|
|
* - Always: 2+ byte: Fixed Hdr
|
|
* - Required in some: variable: Variable Hdr + [(CONNECT)Will Props] + Props
|
|
* - Required in some: variable: Payload
|
|
*
|
|
* For CONNECT, the props if present MUST be in the order [MQTT-3.1.3-1]
|
|
*
|
|
* - Client Identifier
|
|
* - Will Properties
|
|
* - Will Topic
|
|
* - Will Payload
|
|
* - User Name
|
|
* - Password
|
|
*/
|
|
|
|
#include "private-lib-core.h"
|
|
/* #include "lws-mqtt.h" */
|
|
|
|
#include <string.h>
|
|
#include <sys/types.h>
|
|
#include <fcntl.h>
|
|
#include <assert.h>
|
|
|
|
typedef enum {
|
|
LMQPRS_AWAITING_CONNECT,
|
|
|
|
} lws_mqtt_protocol_server_connstate_t;
|
|
|
|
const char * const reason_names_g1[] = {
|
|
"Success / Normal disconnection / QoS0",
|
|
"QoS1",
|
|
"QoS2",
|
|
"Disconnect Will",
|
|
"No matching subscriber",
|
|
"No subscription existed",
|
|
"Continue authentication",
|
|
"Re-authenticate"
|
|
};
|
|
|
|
const char * const reason_names_g2[] = {
|
|
"Unspecified error",
|
|
"Malformed packet",
|
|
"Protocol error",
|
|
"Implementation specific error",
|
|
"Unsupported protocol",
|
|
"Client ID invalid",
|
|
"Bad credentials",
|
|
"Not Authorized",
|
|
"Server Unavailable",
|
|
"Server Busy",
|
|
"Banned",
|
|
"Server Shutting Down",
|
|
"Bad Authentication Method",
|
|
"Keepalive Timeout",
|
|
"Session taken over",
|
|
"Topic Filter Invalid",
|
|
"Packet ID in use",
|
|
"Packet ID not found",
|
|
"Max RX Exceeded",
|
|
"Topic Alias Invalid",
|
|
"Packet too large",
|
|
"Ratelimit",
|
|
"Quota Exceeded",
|
|
"Administrative Action",
|
|
"Payload format invalid",
|
|
"Retain not supported",
|
|
"QoS not supported",
|
|
"Use another server",
|
|
"Server Moved",
|
|
"Shared subscriptions not supported",
|
|
"Connection rate exceeded",
|
|
"Maximum Connect Time",
|
|
"Subscription IDs not supported",
|
|
"Wildcard subscriptions not supported"
|
|
};
|
|
|
|
#define LMQCP_WILL_PROPERTIES 0
|
|
|
|
/* For each property, a bitmap describing which commands it is valid for */
|
|
|
|
static const uint16_t property_valid[] = {
|
|
[LMQPROP_PAYLOAD_FORMAT_INDICATOR] = (1 << LMQCP_PUBLISH) |
|
|
(1 << LMQCP_WILL_PROPERTIES),
|
|
[LMQPROP_MESSAGE_EXPIRY_INTERVAL] = (1 << LMQCP_PUBLISH) |
|
|
(1 << LMQCP_WILL_PROPERTIES),
|
|
[LMQPROP_CONTENT_TYPE] = (1 << LMQCP_PUBLISH) |
|
|
(1 << LMQCP_WILL_PROPERTIES),
|
|
[LMQPROP_RESPONSE_TOPIC] = (1 << LMQCP_PUBLISH) |
|
|
(1 << LMQCP_WILL_PROPERTIES),
|
|
[LMQPROP_CORRELATION_DATA] = (1 << LMQCP_PUBLISH) |
|
|
(1 << LMQCP_WILL_PROPERTIES),
|
|
[LMQPROP_SUBSCRIPTION_IDENTIFIER] = (1 << LMQCP_PUBLISH) |
|
|
(1 << LMQCP_CTOS_SUBSCRIBE),
|
|
[LMQPROP_SESSION_EXPIRY_INTERVAL] = (1 << LMQCP_CTOS_CONNECT) |
|
|
(1 << LMQCP_STOC_CONNACK) |
|
|
(1 << LMQCP_DISCONNECT),
|
|
[LMQPROP_ASSIGNED_CLIENT_IDENTIFIER] = (1 << LMQCP_STOC_CONNACK),
|
|
[LMQPROP_SERVER_KEEP_ALIVE] = (1 << LMQCP_STOC_CONNACK),
|
|
[LMQPROP_AUTHENTICATION_METHOD] = (1 << LMQCP_CTOS_CONNECT) |
|
|
(1 << LMQCP_STOC_CONNACK) |
|
|
(1 << LMQCP_AUTH),
|
|
[LMQPROP_AUTHENTICATION_DATA] = (1 << LMQCP_CTOS_CONNECT) |
|
|
(1 << LMQCP_STOC_CONNACK) |
|
|
(1 << LMQCP_AUTH),
|
|
[LMQPROP_REQUEST_PROBLEM_INFORMATION] = (1 << LMQCP_CTOS_CONNECT),
|
|
[LMQPROP_WILL_DELAY_INTERVAL] = (1 << LMQCP_WILL_PROPERTIES),
|
|
[LMQPROP_REQUEST_RESPONSE_INFORMATION] = (1 << LMQCP_CTOS_CONNECT),
|
|
[LMQPROP_RESPONSE_INFORMATION] = (1 << LMQCP_STOC_CONNACK),
|
|
[LMQPROP_SERVER_REFERENCE] = (1 << LMQCP_STOC_CONNACK) |
|
|
(1 << LMQCP_DISCONNECT),
|
|
[LMQPROP_REASON_STRING] = (1 << LMQCP_STOC_CONNACK) |
|
|
(1 << LMQCP_PUBACK) |
|
|
(1 << LMQCP_PUBREC) |
|
|
(1 << LMQCP_PUBREL) |
|
|
(1 << LMQCP_PUBCOMP) |
|
|
(1 << LMQCP_STOC_SUBACK) |
|
|
(1 << LMQCP_STOC_UNSUBACK) |
|
|
(1 << LMQCP_DISCONNECT) |
|
|
(1 << LMQCP_AUTH),
|
|
[LMQPROP_RECEIVE_MAXIMUM] = (1 << LMQCP_CTOS_CONNECT) |
|
|
(1 << LMQCP_STOC_CONNACK),
|
|
[LMQPROP_TOPIC_ALIAS_MAXIMUM] = (1 << LMQCP_CTOS_CONNECT) |
|
|
(1 << LMQCP_STOC_CONNACK),
|
|
[LMQPROP_TOPIC_ALIAS] = (1 << LMQCP_PUBLISH),
|
|
[LMQPROP_MAXIMUM_QOS] = (1 << LMQCP_STOC_CONNACK),
|
|
[LMQPROP_RETAIN_AVAILABLE] = (1 << LMQCP_STOC_CONNACK),
|
|
[LMQPROP_USER_PROPERTY] = (1 << LMQCP_CTOS_CONNECT) |
|
|
(1 << LMQCP_STOC_CONNACK) |
|
|
(1 << LMQCP_PUBLISH) |
|
|
(1 << LMQCP_WILL_PROPERTIES) |
|
|
(1 << LMQCP_PUBACK) |
|
|
(1 << LMQCP_PUBREC) |
|
|
(1 << LMQCP_PUBREL) |
|
|
(1 << LMQCP_PUBCOMP) |
|
|
(1 << LMQCP_CTOS_SUBSCRIBE) |
|
|
(1 << LMQCP_STOC_SUBACK) |
|
|
(1 << LMQCP_CTOS_UNSUBSCRIBE) |
|
|
(1 << LMQCP_STOC_UNSUBACK) |
|
|
(1 << LMQCP_DISCONNECT) |
|
|
(1 << LMQCP_AUTH),
|
|
[LMQPROP_MAXIMUM_PACKET_SIZE] = (1 << LMQCP_CTOS_CONNECT) |
|
|
(1 << LMQCP_STOC_CONNACK),
|
|
[LMQPROP_WILDCARD_SUBSCRIPTION_AVAIL] = (1 << LMQCP_STOC_CONNACK),
|
|
[LMQPROP_SUBSCRIPTION_IDENTIFIER_AVAIL] = (1 << LMQCP_STOC_CONNACK),
|
|
[LMQPROP_SHARED_SUBSCRIPTION_AVAIL] = (1 << LMQCP_STOC_CONNACK)
|
|
};
|
|
|
|
|
|
/*
|
|
* For each command index, maps flags, id, qos and payload legality
|
|
* notice in most cases PUBLISH requires further processing
|
|
*/
|
|
static const uint8_t map_flags[] = {
|
|
[LMQCP_RESERVED] = 0x00,
|
|
[LMQCP_CTOS_CONNECT] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
|
|
LMQCP_LUT_FLAG_PAYLOAD |
|
|
LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
|
|
[LMQCP_STOC_CONNACK] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
|
|
LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
|
|
[LMQCP_PUBLISH] = LMQCP_LUT_FLAG_PAYLOAD | /* option */
|
|
LMQCP_LUT_FLAG_PACKET_ID_QOS12 | 0x00,
|
|
[LMQCP_PUBACK] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
|
|
LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00,
|
|
[LMQCP_PUBREC] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
|
|
LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00,
|
|
[LMQCP_PUBREL] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
|
|
LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x02,
|
|
[LMQCP_PUBCOMP] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
|
|
LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00,
|
|
[LMQCP_CTOS_SUBSCRIBE] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
|
|
LMQCP_LUT_FLAG_PAYLOAD |
|
|
LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x02,
|
|
[LMQCP_STOC_SUBACK] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
|
|
LMQCP_LUT_FLAG_PAYLOAD |
|
|
LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00,
|
|
[LMQCP_CTOS_UNSUBSCRIBE] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
|
|
LMQCP_LUT_FLAG_PAYLOAD |
|
|
LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x02,
|
|
[LMQCP_STOC_UNSUBACK] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
|
|
LMQCP_LUT_FLAG_PAYLOAD |
|
|
LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
|
|
[LMQCP_CTOS_PINGREQ] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
|
|
LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
|
|
[LMQCP_STOC_PINGRESP] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
|
|
LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
|
|
[LMQCP_DISCONNECT] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
|
|
LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
|
|
[LMQCP_AUTH] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
|
|
LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
|
|
};
|
|
|
|
void
|
|
lws_mqttc_state_transition(lws_mqttc_t *c, lwsgs_mqtt_states_t s)
|
|
{
|
|
lwsl_debug("%s: ep %p: state %d -> %d\n", __func__, c, c->estate, s);
|
|
c->estate = s;
|
|
}
|
|
|
|
static int
|
|
lws_mqtt_pconsume(lws_mqtt_parser_t *par, int consumed)
|
|
{
|
|
par->consumed += consumed;
|
|
|
|
if (par->consumed > par->props_len)
|
|
return -1;
|
|
|
|
/* more properties coming */
|
|
|
|
if (par->consumed < par->props_len) {
|
|
par->state = LMQCPP_PROP_ID_VBI;
|
|
return 0;
|
|
}
|
|
|
|
/* properties finished: are we headed for payload or idle? */
|
|
|
|
if ((map_flags[ctl_pkt_type(par)] & LMQCP_LUT_FLAG_PAYLOAD) &&
|
|
/* A PUBLISH packet MUST NOT contain a Packet Identifier if
|
|
* its QoS value is set to 0 [MQTT-2.2.1-2]. */
|
|
(ctl_pkt_type(par) != LMQCP_PUBLISH ||
|
|
(par->packet_type_flags & 6))) {
|
|
par->state = LMQCPP_PAYLOAD;
|
|
return 0;
|
|
}
|
|
|
|
par->state = LMQCPP_IDLE;
|
|
|
|
return 0;
|
|
}
|
|
|
|
static int
|
|
lws_mqtt_set_client_established(struct lws *wsi)
|
|
{
|
|
lws_role_transition(wsi, LWSIFR_CLIENT, LRS_ESTABLISHED,
|
|
&role_ops_mqtt);
|
|
|
|
if (user_callback_handle_rxflow(wsi->protocol->callback,
|
|
wsi, LWS_CALLBACK_MQTT_CLIENT_ESTABLISHED,
|
|
wsi->user_space, NULL, 0) < 0) {
|
|
lwsl_err("%s: MQTT_ESTABLISHED failed\n", __func__);
|
|
|
|
return -1;
|
|
}
|
|
/*
|
|
* If we made a new connection and got the ACK, our connection is
|
|
* definitely working in both directions at the moment
|
|
*/
|
|
lws_validity_confirmed(wsi);
|
|
|
|
/* clear connection timeout */
|
|
lws_set_timeout(wsi, NO_PENDING_TIMEOUT, 0);
|
|
|
|
return 0;
|
|
}
|
|
|
|
lws_mqtt_subs_t *
|
|
lws_mqtt_find_sub(struct _lws_mqtt_related *mqtt, const char *topic)
|
|
{
|
|
lws_mqtt_subs_t *s = mqtt->subs_head;
|
|
|
|
while (s) {
|
|
if (!strcmp((const char *)s->topic, topic))
|
|
return s;
|
|
s = s->next;
|
|
}
|
|
|
|
return NULL;
|
|
}
|
|
|
|
static lws_mqtt_subs_t *
|
|
lws_mqtt_create_sub(struct _lws_mqtt_related *mqtt, const char *topic)
|
|
{
|
|
lws_mqtt_subs_t *mysub;
|
|
|
|
mysub = lws_malloc(sizeof(*mysub) + strlen(topic) + 1, "sub");
|
|
if (!mysub)
|
|
return NULL;
|
|
|
|
mysub->next = mqtt->subs_head;
|
|
mqtt->subs_head = mysub;
|
|
memcpy(mysub->topic, topic, strlen(topic) + 1);
|
|
mysub->ref_count = 1;
|
|
|
|
lwsl_info("%s: Created mysub %p for wsi->mqtt %p\n",
|
|
__func__, mysub, mqtt);
|
|
|
|
return mysub;
|
|
}
|
|
|
|
static int
|
|
lws_mqtt_client_remove_subs(struct _lws_mqtt_related *mqtt)
|
|
{
|
|
lws_mqtt_subs_t *s = mqtt->subs_head;
|
|
lws_mqtt_subs_t *temp = NULL;
|
|
|
|
|
|
lwsl_info("%s: Called to remove subs from wsi->mqtt %p\n",
|
|
__func__, mqtt);
|
|
|
|
while (s && s->next) {
|
|
if (s->next->ref_count == 0)
|
|
break;
|
|
s = s->next;
|
|
}
|
|
|
|
if (s && s->next) {
|
|
temp = s->next;
|
|
lwsl_info("%s: Removing sub %p from wsi->mqtt %p\n",
|
|
__func__, temp, mqtt);
|
|
s->next = temp->next;
|
|
lws_free(temp);
|
|
return 0;
|
|
}
|
|
return 1;
|
|
}
|
|
|
|
int
|
|
_lws_mqtt_rx_parser(struct lws *wsi, lws_mqtt_parser_t *par,
|
|
const uint8_t *buf, size_t len)
|
|
{
|
|
struct lws *w;
|
|
int n;
|
|
|
|
if (par->flag_pending_send_reason_close)
|
|
return 0;
|
|
|
|
/*
|
|
* Stateful, fragmentation-immune parser
|
|
*
|
|
* Notice that len can always be 1 if under attack, even over tls if
|
|
* the server is compromised or malicious.
|
|
*/
|
|
|
|
while (len) {
|
|
lwsl_debug("%s: %d, len = %d\n", __func__, par->state, (int)len);
|
|
switch (par->state) {
|
|
case LMQCPP_IDLE:
|
|
par->packet_type_flags = *buf++;
|
|
len--;
|
|
|
|
#if defined(LWS_WITH_CLIENT)
|
|
/*
|
|
* The case where we sent the connect, but we received
|
|
* something else before any CONNACK
|
|
*/
|
|
if (lwsi_state(wsi) == LRS_MQTTC_AWAIT_CONNACK &&
|
|
par->packet_type_flags >> 4 != LMQCP_STOC_CONNACK) {
|
|
lwsl_notice("%s: server sent non-CONNACK\n",
|
|
__func__);
|
|
goto send_protocol_error_and_close;
|
|
}
|
|
#endif /* LWS_WITH_CLIENT */
|
|
|
|
n = map_flags[par->packet_type_flags >> 4];
|
|
/*
|
|
* Where a flag bit is marked as “Reserved”, it is
|
|
* reserved for future use and MUST be set to the value
|
|
* listed [MQTT-2.1.3-1].
|
|
*/
|
|
if ((n & LMQCP_LUT_FLAG_RESERVED_FLAGS) &&
|
|
((par->packet_type_flags & 0x0f) != (n & 0x0f))) {
|
|
lwsl_notice("%s: wsi %p: bad flags, 0x%02x mask 0x%02x (len %d)\n",
|
|
__func__, wsi, par->packet_type_flags, n, (int)len + 1);
|
|
lwsl_hexdump_err(buf - 1, len + 1);
|
|
goto send_protocol_error_and_close;
|
|
}
|
|
|
|
lwsl_debug("%s: received pkt type 0x%x / flags 0x%x\n",
|
|
__func__, par->packet_type_flags >> 4,
|
|
par->packet_type_flags & 0xf);
|
|
|
|
/* allows us to know if a property that can only be
|
|
* given once, appears twice */
|
|
memset(par->props_seen, 0, sizeof(par->props_seen));
|
|
par->state = par->packet_type_flags & 0xf0;
|
|
break;
|
|
|
|
case LMQCPP_CONNECT_PACKET:
|
|
lwsl_debug("%s: received CONNECT pkt\n", __func__);
|
|
par->state = LMQCPP_CONNECT_REMAINING_LEN_VBI;
|
|
lws_mqtt_vbi_init(&par->vbit);
|
|
break;
|
|
|
|
case LMQCPP_CONNECT_REMAINING_LEN_VBI:
|
|
switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
|
|
case LMSPR_NEED_MORE:
|
|
break;
|
|
case LMSPR_COMPLETED:
|
|
par->cpkt_remlen = par->vbit.value;
|
|
n = map_flags[ctl_pkt_type(par)];
|
|
lws_mqtt_str_init(&par->s_temp, par->temp,
|
|
sizeof(par->temp), 0);
|
|
par->state = LMQCPP_CONNECT_VH_PNAME;
|
|
break;
|
|
default:
|
|
lwsl_notice("%s: bad vbi\n", __func__);
|
|
goto send_protocol_error_and_close;
|
|
}
|
|
break;
|
|
|
|
case LMQCPP_CONNECT_VH_PNAME:
|
|
switch (lws_mqtt_str_parse(&par->s_temp, &buf, &len)) {
|
|
case LMSPR_NEED_MORE:
|
|
break;
|
|
case LMSPR_COMPLETED:
|
|
if (par->s_temp.len != 4 ||
|
|
memcmp(par->s_temp.buf, "MQTT",
|
|
par->s_temp.len)) {
|
|
lwsl_notice("%s: protocol name: %.*s\n",
|
|
__func__, par->s_temp.len,
|
|
par->s_temp.buf);
|
|
goto send_unsupp_connack_and_close;
|
|
}
|
|
par->state = LMQCPP_CONNECT_VH_PVERSION;
|
|
break;
|
|
default:
|
|
lwsl_notice("%s: bad protocol name\n", __func__);
|
|
goto send_protocol_error_and_close;
|
|
}
|
|
break;
|
|
|
|
case LMQCPP_CONNECT_VH_PVERSION:
|
|
par->conn_protocol_version = *buf++;
|
|
len--;
|
|
if (par->conn_protocol_version != 5) {
|
|
lwsl_info("%s: unsupported MQTT version %d\n",
|
|
__func__, par->conn_protocol_version);
|
|
goto send_unsupp_connack_and_close;
|
|
}
|
|
par->state = LMQCPP_CONNECT_VH_FLAGS;
|
|
break;
|
|
|
|
case LMQCPP_CONNECT_VH_FLAGS:
|
|
par->cpkt_flags = *buf++;
|
|
len--;
|
|
if (par->cpkt_flags & 1) {
|
|
/*
|
|
* The Server MUST validate that the reserved
|
|
* flag in the CONNECT packet is set to 0
|
|
* [MQTT-3.1.2-3].
|
|
*/
|
|
par->reason = LMQCP_REASON_MALFORMED_PACKET;
|
|
goto send_reason_and_close;
|
|
}
|
|
/*
|
|
* conn_flags specifies the Will Properties that should
|
|
* appear in the payload section
|
|
*/
|
|
lws_mqtt_2byte_init(&par->vbit);
|
|
par->state = LMQCPP_CONNECT_VH_KEEPALIVE;
|
|
break;
|
|
|
|
case LMQCPP_CONNECT_VH_KEEPALIVE:
|
|
switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
|
|
case LMSPR_NEED_MORE:
|
|
break;
|
|
case LMSPR_COMPLETED:
|
|
par->keepalive = (uint16_t)par->vbit.value;
|
|
lws_mqtt_vbi_init(&par->vbit);
|
|
par->state = LMQCPP_CONNECT_VH_PROPERTIES_VBI_LEN;
|
|
break;
|
|
default:
|
|
lwsl_notice("%s: ka bad vbi\n", __func__);
|
|
goto send_protocol_error_and_close;
|
|
}
|
|
break;
|
|
|
|
case LMQCPP_PINGRESP_ZERO:
|
|
len--;
|
|
/* second byte of PINGRESP must be zero */
|
|
if (*buf++)
|
|
goto send_protocol_error_and_close;
|
|
goto cmd_completion;
|
|
|
|
case LMQCPP_CONNECT_VH_PROPERTIES_VBI_LEN:
|
|
switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
|
|
case LMSPR_NEED_MORE:
|
|
break;
|
|
case LMSPR_COMPLETED:
|
|
/* reset consumption counter */
|
|
par->consumed = 0;
|
|
par->props_len = par->vbit.value;
|
|
lws_mqtt_vbi_init(&par->vbit);
|
|
par->state = LMQCPP_PROP_ID_VBI;
|
|
break;
|
|
default:
|
|
lwsl_notice("%s: connpr bad vbi\n", __func__);
|
|
goto send_protocol_error_and_close;
|
|
}
|
|
break;
|
|
|
|
case LMQCPP_PUBLISH_PACKET:
|
|
if (lwsi_role_client(wsi) && wsi->mqtt->inside_subscribe) {
|
|
lwsl_notice("%s: Topic rx before subscribing\n",
|
|
__func__);
|
|
goto send_protocol_error_and_close;
|
|
}
|
|
lwsl_info("%s: received PUBLISH pkt\n", __func__);
|
|
par->state = LMQCPP_PUBLISH_REMAINING_LEN_VBI;
|
|
lws_mqtt_vbi_init(&par->vbit);
|
|
break;
|
|
case LMQCPP_PUBLISH_REMAINING_LEN_VBI:
|
|
switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
|
|
case LMSPR_NEED_MORE:
|
|
break;
|
|
case LMSPR_COMPLETED:
|
|
par->cpkt_remlen = par->vbit.value;
|
|
lwsl_debug("%s: PUBLISH pkt len = %d\n",
|
|
__func__, (int)par->cpkt_remlen);
|
|
/* Move on to PUBLISH's variable header */
|
|
par->state = LMQCPP_PUBLISH_VH_TOPIC;
|
|
break;
|
|
default:
|
|
lwsl_notice("%s: pubrem bad vbi\n", __func__);
|
|
goto send_protocol_error_and_close;
|
|
}
|
|
break;
|
|
|
|
case LMQCPP_PUBLISH_VH_TOPIC:
|
|
{
|
|
lws_mqtt_publish_param_t *pub = NULL;
|
|
|
|
if (len < 2) {
|
|
lwsl_notice("%s: topic too short\n", __func__);
|
|
return -1;
|
|
}
|
|
|
|
/* Topic len */
|
|
par->n = lws_ser_ru16be(buf);
|
|
buf += 2;
|
|
len -= 2;
|
|
|
|
if (len < par->n) {/* the way this is written... */
|
|
lwsl_notice("%s: len breakage\n", __func__);
|
|
return -1;
|
|
}
|
|
|
|
/* Invalid topic len */
|
|
if (par->n == 0) {
|
|
lwsl_notice("%s: zero topic len\n", __func__);
|
|
par->reason = LMQCP_REASON_MALFORMED_PACKET;
|
|
goto send_reason_and_close;
|
|
}
|
|
lwsl_debug("%s: PUBLISH topic len %d\n",
|
|
__func__, (int)par->n);
|
|
assert(!wsi->mqtt->rx_cpkt_param);
|
|
wsi->mqtt->rx_cpkt_param = lws_zalloc(
|
|
sizeof(lws_mqtt_publish_param_t), "rx pub param");
|
|
if (!wsi->mqtt->rx_cpkt_param)
|
|
goto oom;
|
|
pub = (lws_mqtt_publish_param_t *)wsi->mqtt->rx_cpkt_param;
|
|
|
|
pub->topic_len = par->n;
|
|
|
|
/* Topic Name */
|
|
pub->topic = (char *)lws_zalloc((size_t)pub->topic_len + 1,
|
|
"rx publish topic");
|
|
if (!pub->topic)
|
|
goto oom;
|
|
lws_strncpy(pub->topic, (const char *)buf,
|
|
(size_t)pub->topic_len + 1);
|
|
buf += pub->topic_len;
|
|
len -= pub->topic_len;
|
|
|
|
/* Extract QoS Level from Fixed Header Flags */
|
|
pub->qos = (lws_mqtt_qos_levels_t)
|
|
((par->packet_type_flags >> 1) & 0x3);
|
|
|
|
pub->payload_pos = 0;
|
|
|
|
pub->payload_len = par->cpkt_remlen -
|
|
(2 + pub->topic_len + ((pub->qos) ? 2 : 0));
|
|
|
|
switch (pub->qos) {
|
|
case QOS0:
|
|
par->state = LMQCPP_PAYLOAD;
|
|
if (pub->payload_len == 0)
|
|
goto cmd_completion;
|
|
|
|
break;
|
|
case QOS1:
|
|
case QOS2:
|
|
par->state = LMQCPP_PUBLISH_VH_PKT_ID;
|
|
break;
|
|
default:
|
|
par->reason = LMQCP_REASON_MALFORMED_PACKET;
|
|
lws_free_set_NULL(pub->topic);
|
|
lws_free_set_NULL(wsi->mqtt->rx_cpkt_param);
|
|
goto send_reason_and_close;
|
|
}
|
|
break;
|
|
}
|
|
case LMQCPP_PUBLISH_VH_PKT_ID:
|
|
{
|
|
lws_mqtt_publish_param_t *pub =
|
|
(lws_mqtt_publish_param_t *)wsi->mqtt->rx_cpkt_param;
|
|
|
|
if (len < 2) {
|
|
lwsl_notice("%s: len breakage 2\n", __func__);
|
|
return -1;
|
|
}
|
|
|
|
par->cpkt_id = lws_ser_ru16be(buf);
|
|
buf += 2;
|
|
len -= 2;
|
|
wsi->mqtt->ack_pkt_id = par->cpkt_id;
|
|
lwsl_debug("%s: Packet ID %d\n",
|
|
__func__, (int)par->cpkt_id);
|
|
par->state = LMQCPP_PAYLOAD;
|
|
pub->payload_pos = 0;
|
|
pub->payload_len = par->cpkt_remlen -
|
|
(2 + pub->topic_len + ((pub->qos) ? 2 : 0));
|
|
if (pub->payload_len == 0)
|
|
goto cmd_completion;
|
|
|
|
break;
|
|
}
|
|
case LMQCPP_PAYLOAD:
|
|
{
|
|
lws_mqtt_publish_param_t *pub =
|
|
(lws_mqtt_publish_param_t *)wsi->mqtt->rx_cpkt_param;
|
|
if (pub == NULL) {
|
|
lwsl_err("%s: Uninitialized pub_param\n",
|
|
__func__);
|
|
goto send_protocol_error_and_close;
|
|
}
|
|
|
|
pub->payload = buf;
|
|
goto cmd_completion;
|
|
}
|
|
|
|
case LMQCPP_CONNACK_PACKET:
|
|
if (!lwsi_role_client(wsi)) {
|
|
lwsl_err("%s: CONNACK is only Server to Client",
|
|
__func__);
|
|
goto send_unsupp_connack_and_close;
|
|
}
|
|
|
|
lwsl_debug("%s: received CONNACK pkt\n", __func__);
|
|
lws_mqtt_vbi_init(&par->vbit);
|
|
switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
|
|
case LMSPR_NEED_MORE:
|
|
break;
|
|
case LMSPR_COMPLETED:
|
|
par->cpkt_remlen = par->vbit.value;
|
|
lwsl_debug("%s: CONNACK pkt len = %d\n",
|
|
__func__, (int)par->cpkt_remlen);
|
|
if (par->cpkt_remlen != 2)
|
|
goto send_protocol_error_and_close;
|
|
|
|
par->state = LMQCPP_CONNACK_VH_FLAGS;
|
|
break;
|
|
default:
|
|
lwsl_notice("%s: connack bad vbi\n", __func__);
|
|
goto send_protocol_error_and_close;
|
|
}
|
|
break;
|
|
|
|
case LMQCPP_CONNACK_VH_FLAGS:
|
|
{
|
|
lws_mqttc_t *c = &wsi->mqtt->client;
|
|
par->cpkt_flags = *buf++;
|
|
len--;
|
|
|
|
if (par->cpkt_flags & ~LMQCFT_SESSION_PRESENT) {
|
|
/*
|
|
* Byte 1 is the "Connect Acknowledge
|
|
* Flags". Bits 7-1 are reserved and
|
|
* MUST be set to 0.
|
|
*/
|
|
par->reason = LMQCP_REASON_MALFORMED_PACKET;
|
|
goto send_reason_and_close;
|
|
}
|
|
/*
|
|
* If the Server accepts a connection with
|
|
* CleanSession set to 1, the Server MUST set
|
|
* Session Present to 0 in the CONNACK packet
|
|
* in addition to setting a zero return code
|
|
* in the CONNACK packet [MQTT-3.2.2-1]. If
|
|
* the Server accepts a connection with
|
|
* CleanSession set to 0, the value set in
|
|
* Session Present depends on whether the
|
|
* Server already has stored Session state for
|
|
* the supplied client ID. If the Server has
|
|
* stored Session state, it MUST set
|
|
* SessionPresent to 1 in the CONNACK packet
|
|
* [MQTT-3.2.2-2]. If the Server does not have
|
|
* stored Session state, it MUST set Session
|
|
* Present to 0 in the CONNACK packet. This is
|
|
* in addition to setting a zero return code
|
|
* in the CONNACK packet [MQTT-3.2.2-3].
|
|
*/
|
|
if ((c->conn_flags & LMQCFT_CLEAN_START) &&
|
|
(par->cpkt_flags & LMQCFT_SESSION_PRESENT))
|
|
goto send_protocol_error_and_close;
|
|
|
|
wsi->mqtt->session_resumed = (par->cpkt_flags &
|
|
LMQCFT_SESSION_PRESENT);
|
|
|
|
/* Move on to Connect Return Code */
|
|
par->state = LMQCPP_CONNACK_VH_RETURN_CODE;
|
|
break;
|
|
}
|
|
case LMQCPP_CONNACK_VH_RETURN_CODE:
|
|
par->conn_rc = *buf++;
|
|
len--;
|
|
/*
|
|
* If a server sends a CONNACK packet containing a
|
|
* non-zero return code it MUST then close the Network
|
|
* Connection [MQTT-3.2.2-5]
|
|
*/
|
|
switch (par->conn_rc) {
|
|
case 0:
|
|
goto cmd_completion;
|
|
case 1:
|
|
case 2:
|
|
case 3:
|
|
case 4:
|
|
case 5:
|
|
par->reason = LMQCP_REASON_UNSUPPORTED_PROTOCOL +
|
|
par->conn_rc - 1;
|
|
goto send_reason_and_close;
|
|
default:
|
|
lwsl_notice("%s: bad connack retcode\n", __func__);
|
|
goto send_protocol_error_and_close;
|
|
}
|
|
break;
|
|
|
|
/* SUBACK */
|
|
case LMQCPP_SUBACK_PACKET:
|
|
if (!lwsi_role_client(wsi)) {
|
|
lwsl_err("%s: SUBACK is only Server to Client",
|
|
__func__);
|
|
goto send_unsupp_connack_and_close;
|
|
}
|
|
|
|
lwsl_debug("%s: received SUBACK pkt\n", __func__);
|
|
lws_mqtt_vbi_init(&par->vbit);
|
|
switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
|
|
case LMSPR_NEED_MORE:
|
|
break;
|
|
case LMSPR_COMPLETED:
|
|
par->cpkt_remlen = par->vbit.value;
|
|
lwsl_debug("%s: SUBACK pkt len = %d\n",
|
|
__func__, (int)par->cpkt_remlen);
|
|
if (par->cpkt_remlen <= 2)
|
|
goto send_protocol_error_and_close;
|
|
par->state = LMQCPP_SUBACK_VH_PKT_ID;
|
|
break;
|
|
default:
|
|
lwsl_notice("%s: suback bad vbi\n", __func__);
|
|
goto send_protocol_error_and_close;
|
|
}
|
|
break;
|
|
|
|
case LMQCPP_SUBACK_VH_PKT_ID:
|
|
|
|
if (len < 2) {
|
|
lwsl_notice("%s: len breakage 4\n", __func__);
|
|
return -1;
|
|
}
|
|
|
|
par->cpkt_id = lws_ser_ru16be(buf);
|
|
wsi->mqtt->ack_pkt_id = par->cpkt_id;
|
|
buf += 2;
|
|
len -= 2;
|
|
par->cpkt_remlen -= 2;
|
|
par->n = 0;
|
|
par->state = LMQCPP_SUBACK_PAYLOAD;
|
|
*par->temp = 0;
|
|
break;
|
|
|
|
case LMQCPP_SUBACK_PAYLOAD:
|
|
{
|
|
lws_mqtt_qos_levels_t qos = (lws_mqtt_qos_levels_t)*buf++;
|
|
|
|
len--;
|
|
switch (qos) {
|
|
case QOS0:
|
|
case QOS1:
|
|
case QOS2:
|
|
break;
|
|
case FAILURE_QOS_LEVEL:
|
|
goto send_protocol_error_and_close;
|
|
|
|
default:
|
|
par->reason = LMQCP_REASON_MALFORMED_PACKET;
|
|
goto send_reason_and_close;
|
|
}
|
|
|
|
if (++(par->n) == par->cpkt_remlen) {
|
|
par->n = 0;
|
|
goto cmd_completion;
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
/* UNSUBACK */
|
|
case LMQCPP_UNSUBACK_PACKET:
|
|
if (!lwsi_role_client(wsi)) {
|
|
lwsl_err("%s: UNSUBACK is only Server to Client",
|
|
__func__);
|
|
goto send_unsupp_connack_and_close;
|
|
}
|
|
|
|
lwsl_debug("%s: received UNSUBACK pkt\n", __func__);
|
|
lws_mqtt_vbi_init(&par->vbit);
|
|
switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
|
|
case LMSPR_NEED_MORE:
|
|
break;
|
|
case LMSPR_COMPLETED:
|
|
par->cpkt_remlen = par->vbit.value;
|
|
lwsl_debug("%s: UNSUBACK pkt len = %d\n",
|
|
__func__, (int)par->cpkt_remlen);
|
|
if (par->cpkt_remlen < 2)
|
|
goto send_protocol_error_and_close;
|
|
par->state = LMQCPP_UNSUBACK_VH_PKT_ID;
|
|
break;
|
|
default:
|
|
lwsl_notice("%s: unsuback bad vbi\n", __func__);
|
|
goto send_protocol_error_and_close;
|
|
}
|
|
break;
|
|
|
|
case LMQCPP_UNSUBACK_VH_PKT_ID:
|
|
|
|
if (len < 2) {
|
|
lwsl_notice("%s: len breakage 3\n", __func__);
|
|
return -1;
|
|
}
|
|
|
|
par->cpkt_id = lws_ser_ru16be(buf);
|
|
wsi->mqtt->ack_pkt_id = par->cpkt_id;
|
|
buf += 2;
|
|
len -= 2;
|
|
par->cpkt_remlen -= 2;
|
|
par->n = 0;
|
|
|
|
goto cmd_completion;
|
|
|
|
case LMQCPP_PUBACK_PACKET:
|
|
lws_mqtt_vbi_init(&par->vbit);
|
|
switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
|
|
case LMSPR_NEED_MORE:
|
|
break;
|
|
case LMSPR_COMPLETED:
|
|
par->cpkt_remlen = par->vbit.value;
|
|
lwsl_info("%s: PUBACK pkt len = %d\n", __func__,
|
|
(int)par->cpkt_remlen);
|
|
/*
|
|
* must be 4 or more, with special case that 2
|
|
* means success with no reason code or props
|
|
*/
|
|
if (par->cpkt_remlen <= 1 ||
|
|
par->cpkt_remlen == 3)
|
|
goto send_protocol_error_and_close;
|
|
|
|
par->state = LMQCPP_PUBACK_VH_PKT_ID;
|
|
par->fixed_seen[2] = par->fixed_seen[3] = 0;
|
|
par->fixed = 0;
|
|
par->n = 0;
|
|
break;
|
|
default:
|
|
lwsl_notice("%s: puback bad vbi\n", __func__);
|
|
goto send_protocol_error_and_close;
|
|
}
|
|
break;
|
|
|
|
case LMQCPP_PUBACK_VH_PKT_ID:
|
|
/*
|
|
* There are 3 fixed bytes and then a VBI for the
|
|
* property section length
|
|
*/
|
|
par->fixed_seen[par->fixed++] = *buf++;
|
|
if (len < par->cpkt_remlen - par->n) {
|
|
lwsl_notice("%s: len breakage 4\n", __func__);
|
|
return -1;
|
|
}
|
|
len--;
|
|
par->n++;
|
|
if (par->fixed == 2)
|
|
par->cpkt_id = lws_ser_ru16be(par->fixed_seen);
|
|
|
|
if (par->fixed == 3) {
|
|
lws_mqtt_vbi_init(&par->vbit);
|
|
par->props_consumed = 0;
|
|
par->state = LMQCPP_PUBACK_PROPERTIES_LEN_VBI;
|
|
}
|
|
/* length of 2 is truncated packet and we completed it */
|
|
if (par->cpkt_remlen == par->fixed)
|
|
goto cmd_completion;
|
|
break;
|
|
|
|
case LMQCPP_PUBACK_PROPERTIES_LEN_VBI:
|
|
switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
|
|
case LMSPR_NEED_MORE:
|
|
break;
|
|
case LMSPR_COMPLETED:
|
|
par->props_len = par->vbit.value;
|
|
lwsl_info("%s: PUBACK props len = %d\n",
|
|
__func__, (int)par->cpkt_remlen);
|
|
/*
|
|
* If there are no properties, this is a
|
|
* command completion event in itself
|
|
*/
|
|
if (!par->props_len)
|
|
goto cmd_completion;
|
|
|
|
/*
|
|
* Otherwise consume the properties before
|
|
* completing the command
|
|
*/
|
|
lws_mqtt_vbi_init(&par->vbit);
|
|
par->state = LMQCPP_PUBACK_VH_PKT_ID;
|
|
break;
|
|
default:
|
|
lwsl_notice("%s: puback pr bad vbi\n", __func__);
|
|
goto send_protocol_error_and_close;
|
|
}
|
|
break;
|
|
|
|
case LMQCPP_EAT_PROPERTIES_AND_COMPLETE:
|
|
/*
|
|
* TODO: stash the props
|
|
*/
|
|
par->props_consumed++;
|
|
len--;
|
|
buf++;
|
|
if (par->props_len != par->props_consumed)
|
|
break;
|
|
|
|
cmd_completion:
|
|
/*
|
|
* We come here when we understood we just processed
|
|
* the last byte of a command packet, regardless of the
|
|
* packet type
|
|
*/
|
|
par->state = LMQCPP_IDLE;
|
|
|
|
switch (par->packet_type_flags >> 4) {
|
|
case LMQCP_STOC_CONNACK:
|
|
lwsl_info("%s: cmd_completion: CONNACK\n",
|
|
__func__);
|
|
|
|
/*
|
|
* Getting the CONNACK means we are the first,
|
|
* the nwsi, and we succeeded to create a new
|
|
* network connection ourselves.
|
|
*
|
|
* Since others may join us sharing the nwsi,
|
|
* and we may close while they still want to use
|
|
* it, our wsi lifecycle alone can no longer
|
|
* define the lifecycle of the nwsi... it means
|
|
* we need to do a "magic trick" and instead of
|
|
* being both the nwsi and act like a child
|
|
* stream, create a new wsi to take over the
|
|
* nwsi duties and turn our wsi into a child of
|
|
* the nwsi with its own lifecycle.
|
|
*
|
|
* The nwsi gets a mostly empty wsi->nwsi used
|
|
* to track already-subscribed topics globally
|
|
* for the connection.
|
|
*/
|
|
|
|
/* we were under SENT_CLIENT_HANDSHAKE timeout */
|
|
lws_set_timeout(wsi, 0, 0);
|
|
|
|
w = lws_create_new_server_wsi(wsi->vhost,
|
|
wsi->tsi);
|
|
if (!w) {
|
|
lwsl_notice("%s: sid 1 migrate failed\n",
|
|
__func__);
|
|
return -1;
|
|
}
|
|
|
|
wsi->mux.highest_sid = 1;
|
|
lws_wsi_mux_insert(w, wsi, wsi->mux.highest_sid++);
|
|
|
|
wsi->mux_substream = 1;
|
|
w->mux_substream = 1;
|
|
w->client_mux_substream = 1;
|
|
wsi->client_mux_migrated = 1;
|
|
wsi->told_user_closed = 1; /* don't tell nwsi closed */
|
|
|
|
lwsi_set_state(w, LRS_ESTABLISHED);
|
|
lwsi_set_state(wsi, LRS_ESTABLISHED);
|
|
lwsi_set_role(w, lwsi_role(wsi));
|
|
|
|
#if defined(LWS_WITH_CLIENT)
|
|
w->flags = wsi->flags;
|
|
#endif
|
|
|
|
w->mqtt = wsi->mqtt;
|
|
wsi->mqtt = lws_zalloc(sizeof(*wsi->mqtt), "nwsi mqtt");
|
|
if (!wsi->mqtt)
|
|
return -1;
|
|
w->mqtt->wsi = w;
|
|
w->protocol = wsi->protocol;
|
|
if (w->user_space &&
|
|
!w->user_space_externally_allocated)
|
|
lws_free_set_NULL(w->user_space);
|
|
w->user_space = wsi->user_space;
|
|
wsi->user_space = NULL;
|
|
w->user_space_externally_allocated =
|
|
wsi->user_space_externally_allocated;
|
|
if (lws_ensure_user_space(w))
|
|
goto bail1;
|
|
w->opaque_user_data = wsi->opaque_user_data;
|
|
wsi->opaque_user_data = NULL;
|
|
w->stash = wsi->stash;
|
|
wsi->stash = NULL;
|
|
|
|
lws_mux_mark_immortal(w);
|
|
|
|
lwsl_notice("%s: migrated nwsi %p to sid 1 %p\n",
|
|
__func__, wsi, w);
|
|
|
|
#if defined(LWS_WITH_SERVER_STATUS)
|
|
wsi->vhost->conn_stats.h2_subs++;
|
|
#endif
|
|
|
|
/*
|
|
* It was the last thing we were waiting for
|
|
* before we can be fully ESTABLISHED
|
|
*/
|
|
if (lws_mqtt_set_client_established(w)) {
|
|
lwsl_notice("%s: set EST fail\n", __func__);
|
|
return -1;
|
|
}
|
|
|
|
/* get the ball rolling */
|
|
lws_validity_confirmed(wsi);
|
|
|
|
/* well, add the queued guys as children */
|
|
lws_wsi_mux_apply_queue(wsi);
|
|
break;
|
|
|
|
bail1:
|
|
/* undo the insert */
|
|
wsi->mux.child_list = w->mux.sibling_list;
|
|
wsi->mux.child_count--;
|
|
|
|
w->context->count_wsi_allocated--;
|
|
|
|
if (w->user_space)
|
|
lws_free_set_NULL(w->user_space);
|
|
w->vhost->protocols[0].callback(w,
|
|
LWS_CALLBACK_WSI_DESTROY,
|
|
NULL, NULL, 0);
|
|
lws_vhost_unbind_wsi(w);
|
|
lws_free(w);
|
|
|
|
return 0;
|
|
|
|
case LMQCP_PUBACK:
|
|
lwsl_info("%s: cmd_completion: PUBACK\n",
|
|
__func__);
|
|
|
|
/*
|
|
* Figure out which child asked for this
|
|
*/
|
|
|
|
n = 0;
|
|
lws_start_foreach_ll(struct lws *, w,
|
|
wsi->mux.child_list) {
|
|
if (w->mqtt->unacked_publish &&
|
|
w->mqtt->ack_pkt_id == par->cpkt_id) {
|
|
char requested_close = 0;
|
|
|
|
w->mqtt->unacked_publish = 0;
|
|
if (user_callback_handle_rxflow(
|
|
w->protocol->callback,
|
|
w, LWS_CALLBACK_MQTT_ACK,
|
|
w->user_space, NULL, 0) < 0) {
|
|
lwsl_info("%s: MQTT_ACK requests close\n",
|
|
__func__);
|
|
requested_close = 1;
|
|
}
|
|
n = 1;
|
|
|
|
/*
|
|
* We got an assertive PUBACK,
|
|
* no need for ACK timeout wait
|
|
* any more
|
|
*/
|
|
lws_sul_schedule(lws_get_context(w), 0,
|
|
&w->mqtt->sul_qos1_puback_wait, NULL,
|
|
LWS_SET_TIMER_USEC_CANCEL);
|
|
|
|
if (requested_close) {
|
|
__lws_close_free_wsi(w,
|
|
0, "ack cb");
|
|
break;
|
|
}
|
|
|
|
break;
|
|
}
|
|
} lws_end_foreach_ll(w, mux.sibling_list);
|
|
|
|
if (!n) {
|
|
lwsl_err("%s: unsolicited PUBACK\n",
|
|
__func__);
|
|
return -1;
|
|
}
|
|
|
|
/*
|
|
* If we published something and it was acked,
|
|
* our connection is definitely working in both
|
|
* directions at the moment.
|
|
*/
|
|
lws_validity_confirmed(wsi);
|
|
break;
|
|
|
|
case LMQCP_STOC_PINGRESP:
|
|
lwsl_info("%s: cmd_completion: PINGRESP\n",
|
|
__func__);
|
|
/*
|
|
* If we asked for a PINGRESP and it came,
|
|
* our connection is definitely working in both
|
|
* directions at the moment.
|
|
*/
|
|
lws_validity_confirmed(wsi);
|
|
break;
|
|
|
|
case LMQCP_STOC_SUBACK:
|
|
lwsl_info("%s: cmd_completion: SUBACK\n",
|
|
__func__);
|
|
|
|
/*
|
|
* Figure out which child asked for this
|
|
*/
|
|
|
|
n = 0;
|
|
lws_start_foreach_ll(struct lws *, w,
|
|
wsi->mux.child_list) {
|
|
if (w->mqtt->inside_subscribe &&
|
|
w->mqtt->ack_pkt_id == par->cpkt_id) {
|
|
w->mqtt->inside_subscribe = 0;
|
|
if (user_callback_handle_rxflow(
|
|
w->protocol->callback,
|
|
w, LWS_CALLBACK_MQTT_SUBSCRIBED,
|
|
w->user_space, NULL, 0) < 0) {
|
|
lwsl_err("%s: MQTT_SUBSCRIBE failed\n",
|
|
__func__);
|
|
return -1;
|
|
}
|
|
n = 1;
|
|
break;
|
|
}
|
|
} lws_end_foreach_ll(w, mux.sibling_list);
|
|
|
|
if (!n) {
|
|
lwsl_err("%s: unsolicited SUBACK\n",
|
|
__func__);
|
|
return -1;
|
|
}
|
|
|
|
/*
|
|
* If we subscribed to something and SUBACK came,
|
|
* our connection is definitely working in both
|
|
* directions at the moment.
|
|
*/
|
|
lws_validity_confirmed(wsi);
|
|
|
|
break;
|
|
|
|
case LMQCP_STOC_UNSUBACK:
|
|
{
|
|
char requested_close = 0;
|
|
lwsl_info("%s: cmd_completion: UNSUBACK\n",
|
|
__func__);
|
|
/*
|
|
* Figure out which child asked for this
|
|
*/
|
|
n = 0;
|
|
lws_start_foreach_ll(struct lws *, w,
|
|
wsi->mux.child_list) {
|
|
if (w->mqtt->inside_unsubscribe &&
|
|
w->mqtt->ack_pkt_id == par->cpkt_id) {
|
|
struct lws *nwsi = lws_get_network_wsi(w);
|
|
|
|
/*
|
|
* No more subscribers left,
|
|
* remove the topic from nwsi
|
|
*/
|
|
lws_mqtt_client_remove_subs(nwsi->mqtt);
|
|
|
|
w->mqtt->inside_unsubscribe = 0;
|
|
if (user_callback_handle_rxflow(
|
|
w->protocol->callback,
|
|
w, LWS_CALLBACK_MQTT_UNSUBSCRIBED,
|
|
w->user_space, NULL, 0) < 0) {
|
|
lwsl_info("%s: MQTT_UNSUBACK requests close\n",
|
|
__func__);
|
|
requested_close = 1;
|
|
}
|
|
n = 1;
|
|
|
|
if (requested_close) {
|
|
__lws_close_free_wsi(w,
|
|
0, "unsub ack cb");
|
|
break;
|
|
}
|
|
break;
|
|
}
|
|
} lws_end_foreach_ll(w, mux.sibling_list);
|
|
|
|
if (!n) {
|
|
lwsl_err("%s: unsolicited UNSUBACK\n",
|
|
__func__);
|
|
return -1;
|
|
}
|
|
|
|
|
|
/*
|
|
* If we unsubscribed to something and
|
|
* UNSUBACK came, our connection is
|
|
* definitely working in both
|
|
* directions at the moment.
|
|
*/
|
|
lws_validity_confirmed(wsi);
|
|
|
|
break;
|
|
}
|
|
case LMQCP_PUBLISH:
|
|
{
|
|
lws_mqtt_publish_param_t *pub =
|
|
(lws_mqtt_publish_param_t *)
|
|
wsi->mqtt->rx_cpkt_param;
|
|
size_t chunk;
|
|
|
|
if (pub == NULL) {
|
|
lwsl_notice("%s: no pub\n", __func__);
|
|
return -1;
|
|
}
|
|
|
|
/*
|
|
* RX PUBLISH is delivered to any children that
|
|
* registered for the related topic
|
|
*/
|
|
|
|
n = wsi->role_ops->rx_cb[lwsi_role_server(wsi)];
|
|
|
|
chunk = pub->payload_len - pub->payload_pos;
|
|
if (chunk > len)
|
|
chunk = len;
|
|
|
|
lws_start_foreach_ll(struct lws *, w,
|
|
wsi->mux.child_list) {
|
|
if (lws_mqtt_find_sub(w->mqtt,
|
|
pub->topic))
|
|
if (w->protocol->callback(
|
|
w, n,
|
|
w->user_space,
|
|
(void *)pub,
|
|
chunk))
|
|
return 1;
|
|
} lws_end_foreach_ll(w, mux.sibling_list);
|
|
|
|
|
|
pub->payload_pos += (uint32_t)chunk;
|
|
len -= chunk;
|
|
buf += chunk;
|
|
|
|
lwsl_debug("%s: post pos %d, plen %d, len %d\n",
|
|
__func__, (int)pub->payload_pos,
|
|
(int)pub->payload_len, (int)len);
|
|
|
|
if (pub->payload_pos != pub->payload_len) {
|
|
/*
|
|
* More chunks of the payload pending,
|
|
* blocking this connection from doing
|
|
* anything else
|
|
*/
|
|
par->state = LMQCPP_PAYLOAD;
|
|
break;
|
|
}
|
|
|
|
/* For QOS>0, send out PUBACK */
|
|
if (pub->qos) {
|
|
wsi->mqtt->send_puback = 1;
|
|
lws_callback_on_writable(wsi);
|
|
}
|
|
|
|
par->payload_consumed = 0;
|
|
lws_free_set_NULL(pub->topic);
|
|
lws_free_set_NULL(wsi->mqtt->rx_cpkt_param);
|
|
|
|
break;
|
|
}
|
|
default:
|
|
break;
|
|
}
|
|
|
|
break;
|
|
|
|
|
|
case LMQCPP_PROP_ID_VBI:
|
|
switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
|
|
case LMSPR_NEED_MORE:
|
|
break;
|
|
case LMSPR_COMPLETED:
|
|
par->consumed += par->vbit.consumed;
|
|
if (par->vbit.value >
|
|
LWS_ARRAY_SIZE(property_valid)) {
|
|
lwsl_notice("%s: undef prop id 0x%x\n",
|
|
__func__, (int)par->vbit.value);
|
|
goto send_protocol_error_and_close;
|
|
}
|
|
if (!(property_valid[par->vbit.value] &
|
|
(1 << ctl_pkt_type(par)))) {
|
|
lwsl_notice("%s: prop id 0x%x invalid for"
|
|
" control pkt %d\n", __func__,
|
|
(int)par->vbit.value,
|
|
ctl_pkt_type(par));
|
|
goto send_protocol_error_and_close;
|
|
}
|
|
par->prop_id = par->vbit.value;
|
|
par->flag_prop_multi =
|
|
par->props_seen[par->prop_id >> 3] &
|
|
(1 << (par->prop_id & 7));
|
|
par->props_seen[par->prop_id >> 3] |=
|
|
(1 << (par->prop_id & 7));
|
|
/*
|
|
* even if it's not a vbi property arg,
|
|
* .consumed of this will be zero the first time
|
|
*/
|
|
lws_mqtt_vbi_init(&par->vbit);
|
|
/*
|
|
* if it's a string, next state must set the
|
|
* destination and size limit itself. But
|
|
* resetting it generically here lets it use
|
|
* lws_mqtt_str_first() to understand it's the
|
|
* first time around.
|
|
*/
|
|
lws_mqtt_str_init(&par->s_temp, NULL, 0, 0);
|
|
|
|
/* property arg state enums are so encoded */
|
|
par->state = 0x100 | par->vbit.value;
|
|
break;
|
|
default:
|
|
lwsl_notice("%s: prop id bad vbi\n", __func__);
|
|
goto send_protocol_error_and_close;
|
|
}
|
|
break;
|
|
|
|
/*
|
|
* All possible property payloads... restricting which ones
|
|
* can appear in which control packets is already done above
|
|
* in LMQCPP_PROP_ID_VBI
|
|
*/
|
|
|
|
case LMQCPP_PROP_REQUEST_PROBLEM_INFO_1BYTE:
|
|
case LMQCPP_PROP_REQUEST_REPSONSE_INFO_1BYTE:
|
|
case LMQCPP_PROP_MAXIMUM_QOS_1BYTE:
|
|
case LMQCPP_PROP_RETAIN_AVAILABLE_1BYTE:
|
|
case LMQCPP_PROP_WILDCARD_SUBSCRIPTION_AVAILABLE_1BYTE:
|
|
case LMQCPP_PROP_SUBSCRIPTION_IDENTIFIER_AVAILABLE_1BYTE:
|
|
case LMQCPP_PROP_SHARED_SUBSCRIPTION_AVAILABLE_1BYTE:
|
|
case LMQCPP_PROP_PAYLOAD_FORMAT_INDICATOR_1BYTE: /* 3.3.2.3.2 */
|
|
if (par->flag_prop_multi)
|
|
goto singular_prop_seen_twice;
|
|
par->payload_format = *buf++;
|
|
len--;
|
|
if (lws_mqtt_pconsume(par, 1))
|
|
goto send_protocol_error_and_close;
|
|
break;
|
|
|
|
case LMQCPP_PROP_MAXIMUM_PACKET_SIZE_4BYTE:
|
|
case LMQCPP_PROP_WILL_DELAY_INTERVAL_4BYTE:
|
|
case LMQCPP_PROP_SESSION_EXPIRY_INTERVAL_4BYTE:
|
|
case LMQCPP_PROP_MSG_EXPIRY_INTERVAL_4BYTE:
|
|
if (par->flag_prop_multi)
|
|
goto singular_prop_seen_twice;
|
|
|
|
if (lws_mqtt_mb_first(&par->vbit))
|
|
lws_mqtt_4byte_init(&par->vbit);
|
|
|
|
switch (lws_mqtt_mb_parse(&par->vbit, &buf, &len)) {
|
|
case LMSPR_NEED_MORE:
|
|
break;
|
|
case LMSPR_COMPLETED:
|
|
if (lws_mqtt_pconsume(par, par->vbit.consumed))
|
|
goto send_protocol_error_and_close;
|
|
break;
|
|
default:
|
|
goto send_protocol_error_and_close;
|
|
}
|
|
break;
|
|
|
|
case LMQCPP_PROP_SERVER_KEEPALIVE_2BYTE:
|
|
case LMQCPP_PROP_RECEIVE_MAXIMUM_2BYTE:
|
|
case LMQCPP_PROP_TOPIC_MAXIMUM_2BYTE:
|
|
case LMQCPP_PROP_TOPIC_ALIAS_2BYTE:
|
|
if (par->flag_prop_multi)
|
|
goto singular_prop_seen_twice;
|
|
|
|
if (lws_mqtt_mb_first(&par->vbit))
|
|
lws_mqtt_2byte_init(&par->vbit);
|
|
|
|
switch (lws_mqtt_mb_parse(&par->vbit, &buf, &len)) {
|
|
case LMSPR_NEED_MORE:
|
|
break;
|
|
case LMSPR_COMPLETED:
|
|
if (lws_mqtt_pconsume(par, par->vbit.consumed))
|
|
goto send_protocol_error_and_close;
|
|
break;
|
|
default:
|
|
goto send_protocol_error_and_close;
|
|
}
|
|
break;
|
|
|
|
case LMQCPP_PROP_ASSIGNED_CLIENTID_UTF8S:
|
|
case LMQCPP_PROP_AUTH_METHOD_UTF8S:
|
|
case LMQCPP_PROP_USER_PROPERTY_NAME_UTF8S:
|
|
case LMQCPP_PROP_USER_PROPERTY_VALUE_UTF8S:
|
|
case LMQCPP_PROP_RESPONSE_INFO_UTF8S:
|
|
case LMQCPP_PROP_SERVER_REFERENCE_UTF8S:
|
|
case LMQCPP_PROP_REASON_STRING_UTF8S:
|
|
case LMQCPP_PROP_RESPONSE_TOPIC_UTF8S:
|
|
case LMQCPP_PROP_CONTENT_TYPE_UTF8S:
|
|
if (par->flag_prop_multi)
|
|
goto singular_prop_seen_twice;
|
|
|
|
if (lws_mqtt_str_first(&par->s_temp))
|
|
lws_mqtt_str_init(&par->s_temp, par->temp,
|
|
sizeof(par->temp), 0);
|
|
|
|
switch (lws_mqtt_str_parse(&par->s_temp, &buf, &len)) {
|
|
case LMSPR_NEED_MORE:
|
|
break;
|
|
case LMSPR_COMPLETED:
|
|
if (lws_mqtt_pconsume(par, par->s_temp.len))
|
|
goto send_protocol_error_and_close;
|
|
break;
|
|
|
|
default:
|
|
lwsl_info("%s: bad protocol name\n", __func__);
|
|
goto send_protocol_error_and_close;
|
|
}
|
|
break;
|
|
|
|
case LMQCPP_PROP_SUBSCRIPTION_ID_VBI:
|
|
|
|
case LMQCPP_PROP_CORRELATION_BINDATA:
|
|
case LMQCPP_PROP_AUTH_DATA_BINDATA:
|
|
|
|
/* TODO */
|
|
lwsl_err("%s: Unimplemented packet state 0x%x\n",
|
|
__func__, par->state);
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
return 0;
|
|
|
|
oom:
|
|
lwsl_err("%s: OOM!\n", __func__);
|
|
goto send_protocol_error_and_close;
|
|
|
|
singular_prop_seen_twice:
|
|
lwsl_info("%s: property appears twice\n", __func__);
|
|
|
|
send_protocol_error_and_close:
|
|
lwsl_notice("%s: peac\n", __func__);
|
|
par->reason = LMQCP_REASON_PROTOCOL_ERROR;
|
|
|
|
send_reason_and_close:
|
|
lwsl_notice("%s: srac\n", __func__);
|
|
par->flag_pending_send_reason_close = 1;
|
|
goto ask;
|
|
|
|
send_unsupp_connack_and_close:
|
|
lwsl_notice("%s: unsupac\n", __func__);
|
|
par->reason = LMQCP_REASON_UNSUPPORTED_PROTOCOL;
|
|
par->flag_pending_send_connack_close = 1;
|
|
|
|
ask:
|
|
/* Should we ask for clients? */
|
|
lws_callback_on_writable(wsi);
|
|
|
|
return -1;
|
|
}
|
|
|
|
int
|
|
lws_mqtt_fill_fixed_header(uint8_t *p, lws_mqtt_control_packet_t ctrl_pkt_type,
|
|
uint8_t dup, lws_mqtt_qos_levels_t qos,
|
|
uint8_t retain)
|
|
{
|
|
lws_mqtt_fixed_hdr_t hdr;
|
|
|
|
hdr.bits = 0;
|
|
hdr.flags.ctrl_pkt_type = (uint8_t) ctrl_pkt_type;
|
|
|
|
switch(ctrl_pkt_type) {
|
|
case LMQCP_PUBLISH:
|
|
hdr.flags.dup = !!dup;
|
|
/*
|
|
* A PUBLISH Packet MUST NOT have both QoS bits set to
|
|
* 1. If a Server or Client receives a PUBLISH Packet
|
|
* which has both QoS bits set to 1 it MUST close the
|
|
* Network Connection [MQTT-3.3.1-4].
|
|
*/
|
|
if (qos >= RESERVED_QOS_LEVEL) {
|
|
lwsl_err("%s: Unsupport QoS level 0x%x\n",
|
|
__func__, qos);
|
|
return -1;
|
|
}
|
|
hdr.flags.qos = (uint8_t)qos;
|
|
hdr.flags.retain = !!retain;
|
|
break;
|
|
|
|
case LMQCP_CTOS_CONNECT:
|
|
case LMQCP_STOC_CONNACK:
|
|
case LMQCP_PUBACK:
|
|
case LMQCP_PUBREC:
|
|
case LMQCP_PUBCOMP:
|
|
case LMQCP_STOC_SUBACK:
|
|
case LMQCP_STOC_UNSUBACK:
|
|
case LMQCP_CTOS_PINGREQ:
|
|
case LMQCP_STOC_PINGRESP:
|
|
case LMQCP_DISCONNECT:
|
|
case LMQCP_AUTH:
|
|
hdr.bits &= 0xf0;
|
|
break;
|
|
|
|
/*
|
|
* Bits 3,2,1 and 0 of the fixed header of the PUBREL,
|
|
* SUBSCRIBE, UNSUBSCRIBE Control Packets are reserved and
|
|
* MUST be set to 0,0,1 and 0 respectively. The Server MUST
|
|
* treat any other value as malformed and close the Network
|
|
* Connection [MQTT-3.6.1-1], [MQTT-3.8.1-1], [MQTT-3.10.1-1].
|
|
*/
|
|
case LMQCP_PUBREL:
|
|
case LMQCP_CTOS_SUBSCRIBE:
|
|
case LMQCP_CTOS_UNSUBSCRIBE:
|
|
hdr.bits |= 0x02;
|
|
break;
|
|
|
|
default:
|
|
return -1;
|
|
}
|
|
|
|
*p = hdr.bits;
|
|
|
|
return 0;
|
|
}
|
|
|
|
/*
|
|
* This fires if the wsi did a PUBLISH under QoS1, but no PUBACK came before
|
|
* the timeout period
|
|
*/
|
|
|
|
static void
|
|
lws_mqtt_publish_resend(struct lws_sorted_usec_list *sul)
|
|
{
|
|
struct _lws_mqtt_related *mqtt = lws_container_of(sul,
|
|
struct _lws_mqtt_related, sul_qos1_puback_wait);
|
|
|
|
lwsl_notice("%s: wsi %p\n", __func__, mqtt->wsi);
|
|
|
|
if (mqtt->wsi->protocol->callback(mqtt->wsi, LWS_CALLBACK_MQTT_RESEND,
|
|
mqtt->wsi->user_space, NULL, 0))
|
|
lws_set_timeout(mqtt->wsi, 1, LWS_TO_KILL_ASYNC);
|
|
}
|
|
|
|
int
|
|
lws_mqtt_client_send_publish(struct lws *wsi, lws_mqtt_publish_param_t *pub,
|
|
const void *buf, uint32_t len, int is_complete)
|
|
{
|
|
struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
|
|
uint8_t *b = (uint8_t *)pt->serv_buf, *start, *p;
|
|
struct lws *nwsi = lws_get_network_wsi(wsi);
|
|
lws_mqtt_str_t mqtt_vh_payload;
|
|
uint32_t vh_len, rem_len;
|
|
|
|
assert(pub->topic);
|
|
|
|
lwsl_debug("%s: len = %d, is_complete = %d\n",
|
|
__func__, (int)len, (int)is_complete);
|
|
|
|
if (lwsi_state(wsi) != LRS_ESTABLISHED) {
|
|
lwsl_err("%s: wsi %p: unknown state 0x%x\n", __func__, wsi,
|
|
lwsi_state(wsi));
|
|
assert(0);
|
|
return 1;
|
|
}
|
|
|
|
if (wsi->mqtt->inside_payload) {
|
|
/*
|
|
* Headers are filled, we are sending
|
|
* the payload - a buffer with LWS_PRE
|
|
* in front it.
|
|
*/
|
|
start = (uint8_t *)buf;
|
|
p = start + len;
|
|
if (is_complete)
|
|
wsi->mqtt->inside_payload = 0;
|
|
goto do_write;
|
|
}
|
|
|
|
start = b + LWS_PRE;
|
|
p = start;
|
|
/*
|
|
* Fill headers and the first chunk of the
|
|
* payload (if any)
|
|
*/
|
|
if (lws_mqtt_fill_fixed_header(p++, LMQCP_PUBLISH,
|
|
0, pub->qos, 0)) {
|
|
lwsl_err("%s: Failed to fill fixed header\n", __func__);
|
|
return 1;
|
|
}
|
|
|
|
/*
|
|
* Topic len field + Topic len + Packet ID
|
|
* (for QOS>0) + Payload len
|
|
*/
|
|
vh_len = 2 + pub->topic_len + ((pub->qos) ? 2 : 0);
|
|
rem_len = vh_len + pub->payload_len;
|
|
lwsl_debug("%s: Remaining len = %d\n", __func__, (int) rem_len);
|
|
|
|
/* Will the chunk of payload fit? */
|
|
if ((vh_len + len) >=
|
|
(wsi->context->pt_serv_buf_size - LWS_PRE)) {
|
|
lwsl_err("%s: Payload is too big\n", __func__);
|
|
return 1;
|
|
}
|
|
|
|
p += lws_mqtt_vbi_encode(rem_len, p);
|
|
|
|
/* Topic's Len */
|
|
lws_ser_wu16be(p, pub->topic_len);
|
|
p += 2;
|
|
|
|
/*
|
|
* Init lws_mqtt_str for "MQTT Variable
|
|
* Headers + payload" (only the supplied
|
|
* chuncked payload)
|
|
*/
|
|
lws_mqtt_str_init(&mqtt_vh_payload, (uint8_t *)p,
|
|
(pub->topic_len + ((pub->qos) ? 2 : 0) + len),
|
|
0);
|
|
|
|
p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
|
|
lws_strncpy((char *)p, pub->topic, (size_t)pub->topic_len+1);
|
|
if (lws_mqtt_str_advance(&mqtt_vh_payload, pub->topic_len)) {
|
|
lwsl_err("%s: a\n", __func__);
|
|
return 1;
|
|
}
|
|
|
|
/* Packet ID */
|
|
if (pub->qos != QOS0) {
|
|
p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
|
|
wsi->mqtt->ack_pkt_id = pub->packet_id = ++nwsi->mqtt->pkt_id;
|
|
lwsl_debug("%s: pkt_id = %d\n", __func__,
|
|
(int)wsi->mqtt->ack_pkt_id);
|
|
lws_ser_wu16be(p, pub->packet_id);
|
|
if (lws_mqtt_str_advance(&mqtt_vh_payload, 2)) {
|
|
lwsl_err("%s: b\n", __func__);
|
|
return 1;
|
|
}
|
|
}
|
|
/*
|
|
* A non-empty Payload is expected and a chunk
|
|
* is present
|
|
*/
|
|
if (pub->payload_len && len) {
|
|
p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
|
|
memcpy(p, buf, len);
|
|
if (lws_mqtt_str_advance(&mqtt_vh_payload, len))
|
|
return 1;
|
|
p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
|
|
}
|
|
|
|
if (!is_complete)
|
|
nwsi->mqtt->inside_payload = wsi->mqtt->inside_payload = 1;
|
|
|
|
do_write:
|
|
|
|
// lwsl_hexdump_err(start, lws_ptr_diff(p, start));
|
|
|
|
if (lws_write(nwsi, start, lws_ptr_diff(p, start), LWS_WRITE_BINARY) !=
|
|
lws_ptr_diff(p, start)) {
|
|
lwsl_err("%s: write failed\n", __func__);
|
|
return 1;
|
|
}
|
|
|
|
if (!is_complete) {
|
|
/* still some more chunks to come... */
|
|
lws_callback_on_writable(wsi);
|
|
|
|
return 0;
|
|
}
|
|
|
|
wsi->mqtt->inside_payload = nwsi->mqtt->inside_payload = 0;
|
|
|
|
if (pub->qos != QOS0)
|
|
wsi->mqtt->unacked_publish = 1;
|
|
|
|
/* this was the last part of the publish message */
|
|
|
|
if (pub->qos == QOS0) {
|
|
/*
|
|
* There won't be any real PUBACK, act like we got one
|
|
* so the user callback logic is the same for QoS0 or
|
|
* QoS1
|
|
*/
|
|
if (wsi->protocol->callback(wsi, LWS_CALLBACK_MQTT_ACK,
|
|
wsi->user_space, NULL, 0)) {
|
|
lwsl_err("%s: ACK callback exited\n", __func__);
|
|
return 1;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
/* For QoS1, if no PUBACK coming after 3s, we must RETRY the publish */
|
|
|
|
wsi->mqtt->sul_qos1_puback_wait.cb = lws_mqtt_publish_resend;
|
|
__lws_sul_insert(&pt->pt_sul_owner, &wsi->mqtt->sul_qos1_puback_wait,
|
|
3 * LWS_USEC_PER_SEC);
|
|
|
|
return 0;
|
|
}
|
|
|
|
int
|
|
lws_mqtt_client_send_subcribe(struct lws *wsi, lws_mqtt_subscribe_param_t *sub)
|
|
{
|
|
struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
|
|
uint8_t *b = (uint8_t *)pt->serv_buf + LWS_PRE, *start = b, *p = start;
|
|
struct lws *nwsi = lws_get_network_wsi(wsi);
|
|
lws_mqtt_str_t mqtt_vh_payload;
|
|
uint8_t exists[8], extant;
|
|
lws_mqtt_subs_t *mysub;
|
|
uint32_t rem_len;
|
|
#if defined(_DEBUG)
|
|
uint32_t tops;
|
|
#endif
|
|
uint32_t n;
|
|
|
|
assert(sub->num_topics);
|
|
assert(sub->num_topics < sizeof(exists));
|
|
|
|
switch (lwsi_state(wsi)) {
|
|
case LRS_ESTABLISHED: /* Protocol connection established */
|
|
if (lws_mqtt_fill_fixed_header(p++, LMQCP_CTOS_SUBSCRIBE,
|
|
0, 0, 0)) {
|
|
lwsl_err("%s: Failed to fill fixed header\n", __func__);
|
|
return 1;
|
|
}
|
|
|
|
/*
|
|
* The stream wants to subscribe to one or more topic, but
|
|
* the shared nwsi may already be subscribed to some or all of
|
|
* them from interactions with other streams. For those cases,
|
|
* we filter them from the list the child wants until we just
|
|
* have ones that are new to the nwsi. If nothing left, we just
|
|
* synthesize the callback to the child as if SUBACK had come
|
|
* and we're done, otherwise just ask the server for topics that
|
|
* are new to the wsi.
|
|
*/
|
|
|
|
extant = 0;
|
|
memset(&exists, 0, sizeof(exists));
|
|
for (n = 0; n < sub->num_topics; n++) {
|
|
lwsl_info("%s: Subscribing to topic[%d] = \"%s\"\n",
|
|
__func__, (int)n, sub->topic[n].name);
|
|
|
|
mysub = lws_mqtt_find_sub(nwsi->mqtt, sub->topic[n].name);
|
|
if (mysub && mysub->ref_count) {
|
|
mysub->ref_count++; /* another stream using it */
|
|
exists[n] = 1;
|
|
extant++;
|
|
}
|
|
|
|
/*
|
|
* Attach the topic we're subscribing to, to wsi->mqtt
|
|
*/
|
|
if (!lws_mqtt_create_sub(wsi->mqtt, sub->topic[n].name)) {
|
|
lwsl_err("%s: create sub fail\n", __func__);
|
|
return 1;
|
|
}
|
|
}
|
|
|
|
if (extant == sub->num_topics) {
|
|
/*
|
|
* It turns out there's nothing to do here, the nwsi has
|
|
* already subscribed to all the topics this stream
|
|
* wanted. Just tell it it can have them.
|
|
*/
|
|
lwsl_notice("%s: all topics already subscribed\n", __func__);
|
|
if (user_callback_handle_rxflow(
|
|
wsi->protocol->callback,
|
|
wsi, LWS_CALLBACK_MQTT_SUBSCRIBED,
|
|
wsi->user_space, NULL, 0) < 0) {
|
|
lwsl_err("%s: MQTT_SUBSCRIBE failed\n",
|
|
__func__);
|
|
return -1;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
#if defined(_DEBUG)
|
|
/*
|
|
* zero or more of the topics already existed, but not all,
|
|
* so we must go to the server with a filtered list of the
|
|
* new ones only
|
|
*/
|
|
|
|
tops = sub->num_topics - extant;
|
|
#endif
|
|
|
|
/*
|
|
* Pid + (Topic len field + Topic len + Req. QoS) x Num of Topics
|
|
*/
|
|
rem_len = 2;
|
|
for (n = 0; n < sub->num_topics; n++)
|
|
if (!exists[n])
|
|
rem_len += (2 + (uint32_t)strlen(sub->topic[n].name) + (uint32_t)1);
|
|
|
|
wsi->mqtt->sub_size = rem_len;
|
|
|
|
#if defined(_DEBUG)
|
|
lwsl_debug("%s: Number of topics = %d, Remaining len = %d\n",
|
|
__func__, (int)tops, (int)rem_len);
|
|
#endif
|
|
|
|
p += lws_mqtt_vbi_encode(rem_len, p);
|
|
|
|
if ((rem_len + lws_ptr_diff(p, start)) >=
|
|
wsi->context->pt_serv_buf_size) {
|
|
lwsl_err("%s: Payload is too big\n", __func__);
|
|
return 1;
|
|
}
|
|
|
|
/* Init lws_mqtt_str */
|
|
lws_mqtt_str_init(&mqtt_vh_payload, (uint8_t *)p, rem_len, 0);
|
|
p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
|
|
|
|
/* Packet ID */
|
|
wsi->mqtt->ack_pkt_id = ++nwsi->mqtt->pkt_id;
|
|
lwsl_debug("%s: pkt_id = %d\n", __func__,
|
|
(int)wsi->mqtt->ack_pkt_id);
|
|
lws_ser_wu16be(p, wsi->mqtt->ack_pkt_id);
|
|
|
|
if (lws_mqtt_str_advance(&mqtt_vh_payload, 2))
|
|
return 1;
|
|
|
|
p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
|
|
|
|
for (n = 0; n < sub->num_topics; n++) {
|
|
lwsl_info("%s: topics[%d] = %s\n", __func__,
|
|
(int)n, sub->topic[n].name);
|
|
|
|
/* if the nwsi already has it, don't ask server for it */
|
|
if (exists[n]) {
|
|
lwsl_info("%s: topics[%d] \"%s\" exists in nwsi\n",
|
|
__func__, (int)n, sub->topic[n].name);
|
|
continue;
|
|
}
|
|
|
|
/*
|
|
* Attach the topic we're subscribing to, to nwsi->mqtt
|
|
* so we know the nwsi itself has a subscription to it
|
|
*/
|
|
|
|
if (!lws_mqtt_create_sub(nwsi->mqtt, sub->topic[n].name))
|
|
return 1;
|
|
|
|
/* Topic's Len */
|
|
lws_ser_wu16be(p, (uint16_t)strlen(sub->topic[n].name));
|
|
if (lws_mqtt_str_advance(&mqtt_vh_payload, 2))
|
|
return 1;
|
|
p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
|
|
|
|
/* Topic Name */
|
|
lws_strncpy((char *)p, sub->topic[n].name,
|
|
strlen(sub->topic[n].name) + 1);
|
|
if (lws_mqtt_str_advance(&mqtt_vh_payload,
|
|
(int)strlen(sub->topic[n].name)))
|
|
return 1;
|
|
p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
|
|
|
|
/* QoS */
|
|
*p = sub->topic[n].qos;
|
|
if (lws_mqtt_str_advance(&mqtt_vh_payload, 1))
|
|
return 1;
|
|
p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
|
|
}
|
|
break;
|
|
|
|
default:
|
|
return 1;
|
|
}
|
|
|
|
if (lws_write(nwsi, start, lws_ptr_diff(p, start), LWS_WRITE_BINARY) !=
|
|
lws_ptr_diff(p, start))
|
|
return 1;
|
|
|
|
wsi->mqtt->inside_subscribe = 1;
|
|
|
|
return 0;
|
|
}
|
|
|
|
int
|
|
lws_mqtt_client_send_unsubcribe(struct lws *wsi,
|
|
const lws_mqtt_subscribe_param_t *unsub)
|
|
{
|
|
struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
|
|
uint8_t *b = (uint8_t *)pt->serv_buf + LWS_PRE, *start = b, *p = start;
|
|
struct lws *nwsi = lws_get_network_wsi(wsi);
|
|
lws_mqtt_str_t mqtt_vh_payload;
|
|
uint8_t send_unsub[8], orphaned;
|
|
uint32_t rem_len, n;
|
|
lws_mqtt_subs_t *mysub;
|
|
#if defined(_DEBUG)
|
|
uint32_t tops;
|
|
#endif
|
|
|
|
lwsl_info("%s: Enter\n", __func__);
|
|
|
|
switch (lwsi_state(wsi)) {
|
|
case LRS_ESTABLISHED: /* Protocol connection established */
|
|
orphaned = 0;
|
|
memset(&send_unsub, 0, sizeof(send_unsub));
|
|
for (n = 0; n < unsub->num_topics; n++) {
|
|
mysub = lws_mqtt_find_sub(nwsi->mqtt,
|
|
unsub->topic[n].name);
|
|
assert(mysub);
|
|
|
|
if (--mysub->ref_count == 0) {
|
|
lwsl_notice("%s: Need to send UNSUB\n", __func__);
|
|
send_unsub[n] = 1;
|
|
orphaned++;
|
|
}
|
|
}
|
|
|
|
if (!orphaned) {
|
|
/*
|
|
* The nwsi still has other subscribers bound to the
|
|
* topics.
|
|
*
|
|
* So, don't send UNSUB to server, and just fake the
|
|
* UNSUB ACK event for the guy going away.
|
|
*/
|
|
lwsl_notice("%s: unsubscribed!\n", __func__);
|
|
if (user_callback_handle_rxflow(
|
|
wsi->protocol->callback,
|
|
wsi, LWS_CALLBACK_MQTT_UNSUBSCRIBED,
|
|
wsi->user_space, NULL, 0) < 0) {
|
|
/*
|
|
* We can't directly close here, because the
|
|
* caller still has the wsi. Inform the
|
|
* caller that we want to close
|
|
*/
|
|
|
|
return 1;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
#if defined(_DEBUG)
|
|
/*
|
|
* one or more of the topics needs to be unsubscribed
|
|
* from, so we must go to the server with a filtered
|
|
* list of the new ones only
|
|
*/
|
|
|
|
tops = orphaned;
|
|
#endif
|
|
|
|
if (lws_mqtt_fill_fixed_header(p++, LMQCP_CTOS_UNSUBSCRIBE,
|
|
0, 0, 0)) {
|
|
lwsl_err("%s: Failed to fill fixed header\n", __func__);
|
|
return 1;
|
|
}
|
|
|
|
/*
|
|
* Pid + (Topic len field + Topic len) x Num of Topics
|
|
*/
|
|
rem_len = 2;
|
|
for (n = 0; n < unsub->num_topics; n++)
|
|
if (send_unsub[n])
|
|
rem_len += (2 + (uint32_t)strlen(unsub->topic[n].name));
|
|
|
|
wsi->mqtt->sub_size = rem_len;
|
|
|
|
lwsl_debug("%s: Number of topics = %d, Remaining len = %d\n",
|
|
__func__, (int)tops, (int)rem_len);
|
|
|
|
p += lws_mqtt_vbi_encode(rem_len, p);
|
|
|
|
if ((rem_len + lws_ptr_diff(p, start)) >=
|
|
wsi->context->pt_serv_buf_size) {
|
|
lwsl_err("%s: Payload is too big\n", __func__);
|
|
return 1;
|
|
}
|
|
|
|
/* Init lws_mqtt_str */
|
|
lws_mqtt_str_init(&mqtt_vh_payload, (uint8_t *)p, rem_len, 0);
|
|
p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
|
|
|
|
/* Packet ID */
|
|
wsi->mqtt->ack_pkt_id = ++nwsi->mqtt->pkt_id;
|
|
lwsl_debug("%s: pkt_id = %d\n", __func__,
|
|
(int)wsi->mqtt->ack_pkt_id);
|
|
lws_ser_wu16be(p, wsi->mqtt->ack_pkt_id);
|
|
|
|
if (lws_mqtt_str_advance(&mqtt_vh_payload, 2))
|
|
return 1;
|
|
|
|
p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
|
|
|
|
for (n = 0; n < unsub->num_topics; n++) {
|
|
lwsl_info("%s: topics[%d] = %s\n", __func__,
|
|
(int)n, unsub->topic[n].name);
|
|
|
|
/*
|
|
* Subscriber still bound to it, don't UBSUB
|
|
* from the server
|
|
*/
|
|
if (!send_unsub[n])
|
|
continue;
|
|
|
|
/* Topic's Len */
|
|
lws_ser_wu16be(p, (uint16_t)strlen(unsub->topic[n].name));
|
|
if (lws_mqtt_str_advance(&mqtt_vh_payload, 2))
|
|
return 1;
|
|
p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
|
|
|
|
/* Topic Name */
|
|
lws_strncpy((char *)p, unsub->topic[n].name,
|
|
strlen(unsub->topic[n].name) + 1);
|
|
if (lws_mqtt_str_advance(&mqtt_vh_payload,
|
|
(int)strlen(unsub->topic[n].name)))
|
|
return 1;
|
|
p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
|
|
}
|
|
break;
|
|
|
|
default:
|
|
return 1;
|
|
}
|
|
|
|
if (lws_write(nwsi, start, lws_ptr_diff(p, start), LWS_WRITE_BINARY) !=
|
|
lws_ptr_diff(p, start))
|
|
return 1;
|
|
|
|
wsi->mqtt->inside_unsubscribe = 1;
|
|
|
|
return 0;
|
|
}
|
|
|
|
/*
|
|
* This is called when child streams bind to an already-existing and compatible
|
|
* MQTT stream
|
|
*/
|
|
|
|
struct lws *
|
|
lws_wsi_mqtt_adopt(struct lws *parent_wsi, struct lws *wsi)
|
|
{
|
|
/* no more children allowed by parent? */
|
|
|
|
if (parent_wsi->mux.child_count + 1 > LWS_MQTT_MAX_CHILDREN) {
|
|
lwsl_err("%s: reached concurrent stream limit\n", __func__);
|
|
return NULL;
|
|
}
|
|
|
|
#if defined(LWS_WITH_CLIENT)
|
|
wsi->client_mux_substream = 1;
|
|
#endif
|
|
|
|
lws_wsi_mux_insert(wsi, parent_wsi, wsi->mux.my_sid);
|
|
|
|
if (lws_ensure_user_space(wsi))
|
|
goto bail1;
|
|
|
|
lws_mqtt_set_client_established(wsi);
|
|
lws_callback_on_writable(wsi);
|
|
|
|
#if defined(LWS_WITH_SERVER_STATUS)
|
|
wsi->vhost->conn_stats.mqtt_subs++;
|
|
#endif
|
|
|
|
return wsi;
|
|
|
|
bail1:
|
|
/* undo the insert */
|
|
parent_wsi->mux.child_list = wsi->mux.sibling_list;
|
|
parent_wsi->mux.child_count--;
|
|
|
|
if (wsi->user_space)
|
|
lws_free_set_NULL(wsi->user_space);
|
|
|
|
wsi->protocol->callback(wsi, LWS_CALLBACK_WSI_DESTROY, NULL, NULL, 0);
|
|
lws_free(wsi);
|
|
|
|
return NULL;
|
|
}
|
|
|