Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip - socket channel notifications #19

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions example/web/elm/src/Chat.elm
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import Phoenix.Channel as Channel exposing (Channel)
import Phoenix.Socket as Socket exposing (Socket, AbnormalClose)
import Phoenix.Push as Push
import Time exposing (Time)
import Dict exposing (Dict)


main : Program Never Model Msg
Expand All @@ -35,6 +36,7 @@ type alias Model =
, composedMessage : String
, accessToken : Int
, connectionStatus : ConnectionStatus
, channelStatuses : Dict String Channel.State
, currentTime : Time
}

Expand Down Expand Up @@ -68,6 +70,7 @@ initModel =
, composedMessage = ""
, accessToken = 1
, connectionStatus = Disconnected
, channelStatuses = Dict.empty
, currentTime = 0
}

Expand All @@ -94,6 +97,7 @@ type Msg
| SendComposedMessage
| SocketClosedAbnormally AbnormalClose
| ConnectionStatusChanged ConnectionStatus
| ChannelStatusUpdated String Channel.State
| Tick Time


Expand Down Expand Up @@ -160,6 +164,9 @@ update message model =
}
! []

ChannelStatusUpdated topic status ->
model ! []

ConnectionStatusChanged connectionStatus ->
{ model | connectionStatus = connectionStatus } ! []

Expand Down Expand Up @@ -213,6 +220,7 @@ socket accessToken =
|> Socket.onOpen (ConnectionStatusChanged Connected)
|> Socket.onClose (\_ -> ConnectionStatusChanged Disconnected)
|> Socket.onAbnormalClose SocketClosedAbnormally
|> Socket.onChannelStatus ChannelStatusUpdated
|> Socket.reconnectTimer (\backoffIteration -> (backoffIteration + 1) * 5000 |> toFloat)


Expand Down
136 changes: 112 additions & 24 deletions src/Phoenix.elm
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import Phoenix.Internal.Socket as InternalSocket exposing (InternalSocket)
import Phoenix.Channel as Channel exposing (Channel)
import Phoenix.Socket as Socket exposing (Socket)
import Phoenix.Push as Push exposing (Push)
import Set


-- SUBSCRIPTIONS
Expand Down Expand Up @@ -166,6 +167,10 @@ type alias Ref =
Int


type alias Router msg =
Platform.Router msg (Msg msg)



-- INIT

Expand All @@ -180,7 +185,7 @@ init =


onEffects :
Platform.Router msg (Msg msg)
Router msg
-> List (MyCmd msg)
-> List (MySub msg)
-> State msg
Expand All @@ -205,7 +210,57 @@ onEffects router cmds subs state =
getNewSockets
getNewChannels
in
sendPushsHelp cmds state <&> \newState -> updateState newState
sendPushsHelp cmds state
<&> updateState
<&> channelStatusChangedNotifications router state


channelStatusChangedNotifications : Router msg -> State msg -> State msg -> Task Never (State msg)
channelStatusChangedNotifications router previous next =
let
allEndpoints =
Set.toList <|
Set.union
(previous.channels |> Dict.keys |> Set.fromList)
(next.channels |> Dict.keys |> Set.fromList)

notifications =
allEndpoints
|> List.map (endpointNotifications router previous next)
|> Task.sequence
in
notifications
&> Task.succeed next


endpointNotifications : Router msg -> State msg -> State msg -> String -> Task Never ()
endpointNotifications router previous next endpoint =
case getChannelStatusCb endpoint next of
Nothing ->
Task.succeed ()

Just callback ->
let
diff =
InternalChannel.dictStatusDiff
(getChannels endpoint previous)
(getChannels endpoint next)

notifications =
List.map (sendChannelStatusToApp router callback) diff
in
Task.sequence notifications &> Task.succeed ()


sendChannelStatusToApp router callback ( topic, status ) =
callback topic (externalStateFor status)
|> Platform.sendToApp router


getChannels endpoint state =
state.channels
|> Dict.get endpoint
|> Maybe.withDefault Dict.empty



Expand Down Expand Up @@ -259,7 +314,7 @@ sendPushsHelp cmds state =
<&> sendPushsHelp rest


handleSocketsUpdate : Platform.Router msg (Msg msg) -> Dict String (Socket msg) -> InternalSocketsDict msg -> Task Never (InternalSocketsDict msg)
handleSocketsUpdate : Router msg -> Dict String (Socket msg) -> InternalSocketsDict msg -> Task Never (InternalSocketsDict msg)
handleSocketsUpdate router definedSockets stateSockets =
let
-- addedSocketsStep: endpoints where we have to open a new socket connection
Expand All @@ -284,7 +339,7 @@ handleSocketsUpdate router definedSockets stateSockets =
Dict.merge addedSocketsStep retainedSocketsStep removedSocketsStep definedSockets stateSockets (Task.succeed Dict.empty)


