Skip to content

Commit 02d99cc

Browse files
committedApr 25, 2021
1
1 parent 7c15cf6 commit 02d99cc

32 files changed

+1697
-0
lines changed
 

‎README.md

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# jt809 netty-springboot实现
2+
3+
## 实现
4+
5+
只采用了主链路通信
6+
7+
- 登陆请求/应答
8+
- 心跳检测
9+
- 车辆实时定位信息上传
10+
11+
### jt809协议注意点
12+
13+
- jt809应当根据头尾标识进行粘包/半包处理,详见[here](src/main/java/io/github/cloudintheking/jt809/codec/MessageDecoder.java)
14+
- jt809由头标识+数据头+数据体+crc+尾标识, 针对数据头+数据体进行crc校验, 针对数据头+数据体+crc进行转义(去除数据中和头尾标识相同的数据,防止边界模糊)
15+
- server 应该只负责处理解析数据,将解析后的数据通过消息队列或其他方式发给业务模块处理,防止慢io堵塞整个server
16+
17+
jt809协议详见 [here](doc/JTT%20809.pdf)
18+
19+
## server 运行
20+
21+
idea 打开项目后运行 Jt809Application
22+
23+
## client运行
24+
25+
test目录下执行TcpClientDemo进行测试

‎doc/JTT 809.pdf

4.01 MB
Binary file not shown.

‎pom.xml

