Skip to content

Commit

Permalink
Ping implemented
Browse files Browse the repository at this point in the history
Added network side type
Ping example added
Named thread factory can be not daemon
Set server connection thread not daemon
Added random math
  • Loading branch information
Kudesunik committed Aug 17, 2021
1 parent 3883950 commit 27142ef
Show file tree
Hide file tree
Showing 29 changed files with 545 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import ru.kudesunik.kudesunetwork.packet.Packet3Ping;
import ru.kudesunik.kudesunetwork.packet.Packet4Raw;
import ru.kudesunik.kudesunetwork.packet.PacketRegistrator;
import ru.kudesunik.kudesunetwork.parameters.NetworkParameters;
import ru.kudesunik.kudesunetwork.server.KudesuNetworkServer;
import ru.kudesunik.kudesunetwork.server.NetworkServerListener;
import ru.kudesunik.kudesunetwork.util.Utilities;
Expand Down
1 change: 1 addition & 0 deletions src/main/java/ru/kudesunik/kudesunetwork/NetworkBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import ru.kudesunik.kudesunetwork.annotations.Nullable;
import ru.kudesunik.kudesunetwork.packet.Packet;
import ru.kudesunik.kudesunetwork.packet.PacketRegistrator;
import ru.kudesunik.kudesunetwork.parameters.NetworkParameters;

