@@ -20,30 +20,44 @@ func InternalSync(
20
20
rules []models.Rule ,
21
21
ruleToEndpoints map [string ][]string ,
22
22
) error {
23
+ tx , err := db .BeginTx (ctx , nil )
24
+ if err != nil {
25
+ return err
26
+ }
27
+ defer tx .Rollback ()
28
+
23
29
updatedAt := models .NewTime (time .Now ())
24
30
25
31
for _ , end := range endpoints {
26
- if err := internalEndpointUpsert (ctx , db , end , updatedAt ); err != nil {
32
+ if err := internalEndpointUpsert (ctx , tx , end , updatedAt ); err != nil {
27
33
return err
28
34
}
29
35
}
30
36
31
37
for _ , rule := range rules {
32
- if err := internalRuleUpsert (ctx , db , rule , updatedAt ); err != nil {
38
+ if err := internalRuleUpsert (ctx , tx , rule , updatedAt ); err != nil {
33
39
return err
34
40
}
35
41
}
36
42
43
+ if err := internalDeleteOlderThan (ctx , tx , updatedAt ); err != nil {
44
+ return err
45
+ }
46
+
37
47
for k , v := range ruleToEndpoints {
38
- if err := internalRuleEndpointsUpsert (ctx , db , k , v , updatedAt ); err != nil {
48
+ if err := internalRuleEndpointsInsert (ctx , tx , k , v ); err != nil {
39
49
return err
40
50
}
41
51
}
42
52
43
- return internalDeleteOlderThan (ctx , db , updatedAt )
53
+ if err := tx .Commit (); err != nil {
54
+ return err
55
+ }
56
+
57
+ return nil
44
58
}
45
59
46
- func internalEndpointUpsert (ctx context.Context , db database.Querier , r models.Endpoint , updatedAt models.Time ) error {
60
+ func internalEndpointUpsert (ctx context.Context , tx database.QuerierTx , r models.Endpoint , updatedAt models.Time ) error {
47
61
m := struct {
48
62
models.Endpoint
49
63
UpdatedAt models.Time
@@ -53,8 +67,9 @@ func internalEndpointUpsert(ctx context.Context, db database.Querier, r models.E
53
67
UpdatedAt : updatedAt ,
54
68
CreatedAt : models .NewTime (time .Now ()),
55
69
}
56
- _ , err := Endpoints .
57
- INSERT (
70
+
71
+ res , err := Endpoints .
72
+ UPDATE (
58
73
Endpoints .Internal ,
59
74
Endpoints .InternalID ,
60
75
Endpoints .Name ,
@@ -65,26 +80,41 @@ func internalEndpointUpsert(ctx context.Context, db database.Querier, r models.E
65
80
Endpoints .Kind ,
66
81
Endpoints .Config ,
67
82
Endpoints .UpdatedAt ,
68
- Endpoints .CreatedAt ,
69
83
).
70
84
MODEL (m ).
71
- ON_CONFLICT (Endpoints .InternalID ).
72
- DO_UPDATE (SET (
73
- Endpoints .InternalID .SET (Endpoints .EXCLUDED .InternalID ),
74
- Endpoints .Name .SET (Endpoints .EXCLUDED .Name ),
75
- Endpoints .AttachmentDisable .SET (Endpoints .EXCLUDED .AttachmentDisable ),
76
- Endpoints .TextDisable .SET (Endpoints .EXCLUDED .TextDisable ),
77
- Endpoints .TitleTemplate .SET (Endpoints .EXCLUDED .TitleTemplate ),
78
- Endpoints .BodyTemplate .SET (Endpoints .EXCLUDED .BodyTemplate ),
79
- Endpoints .Kind .SET (Endpoints .EXCLUDED .Kind ),
80
- Endpoints .Config .SET (Endpoints .EXCLUDED .Config ),
81
- Endpoints .UpdatedAt .SET (Endpoints .EXCLUDED .UpdatedAt ),
82
- )).
83
- ExecContext (ctx , db )
84
- return err
85
+ WHERE (Endpoints .InternalID .EQ (String (r .InternalID .String ))).
86
+ ExecContext (ctx , tx )
87
+ if err != nil {
88
+ return err
89
+ }
90
+
91
+ count , err := res .RowsAffected ()
92
+ if count == 0 {
93
+ _ , err := Endpoints .
94
+ INSERT (
95
+ Endpoints .Internal ,
96
+ Endpoints .InternalID ,
97
+ Endpoints .Name ,
98
+ Endpoints .AttachmentDisable ,
99
+ Endpoints .TextDisable ,
100
+ Endpoints .TitleTemplate ,
101
+ Endpoints .BodyTemplate ,
102
+ Endpoints .Kind ,
103
+ Endpoints .Config ,
104
+ Endpoints .UpdatedAt ,
105
+ Endpoints .CreatedAt ,
106
+ ).
107
+ MODEL (m ).
108
+ ExecContext (ctx , tx )
109
+ if err != nil {
110
+ return err
111
+ }
112
+ }
113
+
114
+ return nil
85
115
}
86
116
87
- func internalRuleUpsert (ctx context.Context , db database.Querier , r models.Rule , updatedAt models.Time ) error {
117
+ func internalRuleUpsert (ctx context.Context , tx database.QuerierTx , r models.Rule , updatedAt models.Time ) error {
88
118
m := struct {
89
119
models.Rule
90
120
UpdatedAt models.Time
@@ -94,47 +124,68 @@ func internalRuleUpsert(ctx context.Context, db database.Querier, r models.Rule,
94
124
UpdatedAt : updatedAt ,
95
125
CreatedAt : models .NewTime (time .Now ()),
96
126
}
97
- _ , err := Rules .
98
- INSERT (
127
+
128
+ res , err := Rules .
129
+ UPDATE (
99
130
Rules .Internal ,
100
131
Rules .InternalID ,
101
132
Rules .Name ,
102
133
Rules .Expression ,
103
- Rules .Enable ,
104
134
Rules .UpdatedAt ,
105
- Rules .CreatedAt ,
106
135
).
107
136
MODEL (m ).
108
- ON_CONFLICT (Rules .InternalID ).
109
- DO_UPDATE (SET (
110
- Rules .Name .SET (Rules .EXCLUDED .Name ),
111
- Rules .Expression .SET (Rules .EXCLUDED .Expression ),
112
- Rules .UpdatedAt .SET (Rules .EXCLUDED .UpdatedAt ),
113
- )).
114
- ExecContext (ctx , db )
137
+ WHERE (Rules .InternalID .EQ (String (r .InternalID .String ))).
138
+ ExecContext (ctx , tx )
139
+ if err != nil {
140
+ return err
141
+ }
142
+
143
+ count , err := res .RowsAffected ()
144
+ if count == 0 {
145
+ _ , err := Rules .
146
+ INSERT (
147
+ Rules .Internal ,
148
+ Rules .InternalID ,
149
+ Rules .Name ,
150
+ Rules .Expression ,
151
+ Rules .Enable ,
152
+ Rules .UpdatedAt ,
153
+ Rules .CreatedAt ,
154
+ ).
155
+ MODEL (m ).
156
+ ExecContext (ctx , tx )
157
+ if err != nil {
158
+ return err
159
+ }
160
+ }
115
161
116
162
return err
117
163
}
118
164
119
- func internalRuleEndpointsUpsert (ctx context.Context , db database.Querier , ruleInternalID string , endpointInternalIDs []string , updatedAt models.Time ) error {
120
- if len (endpointInternalIDs ) == 0 {
121
- return nil
165
+ func internalRuleEndpointsInsert (ctx context.Context , tx database.QuerierTx , ruleInternalID string , endpointInternalIDs []string ) error {
166
+ _ , err := RulesToEndpoints .
167
+ DELETE ().
168
+ WHERE (AND (
169
+ RulesToEndpoints .RuleID .IN (Rules .SELECT (Rules .ID ).WHERE (Rules .InternalID .EQ (String (ruleInternalID )))),
170
+ RulesToEndpoints .Internal .EQ (Bool (true )),
171
+ )).
172
+ ExecContext (ctx , tx )
173
+ if err != nil {
174
+ return err
122
175
}
123
176
124
177
for _ , endpointInternalID := range endpointInternalIDs {
125
- // TODO: refactor this
126
- res , err := db .ExecContext (ctx , `
178
+ res , err := tx .ExecContext (ctx , `
127
179
INSERT INTO rules_to_endpoints (
128
180
internal,
129
181
rule_id,
130
- endpoint_id,
131
- updated_at,
132
- created_at
133
- ) SELECT "1" AS internal, rules.id AS rule_id, endpoints.id AS endpoint_id, ?, ?
182
+ endpoint_id
183
+ ) SELECT "1" AS internal, rules.id AS rule_id, endpoints.id AS endpoint_id
134
184
FROM rules, endpoints
135
- WHERE rules.internal_id=? AND endpoints.internal_id IN (?)
136
- ON CONFLICT (rule_id, endpoint_id) DO UPDATE SET updated_at=EXCLUDED.updated_at, internal=EXCLUDED.internal
137
- ` , updatedAt , updatedAt , ruleInternalID , endpointInternalID )
185
+ WHERE rules.internal_id=? AND endpoints.internal_id IN (?)
186
+ LIMIT 1
187
+ ON CONFLICT (rule_id, endpoint_id) DO UPDATE SET internal=EXCLUDED.internal
188
+ ` , ruleInternalID , endpointInternalID )
138
189
if err != nil {
139
190
return err
140
191
}
@@ -150,24 +201,14 @@ func internalRuleEndpointsUpsert(ctx context.Context, db database.Querier, ruleI
150
201
return nil
151
202
}
152
203
153
- func internalDeleteOlderThan (ctx context.Context , db database.Querier , date models.Time ) error {
154
- _ , err := RulesToEndpoints .
155
- DELETE ().
156
- WHERE (AND (
157
- RulesToEndpoints .Internal .IS_TRUE (),
158
- RulesToEndpoints .UpdatedAt .LT (RawTimestamp (muhTypeAffinity (date ))),
159
- )).
160
- ExecContext (ctx , db )
161
- if err != nil {
162
- return err
163
- }
164
- _ , err = Rules .
204
+ func internalDeleteOlderThan (ctx context.Context , tx database.QuerierTx , date models.Time ) error {
205
+ _ , err := Rules .
165
206
DELETE ().
166
207
WHERE (AND (
167
208
Rules .Internal .IS_TRUE (),
168
209
Rules .UpdatedAt .LT (RawTimestamp (muhTypeAffinity (date ))),
169
210
)).
170
- ExecContext (ctx , db )
211
+ ExecContext (ctx , tx )
171
212
if err != nil {
172
213
return err
173
214
}
@@ -177,6 +218,6 @@ func internalDeleteOlderThan(ctx context.Context, db database.Querier, date mode
177
218
Endpoints .Internal .IS_TRUE (),
178
219
Endpoints .UpdatedAt .LT (RawTimestamp (muhTypeAffinity (date ))),
179
220
)).
180
- ExecContext (ctx , db )
221
+ ExecContext (ctx , tx )
181
222
return err
182
223
}
0 commit comments