@@ -910,21 +910,6 @@ void Context::onUpstreamConnectionClose(CloseType close_type) {
910
910
}
911
911
}
912
912
913
- uint32_t Context::nextHttpCallToken () {
914
- uint32_t token = next_http_call_token_++;
915
- // Handle rollover.
916
- for (;;) {
917
- if (token == 0 ) {
918
- token = next_http_call_token_++;
919
- }
920
- if (!http_request_.count (token)) {
921
- break ;
922
- }
923
- token = next_http_call_token_++;
924
- }
925
- return token;
926
- }
927
-
928
913
// Async call via HTTP
929
914
WasmResult Context::httpCall (std::string_view cluster, const Pairs& request_headers,
930
915
std::string_view request_body, const Pairs& request_trailers,
@@ -961,7 +946,7 @@ WasmResult Context::httpCall(std::string_view cluster, const Pairs& request_head
961
946
timeout = std::chrono::milliseconds (timeout_milliseconds);
962
947
}
963
948
964
- uint32_t token = nextHttpCallToken ();
949
+ uint32_t token = wasm ()-> nextHttpCallId ();
965
950
auto & handler = http_request_[token];
966
951
handler.context_ = this ;
967
952
handler.token_ = token;
@@ -983,22 +968,6 @@ WasmResult Context::httpCall(std::string_view cluster, const Pairs& request_head
983
968
return WasmResult::Ok;
984
969
}
985
970
986
- uint32_t Context::nextGrpcCallToken () {
987
- uint32_t token = next_grpc_token_++;
988
- if (isGrpcStreamToken (token)) {
989
- token = next_grpc_token_++;
990
- }
991
- // Handle rollover. Note: token is always odd.
992
- for (;;) {
993
- if (!grpc_call_request_.count (token)) {
994
- break ;
995
- }
996
- next_grpc_token_++; // Skip stream token.
997
- token = next_grpc_token_++;
998
- }
999
- return token;
1000
- }
1001
-
1002
971
WasmResult Context::grpcCall (std::string_view grpc_service, std::string_view service_name,
1003
972
std::string_view method_name, const Pairs& initial_metadata,
1004
973
std::string_view request, std::chrono::milliseconds timeout,
@@ -1019,7 +988,7 @@ WasmResult Context::grpcCall(std::string_view grpc_service, std::string_view ser
1019
988
return WasmResult::ParseFailure;
1020
989
}
1021
990
}
1022
- uint32_t token = nextGrpcCallToken ();
991
+ uint32_t token = wasm ()-> nextGrpcCallId ();
1023
992
auto & handler = grpc_call_request_[token];
1024
993
handler.context_ = this ;
1025
994
handler.token_ = token;
@@ -1049,26 +1018,6 @@ WasmResult Context::grpcCall(std::string_view grpc_service, std::string_view ser
1049
1018
return WasmResult::Ok;
1050
1019
}
1051
1020
1052
- uint32_t Context::nextGrpcStreamToken () {
1053
- uint32_t token = next_grpc_token_++;
1054
- if (isGrpcCallToken (token)) {
1055
- token = next_grpc_token_++;
1056
- }
1057
- // Handle rollover. Note: token is always even.
1058
- for (;;) {
1059
- if (token == 0 ) {
1060
- next_grpc_token_++; // Skip call token.
1061
- token = next_grpc_token_++;
1062
- }
1063
- if (!grpc_stream_.count (token)) {
1064
- break ;
1065
- }
1066
- next_grpc_token_++; // Skip call token.
1067
- token = next_grpc_token_++;
1068
- }
1069
- return token;
1070
- }
1071
-
1072
1021
WasmResult Context::grpcStream (std::string_view grpc_service, std::string_view service_name,
1073
1022
std::string_view method_name, const Pairs& initial_metadata,
1074
1023
uint32_t * token_ptr) {
@@ -1088,7 +1037,7 @@ WasmResult Context::grpcStream(std::string_view grpc_service, std::string_view s
1088
1037
return WasmResult::ParseFailure;
1089
1038
}
1090
1039
}
1091
- uint32_t token = nextGrpcStreamToken ();
1040
+ uint32_t token = wasm ()-> nextGrpcStreamId ();
1092
1041
auto & handler = grpc_stream_[token];
1093
1042
handler.context_ = this ;
1094
1043
handler.token_ = token;
@@ -1879,7 +1828,7 @@ void Context::onGrpcReceiveWrapper(uint32_t token, ::Envoy::Buffer::InstancePtr
1879
1828
ContextBase::onGrpcReceive (token, response_size);
1880
1829
grpc_receive_buffer_.reset ();
1881
1830
}
1882
- if (isGrpcCallToken (token)) {
1831
+ if (wasm ()-> isGrpcCallId (token)) {
1883
1832
grpc_call_request_.erase (token);
1884
1833
}
1885
1834
}
@@ -1899,9 +1848,9 @@ void Context::onGrpcCloseWrapper(uint32_t token, const Grpc::Status::GrpcStatus&
1899
1848
onGrpcClose (token, status_code_);
1900
1849
status_message_ = " " ;
1901
1850
}
1902
- if (isGrpcCallToken (token)) {
1851
+ if (wasm ()-> isGrpcCallId (token)) {
1903
1852
grpc_call_request_.erase (token);
1904
- } else {
1853
+ } else if ( wasm ()-> isGrpcStreamId (token)) {
1905
1854
auto it = grpc_stream_.find (token);
1906
1855
if (it != grpc_stream_.end ()) {
1907
1856
if (it->second .local_closed_ ) {
@@ -1912,7 +1861,7 @@ void Context::onGrpcCloseWrapper(uint32_t token, const Grpc::Status::GrpcStatus&
1912
1861
}
1913
1862
1914
1863
WasmResult Context::grpcSend (uint32_t token, std::string_view message, bool end_stream) {
1915
- if (isGrpcCallToken (token)) {
1864
+ if (! wasm ()-> isGrpcStreamId (token)) {
1916
1865
return WasmResult::BadArgument;
1917
1866
}
1918
1867
auto it = grpc_stream_.find (token);
@@ -1928,7 +1877,7 @@ WasmResult Context::grpcSend(uint32_t token, std::string_view message, bool end_
1928
1877
}
1929
1878
1930
1879
WasmResult Context::grpcClose (uint32_t token) {
1931
- if (isGrpcCallToken (token)) {
1880
+ if (wasm ()-> isGrpcCallId (token)) {
1932
1881
auto it = grpc_call_request_.find (token);
1933
1882
if (it == grpc_call_request_.end ()) {
1934
1883
return WasmResult::NotFound;
@@ -1937,7 +1886,8 @@ WasmResult Context::grpcClose(uint32_t token) {
1937
1886
it->second .request_ ->cancel ();
1938
1887
}
1939
1888
grpc_call_request_.erase (token);
1940
- } else {
1889
+ return WasmResult::Ok;
1890
+ } else if (wasm ()->isGrpcStreamId (token)) {
1941
1891
auto it = grpc_stream_.find (token);
1942
1892
if (it == grpc_stream_.end ()) {
1943
1893
return WasmResult::NotFound;
@@ -1950,12 +1900,13 @@ WasmResult Context::grpcClose(uint32_t token) {
1950
1900
} else {
1951
1901
it->second .local_closed_ = true ;
1952
1902
}
1903
+ return WasmResult::Ok;
1953
1904
}
1954
- return WasmResult::Ok ;
1905
+ return WasmResult::BadArgument ;
1955
1906
}
1956
1907
1957
1908
WasmResult Context::grpcCancel (uint32_t token) {
1958
- if (isGrpcCallToken (token)) {
1909
+ if (wasm ()-> isGrpcCallId (token)) {
1959
1910
auto it = grpc_call_request_.find (token);
1960
1911
if (it == grpc_call_request_.end ()) {
1961
1912
return WasmResult::NotFound;
@@ -1964,7 +1915,8 @@ WasmResult Context::grpcCancel(uint32_t token) {
1964
1915
it->second .request_ ->cancel ();
1965
1916
}
1966
1917
grpc_call_request_.erase (token);
1967
- } else {
1918
+ return WasmResult::Ok;
1919
+ } else if (wasm ()->isGrpcStreamId (token)) {
1968
1920
auto it = grpc_stream_.find (token);
1969
1921
if (it == grpc_stream_.end ()) {
1970
1922
return WasmResult::NotFound;
@@ -1973,8 +1925,9 @@ WasmResult Context::grpcCancel(uint32_t token) {
1973
1925
it->second .stream_ ->resetStream ();
1974
1926
}
1975
1927
grpc_stream_.erase (token);
1928
+ return WasmResult::Ok;
1976
1929
}
1977
- return WasmResult::Ok ;
1930
+ return WasmResult::BadArgument ;
1978
1931
}
1979
1932
1980
1933
} // namespace Wasm
0 commit comments