-
-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathclient.rb
137 lines (103 loc) · 3.23 KB
/
client.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# frozen_string_literal: true
# Released under the MIT License.
# Copyright, 2015-2024, by Samuel Williams.
# Copyright, 2019, by Bryan Powell.
# Copyright, 2019, by Janko Marohnić.
# Copyright, 2023, by Thomas Morgan.
require_relative "request"
require_relative "connection"
require "protocol/websocket/headers"
require "protocol/websocket/extensions"
require "protocol/http/middleware"
require "async/http/client"
require "delegate"
module Async
module WebSocket
# This is a basic synchronous websocket client:
class Client < ::Protocol::HTTP::Middleware
include ::Protocol::WebSocket::Headers
# @return [Client] a client which can be used to establish websocket connections to the given endpoint.
def self.open(endpoint, **options, &block)
client = self.new(HTTP::Client.new(endpoint, **options), mask: true)
return client unless block_given?
begin
yield client
ensure
client.close
end
end
class ClientCloseDecorator < SimpleDelegator
def initialize(client, connection)
@client = client
super(connection)
end
def close(...)
super(...)
if @client
@client.close
@client = nil
end
end
end
# @return [Connection] an open websocket connection to the given endpoint.
def self.connect(endpoint, *arguments, **options, &block)
Sync do
client = self.open(endpoint, *arguments)
connection = client.connect(endpoint.authority, endpoint.path, **options)
return ClientCloseDecorator.new(client, connection) unless block_given?
begin
yield connection
ensure
connection&.close
client&.close
end
end
end
def initialize(client, **options)
super(client)
@options = options
end
class Framer < ::Protocol::WebSocket::Framer
def initialize(pool, connection, stream)
super(stream)
@pool = pool
@connection = connection
end
attr :connection
def close
super
if @pool
@pool.release(@connection)
@pool = nil
@connection = nil
end
end
end
def connect(authority, path, scheme: @delegate.scheme, headers: nil, handler: Connection, extensions: ::Protocol::WebSocket::Extensions::Client.default, **options, &block)
headers = ::Protocol::HTTP::Headers[headers]
extensions&.offer do |extension|
headers.add(SEC_WEBSOCKET_EXTENSIONS, extension.join("; "))
end
request = Request.new(scheme, authority, path, headers, **options)
pool = @delegate.pool
connection = pool.acquire
response = request.call(connection)
unless response.stream?
raise ConnectionError.new("Failed to negotiate connection!", response.unwrap)
end
protocol = response.headers[SEC_WEBSOCKET_PROTOCOL]&.first
stream = response.stream
framer = Framer.new(pool, connection, stream)
connection = nil
if extension_headers = response.headers[SEC_WEBSOCKET_EXTENSIONS]
extensions.accept(extension_headers)
end
response = nil
stream = nil
return handler.call(framer, protocol, extensions, **@options, &block)
ensure
pool.release(connection) if connection
end
end
end
end