-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhour_pred.R
342 lines (236 loc) · 11.3 KB
/
hour_pred.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
#' Function to run predictions on hourly level
#' @param df data frame must contain date, timestamp, landingContentGroup2, country, deviceCategory, operatingSystem, ses, rev, rps hpred, dpred, avg
#' @param current_date date representing "current date" can be used to backtest by setting date in the past
#' @return data frame containg new hpred predictions
predictor_hour <- function(df, current_date) {
numCores <- detectCores()-1 # get the number of cores available
#system(paste("echo '\n",current_date,"'"))
df_amt <- df%>%
filter(date<=current_date)%>%
filter(date>current_date-3)%>%
group_by(date, landingContentGroup2, country, deviceCategory, operatingSystem)%>%
summarise(ses = sum(ses, na.rm = TRUE))%>%
filter(ses >= 100)%>%
group_by(landingContentGroup2, country, deviceCategory, operatingSystem)%>%
summarise(count = n(), ses = sum(ses, na.rm = TRUE))%>%
arrange(desc(count))
df_check <- df%>%
filter(date == current_date+1, !is.na(hpred))%>%
select(date, timestamp, landingContentGroup2, country, deviceCategory, operatingSystem, hpred)
inner_func_hour <- function(row){
name1<-df_amt[row,]
df_hour_tbats<-df_check%>%
filter(landingContentGroup2 == name1$landingContentGroup2 , country == name1$country ,
deviceCategory == name1$deviceCategory, operatingSystem == name1$operatingSystem)%>%
select(date, timestamp, landingContentGroup2, country, deviceCategory, operatingSystem, hpred)
print(nrow(df_hour_tbats))
if(nrow(df_hour_tbats)!=24){
df_1<-df%>%
filter(landingContentGroup2 == name1$landingContentGroup2 , country == name1$country ,
deviceCategory == name1$deviceCategory, operatingSystem == name1$operatingSystem)%>%
filter(date > current_date-30)%>%
filter(date <= current_date)%>%
arrange(timestamp)%>%
select(timestamp, rps, ses)%>%
rename(metric = 2, weight = 3)
df_hour_tbats <-fit_tbats_concourse(df_1, current_date = current_date, forecast = 37)%>%
mutate(date = date(timestamp))%>%
filter(date == current_date+1)%>%
rename(hpred = 2)%>%
select(timestamp, hpred, date)%>%
mutate(landingContentGroup2 = name1$landingContentGroup2 , country = name1$country ,
deviceCategory = name1$deviceCategory, operatingSystem = name1$operatingSystem)%>%
select(date, timestamp, landingContentGroup2, country, deviceCategory, operatingSystem, hpred)
#end pred
}
return(df_hour_tbats)
}
mcresults <- pbmclapply(1:nrow(df_amt),
FUN=function(i) inner_func_hour(i),
mc.cores = numCores,
mc.cleanup = TRUE,
mc.preschedule = FALSE)
mcresults <-bind_rows(mcresults)
return(mcresults)
}
fit_tbats_concourse <- function(df, label, id, extra = list(),
minweight = 0, maxsparsity = 0.25,
aggfun = function(metric, weight) weighted.mean(metric, weight, na.rm = TRUE),
aggintervals = c(1, 2, 3, 4),
interval = '1 hour', frequency = c(24, 24 * 7),
forecast = NULL,
outliers = c('tsoutliers', NA),
na.impute = c('na.interp', 'na.kalman'),
current_date = NULL) {
outliers <- match.arg(outliers)
na.impute <- match.arg(na.impute)
assert_data_frame(df, min.rows = 1)
assert_colname(df, c('timestamp', 'metric', 'weight'))
## parse interval
interval_value <- as.numeric(strsplit(interval, ' ')[[1]][1])
interval_unit <- strsplit(interval, ' ')[[1]][2]
## make sure it's a data.table object
df <- data.table(df)
setorder(df, timestamp)
## pre-aggregate metrics
agginterval <- head(aggintervals, 1)
df[, aggtimestamp := floor_date(timestamp, unit = paste(agginterval, interval_unit))] # nolint
df[, metric := aggfun(metric, weight), by = aggtimestamp] # nolint
df[, weight := sum(weight, na.rm = TRUE), by = aggtimestamp] # nolint
df[, aggtimestamp := NULL] # nolint
## get rid of meaningless rows (so that the next checks do not fail on NAs)
df <- df[!is.na(metric) & is.finite(metric)] # nolint
## the most recent ZERO values are most likely to be actually missing data
if (nrow(df) > 0 && tail(df$metric, 1) == 0) {
## so drop all trailing ZERO metrics
df <- head(df, -1 * (min(which(cumsum(rev(df$metric)) != 0)) - 1))
}
placeholder <- -1
if(as.numeric(difftime(as.POSIXct(current_date+1),df[, max(timestamp)], units = c("hours")))>=12){
#print(paste("not recent data: ", df[, max(timestamp)]))
predf <- data.frame(timestamp = seq(from = as.POSIXct(current_date+1),
by = interval,
length.out = forecast),
rep(placeholder, forecast))
return(predf)
}
if(difftime( max(df$timestamp, na.rm = TRUE) , min(df$timestamp, na.rm = TRUE), units = c("hours")) < frequency[1] *4){
predf <- data.frame(timestamp = seq(from = as.POSIXct(current_date+1),
by = interval,
length.out = forecast),
rep(placeholder, forecast))
return(predf)
}
## remove values with low weights
df <- df[weight > minweight] # nolint
## remove further outliers
if (nrow(df) > 0 && !is.na(outliers) && outliers == 'tsoutliers') {
tryCatch(
df[tsoutliers(ts(df$metric, frequency = frequency[1]))$index,
c('metric', 'weight') := NA],
error = function(e) {
# print(
# 'Outlier removal failed on model id {id} due to {shQuote(e$message)} ',
# 'using {frequency[1]} frequency on the following vector: ',
# paste(capture.output(dput(df$metric)), collapse = ' '))
})
}
## outliers <- AnomalyDetectionTs(
## x = data.frame(df[, .(timestamp, metric)]),
## plot = FALSE,
## max_anoms = 0.05, direction = 'pos')
## if (nrow(outliers$anoms) > 0) {
## TODO
## }
## find gaps in the time series
if (nrow(na.omit(df)) > 0) {
df <- data.table(suppressMessages(pad(df, interval = interval, by = 'timestamp')))
}
ndf <- nrow(df) # nolint
## fall back to most recent records if full data is too sparse
recent_sparsity <- original_sparsity <- sparsity <- df[, round(mean(is.na(metric)), 4)] # nolint
recent_df <- df
while (recent_sparsity > maxsparsity &&
## allow at least 4 periods to figure out at least some seasonality
nrow(recent_df) > frequency[1] * 4) {
## drop 1 frequency of data
recent_df <- tail(recent_df, -frequency[1])
## drop leading NAs
if (!all(is.na(recent_df$metric)) && is.na(recent_df$metric[1])) {
recent_df <- tail(recent_df, -1 * (which(cumsum(!is.na(recent_df$metric)) != 0)[1] - 1))
}
recent_sparsity <- recent_df[, round(mean(is.na(metric)), 4)] # nolint
if (recent_sparsity < sparsity) {
df <- recent_df
sparsity <- recent_sparsity
}
}
if (is.finite(sparsity) && sparsity > maxsparsity) {
## fall back to further interval aggregates if data is still too sparse
if (length(aggintervals) > 1) {
mc <- match.call()
mc$aggintervals <- tail(aggintervals, -1)
return(eval(mc, envir = parent.frame()))
} else {
## time to give up -- by dropping all records
# print(paste(
# ' after dropping all records from the original {ndf} ',
# 'with {original_sparsity * 100}% original sparsity ',
# 'that we could not fix with aggregation and other tweaks'))
predf <- data.frame(timestamp = seq(from = as.POSIXct(current_date+1),
by = interval,
length.out = forecast),
rep(placeholder, forecast))
return(predf)
}
}
## number of missing values
dfna <- df[is.na(metric), .N] # nolint
## impute missing intervals
tsobj <- xts(df$metric, order.by = df$timestamp)
if (dfna > 0) {
if (na.impute == 'na.kalman') {
tsobj <- tryCatch(
## Kalman smoothing & structural time series models
na.kalman(tsobj), error = function(e)
## use last observation if Kalman fails
tryCatch(na.locf(tsobj), error = function(e) tsobj))
df <- merge(setnames(as.data.table(tsobj), c('timestamp', 'metric')),
df[, .(timestamp, weight)], by = 'timestamp', all.x = TRUE)
}
if (na.impute == 'na.interp') {
df[, metric := tryCatch(
as.numeric(na.interp(ts(metric, frequency = frequency[1]))),
error = function(e)
as.numeric(na.locf(ts(metric, frequency = frequency[1]))))]
tsobj <- xts(df$metric, order.by = df$timestamp)
}
}
## build model using all frequencies
model <- tryCatch(qc_tbats(tbats(msts(tsobj, seasonal.periods = frequency))),
error = function(e) e)
## fall back to less seasonal effects if no seasonality found
if (hasName(model, 'seasonal.periods') && length(model$seasonal.periods) < 1 &&
length(frequency) > 1) {
frequency <- head(frequency, -1)
model <- tryCatch(qc_tbats(tbats(msts(tsobj, seasonal.periods = frequency))), error = function(e) e)
}
## fall back to further interval aggregates if
if (( # nolint
## no seasonality found
(hasName(model, 'seasonal.periods') && length(model$seasonal.periods) < 1) |
## low number of observations to capture a real seasonal effect
(nrow(na.omit(tsobj)) < frequency[1] * 4)) &&
## there are further aggintervals available
length(aggintervals) > 1) {
mc <- match.call()
mc$aggintervals <- tail(aggintervals, -1)
return(eval(mc, envir = parent.frame()))
}
## make sure we fail the model on low number of (unique) observations
if (nrow(na.omit(tsobj)) < frequency[1] | length(unique(na.omit(tsobj))) < 3 ) {
model <- structure(
list(message = "failing on low number of observations"),
class = 'error')
}
if (!inherits(model, 'error')) {
## bump number of periods to forecast based on the current hour
forecast_adj <- forecast
preds <- forecast(model, h = forecast_adj)
predf <- as.data.table(preds)
predp <- predf[['Point Forecast']]
pred <- round(head(predp, 1), 2) # nolint
## add timestamps to the predictions for JSON array
predf <- data.frame(timestamp = tail(seq(from = df[, max(timestamp)],
by = interval,
length.out = length(predp) + 1), -1),
predf)
}else{
predf <- data.frame(timestamp = seq(from = as.POSIXct(current_date+1),
by = interval,
length.out = forecast),
rep(placeholder, forecast))
}
return(predf)
## return
}