Skip to content

Commit a211674

Browse files
authored
make passive heartbeat case respond async (#299)
## Why - we've traded all of our `received out-of-order msg, closing connection` for `invariant violation: would have sent out of order msg` - why are we sending things out of order? lets get more info ## What changed - we should never hit the construct on consumed session case but lets triple quadruple check - also track last 10 constructed messages <!-- Describe the changes you made in this pull request or pointers for the reviewer --> ## Versioning - [ ] Breaking protocol change - [ ] Breaking ts/js API change <!-- Kind reminder to add tests and updated documentation if needed -->
1 parent 161d5f7 commit a211674

File tree

5 files changed

+17
-5
lines changed

5 files changed

+17
-5
lines changed

package-lock.json

+2-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "@replit/river",
33
"description": "It's like tRPC but... with JSON Schema Support, duplex streaming and support for service multiplexing. Transport agnostic!",
4-
"version": "0.207.0",
4+
"version": "0.207.1",
55
"type": "module",
66
"exports": {
77
".": {

transport/sessionStateMachine/SessionConnected.ts

+7-1
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,13 @@ export class SessionConnected<
226226
// if we are not actively heartbeating, we are in passive
227227
// heartbeat mode and should send a response to the ack
228228
if (!this.isActivelyHeartbeating) {
229-
this.sendHeartbeat();
229+
// purposefully make this async to avoid weird browser behavior
230+
// where _some_ browsers will decide that it is ok to interrupt fully
231+
// synchronous code execution (e.g. an existing .send) to receive a
232+
// websocket message and hit this codepath
233+
void Promise.resolve().then(() => {
234+
this.sendHeartbeat();
235+
});
230236
}
231237
};
232238

transport/sessionStateMachine/common.ts

+4
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,10 @@ export abstract class IdentifiedSession extends CommonSession {
281281
constructMsg<Payload>(
282282
partialMsg: PartialTransportMessage<Payload>,
283283
): TransportMessage<Payload> {
284+
if (this._isConsumed) {
285+
throw new Error(ERR_CONSUMED);
286+
}
287+
284288
const msg = {
285289
...partialMsg,
286290
id: generateId(),

transport/sessionStateMachine/stateMachine.test.ts

+3-1
Original file line numberDiff line numberDiff line change
@@ -1958,7 +1958,9 @@ describe('session state machine', () => {
19581958
);
19591959

19601960
// make sure the session acks the heartbeat
1961-
expect(conn.send).toHaveBeenCalledTimes(1);
1961+
await waitFor(() => {
1962+
expect(conn.send).toHaveBeenCalledTimes(1);
1963+
});
19621964
});
19631965

19641966
test('does not dispatch acks', async () => {

0 commit comments

Comments
 (0)