handleChannelsUpdate : Platform.Router msg (Msg msg) -> InternalChannelsDict msg -> InternalChannelsDict msg -> Task Never (InternalChannelsDict msg)
handleChannelsUpdate : Router msg -> InternalChannelsDict msg -> InternalChannelsDict msg -> Task Never (InternalChannelsDict msg)
handleChannelsUpdate router nextChannels previousChannels =
let
addedChannelsStep endpoint definedEndpointChannels taskChain =
Expand Down Expand Up @@ -320,7 +375,7 @@ handleChannelsUpdate router nextChannels previousChannels =
Dict.merge addedChannelsStep retainedChannelsStep removedChannelsStep nextChannels previousChannels (Task.succeed Dict.empty)


handleEndpointChannelsUpdate : Platform.Router msg (Msg msg) -> Endpoint -> Dict Topic (InternalChannel msg) -> Dict Topic (InternalChannel msg) -> Task Never (Dict Topic (InternalChannel msg))
handleEndpointChannelsUpdate : Router msg -> Endpoint -> Dict Topic (InternalChannel msg) -> Dict Topic (InternalChannel msg) -> Task Never (Dict Topic (InternalChannel msg))
handleEndpointChannelsUpdate router endpoint definedChannels stateChannels =
let
leftStep topic defined getNewChannels =
Expand All @@ -341,7 +396,7 @@ handleEndpointChannelsUpdate router endpoint definedChannels stateChannels =
Dict.merge leftStep bothStep rightStep definedChannels stateChannels (Task.succeed Dict.empty)


sendLeaveChannel : Platform.Router msg (Msg msg) -> Endpoint -> InternalChannel msg -> Task Never ()
sendLeaveChannel : Router msg -> Endpoint -> InternalChannel msg -> Task Never ()
sendLeaveChannel router endpoint internalChannel =
case internalChannel.state of
InternalChannel.Joined ->
Expand All @@ -351,7 +406,7 @@ sendLeaveChannel router endpoint internalChannel =
Task.succeed ()


sendJoinChannel : Platform.Router msg (Msg msg) -> Endpoint -> InternalChannel msg -> Task Never ()
sendJoinChannel : Router msg -> Endpoint -> InternalChannel msg -> Task Never ()
sendJoinChannel router endpoint internalChannel =
Platform.sendToSelf router (JoinChannel endpoint internalChannel)
&> maybeNotifyApp router internalChannel.channel.onRequestJoin
Expand Down Expand Up @@ -423,8 +478,14 @@ type Msg msg
| PushResponse (Push msg) Message


onSelfMsg : Platform.Router msg (Msg msg) -> Msg msg -> State msg -> Task Never (State msg)
onSelfMsg : Router msg -> Msg msg -> State msg -> Task Never (State msg)
onSelfMsg router selfMsg state =
applySelfMsg router selfMsg state
<&> channelStatusChangedNotifications router state


applySelfMsg : Router msg -> Msg msg -> State msg -> Task Never (State msg)
applySelfMsg router selfMsg state =
case selfMsg of
GoodOpen endpoint ws ->
case InternalSocket.get endpoint state.sockets of
Expand All @@ -444,7 +505,7 @@ onSelfMsg router selfMsg state =
in
notifyOnOpen
&> (heartbeat router endpoint state_)
<&> (rejoinAllChannels endpoint)
<&> (rejoinAllChannels router endpoint)

-- somehow the state is messed up
Nothing ->
Expand Down Expand Up @@ -632,7 +693,7 @@ onSelfMsg router selfMsg state =
Task.succeed state


handleSelfcallback : Platform.Router msg (Msg msg) -> Endpoint -> Message -> Dict Ref (SelfCallback msg) -> Task x (Dict Ref (SelfCallback msg))
handleSelfcallback : Router msg -> Endpoint -> Message -> Dict Ref (SelfCallback msg) -> Task x (Dict Ref (SelfCallback msg))
handleSelfcallback router endpoint message selfCallbacks =
case message.ref of
Nothing ->
Expand All @@ -659,7 +720,7 @@ processQueue endpoint messages state =
<&> processQueue endpoint rest


handlePhoenixMessage : Platform.Router msg (Msg msg) -> Endpoint -> Message -> State msg -> Task x (State msg)
handlePhoenixMessage : Router msg -> Endpoint -> Message -> State msg -> Task x (State msg)
handlePhoenixMessage router endpoint message state =
case message.event of
"phx_error" ->
Expand All @@ -680,7 +741,7 @@ handlePhoenixMessage router endpoint message state =
Just onError ->
Platform.sendToApp router onError
in
sendToApp &> sendJoinHelper endpoint [ newChannel ] state
sendToApp &> sendJoinHelper router endpoint [ newChannel ] state

-- TODO do we have to do something here?
"phx_close" ->
Expand All @@ -690,7 +751,7 @@ handlePhoenixMessage router endpoint message state =
Task.succeed state


dispatchMessage : Platform.Router msg (Msg msg) -> Endpoint -> Message -> InternalChannelsDict msg -> Task x ()
dispatchMessage : Router msg -> Endpoint -> Message -> InternalChannelsDict msg -> Task x ()
dispatchMessage router endpoint message channels =
case getEventCb endpoint message channels of
Nothing ->
Expand All @@ -710,7 +771,7 @@ getEventCb endpoint message channels =
Dict.get message.event channel.on


