diff --git a/build.gradle b/build.gradle index 9a0aa33..b3b9ca6 100644 --- a/build.gradle +++ b/build.gradle @@ -1,9 +1,10 @@ plugins { id 'java' + id "org.javamodularity.moduleplugin" } -sourceCompatibility = 9 -targetCompatibility = 9 +sourceCompatibility = 11 +targetCompatibility = 11 repositories { jcenter() diff --git a/src/main/java/com/projectswg/forwarder/Forwarder.java b/src/main/java/com/projectswg/forwarder/Forwarder.java index 4d3b703..1e0e319 100644 --- a/src/main/java/com/projectswg/forwarder/Forwarder.java +++ b/src/main/java/com/projectswg/forwarder/Forwarder.java @@ -139,10 +139,12 @@ public class Forwarder { public static class ForwarderData { private InetSocketAddress address = null; + private boolean verifyServer = true; private String username = null; private String password = null; private int loginPort = 0; private int zonePort = 0; + private int pingPort = 0; private int outboundTunerMaxSend = 100; private int outboundTunerInterval = 20; @@ -152,6 +154,10 @@ public class Forwarder { return address; } + public boolean isVerifyServer() { + return verifyServer; + } + public String getUsername() { return username; } @@ -168,6 +174,10 @@ public class Forwarder { return zonePort; } + public int getPingPort() { + return pingPort; + } + public int getOutboundTunerMaxSend() { return outboundTunerMaxSend; } @@ -180,6 +190,10 @@ public class Forwarder { this.address = address; } + public void setVerifyServer(boolean verifyServer) { + this.verifyServer = verifyServer; + } + public void setUsername(String username) { this.username = username; } @@ -196,6 +210,10 @@ public class Forwarder { this.zonePort = zonePort; } + public void setPingPort(int pingPort) { + this.pingPort = pingPort; + } + public void setOutboundTunerMaxSend(int outboundTunerMaxSend) { this.outboundTunerMaxSend = outboundTunerMaxSend; } diff --git a/src/main/java/com/projectswg/forwarder/intents/client/SendPongIntent.java b/src/main/java/com/projectswg/forwarder/intents/client/SendPongIntent.java new file mode 100644 index 0000000..e8d42c8 --- /dev/null +++ b/src/main/java/com/projectswg/forwarder/intents/client/SendPongIntent.java @@ -0,0 +1,17 @@ +package com.projectswg.forwarder.intents.client; + +import me.joshlarson.jlcommon.control.Intent; + +public class SendPongIntent extends Intent { + + private final byte [] data; + + public SendPongIntent(byte [] data) { + this.data = data; + } + + public byte [] getData() { + return data; + } + +} diff --git a/src/main/java/com/projectswg/forwarder/resources/networking/ClientServer.java b/src/main/java/com/projectswg/forwarder/resources/networking/ClientServer.java index 973adb5..0b81aba 100644 --- a/src/main/java/com/projectswg/forwarder/resources/networking/ClientServer.java +++ b/src/main/java/com/projectswg/forwarder/resources/networking/ClientServer.java @@ -2,5 +2,6 @@ package com.projectswg.forwarder.resources.networking; public enum ClientServer { LOGIN, - ZONE + ZONE, + PING } diff --git a/src/main/java/com/projectswg/forwarder/resources/networking/NetInterceptor.java b/src/main/java/com/projectswg/forwarder/resources/networking/NetInterceptor.java index 88a8fcb..d821382 100644 --- a/src/main/java/com/projectswg/forwarder/resources/networking/NetInterceptor.java +++ b/src/main/java/com/projectswg/forwarder/resources/networking/NetInterceptor.java @@ -58,7 +58,7 @@ public class NetInterceptor { for (Galaxy g : cluster.getGalaxies()) { g.setAddress("127.0.0.1"); g.setZonePort(this.data.getZonePort()); - g.setPingPort(this.data.getZonePort()); + g.setPingPort(this.data.getPingPort()); } return cluster.encode().array(); } diff --git a/src/main/java/com/projectswg/forwarder/resources/networking/data/Packager.java b/src/main/java/com/projectswg/forwarder/resources/networking/data/Packager.java index 617dc6e..346ba3e 100644 --- a/src/main/java/com/projectswg/forwarder/resources/networking/data/Packager.java +++ b/src/main/java/com/projectswg/forwarder/resources/networking/data/Packager.java @@ -7,16 +7,17 @@ import com.projectswg.forwarder.resources.networking.packets.Fragmented; import java.util.ArrayList; import java.util.List; import java.util.Queue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicInteger; public class Packager { private final AtomicInteger size; private final List dataChannel; - private final Queue outboundRaw; + private final BlockingQueue outboundRaw; private final ConnectionStream outboundPackaged; - public Packager(Queue outboundRaw, ConnectionStream outboundPackaged, ProtocolStack stack) { + public Packager(BlockingQueue outboundRaw, ConnectionStream outboundPackaged, ProtocolStack stack) { this.size = new AtomicInteger(8); this.dataChannel = new ArrayList<>(); this.outboundRaw = outboundRaw; @@ -27,16 +28,17 @@ public class Packager { byte [] packet; int packetSize; - while (!outboundRaw.isEmpty() && outboundPackaged.size() < maxPackaged) { + while (outboundPackaged.size() < maxPackaged) { packet = outboundRaw.poll(); if (packet == null) break; + packetSize = getPacketLength(packet); - - if (size.get() + packetSize >= 496) // overflowed previous packet + + if (size.get() + packetSize >= 16384) // max data channel size sendDataChannel(); - - if (packetSize < 496) { + + if (packetSize < 16384) { // if overflowed, must go into fragmented addToDataChannel(packet, packetSize); } else { sendFragmented(packet); diff --git a/src/main/java/com/projectswg/forwarder/resources/networking/data/ProtocolStack.java b/src/main/java/com/projectswg/forwarder/resources/networking/data/ProtocolStack.java index 59a7dee..94691f5 100644 --- a/src/main/java/com/projectswg/forwarder/resources/networking/data/ProtocolStack.java +++ b/src/main/java/com/projectswg/forwarder/resources/networking/data/ProtocolStack.java @@ -9,6 +9,9 @@ import org.jetbrains.annotations.NotNull; import java.net.InetSocketAddress; import java.util.*; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.function.BiConsumer; public class ProtocolStack { @@ -17,7 +20,7 @@ public class ProtocolStack { private final InetSocketAddress source; private final BiConsumer sender; private final ClientServer server; - private final Queue outboundRaw; + private final BlockingQueue outboundRaw; private final ConnectionStream inbound; private final ConnectionStream outbound; private final Packager packager; @@ -30,7 +33,7 @@ public class ProtocolStack { this.source = source; this.sender = sender; this.server = server; - this.outboundRaw = new LinkedList<>(); + this.outboundRaw = new LinkedBlockingQueue<>(); this.inbound = new ConnectionStream<>(); this.outbound = new ConnectionStream<>(); this.packager = new Packager(outboundRaw, outbound, this); @@ -94,7 +97,11 @@ public class ProtocolStack { } public void addOutbound(@NotNull byte [] data) { - outboundRaw.offer(data); + try { + outboundRaw.put(data); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } public short getFirstUnacknowledgedOutbound() { @@ -108,11 +115,11 @@ public class ProtocolStack { outbound.removeOrdered(sequence); } - public void fillOutboundPackagedBuffer(int maxPackaged) { + public synchronized void fillOutboundPackagedBuffer(int maxPackaged) { packager.handle(maxPackaged); } - public int fillOutboundBuffer(SequencedOutbound [] buffer) { + public synchronized int fillOutboundBuffer(SequencedOutbound [] buffer) { return outbound.fillBuffer(buffer); } @@ -126,31 +133,30 @@ public class ProtocolStack { private final PriorityQueue sequenced; private final PriorityQueue queued; - private short sequence; + private long sequence; public ConnectionStream() { this.sequenced = new PriorityQueue<>(); this.queued = new PriorityQueue<>(); - this.sequence = 0; } public short getSequence() { - return sequence; + return (short) sequence; } public synchronized SequencedStatus addUnordered(@NotNull T packet) { - if (SequencedPacket.compare(sequence, packet.getSequence()) > 0) { + if (SequencedPacket.compare(getSequence(), packet.getSequence()) > 0) { T peek = peek(); - return peek != null && peek.getSequence() == sequence ? SequencedStatus.READY : SequencedStatus.STALE; + return peek != null && peek.getSequence() == getSequence() ? SequencedStatus.READY : SequencedStatus.STALE; } - if (packet.getSequence() == sequence) { + if (packet.getSequence() == getSequence()) { sequenced.add(packet); sequence++; // Add queued OOO packets T queue; - while ((queue = queued.peek()) != null && queue.getSequence() == sequence) { + while ((queue = queued.peek()) != null && queue.getSequence() == getSequence()) { sequenced.add(queued.poll()); sequence++; } @@ -164,7 +170,7 @@ public class ProtocolStack { } public synchronized void addOrdered(@NotNull T packet) { - packet.setSequence(sequence); + packet.setSequence(getSequence()); addUnordered(packet); } diff --git a/src/main/java/com/projectswg/forwarder/resources/networking/packets/DataChannel.java b/src/main/java/com/projectswg/forwarder/resources/networking/packets/DataChannel.java index c0c1d68..3f8d341 100644 --- a/src/main/java/com/projectswg/forwarder/resources/networking/packets/DataChannel.java +++ b/src/main/java/com/projectswg/forwarder/resources/networking/packets/DataChannel.java @@ -66,6 +66,11 @@ public class DataChannel extends Packet implements SequencedPacket { content.addAll(Arrays.asList(packets)); } + public DataChannel(byte [] packet) { + this(); + content.add(packet); + } + @Override public void decode(ByteBuffer data) { super.decode(data); @@ -103,29 +108,18 @@ public class DataChannel extends Packet implements SequencedPacket { @Override public ByteBuffer encode() { - NetBuffer data; - if (content.size() == 1) { - byte[] pData = content.get(0); - data = NetBuffer.allocate(4 + pData.length); - data.addNetShort(channel.getOpcode()); - data.addNetShort(sequence); - data.addRawArray(pData); - } else if (content.size() > 1) { - data = NetBuffer.allocate(getLength()); - data.addNetShort(channel.getOpcode()); - data.addNetShort(sequence); - data.addNetShort(0x19); - for (byte[] pData : content) { - if (pData.length >= 0xFF) { - data.addByte(0xFF); - data.addNetShort(pData.length); - } else { - data.addByte(pData.length); - } - data.addRawArray(pData); + NetBuffer data = NetBuffer.allocate(getLength()); + data.addNetShort(channel.getOpcode()); + data.addNetShort(sequence); + data.addNetShort(0x19); + for (byte[] pData : content) { + if (pData.length >= 0xFF) { + data.addByte(0xFF); + data.addNetShort(pData.length); + } else { + data.addByte(pData.length); } - } else { - data = NetBuffer.allocate(0); + data.addRawArray(pData); } return data.getBuffer(); } @@ -139,16 +133,12 @@ public class DataChannel extends Packet implements SequencedPacket { } public int getLength() { - if (content.size() == 1) { - return 4 + content.get(0).length; - } else { - int length = 6; - for (byte[] packet : content) { - int addLength = packet.length; - length += 1 + addLength + ((addLength >= 0xFF) ? 2 : 0); - } - return length; + int length = 6; + for (byte[] packet : content) { + int addLength = packet.length; + length += 1 + addLength + ((addLength >= 0xFF) ? 2 : 0); } + return length; } @Override diff --git a/src/main/java/com/projectswg/forwarder/resources/networking/packets/Fragmented.java b/src/main/java/com/projectswg/forwarder/resources/networking/packets/Fragmented.java index f6b3b8b..7d9507a 100644 --- a/src/main/java/com/projectswg/forwarder/resources/networking/packets/Fragmented.java +++ b/src/main/java/com/projectswg/forwarder/resources/networking/packets/Fragmented.java @@ -106,11 +106,11 @@ public class Fragmented extends Packet implements SequencedPacket { public static byte [][] split(byte [] data) { int offset = 0; - int packetCount = (int) Math.ceil((data.length+4)/489.0); + int packetCount = (int) Math.ceil((data.length+4)/16377.0); // 489 byte [][] packets = new byte[packetCount][]; for (int i = 0; i < packetCount; i++) { int header = (i == 0) ? 4 : 0; - ByteBuffer segment = ByteBuffer.allocate(Math.min(data.length-offset-header, 489)).order(ByteOrder.BIG_ENDIAN); + ByteBuffer segment = ByteBuffer.allocate(Math.min(data.length-offset-header, 16377)).order(ByteOrder.BIG_ENDIAN); if (i == 0) segment.putInt(data.length); int segmentLength = segment.remaining(); diff --git a/src/main/java/com/projectswg/forwarder/resources/networking/packets/PingPacket.java b/src/main/java/com/projectswg/forwarder/resources/networking/packets/PingPacket.java index c93260d..d875c62 100644 --- a/src/main/java/com/projectswg/forwarder/resources/networking/packets/PingPacket.java +++ b/src/main/java/com/projectswg/forwarder/resources/networking/packets/PingPacket.java @@ -1,5 +1,7 @@ package com.projectswg.forwarder.resources.networking.packets; +import com.projectswg.common.utilities.ByteUtilities; + import java.nio.ByteBuffer; public class PingPacket extends Packet { @@ -24,4 +26,9 @@ public class PingPacket extends Packet { return payload; } + @Override + public String toString() { + return "PingPacket[payload=" + ByteUtilities.getHexString(payload) + "]"; + } + } diff --git a/src/main/java/com/projectswg/forwarder/resources/recording/PacketRecorder.java b/src/main/java/com/projectswg/forwarder/resources/recording/PacketRecorder.java index cd00113..b2dbc1f 100644 --- a/src/main/java/com/projectswg/forwarder/resources/recording/PacketRecorder.java +++ b/src/main/java/com/projectswg/forwarder/resources/recording/PacketRecorder.java @@ -3,14 +3,15 @@ package com.projectswg.forwarder.resources.recording; import java.io.*; import java.lang.management.ManagementFactory; import java.lang.management.OperatingSystemMXBean; +import java.time.Instant; +import java.time.ZoneId; import java.util.Map; import java.util.Map.Entry; -import java.util.TimeZone; import java.util.TreeMap; public class PacketRecorder implements AutoCloseable, Closeable { - private static final byte VERSION = 2; + private static final byte VERSION = 3; private final DataOutputStream dataOut; @@ -26,7 +27,7 @@ public class PacketRecorder implements AutoCloseable, Closeable { public void record(boolean server, byte [] data) { synchronized (dataOut) { try { - dataOut.writeByte(server?1:0); + dataOut.writeBoolean(server); dataOut.writeLong(System.currentTimeMillis()); dataOut.writeShort(data.length); dataOut.write(data); @@ -47,15 +48,14 @@ public class PacketRecorder implements AutoCloseable, Closeable { private void writeSystemHeader() { OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean(); - TimeZone tz = TimeZone.getDefault(); Map systemStrings = new TreeMap<>(); systemStrings.put("os.arch", os.getArch()); systemStrings.put("os.details", os.getName()+":"+os.getVersion()); systemStrings.put("os.processor_count", Integer.toString(os.getAvailableProcessors())); systemStrings.put("java.version", System.getProperty("java.version")); systemStrings.put("java.vendor", System.getProperty("java.vendor")); - systemStrings.put("time.time_zone", tz.getID()+":"+tz.getDisplayName()); - systemStrings.put("time.current_time", Long.toString(System.currentTimeMillis())); + systemStrings.put("time.time_zone", ZoneId.systemDefault().getId()); + systemStrings.put("time.current_time", Instant.now().toString()); try { dataOut.writeByte(systemStrings.size()); // Count of strings for (Entry e : systemStrings.entrySet()) diff --git a/src/main/java/com/projectswg/forwarder/services/client/ClientOutboundDataService.java b/src/main/java/com/projectswg/forwarder/services/client/ClientOutboundDataService.java index 1272505..9040f09 100644 --- a/src/main/java/com/projectswg/forwarder/services/client/ClientOutboundDataService.java +++ b/src/main/java/com/projectswg/forwarder/services/client/ClientOutboundDataService.java @@ -3,7 +3,6 @@ package com.projectswg.forwarder.services.client; import com.projectswg.common.network.NetBuffer; import com.projectswg.common.network.packets.PacketType; import com.projectswg.common.network.packets.swg.zone.HeartBeat; -import com.projectswg.forwarder.Forwarder.ForwarderData; import com.projectswg.forwarder.intents.client.*; import com.projectswg.forwarder.intents.control.StartForwarderIntent; import com.projectswg.forwarder.resources.networking.ClientServer; @@ -12,9 +11,9 @@ import com.projectswg.forwarder.resources.networking.data.SequencedOutbound; import com.projectswg.forwarder.resources.networking.packets.Acknowledge; import com.projectswg.forwarder.resources.networking.packets.OutOfOrder; import com.projectswg.forwarder.resources.networking.packets.Packet; +import me.joshlarson.jlcommon.concurrency.BasicScheduledThread; import me.joshlarson.jlcommon.concurrency.BasicThread; import me.joshlarson.jlcommon.concurrency.Delay; -import me.joshlarson.jlcommon.concurrency.SmartLock; import me.joshlarson.jlcommon.control.IntentHandler; import me.joshlarson.jlcommon.control.IntentMultiplexer; import me.joshlarson.jlcommon.control.IntentMultiplexer.Multiplexer; @@ -25,34 +24,34 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; public class ClientOutboundDataService extends Service { + private final SequencedOutbound [] outboundBuffer; private final IntentMultiplexer multiplexer; private final Set activeStacks; private final BasicThread sendThread; - private final SmartLock signaller; - - private ForwarderData data; + private final BasicScheduledThread heartbeatThread; public ClientOutboundDataService() { + this.outboundBuffer = new SequencedOutbound[4096]; this.multiplexer = new IntentMultiplexer(this, ProtocolStack.class, Packet.class); this.activeStacks = ConcurrentHashMap.newKeySet(); this.sendThread = new BasicThread("outbound-sender", this::persistentSend); - this.signaller = new SmartLock(); - this.data = null; + this.heartbeatThread = new BasicScheduledThread("heartbeat", this::heartbeat); } @Override public boolean terminate() { - sendThread.stop(true); - return sendThread.awaitTermination(1000); + if (sendThread.isExecuting()) + sendThread.stop(true); + if (heartbeatThread.isRunning()) + heartbeatThread.stop(); + return sendThread.awaitTermination(1000) && heartbeatThread.awaitTermination(1000); } @IntentHandler private void handleStartForwarderIntent(StartForwarderIntent sfi) { - data = sfi.getData(); } @IntentHandler @@ -65,6 +64,7 @@ public class ClientOutboundDataService extends Service { if (sendThread.isExecuting()) return; sendThread.start(); + heartbeatThread.startWithFixedRate(0, 500); } @IntentHandler @@ -72,7 +72,9 @@ public class ClientOutboundDataService extends Service { if (!sendThread.isExecuting()) return; sendThread.stop(true); + heartbeatThread.stop(); sendThread.awaitTermination(1000); + heartbeatThread.awaitTermination(1000); } @IntentHandler @@ -110,8 +112,7 @@ public class ClientOutboundDataService extends Service { HeartBeat heartbeat = new HeartBeat(); heartbeat.decode(NetBuffer.wrap(dpoi.getData())); if (heartbeat.getPayload().length > 0) { - for (ProtocolStack stack : activeStacks) - stack.sendPing(heartbeat.getPayload()); + new SendPongIntent(heartbeat.getPayload()).broadcast(getIntentManager()); return; } break; @@ -121,14 +122,14 @@ public class ClientOutboundDataService extends Service { final ClientServer finalFilterServer = filterServer; ProtocolStack stack = activeStacks.stream().filter(s -> s.getServer() == finalFilterServer).findFirst().orElse(null); if (stack == null) { - Log.d("Data/Oubound Sending %s [len=%d] to %s", type, dpoi.getData().length, activeStacks); - for (ProtocolStack active : activeStacks) + Log.d("Data/Outbound Sending %s [len=%d] to %s", type, dpoi.getData().length, activeStacks); + for (ProtocolStack active : activeStacks) { active.addOutbound(dpoi.getData()); + } } else { Log.d("Data/Outbound Sending %s [len=%d] to %s", type, dpoi.getData().length, stack); stack.addOutbound(dpoi.getData()); } - signaller.signal(); } @Multiplexer @@ -143,47 +144,41 @@ public class ClientOutboundDataService extends Service { } private void persistentSend() { - int maxSend = data.getOutboundTunerMaxSend(); - if (maxSend <= 0) - maxSend = 400; - int interval = data.getOutboundTunerInterval(); - if (interval <= 0) - interval = 1000; - - int bucket = 0; - int [] sendCounts = new int[interval / 20]; - SequencedOutbound [] buffer = new SequencedOutbound[maxSend]; Log.d("Data/Outbound Starting Persistent Send"); + int iteration = 0; while (!Delay.isInterrupted()) { - sendCounts[bucket] = 0; - - int previousInterval = 0; - for (int count : sendCounts) - previousInterval += count; - if (previousInterval < maxSend) { - for (ProtocolStack stack : activeStacks) { - stack.fillOutboundPackagedBuffer(maxSend); - int count = stack.fillOutboundBuffer(buffer); - for (int i = 0; i < count && Delay.sleepMicro(50); i++) { - stack.send(buffer[i].getData()); - } - - if (count > 0) - Log.t("Data/Outbound Sent %d Start: %d", count, buffer[0].getSequence()); - sendCounts[bucket] += count; - } - try { - signaller.await(20, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - break; - } - } else { - Delay.sleepMilli(20); - } - - bucket = (bucket + 1) % sendCounts.length; + flushPackaged(iteration % 20 == 0); + iteration++; + Delay.sleepMilli(50); } Log.d("Data/Outbound Stopping Persistent Send"); } + private void heartbeat() { + for (ProtocolStack stack : activeStacks) { + stack.send(new HeartBeat().encode().array()); + } + } + + private void flushPackaged(boolean overrideSent) { + for (ProtocolStack stack : activeStacks) { + stack.fillOutboundPackagedBuffer(outboundBuffer.length); + int count = stack.fillOutboundBuffer(outboundBuffer); + int sent = 0; + for (int i = 0; i < count; i++) { + SequencedOutbound out = outboundBuffer[i]; + if (overrideSent || !out.isSent()) { + stack.send(out.getData()); + out.setSent(true); + sent++; + } + } + + if (sent > 0) + Log.t("Data/Outbound Sent %d - %d [%d]", outboundBuffer[0].getSequence(), outboundBuffer[count-1].getSequence(), count); + else if (count > 0) + Log.t("Data/Outbound Waiting to send %d - %d [count=%d]", outboundBuffer[0].getSequence(), outboundBuffer[count-1].getSequence(), count); + } + } + } diff --git a/src/main/java/com/projectswg/forwarder/services/client/ClientServerService.java b/src/main/java/com/projectswg/forwarder/services/client/ClientServerService.java index 2f2f0ea..548dbb1 100644 --- a/src/main/java/com/projectswg/forwarder/services/client/ClientServerService.java +++ b/src/main/java/com/projectswg/forwarder/services/client/ClientServerService.java @@ -19,6 +19,7 @@ import me.joshlarson.jlcommon.network.UDPServer; import me.joshlarson.jlcommon.utilities.ByteUtilities; import java.net.*; +import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; import java.util.EnumMap; import java.util.Map; @@ -31,24 +32,34 @@ public class ClientServerService extends Service { private final Map stacks; private final AtomicBoolean serverConnection; private final AtomicReference cachedSessionRequest; + private final AtomicReference lastPing; private ForwarderData data; private UDPServer loginServer; private UDPServer zoneServer; + private UDPServer pingServer; public ClientServerService() { this.intentChain = new IntentChain(); this.stacks = new EnumMap<>(ClientServer.class); this.serverConnection = new AtomicBoolean(false); this.cachedSessionRequest = new AtomicReference<>(null); + this.lastPing = new AtomicReference<>(null); this.data = null; this.loginServer = null; this.zoneServer = null; + this.pingServer = null; } @Override public boolean isOperational() { - return data == null || (loginServer != null && zoneServer != null && loginServer.isRunning() && zoneServer.isRunning()); + if (data == null) + return true; + if (loginServer == null || !loginServer.isRunning()) + return false; + if (zoneServer == null || !zoneServer.isRunning()) + return false; + return pingServer != null && pingServer.isRunning(); } @Override @@ -66,24 +77,32 @@ public class ClientServerService extends Service { loginServer = new UDPServer(new InetSocketAddress(InetAddress.getLoopbackAddress(), data.getLoginPort()), 496, this::onLoginPacket); Log.t("Initializing zone udp server..."); zoneServer = new UDPServer(new InetSocketAddress(InetAddress.getLoopbackAddress(), data.getZonePort()), 496, this::onZonePacket); + Log.t("Initializing ping udp server..."); + pingServer = new UDPServer(new InetSocketAddress(InetAddress.getLoopbackAddress(), data.getPingPort()), 496, this::onPingPacket); Log.t("Binding to login server..."); loginServer.bind(this::customizeUdpServer); Log.t("Binding to zone server..."); zoneServer.bind(this::customizeUdpServer); + Log.t("Binding to ping server..."); + pingServer.bind(this::customizeUdpServer); data.setLoginPort(loginServer.getPort()); data.setZonePort(zoneServer.getPort()); + data.setPingPort(pingServer.getPort()); - Log.i("Initialized login (%d) and zone servers (%d)", loginServer.getPort(), zoneServer.getPort()); + Log.i("Initialized login (%d), zone (%d), and ping (%d) servers", loginServer.getPort(), zoneServer.getPort(), pingServer.getPort()); } catch (SocketException e) { Log.a(e); if (loginServer != null) loginServer.close(); if (zoneServer != null) zoneServer.close(); + if (pingServer != null) + pingServer.close(); loginServer = null; zoneServer = null; + pingServer = null; } data = sfi.getData(); } @@ -96,9 +115,13 @@ public class ClientServerService extends Service { Log.t("Closing the zone udp server..."); if (zoneServer != null) zoneServer.close(); + Log.t("Closing the ping udp server..."); + if (pingServer != null) + pingServer.close(); loginServer = null; zoneServer = null; - Log.i("Closed the login and zone udp servers"); + pingServer = null; + Log.i("Closed the login, zone, and ping udp servers"); } @IntentHandler @@ -116,6 +139,14 @@ public class ClientServerService extends Service { serverConnection.set(false); closeConnection(ClientServer.LOGIN); closeConnection(ClientServer.ZONE); + closeConnection(ClientServer.PING); + } + + @IntentHandler + private void handleSendPongIntent(SendPongIntent spi) { + InetSocketAddress lastPing = this.lastPing.get(); + if (lastPing != null) + send(lastPing, ClientServer.PING, spi.getData()); } private void customizeUdpServer(DatagramSocket socket) { @@ -138,6 +169,18 @@ public class ClientServerService extends Service { process((InetSocketAddress) packet.getSocketAddress(), ClientServer.ZONE, packet.getData()); } + private void onPingPacket(DatagramPacket packet) { + InetSocketAddress source = (InetSocketAddress) packet.getSocketAddress(); + lastPing.set(source); + ProtocolStack stack = stacks.get(ClientServer.ZONE); + PingPacket pingPacket = new PingPacket(packet.getData()); + pingPacket.setAddress(source.getAddress()); + pingPacket.setPort(source.getPort()); + + if (stack != null) + intentChain.broadcastAfter(getIntentManager(), new SonyPacketInboundIntent(stack, pingPacket)); + } + private void send(InetSocketAddress addr, ClientServer server, byte [] data) { switch (server) { case LOGIN: @@ -146,13 +189,22 @@ public class ClientServerService extends Service { case ZONE: zoneServer.send(addr, data); break; + case PING: + pingServer.send(addr, data); + break; } } private void process(InetSocketAddress source, ClientServer server, byte [] data) { - Packet parsed = parse(data); - if (parsed == null) + Packet parsed; + try { + parsed = (server == ClientServer.PING) ? new PingPacket(data) : parse(data); + if (parsed == null) + return; + } catch (BufferUnderflowException e) { + Log.w("Failed to parse packet: %s", ByteUtilities.getHexString(data)); return; + } parsed.setAddress(source.getAddress()); parsed.setPort(source.getPort()); if (parsed instanceof MultiPacket) { @@ -175,11 +227,14 @@ public class ClientServerService extends Service { } private ProtocolStack process(InetSocketAddress source, ClientServer server, Packet parsed) { + Log.t("Process [%b] %s", serverConnection.get(), parsed); if (!serverConnection.get()) { if (parsed instanceof SessionRequest) { cachedSessionRequest.set((SessionRequest) parsed); intentChain.broadcastAfter(getIntentManager(), new RequestServerConnectionIntent()); } + for (ClientServer serverType : ClientServer.values()) + closeConnection(serverType); return null; } if (parsed instanceof SessionRequest) @@ -275,8 +330,6 @@ public class ClientServerService extends Service { case 0x17: case 0x18: return new Acknowledge(data); default: - if (rawData.length == 4) - return new PingPacket(rawData); if (rawData.length >= 6) return new RawSWGPacket(rawData); Log.w("Unknown SOE packet: %d %s", opcode, ByteUtilities.getHexString(data.array())); diff --git a/src/main/java/com/projectswg/forwarder/services/server/ServerConnectionService.java b/src/main/java/com/projectswg/forwarder/services/server/ServerConnectionService.java index 38ec17c..965cab4 100644 --- a/src/main/java/com/projectswg/forwarder/services/server/ServerConnectionService.java +++ b/src/main/java/com/projectswg/forwarder/services/server/ServerConnectionService.java @@ -1,7 +1,7 @@ package com.projectswg.forwarder.services.server; -import com.projectswg.connection.HolocoreSocket; -import com.projectswg.connection.RawPacket; +import com.projectswg.holocore.client.HolocoreSocket; +import com.projectswg.holocore.client.RawPacket; import com.projectswg.forwarder.Forwarder.ForwarderData; import com.projectswg.forwarder.intents.client.ClientDisconnectedIntent; import com.projectswg.forwarder.intents.client.DataPacketInboundIntent; @@ -55,9 +55,12 @@ public class ServerConnectionService extends Service { @IntentHandler private void handleRequestServerConnectionIntent(RequestServerConnectionIntent rsci) { - if (thread.isExecuting()) - stopRunningLoop(); - thread.start(); + HolocoreSocket holocore = this.holocore; + if (holocore != null) + return; // It's trying to connect - give it a little more time + + if (stopRunningLoop()) + thread.start(); } @IntentHandler @@ -73,12 +76,15 @@ public class ServerConnectionService extends Service { } private boolean stopRunningLoop() { + if (!thread.isExecuting()) + return true; thread.stop(true); return thread.awaitTermination(1000); } private void primaryConnectionLoop() { - try (HolocoreSocket holocore = new HolocoreSocket(data.getAddress().getAddress(), data.getAddress().getPort())) { + boolean didConnect = false; + try (HolocoreSocket holocore = new HolocoreSocket(data.getAddress().getAddress(), data.getAddress().getPort(), data.isVerifyServer())) { this.holocore = holocore; Log.t("Attempting to connect to server at %s", holocore.getRemoteAddress()); if (!holocore.connect(CONNECT_TIMEOUT)) { @@ -87,11 +93,15 @@ public class ServerConnectionService extends Service { } Log.i("Successfully connected to server at %s", holocore.getRemoteAddress()); + didConnect = true; intentChain.broadcastAfter(getIntentManager(), new ServerConnectedIntent()); while (holocore.isConnected()) { RawPacket inbound = holocore.receive(); if (inbound == null) { - Log.w("Server closed connection!"); + if (holocore.isConnected()) + Log.w("Server connection interrupted"); + else + Log.w("Server closed connection!"); return; } intentChain.broadcastAfter(getIntentManager(), new DataPacketOutboundIntent(interceptor.interceptServer(inbound.getData()))); @@ -101,7 +111,8 @@ public class ServerConnectionService extends Service { Log.w(t); } finally { Log.i("Disconnected from server."); - intentChain.broadcastAfter(getIntentManager(), new ServerDisconnectedIntent()); + if (didConnect) + intentChain.broadcastAfter(getIntentManager(), new ServerDisconnectedIntent()); this.holocore = null; } } diff --git a/src/main/java/module-info.java b/src/main/java/module-info.java new file mode 100644 index 0000000..ce576ef --- /dev/null +++ b/src/main/java/module-info.java @@ -0,0 +1,13 @@ +module com.projectswg.forwarder { + requires com.projectswg.common; + requires com.projectswg.holocore.client; + requires org.jetbrains.annotations; + + requires java.management; + + exports com.projectswg.forwarder; + + opens com.projectswg.forwarder.services.client to me.joshlarson.jlcommon; + opens com.projectswg.forwarder.services.crash to me.joshlarson.jlcommon; + opens com.projectswg.forwarder.services.server to me.joshlarson.jlcommon; +}