From e32e7a9e327d7ed7af105f1d2a8c283f75980e52 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=8D=A2=E6=98=A5=E4=BA=AE?= <946240095@qq.com> Date: Wed, 8 Dec 2021 15:35:19 +0800 Subject: [PATCH] [INLONG-1891] Inlong-Sort-Standalone add sort-standalone-source module. --- inlong-sort-standalone/pom.xml | 5 +- .../sort-standalone-common/pom.xml | 28 ++- .../holder/SortClusterConfigHolder.java | 1 - .../config/pojo/SortClusterResponse.java | 108 --------- .../config/pojo/type/CacheType.java | 4 +- .../standalone/config/pojo/type/DataType.java | 4 +- .../standalone/config/pojo/type/SortType.java | 4 +- .../standalone/utils/InlongLoggerFactory.java | 22 -- .../sort-standalone-source/pom.xml | 55 ++--- .../PropertiesConfigurationProvider.java | 64 +++++ .../inlong/sort/standalone/SortCluster.java | 135 +++++++++++ .../standalone/SortStandaloneApplication.java | 23 +- .../inlong/sort/standalone/SortTask.java | 212 +++++++++++++++++ .../channel/BufferQueueChannel.java | 180 ++++++++++++++ .../sort/standalone/channel/ProfileEvent.java | 121 ++++++++++ .../channel/ProfileTransaction.java | 110 +++++++++ .../standalone/dispatch/DispatchManager.java | 157 +++++++++++++ .../standalone/dispatch/DispatchProfile.java | 173 ++++++++++++++ .../sort/standalone/sink/SinkContext.java | 220 ++++++++++++++++++ 19 files changed, 1441 insertions(+), 185 deletions(-) create mode 100644 inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/PropertiesConfigurationProvider.java create mode 100644 inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortCluster.java create mode 100644 inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortTask.java create mode 100644 inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/BufferQueueChannel.java create mode 100644 inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/ProfileEvent.java create mode 100644 inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/ProfileTransaction.java create mode 100644 inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/dispatch/DispatchManager.java create mode 100644 inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/dispatch/DispatchProfile.java create mode 100644 inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java diff --git a/inlong-sort-standalone/pom.xml b/inlong-sort-standalone/pom.xml index c329ff121f8..067287695b0 100644 --- a/inlong-sort-standalone/pom.xml +++ b/inlong-sort-standalone/pom.xml @@ -44,6 +44,7 @@ 3.2.0 2.7.2 4.13 + 2.0.2 19.0 false UTF-8 @@ -85,13 +86,13 @@ org.powermock powermock-module-junit4 - 2.0.2 + ${powermock.version} test org.powermock powermock-api-mockito2 - 2.0.2 + ${powermock.version} test diff --git a/inlong-sort-standalone/sort-standalone-common/pom.xml b/inlong-sort-standalone/sort-standalone-common/pom.xml index 4c99a28e17a..c4fdccc3eb3 100644 --- a/inlong-sort-standalone/sort-standalone-common/pom.xml +++ b/inlong-sort-standalone/sort-standalone-common/pom.xml @@ -1,14 +1,22 @@ - + sortTasks = new ArrayList<>(); -// clusterConfig.setSortTasks(sortTasks); -// SortTaskConfig taskConfig = new SortTaskConfig(); -// sortTasks.add(taskConfig); -// taskConfig.setName("sid_tdbank_atta6th_v3"); -// taskConfig.setType(SortType.TQTDBANK); -// // -// Map sinkParams = new HashMap<>(); -// taskConfig.setSinkParams(sinkParams); -// sinkParams.put("b_pcg_venus_szrecone_124_153_utf8", "10.56.15.195:46801,10.56.15.212:46801," -// + "10.56.15.220:46801,10.56.15.221:46801," -// + "10.56.15.230:46801,10.56.16.20:46801,10.56.16.38:46801,10.56.20.21:46801,10.56.20.80:46801," -// + "10.56.20.85:46801,10.56.209.205:46801,10.56.21.17:46801,10.56.21.20:46801,10.56.21.79:46801," -// + "10.56.21.85:46801,10.56.81.205:46801,10.56.81.211:46801,10.56.82.11:46801,10.56.82.12:46801," -// + "10.56.82.37:46801,10.56.82.38:46801,10.56.82.40:46801,10.56.83.143:46801,10.56.83.80:46801," -// + "10.56.84.17:46801"); -// // -// List> idParams = new ArrayList<>(); -// Map idParam = new HashMap<>(); -// idParams.add(idParam); -// taskConfig.setIdParams(idParams); -// idParam.put(Constants.INLONG_GROUP_ID, "0fc00000046"); -// idParam.put(Constants.INLONG_STREAM_ID, ""); -// idParam.put(TdbankConfig.KEY_BID, "b_pcg_venus_szrecone_124_153_utf8"); -// idParam.put(TdbankConfig.KEY_TID, "t_sh_atta_v2_0fc00000046"); -// idParam.put(TdbankConfig.KEY_DATA_TYPE, TdbankConfig.DATA_TYPE_ATTA_TEXT); -// return clusterConfig; -// } -// -// /** -// * generateCdmqConfig -// * -// * @return -// */ -// public static SortClusterConfig generateCdmqConfig() { -// SortClusterConfig clusterConfig = new SortClusterConfig(); -// clusterConfig.setClusterName("cdmqv3-sz-sz1"); -// // -// List sortTasks = new ArrayList<>(); -// clusterConfig.setSortTasks(sortTasks); -// SortTaskConfig taskConfig = new SortTaskConfig(); -// sortTasks.add(taskConfig); -// taskConfig.setName("sid_cdmq_kg_videorequest_v3"); -// taskConfig.setType(SortType.CDMQ); -// // -// Map sinkParams = new HashMap<>(); -// taskConfig.setSinkParams(sinkParams); -// sinkParams.put("cdmqAccessPoint", "cdmqszentry01.data.mig:10005,cdmqszentry05.data.mig:10033"); -// sinkParams.put("cdmqClusterId", "kg_videorequest"); -// sinkParams.put("clientId", "p_video_atta_196"); -// sinkParams.put("batchSize", "122880"); -// sinkParams.put("maxRequestSize", "8388608"); -// sinkParams.put("lingerMs", "150"); -// // -// List> idParams = new ArrayList<>(); -// Map idParam = new HashMap<>(); -// idParams.add(idParam); -// taskConfig.setIdParams(idParams); -// idParam.put(Constants.INLONG_GROUP_ID, "0fc00000046"); -// idParam.put(Constants.TOPIC, "U_TOPIC_0fc00000046"); -// return clusterConfig; -// } -// -// /** -// * main -// * -// * @param args -// */ -// public static void main(String[] args) { -// // tdbank -// { -// SortClusterConfig config = generateTdbankConfig(); -// String configString = JSON.toJSONString(config, false); -// System.out.println("tdbank:" + configString); -// String md5 = DigestUtils.md5Hex(configString); -// SortClusterResponse response = new SortClusterResponse(); -// response.setResult(true); -// response.setErrCode(SUCC); -// response.setMd5(md5); -// response.setData(config); -// String responseString = JSON.toJSONString(response, true); -// System.out.println("tdbank responseString:" + responseString); -// } -// // cdmq -// { -// SortClusterConfig config = generateCdmqConfig(); -// String configString = JSON.toJSONString(config, false); -// System.out.println("cdmq:" + configString); -// String md5 = DigestUtils.md5Hex(configString); -// SortClusterResponse response = new SortClusterResponse(); -// response.setResult(true); -// response.setErrCode(SUCC); -// response.setMd5(md5); -// response.setData(config); -// String responseString = JSON.toJSONString(response, true); -// System.out.println("cdmq responseString:" + responseString); -// } -// } } diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/CacheType.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/CacheType.java index 963dc6ed648..845ccaafc16 100644 --- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/CacheType.java +++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/CacheType.java @@ -22,7 +22,7 @@ */ public enum CacheType { - TUBE("tube"), KAFKA("kafka"), PULSAR("pulsar"), N("n"); + TUBE("tube"), KAFKA("kafka"), PULSAR("pulsar"), UNKNOWN("n"); private final String value; @@ -65,6 +65,6 @@ public static CacheType convert(String value) { return v; } } - return N; + return UNKNOWN; } } diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/DataType.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/DataType.java index 882e29e67b1..d5a7d64ad79 100644 --- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/DataType.java +++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/DataType.java @@ -22,7 +22,7 @@ */ public enum DataType { - TEXT("text"), PB("pb"), JCE("jce"), N("n"); + TEXT("text"), PB("pb"), JCE("jce"), UNKNOWN("n"); private final String value; @@ -65,6 +65,6 @@ public static DataType convert(String value) { return v; } } - return N; + return UNKNOWN; } } diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/SortType.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/SortType.java index e066928a5c2..6444821e2ee 100644 --- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/SortType.java +++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/SortType.java @@ -24,7 +24,7 @@ public enum SortType { HIVE("hive"), TUBE("tube"), KAFKA("kafka"), PULSAR("pulsar"), ElasticSearch("ElasticSearch"), THTDBANK( - "thtdbank"), TQTDBANK("tqtdbank"), CDMQ("cdmq"), N("n"); + "thtdbank"), TQTDBANK("tqtdbank"), CDMQ("cdmq"), UNKNOWN("n"); private final String value; @@ -67,6 +67,6 @@ public static SortType convert(String value) { return v; } } - return N; + return UNKNOWN; } } \ No newline at end of file diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/InlongLoggerFactory.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/InlongLoggerFactory.java index d4bdea757df..274483afcff 100644 --- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/InlongLoggerFactory.java +++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/InlongLoggerFactory.java @@ -60,26 +60,4 @@ public static String getClassNamePrefix(String className, int layer) { String namePrefix = className.substring(0, index); return namePrefix; } - -// /** -// * main -// * @param args -// */ -// public static void main(String[] args) { -// int layer = 3; -// String className = ""; -// System.out.println(className + ":" + getClassNamePrefix(className, layer)); -// className = "ccc"; -// System.out.println(className + ":" + getClassNamePrefix(className, layer)); -// className = "org.ccc"; -// System.out.println(className + ":" + getClassNamePrefix(className, layer)); -// className = "org.apache.ccc"; -// System.out.println(className + ":" + getClassNamePrefix(className, layer)); -// className = "org.apache.inlong.ccc"; -// System.out.println(className + ":" + getClassNamePrefix(className, layer)); -// className = "org.apache.inlong.sort.ccc"; -// System.out.println(className + ":" + getClassNamePrefix(className, layer)); -// className = "org.apache.inlong.sort.standalone.ccc"; -// System.out.println(className + ":" + getClassNamePrefix(className, layer)); -// } } diff --git a/inlong-sort-standalone/sort-standalone-source/pom.xml b/inlong-sort-standalone/sort-standalone-source/pom.xml index 630592ae2fe..3abc5a53cb9 100644 --- a/inlong-sort-standalone/sort-standalone-source/pom.xml +++ b/inlong-sort-standalone/sort-standalone-source/pom.xml @@ -1,14 +1,22 @@ - + UTF-8 1.8 1.8 - 4.13 - 19.0 - false - com.google.guava - guava - ${guava.version} - - - org.powermock - powermock-module-junit4 - 2.0.2 - test - - - org.powermock - powermock-api-mockito2 - 2.0.2 - test - - - junit - junit - ${junit.version} - test + org.apache.inlong + sort-standalone-common + ${project.version} \ No newline at end of file diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/PropertiesConfigurationProvider.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/PropertiesConfigurationProvider.java new file mode 100644 index 00000000000..853bfc2c6c7 --- /dev/null +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/PropertiesConfigurationProvider.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.standalone; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.flume.conf.FlumeConfiguration; +import org.apache.flume.node.AbstractConfigurationProvider; +import org.slf4j.Logger; +import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory; + +/** + * + * PropertiesConfigurationProvider + */ +public class PropertiesConfigurationProvider extends + AbstractConfigurationProvider { + + public static final Logger LOG = InlongLoggerFactory.getLogger(PropertiesConfigurationProvider.class); + + private final Map flumeConf; + + /** + * PropertiesConfigurationProvider + * + * @param agentName + * @param flumeConf + */ + public PropertiesConfigurationProvider(String agentName, Map flumeConf) { + super(agentName); + this.flumeConf = flumeConf; + } + + /** + * getFlumeConfiguration + * + * @return + */ + @Override + public FlumeConfiguration getFlumeConfiguration() { + try { + return new FlumeConfiguration(flumeConf); + } catch (Exception e) { + LOG.error("exception catch:" + e.getMessage(), e); + } + return new FlumeConfiguration(new HashMap()); + } +} \ No newline at end of file diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortCluster.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortCluster.java new file mode 100644 index 00000000000..c91f985c288 --- /dev/null +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortCluster.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.standalone; + +import static org.apache.inlong.sort.standalone.utils.Constants.RELOAD_INTERVAL; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder; +import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder; +import org.apache.inlong.sort.standalone.config.pojo.SortClusterConfig; +import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig; +import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory; +import org.slf4j.Logger; + +/** + * + * SortCluster + */ +public class SortCluster { + + public static final Logger LOG = InlongLoggerFactory.getLogger(SortCluster.class); + + private Timer reloadTimer; + private Map taskMap = new ConcurrentHashMap<>(); + private List deletingTasks = new ArrayList<>(); + + /** + * start + */ + public void start() { + try { + this.reload(); + this.setReloadTimer(); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + } + + /** + * close + */ + public void close() { + try { + this.reloadTimer.cancel(); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + } + + /** + * setReloadTimer + */ + private void setReloadTimer() { + reloadTimer = new Timer(true); + TimerTask task = new TimerTask() { + + /** + * run + */ + public void run() { + reload(); + } + }; + long reloadInterval = CommonPropertiesHolder.getLong(RELOAD_INTERVAL, 60000L); + reloadTimer.schedule(task, new Date(System.currentTimeMillis() + reloadInterval), reloadInterval); + } + + /** + * reload + */ + public void reload() { + try { + // get new config + SortClusterConfig newConfig = SortClusterConfigHolder.getClusterConfig(); + if (newConfig == null) { + return; + } + // add new task + for (SortTaskConfig taskConfig : newConfig.getSortTasks()) { + String newTaskName = taskConfig.getName(); + if (taskMap.containsKey(newTaskName)) { + continue; + } + SortTask newTask = new SortTask(newTaskName); + newTask.start(); + this.taskMap.put(newTaskName, newTask); + } + // remove task + for (Entry entry : taskMap.entrySet()) { + String taskName = entry.getKey(); + boolean isFound = false; + for (SortTaskConfig taskConfig : newConfig.getSortTasks()) { + if (taskName.equals(taskConfig.getName())) { + isFound = true; + break; + } + } + if (!isFound) { + this.deletingTasks.add(entry.getValue()); + } + } + // stop deleting task list + for (SortTask task : deletingTasks) { + task.stop(); + taskMap.remove(task.getTaskName()); + } + deletingTasks.clear(); + } catch (Throwable e) { + LOG.error(e.getMessage(), e); + } + } +} diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortStandaloneApplication.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortStandaloneApplication.java index c0dc143ace3..48c04707b85 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortStandaloneApplication.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortStandaloneApplication.java @@ -17,17 +17,36 @@ package org.apache.inlong.sort.standalone; +import org.apache.flume.node.Application; +import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder; +import org.apache.inlong.sort.standalone.metrics.MetricObserver; +import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory; +import org.slf4j.Logger; + /** * - * Application + * SortStandaloneApplication */ public class SortStandaloneApplication { + public static final Logger LOG = InlongLoggerFactory.getLogger(Application.class); + /** * main * * @param args */ public static void main(String[] args) { + LOG.info("start to sort-standalone"); + try { + SortCluster cluster = new SortCluster(); + // + cluster.start(); + // metrics + MetricObserver.init(CommonPropertiesHolder.get()); + Thread.sleep(5000); + } catch (Exception e) { + LOG.error("A fatal error occurred while running. Exception follows.", e); + } } -} +} \ No newline at end of file diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortTask.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortTask.java new file mode 100644 index 00000000000..7748ae6b86b --- /dev/null +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortTask.java @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.standalone; + +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.flume.Channel; +import org.apache.flume.SinkRunner; +import org.apache.flume.SourceRunner; +import org.apache.flume.lifecycle.LifecycleState; +import org.apache.flume.lifecycle.LifecycleSupervisor; +import org.apache.flume.lifecycle.LifecycleSupervisor.SupervisorPolicy; +import org.apache.flume.node.MaterializedConfiguration; +import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder; +import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig; +import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory; +import org.slf4j.Logger; + +import com.google.common.eventbus.Subscribe; + +/** + * + * SortTask + */ +public class SortTask { + + public static final Logger LOG = InlongLoggerFactory.getLogger(SortTask.class); + + private final String taskName; + private final LifecycleSupervisor supervisor; + private MaterializedConfiguration materializedConfiguration; + private final ReentrantLock lifecycleLock = new ReentrantLock(); + + /** + * Constructor + * + * @param taskName + */ + public SortTask(String taskName) { + this.taskName = taskName; + this.supervisor = new LifecycleSupervisor(); + } + + /** + * get taskName + * + * @return the taskName + */ + public String getTaskName() { + return taskName; + } + + /** + * start + */ + public void start() { + SortTaskConfig config = SortClusterConfigHolder.getTaskConfig(taskName); + if (config == null) { + return; + } + + // + Map flumeConfiguration = config.generateFlumeConfiguration(); + LOG.info("Start sort task:{},config:{}", taskName, flumeConfiguration); + PropertiesConfigurationProvider configurationProvider = new PropertiesConfigurationProvider( + config.getName(), flumeConfiguration); + this.handleConfigurationEvent(configurationProvider.getConfiguration()); + } + + /** + * handleConfigurationEvent + * + * @param conf + */ + @Subscribe + public void handleConfigurationEvent(MaterializedConfiguration conf) { + try { + lifecycleLock.lockInterruptibly(); + stopAllComponents(); + startAllComponents(conf); + } catch (InterruptedException e) { + LOG.info("Interrupted while trying to handle configuration event"); + return; + } finally { + // If interrupted while trying to lock, we don't own the lock, so must not attempt to unlock + if (lifecycleLock.isHeldByCurrentThread()) { + lifecycleLock.unlock(); + } + } + } + + /** + * stop + */ + public void stop() { + lifecycleLock.lock(); + stopAllComponents(); + try { + supervisor.stop(); + } finally { + lifecycleLock.unlock(); + } + } + + /** + * stopAllComponents + */ + private void stopAllComponents() { + if (this.materializedConfiguration != null) { + LOG.info("Shutting down configuration: {}", this.materializedConfiguration); + for (Entry entry : this.materializedConfiguration.getSourceRunners().entrySet()) { + try { + LOG.info("Stopping Source " + entry.getKey()); + supervisor.unsupervise(entry.getValue()); + } catch (Exception e) { + LOG.error("Error while stopping {}", entry.getValue(), e); + } + } + + for (Entry entry : this.materializedConfiguration.getSinkRunners().entrySet()) { + try { + LOG.info("Stopping Sink " + entry.getKey()); + supervisor.unsupervise(entry.getValue()); + } catch (Exception e) { + LOG.error("Error while stopping {}", entry.getValue(), e); + } + } + + for (Entry entry : this.materializedConfiguration.getChannels().entrySet()) { + try { + LOG.info("Stopping Channel " + entry.getKey()); + supervisor.unsupervise(entry.getValue()); + } catch (Exception e) { + LOG.error("Error while stopping {}", entry.getValue(), e); + } + } + } + } + + /** + * startAllComponents + * + * @param materializedConfiguration + */ + private void startAllComponents(MaterializedConfiguration materializedConfiguration) { + LOG.info("Starting new configuration:{}", materializedConfiguration); + + this.materializedConfiguration = materializedConfiguration; + + for (Entry entry : materializedConfiguration.getChannels().entrySet()) { + try { + LOG.info("Starting Channel " + entry.getKey()); + supervisor.supervise(entry.getValue(), + new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); + } catch (Exception e) { + LOG.error("Error while starting {}", entry.getValue(), e); + } + } + + /* + * Wait for all channels to start. + */ + for (Channel ch : materializedConfiguration.getChannels().values()) { + while (ch.getLifecycleState() != LifecycleState.START + && !supervisor.isComponentInErrorState(ch)) { + try { + LOG.info("Waiting for channel: " + ch.getName() + " to start. Sleeping for 500 ms"); + Thread.sleep(500); + } catch (InterruptedException e) { + LOG.error("Interrupted while waiting for channel to start.", e); + } + } + } + + for (Entry entry : materializedConfiguration.getSinkRunners().entrySet()) { + try { + LOG.info("Starting Sink " + entry.getKey()); + supervisor.supervise(entry.getValue(), + new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); + } catch (Exception e) { + LOG.error("Error while starting {}", entry.getValue(), e); + } + } + + for (Entry entry : materializedConfiguration.getSourceRunners().entrySet()) { + try { + LOG.info("Starting Source " + entry.getKey()); + supervisor.supervise(entry.getValue(), + new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); + } catch (Exception e) { + LOG.error("Error while starting {}", entry.getValue(), e); + } + } + } +} \ No newline at end of file diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/BufferQueueChannel.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/BufferQueueChannel.java new file mode 100644 index 00000000000..b15a396e8fb --- /dev/null +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/BufferQueueChannel.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.standalone.channel; + +import java.util.Date; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.flume.ChannelException; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.Transaction; +import org.apache.flume.channel.AbstractChannel; +import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder; +import org.apache.inlong.sort.standalone.utils.BufferQueue; +import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory; +import org.apache.inlong.sort.standalone.utils.SizeSemaphore; +import org.slf4j.Logger; + +import com.google.common.base.Preconditions; + +/** + * + * BufferQueueChannel + */ +public class BufferQueueChannel extends AbstractChannel { + + public static final Logger LOG = InlongLoggerFactory.getLogger(BufferQueueChannel.class); + + public static final String KEY_MAX_BUFFERQUEUE_SIZE_KB = "maxBufferQueueSizeKb"; + public static final String KEY_RELOADINTERVAL = "reloadInterval"; + public static final int DEFAULT_MAX_BUFFERQUEUE_SIZE_KB = 128 * 1024; + + // global buffer size + private static SizeSemaphore globalBufferQueueSizeKb; + private BufferQueue bufferQueue; + private ThreadLocal currentTransaction = new ThreadLocal(); + protected Timer channelTimer; + private AtomicLong takeCounter = new AtomicLong(0); + private AtomicLong putCounter = new AtomicLong(0); + + /** + * Constructor + */ + public BufferQueueChannel() { + Context context = CommonPropertiesHolder.getContext(); + SizeSemaphore globalSize = getGlobalBufferQueueSizeKb(context); + this.bufferQueue = new BufferQueue<>(globalSize.maxSize() / 3, globalSize); + } + + /** + * put + * + * @param event + * @throws ChannelException + */ + @Override + public void put(Event event) throws ChannelException { + putCounter.incrementAndGet(); + int eventSize = event.getBody().length; + this.bufferQueue.acquire(eventSize); + ProfileTransaction transaction = currentTransaction.get(); + Preconditions.checkState(transaction != null, "No transaction exists for this thread"); + if (event instanceof ProfileEvent) { + ProfileEvent profile = (ProfileEvent) event; + transaction.doPut(profile); + } else { + ProfileEvent profile = new ProfileEvent(event.getBody(), event.getHeaders()); + transaction.doPut(profile); + } + } + + /** + * take + * + * @return Event + * @throws ChannelException + */ + @Override + public Event take() throws ChannelException { + ProfileEvent event = this.bufferQueue.pollRecord(); + if (event != null) { + ProfileTransaction transaction = currentTransaction.get(); + Preconditions.checkState(transaction != null, "No transaction exists for this thread"); + transaction.doTake(event); + takeCounter.incrementAndGet(); + } + return event; + } + + /** + * getTransaction + * + * @return + */ + @Override + public Transaction getTransaction() { + ProfileTransaction newTransaction = new ProfileTransaction(this.bufferQueue); + this.currentTransaction.set(newTransaction); + return newTransaction; + } + + /** + * start + */ + @Override + public void start() { + super.start(); + try { + this.setReloadTimer(); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + } + + /** + * setReloadTimer + */ + protected void setReloadTimer() { + channelTimer = new Timer(true); + long reloadInterval = CommonPropertiesHolder.getLong(KEY_RELOADINTERVAL, 60000L); + TimerTask channelTask = new TimerTask() { + + public void run() { + LOG.info("queueSize:{},availablePermits:{},put:{},take:{}", + bufferQueue.size(), + bufferQueue.availablePermits(), + putCounter.getAndSet(0), + takeCounter.getAndSet(0)); + } + }; + channelTimer.schedule(channelTask, + new Date(System.currentTimeMillis() + reloadInterval), + reloadInterval); + } + + /** + * configure + * + * @param context + */ + @Override + public void configure(Context context) { + } + + /** + * getGlobalBufferQueueSizeKb + * + * @return + */ + public static SizeSemaphore getGlobalBufferQueueSizeKb(Context context) { + if (globalBufferQueueSizeKb != null) { + return globalBufferQueueSizeKb; + } + synchronized (LOG) { + if (globalBufferQueueSizeKb != null) { + return globalBufferQueueSizeKb; + } + int maxBufferQueueSizeKb = context.getInteger(KEY_MAX_BUFFERQUEUE_SIZE_KB, DEFAULT_MAX_BUFFERQUEUE_SIZE_KB); + globalBufferQueueSizeKb = new SizeSemaphore(maxBufferQueueSizeKb, SizeSemaphore.ONEKB); + return globalBufferQueueSizeKb; + } + } +} diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/ProfileEvent.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/ProfileEvent.java new file mode 100644 index 00000000000..153cc25db05 --- /dev/null +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/ProfileEvent.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.standalone.channel; + +import java.util.Map; + +import org.apache.commons.lang.math.NumberUtils; +import org.apache.flume.event.SimpleEvent; +import org.apache.inlong.sort.standalone.config.pojo.InlongId; +import org.apache.inlong.sort.standalone.utils.Constants; + +/** + * + * ProfileEvent + */ +public class ProfileEvent extends SimpleEvent { + + private final String inlongGroupId; + private final String inlongStreamId; + private final String uid; + + private final long rawLogTime; + private final long fetchTime; + private long sendTime; + + /** + * Constructor + * + * @param body + * @param headers + */ + public ProfileEvent(byte[] body, Map headers) { + super.setBody(body); + super.setHeaders(headers); + this.inlongGroupId = headers.get(Constants.INLONG_GROUP_ID); + this.inlongStreamId = headers.get(Constants.INLONG_STREAM_ID); + this.uid = InlongId.generateUid(inlongGroupId, inlongStreamId); + this.fetchTime = System.currentTimeMillis(); + this.sendTime = fetchTime; + this.rawLogTime = NumberUtils.toLong(headers.get(Constants.HEADER_KEY_MSG_TIME), fetchTime); + } + + /** + * get sendTime + * + * @return the sendTime + */ + public long getSendTime() { + return sendTime; + } + + /** + * set sendTime + * + * @param sendTime the sendTime to set + */ + public void setSendTime(long sendTime) { + this.sendTime = sendTime; + } + + /** + * get inlongGroupId + * + * @return the inlongGroupId + */ + public String getInlongGroupId() { + return inlongGroupId; + } + + /** + * get inlongStreamId + * + * @return the inlongStreamId + */ + public String getInlongStreamId() { + return inlongStreamId; + } + + /** + * get rawLogTime + * + * @return the rawLogTime + */ + public long getRawLogTime() { + return rawLogTime; + } + + /** + * get fetchTime + * + * @return the fetchTime + */ + public long getFetchTime() { + return fetchTime; + } + + /** + * get uid + * + * @return the uid + */ + public String getUid() { + return uid; + } + +} diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/ProfileTransaction.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/ProfileTransaction.java new file mode 100644 index 00000000000..a885224fb7d --- /dev/null +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/ProfileTransaction.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.standalone.channel; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.flume.Transaction; +import org.apache.inlong.sort.standalone.utils.BufferQueue; +import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory; +import org.slf4j.Logger; + +/** + * + * ProfileTransaction + */ +public class ProfileTransaction implements Transaction { + + public static final Logger LOG = InlongLoggerFactory.getLogger(ProfileTransaction.class); + + private BufferQueue bufferQueue; + private List takeList = new ArrayList<>(); + private List putList = new ArrayList<>(); + + /** + * Constructor + * + * @param bufferQueue + */ + public ProfileTransaction(BufferQueue bufferQueue) { + this.bufferQueue = bufferQueue; + } + + /** + * begin + */ + @Override + public void begin() { + } + + /** + * commit + */ + @Override + public void commit() { + for (ProfileEvent event : takeList) { + bufferQueue.release(event.getBody().length); + } + this.takeList.clear(); + for (ProfileEvent event : putList) { + this.bufferQueue.offer(event); + } + this.putList.clear(); + } + + /** + * rollback + */ + @Override + public void rollback() { + for (ProfileEvent event : takeList) { + this.bufferQueue.offer(event); + } + this.takeList.clear(); + for (ProfileEvent event : putList) { + bufferQueue.release(event.getBody().length); + } + this.putList.clear(); + } + + /** + * close + */ + @Override + public void close() { + } + + /** + * doTake + * + * @param event + */ + public void doTake(ProfileEvent event) { + this.takeList.add(event); + } + + /** + * doPut + * + * @param event + */ + public void doPut(ProfileEvent event) { + this.putList.add(event); + } +} diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/dispatch/DispatchManager.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/dispatch/DispatchManager.java new file mode 100644 index 00000000000..3d9a3262e84 --- /dev/null +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/dispatch/DispatchManager.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.standalone.dispatch; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.flume.Context; +import org.apache.inlong.sort.standalone.channel.BufferQueueChannel; +import org.apache.inlong.sort.standalone.channel.ProfileEvent; +import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory; +import org.slf4j.Logger; + +/** + * DispatchManager + */ +public class DispatchManager { + + public static final Logger LOG = InlongLoggerFactory.getLogger(BufferQueueChannel.class); + public static final String KEY_DISPATCH_TIMEOUT = "dispatchTimeout"; + public static final String KEY_DISPATCH_MAX_PACKCOUNT = "dispatchMaxPackCount"; + public static final String KEY_DISPATCH_MAX_PACKSIZE = "dispatchMaxPackSize"; + public static final long DEFAULT_DISPATCH_TIMEOUT = 2000; + public static final long DEFAULT_DISPATCH_MAX_PACKCOUNT = 256; + public static final long DEFAULT_DISPATCH_MAX_PACKSIZE = 327680; + + private LinkedBlockingQueue dispatchQueue; + private final long dispatchTimeout; + private final long maxPackCount; + private final long maxPackSize; + private ConcurrentHashMap profileCache = new ConcurrentHashMap<>(); + // + private AtomicBoolean needOutputOvertimeData = new AtomicBoolean(false); + + /** + * Constructor + * + * @param context + * @param dispatchQueue + */ + public DispatchManager(Context context, LinkedBlockingQueue dispatchQueue) { + this.dispatchQueue = dispatchQueue; + this.dispatchTimeout = context.getLong(KEY_DISPATCH_TIMEOUT, DEFAULT_DISPATCH_TIMEOUT); + this.maxPackCount = context.getLong(KEY_DISPATCH_MAX_PACKCOUNT, DEFAULT_DISPATCH_MAX_PACKCOUNT); + this.maxPackSize = context.getLong(KEY_DISPATCH_MAX_PACKSIZE, DEFAULT_DISPATCH_MAX_PACKSIZE); + } + + /** + * addEvent + * + * @param event + */ + public void addEvent(ProfileEvent event) { + if (needOutputOvertimeData.get()) { + this.outputOvertimeData(); + this.needOutputOvertimeData.set(false); + } + // parse + String uid = event.getUid(); + // + DispatchProfile dispatchProfile = this.profileCache.get(uid); + if (dispatchProfile == null) { + dispatchProfile = new DispatchProfile(uid, event.getInlongGroupId(), event.getInlongStreamId()); + this.profileCache.put(uid, dispatchProfile); + } + // + boolean addResult = dispatchProfile.addEvent(event, maxPackCount, maxPackSize); + if (!addResult) { + DispatchProfile newDispatchProfile = new DispatchProfile(uid, event.getInlongGroupId(), + event.getInlongStreamId()); + DispatchProfile oldDispatchProfile = this.profileCache.put(uid, newDispatchProfile); + this.dispatchQueue.offer(oldDispatchProfile); + newDispatchProfile.addEvent(event, maxPackCount, maxPackSize); + } + } + + /** + * outputOvertimeData + * + * @return + */ + public void outputOvertimeData() { + LOG.info("start to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{}", + profileCache.size(), dispatchQueue.size()); + long currentTime = System.currentTimeMillis(); + long createThreshold = currentTime - dispatchTimeout; + List removeKeys = new ArrayList<>(); + long eventCount = 0; + for (Entry entry : this.profileCache.entrySet()) { + DispatchProfile dispatchProfile = entry.getValue(); + eventCount += dispatchProfile.getCount(); + if (!dispatchProfile.isTimeout(createThreshold)) { + continue; + } + removeKeys.add(entry.getKey()); + } + // output + removeKeys.forEach((key) -> { + dispatchQueue.offer(this.profileCache.remove(key)); + }); + LOG.info("end to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{},eventCount:{}", + profileCache.size(), dispatchQueue.size(), eventCount); + } + + /** + * get dispatchTimeout + * + * @return the dispatchTimeout + */ + public long getDispatchTimeout() { + return dispatchTimeout; + } + + /** + * get maxPackCount + * + * @return the maxPackCount + */ + public long getMaxPackCount() { + return maxPackCount; + } + + /** + * get maxPackSize + * + * @return the maxPackSize + */ + public long getMaxPackSize() { + return maxPackSize; + } + + /** + * setNeedOutputOvertimeData + */ + public void setNeedOutputOvertimeData() { + this.needOutputOvertimeData.set(true); + } +} diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/dispatch/DispatchProfile.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/dispatch/DispatchProfile.java new file mode 100644 index 00000000000..2a5d5c65a84 --- /dev/null +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/dispatch/DispatchProfile.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.standalone.dispatch; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.inlong.sort.standalone.channel.ProfileEvent; + +/** + * + * DispatchProfile + */ +public class DispatchProfile { + + private final String inlongGroupId; + private final String inlongStreamId; + private final String uid; + private List events = new ArrayList<>(); + private long createTime = System.currentTimeMillis(); + private long count = 0; + private long size = 0; + private long minEventTime = System.currentTimeMillis(); + + /** + * Constructor + * + * @param uid + * @param inlongGroupId + * @param inlongStreamId + */ + public DispatchProfile(String uid, String inlongGroupId, String inlongStreamId) { + this.uid = uid; + this.inlongGroupId = inlongGroupId; + this.inlongStreamId = inlongStreamId; + } + + /** + * addEvent + * + * @param event + * @param maxPackCount + * @param maxPackSize + * @return + */ + public boolean addEvent(ProfileEvent event, long maxPackCount, long maxPackSize) { + long eventLength = event.getBody().length; + if (count >= maxPackCount || (count > 0 && size + eventLength > maxPackSize)) { + return false; + } + this.events.add(event); + this.count++; + this.size += eventLength; + this.minEventTime = Math.min(minEventTime, event.getRawLogTime()); + return true; + } + + /** + * isTimeout + * + * @param createThreshold + * @return + */ + public boolean isTimeout(long createThreshold) { + return createThreshold >= createTime; + } + + /** + * get uid + * + * @return the uid + */ + public String getUid() { + return uid; + } + + /** + * get events + * + * @return the events + */ + public List getEvents() { + return events; + } + + /** + * set events + * + * @param events the events to set + */ + public void setEvents(List events) { + this.events = events; + } + + /** + * get count + * + * @return the count + */ + public long getCount() { + return count; + } + + /** + * set count + * + * @param count the count to set + */ + public void setCount(long count) { + this.count = count; + } + + /** + * get size + * + * @return the size + */ + public long getSize() { + return size; + } + + /** + * set size + * + * @param size the size to set + */ + public void setSize(long size) { + this.size = size; + } + + /** + * get minEventTime + * + * @return the minEventTime + */ + public long getMinEventTime() { + return minEventTime; + } + + /** + * get inlongGroupId + * + * @return the inlongGroupId + */ + public String getInlongGroupId() { + return inlongGroupId; + } + + /** + * get inlongStreamId + * + * @return the inlongStreamId + */ + public String getInlongStreamId() { + return inlongStreamId; + } + +} diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java new file mode 100644 index 00000000000..8dac973e5da --- /dev/null +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.standalone.sink; + +import java.util.Date; +import java.util.Timer; +import java.util.TimerTask; + +import org.apache.flume.Channel; +import org.apache.flume.Context; +import org.apache.inlong.commons.config.metrics.MetricRegister; +import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder; +import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder; +import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig; +import org.apache.inlong.sort.standalone.metrics.SortMetricItemSet; +import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory; +import org.slf4j.Logger; + +/** + * + * SinkContext + */ +public class SinkContext { + + public static final Logger LOG = InlongLoggerFactory.getLogger(SinkContext.class); + + public static final String KEY_MAX_THREADS = "maxThreads"; + public static final String KEY_PROCESSINTERVAL = "processInterval"; + public static final String KEY_RELOADINTERVAL = "reloadInterval"; + + protected final String clusterId; + protected final String taskName; + protected final String sinkName; + protected final Context sinkContext; + + protected SortTaskConfig sortTaskConfig; + + protected final Channel channel; + // + protected final int maxThreads; + protected final long processInterval; + protected final long reloadInterval; + // + protected final SortMetricItemSet metricItemSet; + protected Timer reloadTimer; + + /** + * Constructor + * + * @param sinkName + * @param context + * @param channel + */ + public SinkContext(String sinkName, Context context, Channel channel) { + this.sinkName = sinkName; + this.sinkContext = context; + this.channel = channel; + this.clusterId = context.getString(CommonPropertiesHolder.KEY_CLUSTER_ID); + this.taskName = context.getString(SortTaskConfig.KEY_TASK_NAME); + this.maxThreads = sinkContext.getInteger(KEY_MAX_THREADS, 10); + this.processInterval = sinkContext.getInteger(KEY_PROCESSINTERVAL, 100); + this.reloadInterval = sinkContext.getLong(KEY_RELOADINTERVAL, 60000L); + // + this.metricItemSet = new SortMetricItemSet(sinkName); + MetricRegister.register(this.metricItemSet); + } + + /** + * start + */ + public void start() { + try { + this.reload(); + this.setReloadTimer(); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + } + + /** + * close + */ + public void close() { + try { + this.reloadTimer.cancel(); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + } + + /** + * setReloadTimer + */ + protected void setReloadTimer() { + reloadTimer = new Timer(true); + TimerTask task = new TimerTask() { + + public void run() { + reload(); + } + }; + reloadTimer.schedule(task, new Date(System.currentTimeMillis() + reloadInterval), reloadInterval); + } + + /** + * reload + */ + public void reload() { + try { + this.sortTaskConfig = SortClusterConfigHolder.getTaskConfig(taskName); + } catch (Throwable e) { + LOG.error(e.getMessage(), e); + } + } + + /** + * get clusterId + * + * @return the clusterId + */ + public String getClusterId() { + return clusterId; + } + + /** + * get taskName + * + * @return the taskName + */ + public String getTaskName() { + return taskName; + } + + /** + * get sinkName + * + * @return the sinkName + */ + public String getSinkName() { + return sinkName; + } + + /** + * get sinkContext + * + * @return the sinkContext + */ + public Context getSinkContext() { + return sinkContext; + } + + /** + * get sortTaskConfig + * + * @return the sortTaskConfig + */ + public SortTaskConfig getSortTaskConfig() { + return sortTaskConfig; + } + + /** + * get channel + * + * @return the channel + */ + public Channel getChannel() { + return channel; + } + + /** + * get maxThreads + * + * @return the maxThreads + */ + public int getMaxThreads() { + return maxThreads; + } + + /** + * get processInterval + * + * @return the processInterval + */ + public long getProcessInterval() { + return processInterval; + } + + /** + * get reloadInterval + * + * @return the reloadInterval + */ + public long getReloadInterval() { + return reloadInterval; + } + + /** + * get metricItemSet + * + * @return the metricItemSet + */ + public SortMetricItemSet getMetricItemSet() { + return metricItemSet; + } +}