handleChannelJoinReply : Platform.Router msg (Msg msg) -> Endpoint -> Topic -> Message -> InternalChannel.State -> InternalChannelsDict msg -> Task x (InternalChannelsDict msg)
handleChannelJoinReply : Router msg -> Endpoint -> Topic -> Message -> InternalChannel.State -> InternalChannelsDict msg -> Task x (InternalChannelsDict msg)
handleChannelJoinReply router endpoint topic message prevState channels =
let
maybeChannel =
Expand Down Expand Up @@ -759,7 +820,7 @@ handleChannelJoinReply router endpoint topic message prevState channels =
|> Maybe.withDefault (Task.succeed channels)


handleChannelDisconnect : Platform.Router msg (Msg msg) -> Endpoint -> State msg -> Task x (State msg)
handleChannelDisconnect : Router msg -> Endpoint -> State msg -> Task x (State msg)
handleChannelDisconnect router endpoint state =
case Dict.get endpoint state.channels of
Nothing ->
Expand Down Expand Up @@ -797,7 +858,7 @@ handleChannelDisconnect router endpoint state =
)


heartbeat : Platform.Router msg (Msg msg) -> Endpoint -> State msg -> Task x (State msg)
heartbeat : Router msg -> Endpoint -> State msg -> Task x (State msg)
heartbeat router endpoint state =
case Dict.get endpoint state.sockets of
Just { socket } ->
Expand All @@ -816,18 +877,18 @@ heartbeatMessage =
Message.init "phoenix" "heartbeat"


rejoinAllChannels : Endpoint -> State msg -> Task x (State msg)
rejoinAllChannels endpoint state =
rejoinAllChannels : Router msg -> Endpoint -> State msg -> Task x (State msg)
rejoinAllChannels router endpoint state =
case Dict.get endpoint state.channels of
Nothing ->
Task.succeed state

Just endpointChannels ->
sendJoinHelper endpoint (Dict.values endpointChannels) state
sendJoinHelper router endpoint (Dict.values endpointChannels) state


sendJoinHelper : Endpoint -> List (InternalChannel msg) -> State msg -> Task x (State msg)
sendJoinHelper endpoint channels state =
sendJoinHelper : Router msg -> Endpoint -> List (InternalChannel msg) -> State msg -> Task x (State msg)
sendJoinHelper router endpoint channels state =
case channels of
[] ->
Task.succeed state
Expand All @@ -847,7 +908,7 @@ sendJoinHelper endpoint channels state =
Helpers.insertIn endpoint internalChannel.channel.topic newChannel state.channels
in
pushSocket_ endpoint message (Just selfCb) (updateChannels newChannels state)
<&> \newState -> sendJoinHelper endpoint rest newState
<&> \newState -> sendJoinHelper router endpoint rest newState



Expand Down Expand Up @@ -927,7 +988,7 @@ pushSocket endpoint message selfCb state =
-- OPENING WEBSOCKETS WITH EXPONENTIAL BACKOFF


attemptOpen : Platform.Router msg (Msg msg) -> Float -> InternalSocket msg -> Task x Process.Id
attemptOpen : Router msg -> Float -> InternalSocket msg -> Task x Process.Id
attemptOpen router backoff ({ connection, socket } as internalSocket) =
let
goodOpen ws =
Expand All @@ -943,7 +1004,7 @@ attemptOpen router backoff ({ connection, socket } as internalSocket) =
Process.spawn (after backoff &> actuallyAttemptOpen)


open : InternalSocket msg -> Platform.Router msg (Msg msg) -> Task WS.BadOpen WS.WebSocket
open : InternalSocket msg -> Router msg -> Task WS.BadOpen WS.WebSocket
open socket router =
let
onMessage _ msg =
Expand Down Expand Up @@ -981,3 +1042,30 @@ maybeNotifyApp router maybeTagger =

maybeAndMap =
Maybe.map2 (|>)


externalStateFor : InternalChannel.State -> Channel.State
externalStateFor internalState =
case internalState of
InternalChannel.Closed ->
Channel.Closed

InternalChannel.Joining ->
Channel.Joining

InternalChannel.Joined ->
Channel.Joined

InternalChannel.Errored ->
Channel.Errored

InternalChannel.Disconnected ->
Channel.Disconnected


getChannelStatusCb : String -> State msg -> Maybe (String -> Channel.State -> msg)
getChannelStatusCb endpoint state =
state.sockets
|> Dict.get endpoint
|> Maybe.map .socket
|> Maybe.andThen .onChannelStatus
9 changes: 9 additions & 0 deletions src/Phoenix/Channel.elm
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ module Phoenix.Channel
, onLeaveError
, withDebug
, map
, State(..)
)

{-| A channel declares which topic should be joined, registers event handlers and has various callbacks for possible lifecycle events.
Expand All @@ -33,6 +34,14 @@ import Json.Decode as Decode exposing (Value)
-- CHANNEL


type State
= Closed
| Joining
| Joined
| Errored
| Disconnected


{-| Representation of a Phoenix Channel
-}
type alias Channel msg =
Expand Down
Loading