|
1 | 1 | package com.alibaba.otter.canal.deployer;
|
2 | 2 |
|
3 |
| -import java.util.Map; |
4 |
| -import java.util.Properties; |
5 |
| - |
6 |
| -import org.I0Itec.zkclient.IZkStateListener; |
7 |
| -import org.I0Itec.zkclient.exception.ZkNoNodeException; |
8 |
| -import org.I0Itec.zkclient.exception.ZkNodeExistsException; |
9 |
| -import org.apache.commons.lang.BooleanUtils; |
10 |
| -import org.apache.commons.lang.StringUtils; |
11 |
| -import org.apache.zookeeper.Watcher.Event.KeeperState; |
12 |
| -import org.slf4j.Logger; |
13 |
| -import org.slf4j.LoggerFactory; |
14 |
| -import org.slf4j.MDC; |
15 |
| - |
16 | 3 | import com.alibaba.otter.canal.common.utils.AddressUtils;
|
17 | 4 | import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
|
18 | 5 | import com.alibaba.otter.canal.common.zookeeper.ZookeeperPathUtils;
|
|
36 | 23 | import com.google.common.base.Function;
|
37 | 24 | import com.google.common.collect.MapMaker;
|
38 | 25 | import com.google.common.collect.MigrateMap;
|
| 26 | +import org.I0Itec.zkclient.IZkStateListener; |
| 27 | +import org.I0Itec.zkclient.exception.ZkNoNodeException; |
| 28 | +import org.I0Itec.zkclient.exception.ZkNodeExistsException; |
| 29 | +import org.apache.commons.lang.BooleanUtils; |
| 30 | +import org.apache.commons.lang.StringUtils; |
| 31 | +import org.apache.zookeeper.Watcher.Event.KeeperState; |
| 32 | +import org.slf4j.Logger; |
| 33 | +import org.slf4j.LoggerFactory; |
| 34 | +import org.slf4j.MDC; |
| 35 | + |
| 36 | +import java.util.ArrayList; |
| 37 | +import java.util.List; |
| 38 | +import java.util.Map; |
| 39 | +import java.util.Properties; |
| 40 | +import java.util.regex.Matcher; |
| 41 | +import java.util.regex.Pattern; |
| 42 | +import java.util.stream.Collectors; |
| 43 | + |
| 44 | +import static com.alibaba.otter.canal.deployer.CanalConstants.CANAL_DESTINATIONS; |
| 45 | +import static com.alibaba.otter.canal.deployer.CanalConstants.CANAL_DESTINATIONS_EXPR; |
39 | 46 |
|
40 | 47 | /**
|
41 | 48 | * canal调度控制器
|
@@ -390,7 +397,7 @@ private PlainCanalConfigClient getManagerClient(String managerAddress) {
|
390 | 397 | }
|
391 | 398 |
|
392 | 399 | private void initInstanceConfig(Properties properties) {
|
393 |
| - String destinationStr = getProperty(properties, CanalConstants.CANAL_DESTINATIONS); |
| 400 | + String destinationStr = getDestinations(properties); |
394 | 401 | String[] destinations = StringUtils.split(destinationStr, CanalConstants.CANAL_DESTINATION_SPLIT);
|
395 | 402 |
|
396 | 403 | for (String destination : destinations) {
|
@@ -461,6 +468,40 @@ public static String getProperty(Properties properties, String key) {
|
461 | 468 | return StringUtils.trim(value);
|
462 | 469 | }
|
463 | 470 |
|
| 471 | + public static String getDestinations(Properties properties) { |
| 472 | + String expr = getProperty(properties, CANAL_DESTINATIONS_EXPR); |
| 473 | + if (StringUtils.isNotBlank(expr)) { |
| 474 | + return parseExpr(expr); |
| 475 | + } else { |
| 476 | + return getProperty(properties, CANAL_DESTINATIONS); |
| 477 | + } |
| 478 | + } |
| 479 | + |
| 480 | + private static String parseExpr(String expr) { |
| 481 | + String prefix = StringUtils.substringBefore(expr, "{"); |
| 482 | + String range = StringUtils.substringAfter(expr, "{"); |
| 483 | + range = StringUtils.substringBefore(range, "}"); |
| 484 | + |
| 485 | + String regex = "(\\d+)-(\\d+)"; |
| 486 | + Pattern pattern = Pattern.compile(regex); |
| 487 | + Matcher matcher = pattern.matcher(range); |
| 488 | + if (matcher.find()) { |
| 489 | + String head = matcher.group(1); |
| 490 | + String tail = matcher.group(2); |
| 491 | + int start = Integer.parseInt(head); |
| 492 | + int end = Integer.parseInt(tail); |
| 493 | + |
| 494 | + List<String> list = new ArrayList<>(); |
| 495 | + for (int i = start; i <= end; i++) { |
| 496 | + String d = prefix + i; |
| 497 | + list.add(d); |
| 498 | + } |
| 499 | + return list.stream().map(Object::toString).collect(Collectors.joining(",")); |
| 500 | + } else { |
| 501 | + throw new CanalServerException("invalid destinations expr " + expr); |
| 502 | + } |
| 503 | + } |
| 504 | + |
464 | 505 | public void start() throws Throwable {
|
465 | 506 | logger.info("## start the canal server[{}({}):{}]", ip, registerIp, port);
|
466 | 507 | // 创建整个canal的工作节点
|
|
0 commit comments