Skip to content

Commit 070db5c

Browse files
committed
The following changes (as well as changes in shared-util) enable uAMQP server mode when the client is AMQPlite.net.
1. When detecting an invalid header (e.g. due to protocol id 3 being sent as in the case of SASL, per spec the server must reply with its desired header then close the connection. Registering a sent complete callback and then invoke the error handler to clean up. 2. connection_listen opens the underlying IO. Since header detect io is already opened by the endpoint callback, the callbacks are not properly registered causing a prefetch on null later. 3. A link attach request needs to have a link attach response. This was not sent since the link was initialized already to be in HALF_ATTACH state. Remove initial Set and have the session state change callback take care of the attach. 4. Attach properties was not initialized, causing a crash on send attach later. 5. Flow sends incoming id, even though incoming id was not yet initialized in a flow message from client. This is not per spec, as the id is not initialized and thus random, thus client closes the connection. Ensure that incoming id is not added to the flow message on first send. 6. Likewise, on first received flow frame, per spec the window is calculated from the initial outgoing-id of the endpoint (in our case 0). 7. Adding the wsio_open changes required due to xio_open signature change. Fixing sasl_io to honor the close callback. All changes were tested using the local server and client samples as well as the amqpnodeserver for fgw. These changes depend on latest develop branch state in amqp-shared-c, so this change also forwards the commit id of the shared submodule.
1 parent 1b13a97 commit 070db5c

File tree

8 files changed

+159
-124
lines changed

8 files changed

+159
-124
lines changed

inc/wsio.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ typedef struct WSIO_CONFIG_TAG
2929

3030
extern CONCRETE_IO_HANDLE wsio_create(void* io_create_parameters, LOGGER_LOG logger_log);
3131
extern void wsio_destroy(CONCRETE_IO_HANDLE ws_io);
32-
extern int wsio_open(CONCRETE_IO_HANDLE ws_io, ON_IO_OPEN_COMPLETE on_io_open_complete, ON_BYTES_RECEIVED on_bytes_received, ON_IO_ERROR on_io_error, void* callback_context);
32+
extern int wsio_open(CONCRETE_IO_HANDLE ws_io, ON_IO_OPEN_COMPLETE on_io_open_complete, void* on_io_open_complete_context, ON_BYTES_RECEIVED on_bytes_received, void* on_bytes_received_context, ON_IO_ERROR on_io_error, void* on_io_error_context);
3333
extern int wsio_close(CONCRETE_IO_HANDLE ws_io, ON_IO_CLOSE_COMPLETE on_io_close_complete, void* callback_context);
3434
extern int wsio_send(CONCRETE_IO_HANDLE ws_io, const void* buffer, size_t size, ON_SEND_COMPLETE on_send_complete, void* callback_context);
3535
extern void wsio_dowork(CONCRETE_IO_HANDLE ws_io);

src/header_detect_io.c

+34-21
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,12 @@ static void indicate_close_complete(HEADER_DETECT_IO_INSTANCE* header_detect_io_
5555
}
5656
}
5757

