Skip to content

Commit fb4667f

Browse files
authored
[INLONG-5072][Sort] Add metric computing of MySQL and PostgreSQL and HBase for user query metric by label (#5073)
1 parent 173b966 commit fb4667f

File tree

23 files changed

+2274
-24
lines changed

23 files changed

+2274
-24
lines changed

inlong-sort/sort-connectors/hbase/pom.xml

+5-7
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@
4646
</exclusion>
4747
</exclusions>
4848
</dependency>
49+
<dependency>
50+
<groupId>org.apache.hadoop</groupId>
51+
<artifactId>hadoop-common</artifactId>
52+
<scope>provided</scope>
53+
</dependency>
4954
</dependencies>
5055

5156
<dependencyManagement>
@@ -111,13 +116,6 @@
111116
</excludes>
112117
</artifactSet>
113118
<filters>
114-
<filter>
115-
<artifact>org.apache.inlong:sort-connector-*</artifact>
116-
<includes>
117-
<include>org/apache/inlong/**</include>
118-
<include>META-INF/services/org.apache.flink.table.factories.Factory</include>
119-
</includes>
120-
</filter>
121119
<filter>
122120
<artifact>*:*</artifact>
123121
<excludes>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.inlong.sort.hbase;
20+
21+
import org.apache.flink.configuration.ConfigOption;
22+
import org.apache.flink.configuration.ReadableConfig;
23+
import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
24+
import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
25+
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
26+
import org.apache.flink.connector.hbase2.source.HBaseDynamicTableSource;
27+
import org.apache.flink.table.api.TableSchema;
28+
import org.apache.flink.table.connector.sink.DynamicTableSink;
29+
import org.apache.flink.table.connector.source.DynamicTableSource;
30+
import org.apache.flink.table.factories.DynamicTableSinkFactory;
31+
import org.apache.flink.table.factories.DynamicTableSourceFactory;
32+
import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
33+
import org.apache.hadoop.conf.Configuration;
34+
import org.apache.inlong.sort.hbase.sink.HBaseDynamicTableSink;
35+
36+
import java.util.HashSet;
37+
import java.util.Map;
38+
import java.util.Set;
39+
40+
import static org.apache.flink.connector.hbase.options.HBaseOptions.LOOKUP_ASYNC;
41+
import static org.apache.flink.connector.hbase.options.HBaseOptions.LOOKUP_CACHE_MAX_ROWS;
42+
import static org.apache.flink.connector.hbase.options.HBaseOptions.LOOKUP_CACHE_TTL;
43+
import static org.apache.flink.connector.hbase.options.HBaseOptions.LOOKUP_MAX_RETRIES;
44+
import static org.apache.flink.connector.hbase.options.HBaseOptions.NULL_STRING_LITERAL;
45+
import static org.apache.flink.connector.hbase.options.HBaseOptions.PROPERTIES_PREFIX;
46+
import static org.apache.flink.connector.hbase.options.HBaseOptions.SINK_BUFFER_FLUSH_INTERVAL;
47+
import static org.apache.flink.connector.hbase.options.HBaseOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
48+
import static org.apache.flink.connector.hbase.options.HBaseOptions.SINK_BUFFER_FLUSH_MAX_SIZE;
49+
import static org.apache.flink.connector.hbase.options.HBaseOptions.TABLE_NAME;
50+
import static org.apache.flink.connector.hbase.options.HBaseOptions.ZOOKEEPER_QUORUM;
51+
import static org.apache.flink.connector.hbase.options.HBaseOptions.ZOOKEEPER_ZNODE_PARENT;
52+
import static org.apache.flink.connector.hbase.options.HBaseOptions.getHBaseConfiguration;
53+
import static org.apache.flink.connector.hbase.options.HBaseOptions.getHBaseLookupOptions;
54+
import static org.apache.flink.connector.hbase.options.HBaseOptions.getHBaseWriteOptions;
55+
import static org.apache.flink.connector.hbase.options.HBaseOptions.validatePrimaryKey;
56+
import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
57+
import static org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper;
58+
import static org.apache.inlong.sort.hbase.options.InLongOptions.INLONG_METRIC;
59+
60+
/** HBase connector factory. */
61+
public class HBase2DynamicTableFactory
62+
implements DynamicTableSourceFactory, DynamicTableSinkFactory {
63+
64+
private static final String IDENTIFIER = "hbase-2.2-inlong";
65+
66+
@Override
67+
public DynamicTableSource createDynamicTableSource(Context context) {
68+
TableFactoryHelper helper = createTableFactoryHelper(this, context);
69+
helper.validateExcept(PROPERTIES_PREFIX);
70+
71+
final ReadableConfig tableOptions = helper.getOptions();
72+
73+
TableSchema tableSchema = context.getCatalogTable().getSchema();
74+
Map<String, String> options = context.getCatalogTable().getOptions();
75+
76+
validatePrimaryKey(tableSchema);
77+
78+
String tableName = tableOptions.get(TABLE_NAME);
79+
Configuration hbaseConf = getHBaseConfiguration(options);
80+
HBaseLookupOptions lookupOptions = getHBaseLookupOptions(tableOptions);
81+
String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
82+
HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(tableSchema);
83+
84+
return new HBaseDynamicTableSource(
85+
hbaseConf, tableName, hbaseSchema, nullStringLiteral, lookupOptions);
86+
}
87+
88+
@Override
89+
public DynamicTableSink createDynamicTableSink(Context context) {
90+
TableFactoryHelper helper = createTableFactoryHelper(this, context);
91+
helper.validateExcept(PROPERTIES_PREFIX);
92+
93+
final ReadableConfig tableOptions = helper.getOptions();
94+
95+
TableSchema tableSchema = context.getCatalogTable().getSchema();
96+
Map<String, String> options = context.getCatalogTable().getOptions();
97+
98+
validatePrimaryKey(tableSchema);
99+
100+
String tableName = tableOptions.get(TABLE_NAME);
101+
Configuration hbaseConf = getHBaseConfiguration(options);
102+
HBaseWriteOptions hBaseWriteOptions = getHBaseWriteOptions(tableOptions);
103+
String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
104+
HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(tableSchema);
105+
String inLongMetric = tableOptions.get(INLONG_METRIC);
106+
107+
return new HBaseDynamicTableSink(
108+
tableName, hbaseSchema, hbaseConf, hBaseWriteOptions, nullStringLiteral, inLongMetric);
109+
}
110+
111+
@Override
112+
public String factoryIdentifier() {
113+
return IDENTIFIER;
114+
}
115+
116+
@Override
117+
public Set<ConfigOption<?>> requiredOptions() {
118+
Set<ConfigOption<?>> set = new HashSet<>();
119+
set.add(TABLE_NAME);
120+
return set;
121+
}
122+
123+
@Override
124+
public Set<ConfigOption<?>> optionalOptions() {
125+
Set<ConfigOption<?>> set = new HashSet<>();
126+
set.add(ZOOKEEPER_ZNODE_PARENT);
127+
set.add(ZOOKEEPER_QUORUM);
128+
set.add(NULL_STRING_LITERAL);
129+
set.add(SINK_BUFFER_FLUSH_MAX_SIZE);
130+
set.add(SINK_BUFFER_FLUSH_MAX_ROWS);
131+
set.add(SINK_BUFFER_FLUSH_INTERVAL);
132+
set.add(SINK_PARALLELISM);
133+
set.add(LOOKUP_ASYNC);
134+
set.add(LOOKUP_CACHE_MAX_ROWS);
135+
set.add(LOOKUP_CACHE_TTL);
136+
set.add(LOOKUP_MAX_RETRIES);
137+
set.add(INLONG_METRIC);
138+
return set;
139+
}
140+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.inlong.sort.hbase.metric;
20+
21+
import org.apache.flink.metrics.Counter;
22+
import org.apache.flink.metrics.Meter;
23+
import org.apache.flink.metrics.MeterView;
24+
import org.apache.flink.metrics.MetricGroup;
25+
26+
/**
27+
* A collection class for handling metrics
28+
*/
29+
public class MetricData {
30+
31+
private final MetricGroup metricGroup;
32+
33+
private Counter numRecordsOut;
34+
private Counter numBytesOut;
35+
private Counter dirtyRecords;
36+
private Counter dirtyBytes;
37+
private Meter numRecordsOutPerSecond;
38+
private Meter numBytesOutPerSecond;
39+
private static Integer TIME_SPAN_IN_SECONDS = 60;
40+
private static String STREAM_ID = "streamId";
41+
private static String GROUP_ID = "groupId";
42+
private static String NODE_ID = "nodeId";
43+
44+
public MetricData(MetricGroup metricGroup) {
45+
this.metricGroup = metricGroup;
46+
}
47+
48+
public void registerMetricsForNumRecordsOut(String groupId, String streamId, String nodeId, String metricName) {
49+
numRecordsOut =
50+
metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
51+
.counter(metricName);
52+
}
53+
54+
public void registerMetricsForNumBytesOut(String groupId, String streamId, String nodeId, String metricName) {
55+
numBytesOut =
56+
metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
57+
.counter(metricName);
58+
}
59+
60+
public void registerMetricsForNumRecordsOutPerSecond(String groupId, String streamId, String nodeId,
61+
String metricName) {
62+
numRecordsOutPerSecond = metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID,
63+
nodeId)
64+
.meter(metricName, new MeterView(this.numRecordsOut, TIME_SPAN_IN_SECONDS));
65+
}
66+
67+
public void registerMetricsForNumBytesOutPerSecond(String groupId, String streamId, String nodeId,
68+
String metricName) {
69+
numBytesOutPerSecond = metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId)
70+
.addGroup(NODE_ID, nodeId)
71+
.meter(metricName, new MeterView(this.numBytesOut, TIME_SPAN_IN_SECONDS));
72+
}
73+
74+
public void registerMetricsForDirtyRecords(String groupId, String streamId, String nodeId,
75+
String metricName) {
76+
dirtyRecords = metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
77+
.counter(metricName);
78+
}
79+
80+
public void registerMetricsForDirtyBytes(String groupId, String streamId, String nodeId,
81+
String metricName) {
82+
dirtyBytes =
83+
metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID, nodeId)
84+
.counter(metricName);
85+
86+
}
87+
88+
public Counter getNumRecordsOut() {
89+
return numRecordsOut;
90+
}
91+
92+
public Counter getNumBytesOut() {
93+
return numBytesOut;
94+
}
95+
96+
public Counter getDirtyRecords() {
97+
return dirtyRecords;
98+
}
99+
100+
public Counter getDirtyBytes() {
101+
return dirtyBytes;
102+
}
103+
104+
public Meter getNumRecordsOutPerSecond() {
105+
return numRecordsOutPerSecond;
106+
}
107+
108+
public Meter getNumBytesOutPerSecond() {
109+
return numBytesOutPerSecond;
110+
}
111+
112+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.inlong.sort.hbase.options;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.configuration.ConfigOption;
23+
import org.apache.flink.configuration.ConfigOptions;
24+
25+
/** InLong Options for HBase. */
26+
@Internal
27+
public class InLongOptions {
28+
29+
public static final ConfigOption<String> INLONG_METRIC =
30+
ConfigOptions.key("inlong.metric")
31+
.stringType()
32+
.defaultValue("")
33+
.withDescription("INLONG GROUP ID + '_' + STREAM ID + '_' + NODE ID");
34+
}

0 commit comments

Comments
 (0)