From 6ac4c8d1c9f9651545ff95886f757b6e6b9b8591 Mon Sep 17 00:00:00 2001 From: Rob Jansen Date: Wed, 18 Jan 2023 17:43:08 -0500 Subject: [PATCH 1/2] Add graceful shutdowns for fixed-size streams --- src/tgen-stream.c | 121 +++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 115 insertions(+), 6 deletions(-) diff --git a/src/tgen-stream.c b/src/tgen-stream.c index 9315402..cf2d4e3 100644 --- a/src/tgen-stream.c +++ b/src/tgen-stream.c @@ -20,7 +20,7 @@ /* an auth password so we know both sides understand tgen */ #define TGEN_AUTH_PW "T8nNx9L95LATtckJkR5n" -#define TGEN_PROTO_VERS_MAJ 1 +#define TGEN_PROTO_VERS_MAJ 2 #define TGEN_PROTO_VERS_MIN 0 /* the various states the read side of the connection can take */ @@ -31,6 +31,7 @@ typedef enum _TGenStreamRecvState { TGEN_STREAM_RECV_MODEL, TGEN_STREAM_RECV_PAYLOAD, TGEN_STREAM_RECV_CHECKSUM, + TGEN_STREAM_RECV_FOOTER, TGEN_STREAM_RECV_SUCCESS, TGEN_STREAM_RECV_ERROR, } TGenStreamRecvState; @@ -42,6 +43,7 @@ typedef enum _TGenStreamSendState { TGEN_STREAM_SEND_RESPONSE, TGEN_STREAM_SEND_PAYLOAD, TGEN_STREAM_SEND_CHECKSUM, + TGEN_STREAM_SEND_FOOTER, TGEN_STREAM_SEND_FLUSH, TGEN_STREAM_SEND_SUCCESS, TGEN_STREAM_SEND_ERROR, @@ -59,6 +61,7 @@ typedef enum _TGenStreamErrorType { TGEN_STREAM_ERR_HEADER_MODELSIZE, TGEN_STREAM_ERR_MODEL, TGEN_STREAM_ERR_CHECKSUM, + TGEN_STREAM_ERR_FOOTER, TGEN_STREAM_ERR_READ, TGEN_STREAM_ERR_WRITE, TGEN_STREAM_ERR_READEOF, @@ -184,9 +187,11 @@ struct _TGenStream { gint64 firstPayloadByteRecv; gint64 lastPayloadByteRecv; gint64 checksumRecv; + gint64 footerRecv; gint64 firstPayloadByteSend; gint64 lastPayloadByteSend; gint64 checksumSend; + gint64 footerSend; gint64 lastBytesStatusReport; gint64 lastTimeStatusReport; gint64 lastTimeErrorReport; @@ -224,6 +229,9 @@ static const gchar* _tgenstream_recvStateToString(TGenStreamRecvState state) { case TGEN_STREAM_RECV_CHECKSUM: { return "RECV_CHECKSUM"; } + case TGEN_STREAM_RECV_FOOTER: { + return "RECV_FOOTER"; + } /* success and error are terminal states */ case TGEN_STREAM_RECV_SUCCESS: { return "RECV_SUCCESS"; @@ -253,6 +261,9 @@ static const gchar* _tgenstream_sendStateToString(TGenStreamSendState state) { case TGEN_STREAM_SEND_CHECKSUM: { return "SEND_CHECKSUM"; } + case TGEN_STREAM_SEND_FOOTER: { + return "SEND_FOOTER"; + } case TGEN_STREAM_SEND_FLUSH: { return "SEND_FLUSH"; } @@ -299,6 +310,9 @@ static const gchar* _tgenstream_errorToString(TGenStreamErrorType error) { case TGEN_STREAM_ERR_CHECKSUM: { return "CHECKSUM"; } + case TGEN_STREAM_ERR_FOOTER: { + return "FOOTER"; + } case TGEN_STREAM_ERR_READ: { return "READ"; } @@ -1037,6 +1051,42 @@ static gboolean _tgenstream_readChecksum(TGenStream* stream) { return isSuccess; } +static gboolean _tgenstream_readFooter(TGenStream *stream) { + TGEN_ASSERT(stream); + + if (stream->recv.requestedBytes == 0 && !stream->recv.requestedZero) { + /* we don't handle footers if we are using Markov models and + * don't know the total size, so just move on. */ + tgen_debug("Ignoring footer on stream with no requested bytes"); + return TRUE; + } + + GString *line = _tgenstream_getLine(stream); + if (!line) { + return FALSE; + } + + /* we have read the entire footer from the other end */ + stream->time.footerRecv = _tgenstream_getTime(stream); + + gboolean isSuccess = FALSE; + if (g_ascii_strncasecmp(line->str, "FIN", 3) == 0) { + tgen_info("transport %s stream %s received proper FIN, graceful shutdown succeeded.", + tgentransport_toString(stream->transport), _tgenstream_toString(stream)); + isSuccess = TRUE; + } else { + tgen_message("transport %s stream %s received malformed FIN, graceful shutdown failed.", + tgentransport_toString(stream->transport), _tgenstream_toString(stream)); + _tgenstream_changeRecvState(stream, TGEN_STREAM_RECV_ERROR); + _tgenstream_changeError(stream, TGEN_STREAM_ERR_FOOTER); + isSuccess = FALSE; + } + + g_string_free(line, TRUE); + + return isSuccess; +} + static void _tgenstream_onReadable(TGenStream* stream) { TGEN_ASSERT(stream); @@ -1083,6 +1133,12 @@ static void _tgenstream_onReadable(TGenStream* stream) { if(stream->recv.state == TGEN_STREAM_RECV_CHECKSUM) { if(_tgenstream_readChecksum(stream)) { + _tgenstream_changeRecvState(stream, TGEN_STREAM_RECV_FOOTER); + } + } + + if (stream->recv.state == TGEN_STREAM_RECV_FOOTER) { + if (_tgenstream_readFooter(stream)) { /* yay, now we are done! */ _tgenstream_changeRecvState(stream, TGEN_STREAM_RECV_SUCCESS); } @@ -1438,6 +1494,48 @@ static gboolean _tgenstream_writeChecksum(TGenStream* stream) { } } +static gboolean _tgenstream_writeFooter(TGenStream *stream) { + TGEN_ASSERT(stream); + + if (stream->send.requestedBytes == 0 && !stream->send.requestedZero) { + /* we don't handle footers if we are using Markov models and + * don't know the total size, so just move on. */ + tgen_debug("Ignoring footer on stream with no requested bytes"); + return TRUE; + } + + /* The goal is to send the footer after we have sent everything we want to send + * AND we have received everything that we expect the other side to have sent us + * (including their checksum but not including their footer). Our footer is + * informing the other side that we have received all of their pre-footer bytes. + * Thus, we only proceed to write our footer if we received everything we expect + * from the other side and we are waiting for their footer, or we also got their + * footer and our receive side is successful. */ + if (stream->recv.state != TGEN_STREAM_RECV_FOOTER && + stream->recv.state != TGEN_STREAM_RECV_SUCCESS) { + return FALSE; + } + + /* buffer the footer if we have not done that yet */ + if (!stream->send.buffer) { + stream->send.buffer = g_string_new(NULL); + g_string_printf(stream->send.buffer, "FIN"); + g_string_append_c(stream->send.buffer, '\n'); + tgen_debug("Sending footer '%s'", stream->send.buffer->str); + } + + _tgenstream_flushOut(stream); + + if (!stream->send.buffer) { + /* we were able to send all of the footer */ + stream->time.footerSend = _tgenstream_getTime(stream); + return TRUE; + } else { + /* unable to send entire footer, wait for next chance to write */ + return FALSE; + } +} + static void _tgenstream_onWritable(TGenStream* stream) { TGEN_ASSERT(stream); @@ -1486,6 +1584,13 @@ static void _tgenstream_onWritable(TGenStream* stream) { if(stream->send.state == TGEN_STREAM_SEND_CHECKSUM) { if(_tgenstream_writeChecksum(stream)) { + /* now we just need to make sure we finished flushing */ + _tgenstream_changeSendState(stream, TGEN_STREAM_SEND_FOOTER); + } + } + + if (stream->send.state == TGEN_STREAM_SEND_FOOTER) { + if (_tgenstream_writeFooter(stream)) { /* now we just need to make sure we finished flushing */ _tgenstream_changeSendState(stream, TGEN_STREAM_SEND_FLUSH); } @@ -1753,11 +1858,15 @@ static TGenEvent _tgenstream_computeWantedEvents(TGenStream* stream) { wantedEvents |= TGEN_EVENT_READ; } if(!sendDone && stream->send.state != TGEN_STREAM_SEND_NONE) { - /* check if we should defer writes */ - if(stream->send.deferBarrierMicros > 0) { - wantedEvents |= TGEN_EVENT_WRITE_DEFERRED; - } else { - wantedEvents |= TGEN_EVENT_WRITE; + /* we don't want to write if we need to send the footer but have not + * yet got everything we expected to receive yet. */ + if (stream->send.state != TGEN_STREAM_SEND_FOOTER || stream->time.checksumRecv > 0) { + /* check if we should defer writes */ + if (stream->send.deferBarrierMicros > 0) { + wantedEvents |= TGEN_EVENT_WRITE_DEFERRED; + } else { + wantedEvents |= TGEN_EVENT_WRITE; + } } } } From d9242ad974b52b1ef1d549b98fba421244adfcd2 Mon Sep 17 00:00:00 2001 From: Rob Jansen Date: Fri, 20 Jan 2023 17:22:29 -0500 Subject: [PATCH 2/2] Fix compile errors for set-but-not-used variables --- src/tgen-server.c | 5 ++++- test/test-markovmodel.c | 2 ++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/tgen-server.c b/src/tgen-server.c index 31bd0d3..52842d7 100644 --- a/src/tgen-server.c +++ b/src/tgen-server.c @@ -73,8 +73,9 @@ TGenIOResponse tgenserver_onEvent(TGenServer* server, gint descriptor, TGenEvent g_assert((events & TGEN_EVENT_READ) && descriptor == server->socketD); gboolean blocked = FALSE; +#ifdef DEBUG gint acceptedCount = 0; - +#endif /* accept as many connections as we have available, until we get EWOULDBLOCK error */ while(!blocked) { gint result = _tgenserver_acceptPeer(server); @@ -86,7 +87,9 @@ TGenIOResponse tgenserver_onEvent(TGenServer* server, gint descriptor, TGenEvent server->socketD, result, errno, g_strerror(errno)); } } else { +#ifdef DEBUG acceptedCount++; +#endif } } diff --git a/test/test-markovmodel.c b/test/test-markovmodel.c index 141a4d3..907d4b8 100644 --- a/test/test-markovmodel.c +++ b/test/test-markovmodel.c @@ -74,6 +74,8 @@ static void generate(TGenMarkovModel* mmodel) { } } } + + tgen_info("%d server packets and %d origin packets", numServerPackets, numOriginPackets); } gint main(gint argc, gchar *argv[]) {