From 8505a996c3758d5b1d4054d302474d4cd6979808 Mon Sep 17 00:00:00 2001 From: Obique Date: Mon, 20 Nov 2017 00:03:55 -0600 Subject: [PATCH] Made some optimizations to TCPSocket --- .../projectswg/common/network/TCPSocket.java | 189 ++++++++++++------ 1 file changed, 131 insertions(+), 58 deletions(-) diff --git a/src/com/projectswg/common/network/TCPSocket.java b/src/com/projectswg/common/network/TCPSocket.java index 51e8bfa..b61241a 100644 --- a/src/com/projectswg/common/network/TCPSocket.java +++ b/src/com/projectswg/common/network/TCPSocket.java @@ -29,10 +29,13 @@ package com.projectswg.common.network; import java.io.EOFException; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.net.InetSocketAddress; +import java.net.Socket; import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import com.projectswg.common.callback.CallbackManager; import com.projectswg.common.concurrency.Delay; @@ -44,15 +47,24 @@ public class TCPSocket { private final CallbackManager callbackManager; private final TCPSocketListener listener; private final InetSocketAddress address; + private final AtomicReference state; + private final Object stateMutex; private final int bufferSize; - private SocketChannel socket; + private Socket socket; + private InputStream socketInputStream; + private OutputStream socketOutputStream; public TCPSocket(InetSocketAddress address, int bufferSize) { this.callbackManager = new CallbackManager<>("tcpsocket-"+address, 1); this.listener = new TCPSocketListener(); this.address = address; + this.state = new AtomicReference<>(SocketState.CLOSED); + this.stateMutex = new Object(); this.bufferSize = bufferSize; + this.socket = null; + this.socketInputStream = null; + this.socketOutputStream = null; } public int getBufferSize() { @@ -63,16 +75,20 @@ public class TCPSocket { return address; } - public SocketChannel getSocket() { + public Socket getSocket() { return socket; } public boolean isAlive() { - return socket != null && listener.isAlive(); + synchronized (stateLock()) { + return socket != null && listener.isAlive(); + } } public boolean isConnected() { - return socket != null && socket.isConnected(); + synchronized (stateLock()) { + return socket != null && socket.isConnected(); + } } public void setCallback(TCPSocketCallback callback) { @@ -83,57 +99,114 @@ public class TCPSocket { callbackManager.clearCallbacks(); } - public boolean connect() { - Assert.isNull(socket, "Socket must be null! Cannot connect twice!"); - Assert.test(!listener.isAlive(), "Listener must not be alive! Cannot connect twice!"); - try { - callbackManager.start(); - socket = SocketChannel.open(address); - if (socket.finishConnect()) { - listener.start(); - callbackManager.callOnEach((callback) -> callback.onConnected(this)); - return true; - } - } catch (IOException e) { - Log.e(e); + public void createConnection() throws IOException { + synchronized (stateLock()) { + checkAndSetState(SocketState.CLOSED, SocketState.CREATED); + socket = createSocket(); } - socket = null; - callbackManager.stop(); - return false; + } + + public void startConnection() throws IOException { + synchronized (stateLock()) { + try { + checkAndSetState(SocketState.CREATED, SocketState.CONNECTING); + socket.connect(address); + socketInputStream = socket.getInputStream(); + socketOutputStream = socket.getOutputStream(); + } catch (IOException e) { + checkAndSetState(SocketState.CONNECTING, SocketState.CLOSED); + socket = null; + socketInputStream = null; + socketOutputStream = null; + throw e; + } + + callbackManager.start(); + listener.start(); + checkAndSetState(SocketState.CONNECTING, SocketState.CONNECTED); + callbackManager.callOnEach((callback) -> callback.onConnected(this)); + } + } + + public void connect() throws IOException { + createConnection(); + startConnection(); } public boolean disconnect() { - if (socket == null) - return true; - try { - socket.close(); - if (listener.isAlive()) { - listener.stop(); - listener.awaitTermination(); + synchronized (stateLock()) { + if (socket == null) + return true; + try { + checkAndSetState(SocketState.CONNECTED, SocketState.CLOSED); + socket.close(); + socket = null; + socketInputStream = null; + socketOutputStream = null; + + if (listener.isAlive()) { + listener.stop(); + listener.awaitTermination(); + } + + if (callbackManager.isRunning()) { + callbackManager.callOnEach((callback) -> callback.onDisconnected(this)); + callbackManager.stop(); + } + return true; + } catch (IOException e) { + Log.e(e); } - if (callbackManager.isRunning()) { - callbackManager.callOnEach((callback) -> callback.onDisconnected(this)); - callbackManager.stop(); - } - socket = null; - return true; - } catch (IOException e) { - Log.e(e); + return false; } - return false; + } + + public boolean send(NetBuffer data) { + return send(data.array(), data.position(), data.remaining()); } public boolean send(ByteBuffer data) { - try { - while (data.hasRemaining()) { - if (socket == null || socket.write(data) <= 0) + return send(data.array(), data.position(), data.remaining()); + } + + public boolean send(byte [] data) { + return send(data, 0, data.length); + } + + public boolean send(byte [] data, int offset, int length) { + synchronized (stateLock()) { + try { + if (socket == null) return false; + + if (length > 0) + socketOutputStream.write(data, offset, length); + + return true; + } catch (IOException e) { + Log.e(e); } - return true; - } catch (IOException e) { - Log.e(e); + return false; } - return false; + } + + protected Socket createSocket() throws IOException { + return new Socket(); + } + + protected final Object stateLock() { + return stateMutex; + } + + /** + * Checks the current state to see if it matches the expected, and if so, changes it to the new state. If not, it fails the assertion + * @param expected the expected state + * @param state the new state + */ + private void checkAndSetState(SocketState expected, SocketState state) { + Assert.notNull(expected, "Expected state cannot be null!"); + Assert.notNull(state, "New state cannot be null!"); + Assert.test(this.state.compareAndSet(expected, state), "Failed to set state! Was: " + this.state.get() + " Expected: " + expected + " Update: " + state); } public interface TCPSocketCallback { @@ -142,6 +215,13 @@ public class TCPSocket { void onIncomingData(TCPSocket socket, byte [] data); } + private enum SocketState { + CLOSED, + CREATED, + CONNECTING, + CONNECTED + } + private class TCPSocketListener implements Runnable { private final AtomicBoolean running; @@ -158,7 +238,7 @@ public class TCPSocket { public void start() { Assert.test(!running.get(), "Cannot start listener! Already started!"); Assert.isNull(thread, "Cannot start listener! Already started!"); - thread = new Thread(this); + thread = new Thread(this, "TCPServer Port#" + address.getPort()); running.set(true); thread.start(); } @@ -187,13 +267,10 @@ public class TCPSocket { public void run() { try { alive.set(true); - SocketChannel sc = TCPSocket.this.socket; - int bufferSize = TCPSocket.this.bufferSize; - Assert.notNull(sc, "SocketChannel is null at start of listener run()!"); - Assert.test(bufferSize > 0, "Buffer size is <= 0 at start of listener run()!"); - ByteBuffer buf = ByteBuffer.allocateDirect(bufferSize); + InputStream input = TCPSocket.this.socketInputStream; + byte [] buffer = new byte[TCPSocket.this.bufferSize]; while (running.get()) { - waitIncoming(sc, buf, bufferSize); + waitIncoming(input, buffer); } } catch (Throwable t) { @@ -205,18 +282,14 @@ public class TCPSocket { } } - private void waitIncoming(SocketChannel sc, ByteBuffer buf, int bufferSize) throws IOException { - buf.position(0); - buf.limit(bufferSize); - int n = sc.read(buf); - buf.flip(); + private void waitIncoming(InputStream input, byte [] buffer) throws IOException { + int n = input.read(buffer); if (n == 0) return; if (n < 0) throw new EOFException(); byte [] data = new byte[n]; - buf.position(0); - buf.get(data, 0, n); + System.arraycopy(buffer, 0, data, 0, n); callbackManager.callOnEach((callback) -> callback.onIncomingData(TCPSocket.this, data)); }