Skip to content

Commit 813f2f3

Browse files
committed
Finish auth implementation
1 parent b153a7c commit 813f2f3

9 files changed

+51
-12
lines changed

README.md

-1
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ Ok(ReceivedMessage("spoke-test", "Hello from spoke!", False))
5858
## Development status
5959

6060
### Missing MQTT 3.1.1 features
61-
- Auth
6261
- Persistent sessions across restarts
6362

6463
### Transport channels

src/spoke.gleam

+18-4
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ pub type TransportOptions {
2323
}
2424

2525
pub type AuthDetails {
26-
AuthDetails(user_name: String, password: Option(BitArray))
26+
AuthDetails(username: String, password: Option(BitArray))
2727
}
2828

2929
pub type ConnectOptions {
@@ -171,16 +171,21 @@ pub fn server_timeout_ms(
171171

172172
pub fn using_auth(
173173
options: ConnectOptions,
174-
details: AuthDetails,
174+
username: String,
175+
password: Option(BitArray),
175176
) -> ConnectOptions {
176-
ConnectOptions(..options, authentication: Some(details))
177+
ConnectOptions(
178+
..options,
179+
authentication: Some(AuthDetails(username, password)),
180+
)
177181
}
178182

179183
/// Starts a new MQTT client with the given options.
180184
/// Does not connect to the server, until `connect` is called.
181185
pub fn start(connect_options: ConnectOptions) -> Client {
182186
start_with_ms_keep_alive(
183187
connect_options.client_id,
188+
connect_options.authentication,
184189
connect_options.keep_alive_seconds * 1000,
185190
connect_options.server_timeout_ms,
186191
connect_options.transport_options,
@@ -288,13 +293,15 @@ fn call_or_disconnect(
288293
@internal
289294
pub fn start_with_ms_keep_alive(
290295
client_id: String,
296+
auth: Option(AuthDetails),
291297
keep_alive_ms: Int,
292298
server_timeout_ms: Int,
293299
transport_opts: TransportOptions,
294300
) -> Client {
295301
let updates = process.new_subject()
296302
let connect = fn() { create_channel(transport_opts) }
297-
let config = Config(client_id, keep_alive_ms, server_timeout_ms, connect)
303+
let config =
304+
Config(client_id, auth, keep_alive_ms, server_timeout_ms, connect)
298305
let assert Ok(client) =
299306
actor.start_spec(actor.Spec(fn() { init(config, updates) }, 100, run_client))
300307
Client(client, updates, config)
@@ -310,6 +317,7 @@ fn create_channel(options: TransportOptions) -> ChannelResult(ByteChannel) {
310317
type Config {
311318
Config(
312319
client_id: String,
320+
auth: Option(AuthDetails),
313321
keep_alive: Int,
314322
server_timeout: Int,
315323
connect: fn() -> ChannelResult(ByteChannel),
@@ -452,6 +460,11 @@ fn handle_connect(
452460
return: actor.continue(state),
453461
)
454462

463+
let auth =
464+
option.map(state.config.auth, fn(auth) {
465+
packet.AuthOptions(auth.username, auth.password)
466+
})
467+
455468
let will =
456469
option.map(will, fn(will) {
457470
let data = packet.MessageData(will.topic, will.payload, will.retain)
@@ -466,6 +479,7 @@ fn handle_connect(
466479
clean_session,
467480
config.keep_alive,
468481
config.server_timeout,
482+
auth,
469483
will,
470484
)
471485

src/spoke/internal/connection.gleam

+4-4
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ pub type Update {
4040
ReceivedPacket(incoming.Packet)
4141
}
4242

43-
// TODO: Add auth, will, etc
4443
/// Starts the connection.
4544
/// The connection will be alive until there is a channel error,
4645
/// protocol violation, or explicitly disconnected.
@@ -51,6 +50,7 @@ pub fn connect(
5150
clean_session: Bool,
5251
keep_alive_ms: Int,
5352
server_timeout_ms: Int,
53+
auth: Option(packet.AuthOptions),
5454
will: Option(#(packet.MessageData, packet.QoS)),
5555
) -> Result(Connection, String) {
5656
// Start linked, as this *really* shouldn't fail
@@ -60,7 +60,7 @@ pub fn connect(
6060
let messages = process.new_subject()
6161
let connect = process.new_subject()
6262
process.send(ack_subject, #(messages, connect))
63-
establish_channel(messages, connect, clean_session, will)
63+
establish_channel(messages, connect, clean_session, auth, will)
6464
})
6565

6666
// Start monitoring and unlink
@@ -160,6 +160,7 @@ fn establish_channel(
160160
mailbox: Subject(Message),
161161
connect: Subject(Config),
162162
clean_session: Bool,
163+
auth: Option(packet.AuthOptions),
163164
will: Option(#(packet.MessageData, packet.QoS)),
164165
) -> Nil {
165166
// If we can't receive in time, the parent is very stuck, so dying is fine
@@ -187,13 +188,12 @@ fn establish_channel(
187188
connection_state,
188189
)
189190

190-
// TODO auth, will
191191
let connect_options =
192192
packet.ConnectOptions(
193193
clean_session:,
194194
client_id: config.client_id,
195195
keep_alive_seconds: config.keep_alive_ms / 1000,
196-
auth: None,
196+
auth:,
197197
will:,
198198
)
199199

src/spoke/internal/packet.gleam

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ pub type MessageData {
1717

1818
// MQTT does not allow using only a password
1919
pub type AuthOptions {
20-
AuthOptions(user_name: String, password: Option(BitArray))
20+
AuthOptions(username: String, password: Option(BitArray))
2121
}
2222

2323
pub type ConnectOptions {

src/spoke/internal/packet/encode.gleam

+1-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ pub fn connect(options: ConnectOptions) -> BytesTree {
6060
Some(auth) -> {
6161
let payload =
6262
payload
63-
|> bytes_tree.append(string(auth.user_name))
63+
|> bytes_tree.append(string(auth.username))
6464

6565
case auth.password {
6666
Some(password) -> {

test/connect_test.gleam

+22
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,28 @@ pub fn connect_with_will_test() {
140140
spoke.disconnect(client)
141141
}
142142

143+
pub fn connect_with_auth_test() {
144+
let server = fake_server.start_server()
145+
let client =
146+
fake_server.default_options(server.port)
147+
|> spoke.connect_with_id(default_client_id)
148+
|> spoke.using_auth("user", Some(<<"Hunter2">>))
149+
|> spoke.start
150+
151+
spoke.connect(client, True)
152+
153+
let server = fake_server.expect_connection_established(server)
154+
fake_server.expect_packet_matching(server, fn(packet) {
155+
let assert server_in.Connect(connect_data) = packet
156+
case connect_data.auth {
157+
Some(packet.AuthOptions("user", Some(<<"Hunter2">>))) -> True
158+
_ -> False
159+
}
160+
})
161+
162+
spoke.disconnect(client)
163+
}
164+
143165
fn connect_with_error(
144166
error: packet.ConnectError,
145167
client_id: String,

test/fake_server.gleam

+2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import gleam/bit_array
22
import gleam/bytes_tree.{type BytesTree}
33
import gleam/erlang/process
4+
import gleam/option.{None}
45
import gleam/string
56
import gleeunit/should
67
import glisten/socket.{type ListenSocket, type Socket}
@@ -38,6 +39,7 @@ pub fn set_up_connected_client_with_timeout(
3839
let client =
3940
spoke.start_with_ms_keep_alive(
4041
"ping-client",
42+
None,
4143
1000,
4244
timeout,
4345
default_options(server.port),

test/options_test.gleam

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ pub fn connect_options_test() {
1919
) =
2020
spoke.default_tcp_options("")
2121
|> spoke.connect_with_id("client-id")
22-
|> spoke.using_auth(spoke.AuthDetails("user", Some(<<"Hunter2">>)))
22+
|> spoke.using_auth("user", Some(<<"Hunter2">>))
2323
|> spoke.keep_alive_seconds(42)
2424
|> spoke.server_timeout_ms(420)
2525
as "Connect option modifiers should be properly applied"

test/ping_test.gleam

+2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import fake_server.{type ConnectedServer}
22
import gleam/erlang/process
3+
import gleam/option.{None}
34
import spoke.{AtMostOnce}
45
import spoke/internal/packet/server/incoming as server_in
56
import spoke/internal/packet/server/outgoing as server_out
@@ -79,6 +80,7 @@ fn set_up_connected() -> #(spoke.Client, ConnectedServer) {
7980
let client =
8081
spoke.start_with_ms_keep_alive(
8182
"ping-client",
83+
None,
8284
keep_alive,
8385
server_timeout,
8486
fake_server.default_options(server.port),

0 commit comments

Comments
 (0)