+79
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<groupId>io.github.cloudintheking</groupId>
8+
<artifactId>jt809</artifactId>
9+
<version>1.0-SNAPSHOT</version>
10+
11+
<parent>
12+
<groupId>org.springframework.boot</groupId>
13+
<artifactId>spring-boot-starter-parent</artifactId>
14+
<version>2.2.4.RELEASE</version>
15+
<relativePath/>
16+
</parent>
17+
18+
<properties>
19+
<maven.compiler.source>8</maven.compiler.source>
20+
<maven.compiler.target>8</maven.compiler.target>
21+
<netty-all.version>4.1.6.Final</netty-all.version>
22+
<lombok.version>1.16.12</lombok.version>
23+
<fastjson.version>1.2.29</fastjson.version>
24+
<junit.version>4.12</junit.version>
25+
</properties>
26+
27+
<dependencies>
28+
29+
<dependency>
30+
<groupId>org.springframework.boot</groupId>
31+
<artifactId>spring-boot-starter-amqp</artifactId>
32+
</dependency>
33+
34+
<dependency>
35+
<groupId>org.springframework.boot</groupId>
36+
<artifactId>spring-boot-configuration-processor</artifactId>
37+
<optional>true</optional>
38+
</dependency>
39+
40+
<dependency>
41+
<groupId>io.netty</groupId>
42+
<artifactId>netty-all</artifactId>
43+
<version>${netty-all.version}</version>
44+
</dependency>
45+
46+
<dependency>
47+
<groupId>org.projectlombok</groupId>
48+
<artifactId>lombok</artifactId>
49+
<!-- <version>${lombok.version}</version>-->
50+
</dependency>
51+
52+
<dependency>
53+
<groupId>com.alibaba</groupId>
54+
<artifactId>fastjson</artifactId>
55+
<version>${fastjson.version}</version>
56+
</dependency>
57+
58+
<dependency>
59+
<groupId>junit</groupId>
60+
<artifactId>junit</artifactId>
61+
<version>${junit.version}</version>
62+
</dependency>
63+
64+
<dependency>
65+
<groupId>org.apache.commons</groupId>
66+
<artifactId>commons-lang3</artifactId>
67+
</dependency>
68+
69+
</dependencies>
70+
71+
<build>
72+
<plugins>
73+
<plugin>
74+
<groupId>org.springframework.boot</groupId>
75+
<artifactId>spring-boot-maven-plugin</artifactId>
76+
</plugin>
77+
</plugins>
78+
</build>
79+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package io.github.cloudintheking.jt809;
2+
3+
import org.springframework.boot.SpringApplication;
4+
import org.springframework.boot.autoconfigure.SpringBootApplication;
5+
6+
@SpringBootApplication
7+
public class Jt809Application {
8+
public static void main(String[] args) {
9+
SpringApplication.run(Jt809Application.class, args);
10+
}
11+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package io.github.cloudintheking.jt809.attribute;
2+
3+
import io.netty.util.AttributeKey;
4+
5+
public interface Attributes {
6+
AttributeKey<Session> SESSION = AttributeKey.newInstance("session");
7+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package io.github.cloudintheking.jt809.attribute;
2+
3+
import lombok.AllArgsConstructor;
4+
import lombok.Builder;
5+
import lombok.Data;
6+
import lombok.NoArgsConstructor;
7+
8+
@AllArgsConstructor
9+
@NoArgsConstructor
10+
@Data
11+
@Builder(toBuilder = true)
12+
public class Session {
13+
14+
private Integer userId;
15+
private String password;
16+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package io.github.cloudintheking.jt809.client;
2+
3+
import io.github.cloudintheking.jt809.client.handler.HeartBeatHandler;
4+
import io.github.cloudintheking.jt809.client.handler.LoginResponseHandler;
5+
import io.github.cloudintheking.jt809.codec.MessageDecoder;
6+
import io.github.cloudintheking.jt809.codec.MessageEncoder;
7+
import io.netty.bootstrap.Bootstrap;
8+
import io.netty.channel.Channel;
9+
import io.netty.channel.ChannelFuture;
10+
import io.netty.channel.ChannelInitializer;
11+
import io.netty.channel.ChannelOption;
12+
import io.netty.channel.nio.NioEventLoopGroup;
13+
import io.netty.channel.socket.SocketChannel;
14+
import io.netty.channel.socket.nio.NioSocketChannel;
15+
import io.netty.handler.logging.LogLevel;
16+
import io.netty.handler.logging.LoggingHandler;
17+
import lombok.Data;
18+
import org.slf4j.Logger;
19+
import org.slf4j.LoggerFactory;
20+
21+
import java.net.InetSocketAddress;
22+
23+
24+
@Data
25+
public class TcpClient809 {
26+
private Logger LOG = LoggerFactory.getLogger(getClass());
27+
private static final int DEFAULT_PORT = 9000;
28+
29+
private long connectTimeoutMillis = 3000;
30+
31+
private int port = DEFAULT_PORT;
32+
33+
private boolean tcpNoDelay = false;
34+
35+
private boolean reuseAddress = true;
36+
37+
private boolean keepAlive = true;
38+
39+
private int workerCount = 4;
40+
41+
public static TcpClient809 INSTANCE = new TcpClient809();
42+
43+
public Channel getChannel(String address, int port) {
44+
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
45+
Bootstrap bootstrap = new Bootstrap();
46+
bootstrap.group(workerGroup)
47+
.channel(NioSocketChannel.class)
48+
.option(ChannelOption.TCP_NODELAY, tcpNoDelay)
49+
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) connectTimeoutMillis)
50+
.option(ChannelOption.SO_REUSEADDR, reuseAddress)
51+
.option(ChannelOption.SO_KEEPALIVE, keepAlive)
52+
.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 64 * 1024)
53+
.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 32 * 1024)
54+
.handler(new ChannelInitializer<SocketChannel>() {
55+
@Override
56+
protected void initChannel(SocketChannel socketChannel) throws Exception {
57+
socketChannel.pipeline().addLast(new LoggingHandler(LogLevel.ERROR));
58+
socketChannel.pipeline().addLast(new HeartBeatHandler());//心跳发送包处理handler
59+
socketChannel.pipeline().addLast(new MessageDecoder());//解码
60+
socketChannel.pipeline().addLast(MessageEncoder.INSTANCE);//编码
61+
socketChannel.pipeline().addLast(new LoginResponseHandler());//反馈数据处理
62+
}
63+
});
64+
ChannelFuture future = bootstrap.connect(new InetSocketAddress(address, port));
65+
future.awaitUninterruptibly();
66+
if (future.isSuccess()) {
67+
return future.channel();
68+
} else {
69+
return null;
70+
}
71+
}
72+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package io.github.cloudintheking.jt809.client.handler;
2+
3+
4+
import io.github.cloudintheking.jt809.constants.MsgIdConstant;
5+
import io.github.cloudintheking.jt809.protocol.Message;
6+
import io.netty.buffer.ByteBuf;
7+
import io.netty.buffer.Unpooled;
8+
import io.netty.channel.ChannelHandlerContext;
9+
import io.netty.handler.timeout.IdleState;
10+
import io.netty.handler.timeout.IdleStateEvent;
11+
import io.netty.handler.timeout.IdleStateHandler;
12+
import org.slf4j.Logger;
13+
import org.slf4j.LoggerFactory;
14+
15+
import java.util.concurrent.TimeUnit;
16+
17+
/**
18+
* 心跳处理
19+
*/
20+
public class HeartBeatHandler extends IdleStateHandler {
21+
22+
private static Logger LOG = LoggerFactory.getLogger(HeartBeatHandler.class);
23+
24+
private static final int WRITE_IDLE_TIME = 60;
25+
26+
public HeartBeatHandler() {
27+
super(0, WRITE_IDLE_TIME, 0, TimeUnit.SECONDS);
28+
}
29+
30+
@Override
31+
public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) throws Exception {
32+
if (e.state() == IdleState.WRITER_IDLE) {
33+
LOG.info("链路空闲,发送心跳!");
34+
Message msg = new Message(MsgIdConstant.UP_LINKETEST_REQ);
35+
ByteBuf channelBuffer = Unpooled.buffer(0);
36+
msg.setMsgBody(channelBuffer);
37+
ctx.channel().writeAndFlush(msg);
38+
super.channelIdle(ctx, e);
39+
}
40+
}
41+
42+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package io.github.cloudintheking.jt809.client.handler;
2+
3+
4+
import io.github.cloudintheking.jt809.attribute.Attributes;
5+
import io.github.cloudintheking.jt809.attribute.Session;
6+
import io.github.cloudintheking.jt809.constants.MsgIdConstant;
7+
import io.github.cloudintheking.jt809.protocol.Message;
8+
import io.netty.buffer.ByteBuf;
9+
import io.netty.channel.ChannelHandlerContext;
10+
import io.netty.channel.SimpleChannelInboundHandler;
11+
import lombok.extern.slf4j.Slf4j;
12+
13+
/**
14+
* 登陆响应处理
15+
*/
16+
@Slf4j
17+
public class LoginResponseHandler extends SimpleChannelInboundHandler<Message> {
18+
19+
@Override
20+
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Message msg) throws Exception {
21+
if (msg.getMsgId() == MsgIdConstant.UP_CONNECT_RSP) {
22+
log.info("接收来自server登陆响应");
23+
ByteBuf msgBody = msg.getMsgBody();
24+
int result = msgBody.readByte();
25+
if (result == MsgIdConstant.UP_CONNECT_RSP_SUCCESS) {
26+
channelHandlerContext.channel().attr(Attributes.SESSION).set(Session.builder().build());
27+
log.info("登录成功");
28+
} else {
29+
log.error("登录异常,请检查0x0:{}", Integer.toHexString(result));
30+
}
31+
}
32+
}
33+
}

0 commit comments

Comments
 (0)