@@ -74,12 +74,11 @@ type participant struct {
74
74
keyBuilder * KeyBuilder
75
75
zkClient * uzk.Client
76
76
// Mirrors org.apache.helix.participant.HelixStateMachineEngine
77
- // stateModelName->stateModelProcessor
78
- stateModelProcessors sync. Map
77
+ stateModelProcessorsMu sync. Mutex
78
+ stateModelProcessors map [ string ] * StateModelProcessor // state model name to processor
79
79
stateModelProcessorLocks map [string ]* sync.Mutex
80
80
stateModel StateModel
81
- sync.Mutex
82
- dataAccessor * DataAccessor
81
+ dataAccessor * DataAccessor
83
82
84
83
// fatalErrChan would notify user when a fatal error occurs
85
84
fatalErrChan chan error
@@ -122,6 +121,7 @@ func NewParticipant(
122
121
port : port ,
123
122
keyBuilder : keyBuilder ,
124
123
zkClient : zkClient ,
124
+ stateModelProcessors : make (map [string ]* StateModelProcessor ),
125
125
stateModelProcessorLocks : make (map [string ]* sync.Mutex ),
126
126
dataAccessor : newDataAccessor (zkClient , keyBuilder ),
127
127
stateModel : NewStateModel (),
@@ -159,7 +159,9 @@ func (p *participant) IsConnected() bool {
159
159
160
160
// RegisterStateModel associates state trasition functions with the participant
161
161
func (p * participant ) RegisterStateModel (stateModelName string , processor * StateModelProcessor ) {
162
- p .stateModelProcessors .Store (stateModelName , processor )
162
+ p .stateModelProcessorsMu .Lock ()
163
+ p .stateModelProcessors [stateModelName ] = processor
164
+ p .stateModelProcessorsMu .Unlock ()
163
165
p .stateModelProcessorLocks [stateModelName ] = & sync.Mutex {}
164
166
}
165
167
@@ -435,7 +437,7 @@ func (p *participant) handleMsg(msg *model.Message) error {
435
437
mu , ok := p .stateModelProcessorLocks [msg .GetStateModelDef ()]
436
438
if ! ok {
437
439
p .logger .Error ("failed to find state model in stateModelProcessorLocks" ,
438
- zap .String ("StateModelDefinition " , msg .GetStateModelDef ()),
440
+ zap .String ("stateModelDefinition " , msg .GetStateModelDef ()),
439
441
zap .Any ("stateModelProcessorLocks" , p .stateModelProcessorLocks ))
440
442
return errMsgMissingStateModelDef
441
443
}
@@ -556,8 +558,9 @@ func (p *participant) handleStateTransition(msg *model.Message) error {
556
558
// set the msg execution time
557
559
msg .SetExecuteStartTime (time .Now ())
558
560
559
- if val , ok := p .stateModelProcessors .Load (msg .GetStateModelDef ()); ok {
560
- processor := val .(* StateModelProcessor )
561
+ p .stateModelProcessorsMu .Lock ()
562
+ defer p .stateModelProcessorsMu .Unlock ()
563
+ if processor , ok := p .stateModelProcessors [msg .GetStateModelDef ()]; ok {
561
564
if toStateHandler , ok := processor .Transitions [fromState ]; ok {
562
565
if handler , ok := toStateHandler [toState ]; ok {
563
566
// TODO: deal with handler error
0 commit comments