public abstract class NetworkBase {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@

import ru.kudesunik.kudesunetwork.KudesuNetwork;
import ru.kudesunik.kudesunetwork.NetworkBase;
import ru.kudesunik.kudesunetwork.NetworkParameters;
import ru.kudesunik.kudesunetwork.handler.NetworkHandler;
import ru.kudesunik.kudesunetwork.packet.Packet;
import ru.kudesunik.kudesunetwork.packet.PacketRegistrator;
import ru.kudesunik.kudesunetwork.parameters.NetworkParameters;

public class KudesuNetworkClient extends NetworkBase {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public interface NetworkClientListener extends NetworkListener {

public void onPacketReceive(Packet packet);

public void onPing(int count);
public boolean onPing(long id, long sendedTimestamp, long receivedTimestamp);

public void onDisconnection();
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@

import ru.kudesunik.kudesunetwork.KudesuNetwork;
import ru.kudesunik.kudesunetwork.NetworkBase;
import ru.kudesunik.kudesunetwork.NetworkParameters;
import ru.kudesunik.kudesunetwork.annotations.ThreadSafe;
import ru.kudesunik.kudesunetwork.client.NetworkClientListener;
import ru.kudesunik.kudesunetwork.packet.Packet;
import ru.kudesunik.kudesunetwork.packet.Packet1Handshake;
import ru.kudesunik.kudesunetwork.packet.Packet2Authorization;
import ru.kudesunik.kudesunetwork.packet.Packet3Ping;
import ru.kudesunik.kudesunetwork.parameters.NetworkParameters;
import ru.kudesunik.kudesunetwork.server.NetworkServerListener;
import ru.kudesunik.kudesunetwork.util.NamedThreadFactory;
import ru.kudesunik.kudesunetwork.util.task.TaskManager;
Expand Down Expand Up @@ -45,6 +46,8 @@ public class NetworkHandler {
private final ExecutorService readerExecutorService;
private final ExecutorService workerExecutorService;

private final NetworkSide networkSide;

private final NetworkReader networkReader;
private final NetworkWorker networkWorker;

Expand All @@ -58,8 +61,10 @@ public NetworkHandler(Socket socket, NetworkBase base, NetworkListener listener,
this.outputStream = socket.getOutputStream();
if(listener instanceof NetworkClientListener) {
this.clientListener = (NetworkClientListener) listener;
this.networkSide = NetworkSide.CLIENT;
} else {
this.serverListener = (NetworkServerListener) listener;
this.networkSide = NetworkSide.SERVER;
}
this.parameters = parameters;
this.useProtocol = useProtocol;
Expand All @@ -84,7 +89,7 @@ public void start() {
}
}

@ThreadSafe(callerThread = "KudeSocket Worker")
@ThreadSafe(callerThread = "KudesuNetwork Worker")
public void receivePacket(Packet packet) {
switch(packet.getId()) {
case Packet1Handshake.ID:
Expand All @@ -93,19 +98,22 @@ public void receivePacket(Packet packet) {
case Packet2Authorization.ID:
receiveAuthorizationPacket((Packet2Authorization) packet);
break;
case Packet3Ping.ID:
receivePingPacket((Packet3Ping) packet);
break;
default:
if(clientListener != null) {
if(networkSide == NetworkSide.CLIENT) {
clientListener.onPacketReceive(packet);
} else {
serverListener.onPacketReceive(socket.getPort(), packet);
}
}
}

@ThreadSafe(callerThread = "KudeSocket Worker")
@ThreadSafe(callerThread = "KudesuNetwork Worker")
private void receiveHandshakePacket(Packet1Handshake packet) {
boolean result = false;
if(clientListener != null) {
if(networkSide == NetworkSide.CLIENT) {
result = clientListener.onHandshake(packet.getProtocolName(), packet.getProtocolVersion());
} else {
result = serverListener.onHandshake(socket.getPort(), packet.getProtocolName(), packet.getProtocolVersion());
Expand All @@ -116,10 +124,10 @@ private void receiveHandshakePacket(Packet1Handshake packet) {
}
}

@ThreadSafe(callerThread = "KudeSocket Worker")
@ThreadSafe(callerThread = "KudesuNetwork Worker")
private void receiveAuthorizationPacket(Packet2Authorization packet) {
boolean result = false;
if(clientListener != null) {
if(networkSide == NetworkSide.CLIENT) {
result = clientListener.onAuthorization(packet.getData());
} else {
result = serverListener.onAuthorization(socket.getPort(), packet.getData());
Expand All @@ -132,31 +140,25 @@ private void receiveAuthorizationPacket(Packet2Authorization packet) {
}
}

@ThreadSafe(callerThread = "KudesuNetwork Worker")
private void receivePingPacket(Packet3Ping packet) {
boolean result = false;
if(networkSide == NetworkSide.CLIENT) {
result = clientListener.onPing(packet.getPingId(), packet.getTimestampSended(), packet.getTimestampReceived());
} else {
result = serverListener.onPing(socket.getPort(), packet.getPingId(), packet.getTimestampSended(), packet.getTimestampReceived());
}
if(!result) {
KudesuNetwork.log(Level.ERROR, "Ping check failed!");
requestDropConnection();
}
}

@ThreadSafe(callerThread = "Unknown")
public void sendPacket(Packet packet) {
networkWorker.givePacketToSend(packet);
}

public boolean isAlive() {
return socket.isConnected() && !socket.isClosed();
}

public boolean isPacketExist(int packetId) {
return base.isPacketExist(packetId);
}

public boolean isProtocolPacket(int packetId) {
return base.isProtocolPacket(packetId);
}

public Packet getPacketContainer(int packetId) {
return base.getPacketContainer(packetId);
}

public boolean isNetworkReady() {
return isNetworkReady;
}

@ThreadSafe(callerThread = "Unknown")
public void requestDropConnection() {
if(!isDropConnectionCalled) {
Expand All @@ -182,7 +184,7 @@ private void dropConnectionInternal() {
stopWorkerThreads();
base.onConnectionDropped(socket.getPort());
KudesuNetwork.log(Level.INFO, "Connection closed: " + socket.getInetAddress() + ":" + socket.getPort() + " / " + socket.getLocalPort());
if(clientListener != null) {
if(networkSide == NetworkSide.CLIENT) {
clientListener.onDisconnection();
} else {
serverListener.onDisconnection(socket.getLocalPort());
Expand Down Expand Up @@ -219,11 +221,43 @@ private void awaitTermination(ExecutorService executorService, String threadName
}
}

public boolean isAlive() {
return socket.isConnected() && !socket.isClosed();
}

public boolean isPacketExist(int packetId) {
return base.isPacketExist(packetId);
}

public boolean isProtocolPacket(int packetId) {
return base.isProtocolPacket(packetId);
}

public Packet getPacketContainer(int packetId) {
return base.getPacketContainer(packetId);
}

public boolean isNetworkReady() {
return isNetworkReady;
}

public InputStream getInputStream() {
return inputStream;
}

public OutputStream getOutputStream() {
return outputStream;
}

public boolean useProtocol() {
return useProtocol;
}

public NetworkParameters getParameters() {
return parameters;
}

public NetworkSide getNetworkSide() {
return networkSide;
}
}
29 changes: 29 additions & 0 deletions src/main/java/ru/kudesunik/kudesunetwork/handler/NetworkSide.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package ru.kudesunik.kudesunetwork.handler;

public enum NetworkSide {

CLIENT((byte) 0), SERVER((byte) 1);

private static final NetworkSide[] values = values();
private static final NetworkSide[] byId = new NetworkSide[values.length];

private final byte id;

private NetworkSide(byte id) {
this.id = id;
}

public byte getId() {
return id;
}

public static NetworkSide getById(int id) {
return byId[id];
}

static {
for(NetworkSide ps : values) {
byId[ps.id] = ps;
}
}
}
73 changes: 54 additions & 19 deletions src/main/java/ru/kudesunik/kudesunetwork/handler/NetworkWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,14 @@
import org.apache.logging.log4j.Level;

import ru.kudesunik.kudesunetwork.KudesuNetwork;
import ru.kudesunik.kudesunetwork.annotations.NonNull;
import ru.kudesunik.kudesunetwork.annotations.ThreadSafe;
import ru.kudesunik.kudesunetwork.packet.Packet;
import ru.kudesunik.kudesunetwork.packet.Packet3Ping;
import ru.kudesunik.kudesunetwork.packet.Packet4Raw;
import ru.kudesunik.kudesunetwork.parameters.PingParameters;
import ru.kudesunik.kudesunetwork.util.task.TaskManager;
import ru.kudesunik.kudesunetwork.util.task.TaskManagerTask;

public class NetworkWorker implements Runnable {

Expand All @@ -21,10 +26,13 @@ public class NetworkWorker implements Runnable {
private final ConcurrentLinkedDeque<Packet> recievedPackets;
private final ConcurrentLinkedDeque<Packet> packetsToSend;

private final PingParameters pingParameters;

private volatile boolean isWorking;

public NetworkWorker(NetworkHandler handler, boolean useProtocol) {
this.handler = handler;
this.pingParameters = handler.getParameters().getPingParameters();
this.outputStream = handler.getOutputStream();
this.recievedPackets = new ConcurrentLinkedDeque<>();
this.packetsToSend = new ConcurrentLinkedDeque<>();
Expand All @@ -35,8 +43,17 @@ public NetworkWorker(NetworkHandler handler, boolean useProtocol) {
* Put received packet to worker queue for further sending it to listener
* @param packet - recieved packet
*/
@ThreadSafe(callerThread = "KudeSocket Reader")
@ThreadSafe(callerThread = "KudesuNetwork Reader")
public void receivePacket(Packet packet) {
if(packet.getId() == Packet3Ping.ID) {
Packet3Ping pingPacket = (Packet3Ping) packet;
if(pingPacket.getNetworkSide() == handler.getNetworkSide()) {
pingPacket.setTimestampReceived(); //Set real receive timestamp and handle it
} else {
givePacketToSend(pingPacket); //Send packet back safely, don't handle
return;
}
}
synchronized(recievedPackets) {
this.recievedPackets.addLast(packet);
}
Expand All @@ -48,7 +65,7 @@ public void receivePacket(Packet packet) {
/**
* Send recieved packet to listener
*/
@ThreadSafe(callerThread = "KudeSocket Worker")
@ThreadSafe(callerThread = "KudesuNetwork Worker")
private void sendReceivedPacket() {
Packet packet;
synchronized(recievedPackets) {
Expand Down Expand Up @@ -78,37 +95,46 @@ public void givePacketToSend(Packet packet) {
/**
* Send packet in queue to server or client
*/
@ThreadSafe(callerThread = "KudeSocket Worker")
@ThreadSafe(callerThread = "KudesuNetwork Worker")
private void sendPacket() {
Packet packet;
synchronized(packetsToSend) {
packet = packetsToSend.poll();
}
if(packet != null) {
sendPacket(packet);
}
}

@ThreadSafe(callerThread = "KudesuNetwork Worker")
private void sendPacket(@NonNull Packet packet) {
byte packetId = packet.getId();
if(handler.isPacketExist(packetId)) {
if(packet.getId() == Packet3Ping.ID) {
Packet3Ping pingPacket = (Packet3Ping) packet;
pingPacket.setTimestampSended();
}
if(packet.getId() == Packet4Raw.ID) {
sendRawPacket(packet);
return;
}
byte packetId = packet.getId();
if(handler.isPacketExist(packetId)) {
ByteArrayOutputStream data = new ByteArrayOutputStream();
DataOutputStream packetData = new DataOutputStream(data);
DataOutputStream overallData = new DataOutputStream(outputStream);
try {
packet.write(packetData);
overallData.writeByte(packetId);
overallData.writeShort(packetData.size());
overallData.write(data.toByteArray());
} catch(IOException ex) {
handler.requestDropConnection();
}
} else {
KudesuNetwork.log(Level.ERROR, "Packet not found!");
ByteArrayOutputStream data = new ByteArrayOutputStream();
DataOutputStream packetData = new DataOutputStream(data);
DataOutputStream overallData = new DataOutputStream(outputStream);
try {
packet.write(packetData);
overallData.writeByte(packetId);
overallData.writeShort(packetData.size());
overallData.write(data.toByteArray());
} catch(IOException ex) {
handler.requestDropConnection();
}
} else {
KudesuNetwork.log(Level.ERROR, "Packet not found!");
}
}

@ThreadSafe(callerThread = "KudeSocket Worker")
@ThreadSafe(callerThread = "KudesuNetwork Worker")
private void sendRawPacket(Packet packet) {
try {
packet.write(new DataOutputStream(outputStream));
Expand All @@ -128,6 +154,15 @@ public Packet getPacketContainer(int packetId) {
@Override
public void run() {
boolean repeatFlag = true;
if(pingParameters.isEnabled() && handler.useProtocol()) {
TaskManagerTask task = new TaskManagerTask("Ping task", () -> {
Packet3Ping pingPacket = new Packet3Ping();
pingPacket.setSide(handler.getNetworkSide());
givePacketToSend(pingPacket);
});
task.setUpdateTime(pingParameters.getDelay());
TaskManager.execute(task);
}
while(isWorking) {
synchronized(recievedPackets) {
while(!recievedPackets.isEmpty()) {
Expand Down
Loading

0 comments on commit 27142ef

Please sign in to comment.