58+
static void on_underlying_io_error(void* context);
59+
static void on_send_complete_close(void* context, IO_SEND_RESULT send_result)
60+
{
61+
on_underlying_io_error(context);
62+
}
63+
5864
static void on_underlying_io_bytes_received(void* context, const unsigned char* buffer, size_t size)
5965
{
6066
HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)context;
@@ -69,7 +75,10 @@ static void on_underlying_io_bytes_received(void* context, const unsigned char*
6975
case IO_STATE_WAIT_FOR_HEADER:
7076
if (amqp_header[header_detect_io_instance->header_pos] != buffer[0])
7177
{
72-
header_detect_io_instance->io_state = IO_STATE_NOT_OPEN;
78+
/* Send expected header, then close as per spec. We do not care if we fail */
79+
(void)xio_send(header_detect_io_instance->underlying_io, amqp_header, sizeof(amqp_header), on_send_complete_close, context);
80+
81+
header_detect_io_instance->io_state = IO_STATE_NOT_OPEN;
7382
indicate_open_complete(header_detect_io_instance, IO_OPEN_ERROR);
7483
size = 0;
7584
}
@@ -227,7 +236,12 @@ int headerdetectio_open(CONCRETE_IO_HANDLE header_detect_io, ON_IO_OPEN_COMPLETE
227236
{
228237
HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)header_detect_io;
229238

230-
if (header_detect_io_instance->io_state == IO_STATE_OPEN)
239+
if (header_detect_io_instance->io_state != IO_STATE_NOT_OPEN &&
240+
header_detect_io_instance->io_state != IO_STATE_OPEN)
241+
{
242+
result = __LINE__;
243+
}
244+
else
231245
{
232246
header_detect_io_instance->on_bytes_received = on_bytes_received;
233247
header_detect_io_instance->on_io_open_complete = on_io_open_complete;
@@ -236,25 +250,24 @@ int headerdetectio_open(CONCRETE_IO_HANDLE header_detect_io, ON_IO_OPEN_COMPLETE
236250
header_detect_io_instance->on_io_open_complete_context = on_io_open_complete_context;
237251
header_detect_io_instance->on_io_error_context = on_io_error_context;
238252

239-
result = 0;
240-
}
241-
else if (header_detect_io_instance->io_state != IO_STATE_NOT_OPEN)
242-
{
243-
result = __LINE__;
244-
}
245-
else
246-
{
247-
header_detect_io_instance->header_pos = 0;
248-
header_detect_io_instance->io_state = IO_STATE_OPENING_UNDERLYING_IO;
249-
250-
if (xio_open(header_detect_io_instance->underlying_io, on_underlying_io_open_complete, header_detect_io_instance, on_underlying_io_bytes_received, header_detect_io_instance, on_underlying_io_error, header_detect_io_instance) != 0)
251-
{
252-
result = __LINE__;
253-
}
254-
else
255-
{
256-
result = 0;
257-
}
253+
if (header_detect_io_instance->io_state == IO_STATE_OPEN)
254+
{
255+
indicate_open_complete(header_detect_io_instance, IO_OPEN_OK);
256+
}
257+
else
258+
{
259+
header_detect_io_instance->header_pos = 0;
260+
header_detect_io_instance->io_state = IO_STATE_OPENING_UNDERLYING_IO;
261+
262+
if (xio_open(header_detect_io_instance->underlying_io, on_underlying_io_open_complete, header_detect_io_instance, on_underlying_io_bytes_received, header_detect_io_instance, on_underlying_io_error, header_detect_io_instance) != 0)
263+
{
264+
result = __LINE__;
265+
}
266+
else
267+
{
268+
result = 0;
269+
}
270+
}
258271
}
259272
}
260273

src/link.c

+2-2
Original file line numberDiff line numberDiff line change
@@ -548,7 +548,8 @@ LINK_HANDLE link_create_from_endpoint(SESSION_HANDLE session, LINK_ENDPOINT_HAND
548548
result->initial_delivery_count = 0;
549549
result->max_message_size = 0;
550550
result->is_underlying_session_begun = 0;
551-
result->source = amqpvalue_clone(target);
551+
result->attach_properties = NULL;
552+
result->source = amqpvalue_clone(target);
552553
result->target = amqpvalue_clone(source);
553554
if (role == role_sender)
554555
{
@@ -579,7 +580,6 @@ LINK_HANDLE link_create_from_endpoint(SESSION_HANDLE session, LINK_ENDPOINT_HAND
579580
(void)strcpy(result->name, name);
580581
result->on_link_state_changed = NULL;
581582
result->callback_context = NULL;
582-
set_link_state(result, LINK_STATE_HALF_ATTACHED);
583583
result->link_endpoint = link_endpoint;
584584
}
585585
}

src/saslclientio.c

+4-1
Original file line numberDiff line numberDiff line change
@@ -968,7 +968,7 @@ int saslclientio_open(CONCRETE_IO_HANDLE sasl_client_io, ON_IO_OPEN_COMPLETE on_
968968
return result;
969969
}
970970

971-
int saslclientio_close(CONCRETE_IO_HANDLE sasl_client_io, ON_IO_CLOSE_COMPLETE on_io_close_complete, void* callback_context)
971+
int saslclientio_close(CONCRETE_IO_HANDLE sasl_client_io, ON_IO_CLOSE_COMPLETE on_io_close_complete, void* on_io_close_complete_context)
972972
{
973973
int result = 0;
974974

@@ -991,6 +991,9 @@ int saslclientio_close(CONCRETE_IO_HANDLE sasl_client_io, ON_IO_CLOSE_COMPLETE o
991991
{
992992
sasl_client_io_instance->io_state = IO_STATE_CLOSING;
993993

994+
sasl_client_io_instance->on_io_close_complete = on_io_close_complete;
995+
sasl_client_io_instance->on_io_close_complete_context = on_io_close_complete_context;
996+
994997
/* Codes_SRS_SASLCLIENTIO_01_015: [saslclientio_close shall close the underlying io handle passed in saslclientio_create by calling xio_close.] */
995998
if (xio_close(sasl_client_io_instance->underlying_io, on_underlying_io_close_complete, sasl_client_io_instance) != 0)
996999
{

src/session.c

+17-4
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,11 @@ static void on_frame_received(void* context, AMQP_VALUE performative, uint32_t p
414414
{
415415
end_session_with_error(session_instance, "amqp:internal-error", "Cannot create link endpoint");
416416
}
417-
else
417+
else if (attach_get_handle(attach_handle, &new_link_endpoint->input_handle) != 0)
418+
{
419+
end_session_with_error(session_instance, "amqp:decode-error", "Cannot get input handle from ATTACH frame");
420+
}
421+
else
418422
{
419423
if (!session_instance->on_link_attached(session_instance->on_link_attached_callback_context, new_link_endpoint, name, role, source, target))
420424
{
@@ -493,8 +497,18 @@ static void on_frame_received(void* context, AMQP_VALUE performative, uint32_t p
493497
uint32_t remote_handle;
494498
transfer_number flow_next_incoming_id;
495499
uint32_t flow_incoming_window;
500+
501+
if (flow_get_next_incoming_id(flow_handle, &flow_next_incoming_id) != 0)
502+
{
503+
/*
504+
If the next-incoming-id field of the flow frame is not set,
505+
then remote-incomingwindow is computed as follows:
506+
initial-outgoing-id(endpoint) + incoming-window(flow) - next-outgoing-id(endpoint)
507+
*/
508+
flow_next_incoming_id = session_instance->next_outgoing_id;
509+
}
510+
496511
if ((flow_get_next_outgoing_id(flow_handle, &session_instance->next_incoming_id) != 0) ||
497-
(flow_get_next_incoming_id(flow_handle, &flow_next_incoming_id) != 0) ||
498512
(flow_get_incoming_window(flow_handle, &flow_incoming_window) != 0))
499513
{
500514
flow_destroy(flow_handle);
@@ -1152,8 +1166,7 @@ int session_send_flow(LINK_ENDPOINT_HANDLE link_endpoint, FLOW_HANDLE flow)
11521166

11531167
result = 0;
11541168

1155-
if ((session_instance->session_state == SESSION_STATE_BEGIN_RCVD) ||
1156-
((session_instance->session_state == SESSION_STATE_MAPPED)))
1169+
if (session_instance->session_state == SESSION_STATE_BEGIN_RCVD)
11571170
{
11581171
if (flow_set_next_incoming_id(flow, session_instance->next_incoming_id) != 0)
11591172
{

src/wsio.c

+22-16
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,12 @@ typedef struct PENDING_SOCKET_IO_TAG
3838
typedef struct WSIO_INSTANCE_TAG
3939
{
4040
ON_BYTES_RECEIVED on_bytes_received;
41-
ON_IO_OPEN_COMPLETE on_io_open_complete;
42-
ON_IO_ERROR on_io_error;
43-
LOGGER_LOG logger_log;
44-
void* open_callback_context;
41+
void* on_bytes_received_context;
42+
ON_IO_OPEN_COMPLETE on_io_open_complete;
43+
void* on_io_open_complete_context;
44+
ON_IO_ERROR on_io_error;
45+
void* on_io_error_context;
46+
LOGGER_LOG logger_log;
4547
IO_STATE io_state;
4648
LIST_HANDLE pending_io_list;
4749
struct lws_context* ws_context;
@@ -60,7 +62,7 @@ static void indicate_error(WSIO_INSTANCE* wsio_instance)
6062
wsio_instance->io_state = IO_STATE_ERROR;
6163
if (wsio_instance->on_io_error != NULL)
6264
{
63-
wsio_instance->on_io_error(wsio_instance->open_callback_context);
65+
wsio_instance->on_io_error(wsio_instance->on_io_error_context);
6466
}
6567
}
6668

@@ -70,7 +72,7 @@ static void indicate_open_complete(WSIO_INSTANCE* ws_io_instance, IO_OPEN_RESULT
7072
if (ws_io_instance->on_io_open_complete != NULL)
7173
{
7274
/* Codes_SRS_WSIO_01_039: [The callback_context argument shall be passed to on_io_open_complete as is.] */
73-
ws_io_instance->on_io_open_complete(ws_io_instance->open_callback_context, open_result);
75+
ws_io_instance->on_io_open_complete(ws_io_instance->on_io_open_complete_context, open_result);
7476
}
7577
}
7678

@@ -390,7 +392,7 @@ static int on_ws_callback(struct lws *wsi, enum lws_callback_reasons reason, voi
390392
/* Codes_SRS_WSIO_01_084: [The bytes argument shall point to the received bytes as indicated by the LWS_CALLBACK_CLIENT_RECEIVE in argument.] */
391393
/* Codes_SRS_WSIO_01_085: [The length argument shall be set to the number of received bytes as indicated by the LWS_CALLBACK_CLIENT_RECEIVE len argument.] */
392394
/* Codes_SRS_WSIO_01_086: [The callback_context shall be set to the callback_context that was passed in wsio_open.] */
393-
wsio_instance->on_bytes_received(wsio_instance->open_callback_context, in, len);
395+
wsio_instance->on_bytes_received(wsio_instance->on_bytes_received_context, in, len);
394396
}
395397
}
396398

@@ -528,11 +530,13 @@ CONCRETE_IO_HANDLE wsio_create(void* io_create_parameters, LOGGER_LOG logger_log
528530
result = amqpalloc_malloc(sizeof(WSIO_INSTANCE));
529531
if (result != NULL)
530532
{
531-
result->on_bytes_received = NULL;
532-
result->on_io_open_complete = NULL;
533-
result->on_io_error = NULL;
534-
result->logger_log = logger_log;
535-
result->open_callback_context = NULL;
533+
result->on_bytes_received = NULL;
534+
result->on_bytes_received_context = NULL;
535+
result->on_io_open_complete = NULL;
536+
result->on_io_open_complete_context = NULL;
537+
result->on_io_error = NULL;
538+
result->on_io_error_context = NULL;
539+
result->logger_log = logger_log;
536540
result->wsi = NULL;
537541
result->ws_context = NULL;
538542

@@ -678,7 +682,7 @@ void wsio_destroy(CONCRETE_IO_HANDLE ws_io)
678682
}
679683
}
680684

681-
int wsio_open(CONCRETE_IO_HANDLE ws_io, ON_IO_OPEN_COMPLETE on_io_open_complete, ON_BYTES_RECEIVED on_bytes_received, ON_IO_ERROR on_io_error, void* callback_context)
685+
int wsio_open(CONCRETE_IO_HANDLE ws_io, ON_IO_OPEN_COMPLETE on_io_open_complete, void* on_io_open_complete_context, ON_BYTES_RECEIVED on_bytes_received, void* on_bytes_received_context, ON_IO_ERROR on_io_error, void* on_io_error_context)
682686
{
683687
int result = 0;
684688

@@ -698,9 +702,11 @@ int wsio_open(CONCRETE_IO_HANDLE ws_io, ON_IO_OPEN_COMPLETE on_io_open_complete,
698702
else
699703
{
700704
wsio_instance->on_bytes_received = on_bytes_received;
705+
wsio_instance->on_bytes_received_context = on_bytes_received_context;
701706
wsio_instance->on_io_open_complete = on_io_open_complete;
707+
wsio_instance->on_io_open_complete_context = on_io_open_complete_context;
702708
wsio_instance->on_io_error = on_io_error;
703-
wsio_instance->open_callback_context = callback_context;
709+
wsio_instance->on_io_error_context = on_io_error_context;
704710

705711
int ietf_version = -1; /* latest */
706712
struct lws_context_creation_info info;
@@ -775,7 +781,7 @@ int wsio_open(CONCRETE_IO_HANDLE ws_io, ON_IO_OPEN_COMPLETE on_io_open_complete,
775781
return result;
776782
}
777783

778-
int wsio_close(CONCRETE_IO_HANDLE ws_io, ON_IO_CLOSE_COMPLETE on_io_close_complete, void* callback_context)
784+
int wsio_close(CONCRETE_IO_HANDLE ws_io, ON_IO_CLOSE_COMPLETE on_io_close_complete, void* on_io_close_complete_context)
779785
{
780786
int result = 0;
781787

@@ -844,7 +850,7 @@ int wsio_close(CONCRETE_IO_HANDLE ws_io, ON_IO_CLOSE_COMPLETE on_io_close_comple
844850
{
845851
/* Codes_SRS_WSIO_01_047: [The callback on_io_close_complete shall be called after the close action has been completed in the context of wsio_close (wsio_close is effectively blocking).] */
846852
/* Codes_SRS_WSIO_01_048: [The callback_context argument shall be passed to on_io_close_complete as is.] */
847-
on_io_close_complete(callback_context);
853+
on_io_close_complete(on_io_close_complete_context);
848854
}
849855

850856
/* Codes_SRS_WSIO_01_044: [On success wsio_close shall return 0.] */

0 commit comments

Comments
 (0)