Made some optimizations to TCPSocket

This commit is contained in:
Obique
2017-11-20 00:03:55 -06:00
parent 016f219e80
commit 8505a996c3

View File

@@ -29,10 +29,13 @@ package com.projectswg.common.network;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import com.projectswg.common.callback.CallbackManager;
import com.projectswg.common.concurrency.Delay;
@@ -44,15 +47,24 @@ public class TCPSocket {
private final CallbackManager<TCPSocketCallback> callbackManager;
private final TCPSocketListener listener;
private final InetSocketAddress address;
private final AtomicReference<SocketState> state;
private final Object stateMutex;
private final int bufferSize;
private SocketChannel socket;
private Socket socket;
private InputStream socketInputStream;
private OutputStream socketOutputStream;
public TCPSocket(InetSocketAddress address, int bufferSize) {
this.callbackManager = new CallbackManager<>("tcpsocket-"+address, 1);
this.listener = new TCPSocketListener();
this.address = address;
this.state = new AtomicReference<>(SocketState.CLOSED);
this.stateMutex = new Object();
this.bufferSize = bufferSize;
this.socket = null;
this.socketInputStream = null;
this.socketOutputStream = null;
}
public int getBufferSize() {
@@ -63,16 +75,20 @@ public class TCPSocket {
return address;
}
public SocketChannel getSocket() {
public Socket getSocket() {
return socket;
}
public boolean isAlive() {
return socket != null && listener.isAlive();
synchronized (stateLock()) {
return socket != null && listener.isAlive();
}
}
public boolean isConnected() {
return socket != null && socket.isConnected();
synchronized (stateLock()) {
return socket != null && socket.isConnected();
}
}
public void setCallback(TCPSocketCallback callback) {
@@ -83,57 +99,114 @@ public class TCPSocket {
callbackManager.clearCallbacks();
}
public boolean connect() {
Assert.isNull(socket, "Socket must be null! Cannot connect twice!");
Assert.test(!listener.isAlive(), "Listener must not be alive! Cannot connect twice!");
try {
callbackManager.start();
socket = SocketChannel.open(address);
if (socket.finishConnect()) {
listener.start();
callbackManager.callOnEach((callback) -> callback.onConnected(this));
return true;
}
} catch (IOException e) {
Log.e(e);
public void createConnection() throws IOException {
synchronized (stateLock()) {
checkAndSetState(SocketState.CLOSED, SocketState.CREATED);
socket = createSocket();
}
socket = null;
callbackManager.stop();
return false;
}
public void startConnection() throws IOException {
synchronized (stateLock()) {
try {
checkAndSetState(SocketState.CREATED, SocketState.CONNECTING);
socket.connect(address);
socketInputStream = socket.getInputStream();
socketOutputStream = socket.getOutputStream();
} catch (IOException e) {
checkAndSetState(SocketState.CONNECTING, SocketState.CLOSED);
socket = null;
socketInputStream = null;
socketOutputStream = null;
throw e;
}
callbackManager.start();
listener.start();
checkAndSetState(SocketState.CONNECTING, SocketState.CONNECTED);
callbackManager.callOnEach((callback) -> callback.onConnected(this));
}
}
public void connect() throws IOException {
createConnection();
startConnection();
}
public boolean disconnect() {
if (socket == null)
return true;
try {
socket.close();
if (listener.isAlive()) {
listener.stop();
listener.awaitTermination();
synchronized (stateLock()) {
if (socket == null)
return true;
try {
checkAndSetState(SocketState.CONNECTED, SocketState.CLOSED);
socket.close();
socket = null;
socketInputStream = null;
socketOutputStream = null;
if (listener.isAlive()) {
listener.stop();
listener.awaitTermination();
}
if (callbackManager.isRunning()) {
callbackManager.callOnEach((callback) -> callback.onDisconnected(this));
callbackManager.stop();
}
return true;
} catch (IOException e) {
Log.e(e);
}
if (callbackManager.isRunning()) {
callbackManager.callOnEach((callback) -> callback.onDisconnected(this));
callbackManager.stop();
}
socket = null;
return true;
} catch (IOException e) {
Log.e(e);
return false;
}
return false;
}
public boolean send(NetBuffer data) {
return send(data.array(), data.position(), data.remaining());
}
public boolean send(ByteBuffer data) {
try {
while (data.hasRemaining()) {
if (socket == null || socket.write(data) <= 0)
return send(data.array(), data.position(), data.remaining());
}
public boolean send(byte [] data) {
return send(data, 0, data.length);
}
public boolean send(byte [] data, int offset, int length) {
synchronized (stateLock()) {
try {
if (socket == null)
return false;
if (length > 0)
socketOutputStream.write(data, offset, length);
return true;
} catch (IOException e) {
Log.e(e);
}
return true;
} catch (IOException e) {
Log.e(e);
return false;
}
return false;
}
protected Socket createSocket() throws IOException {
return new Socket();
}
protected final Object stateLock() {
return stateMutex;
}
/**
* Checks the current state to see if it matches the expected, and if so, changes it to the new state. If not, it fails the assertion
* @param expected the expected state
* @param state the new state
*/
private void checkAndSetState(SocketState expected, SocketState state) {
Assert.notNull(expected, "Expected state cannot be null!");
Assert.notNull(state, "New state cannot be null!");
Assert.test(this.state.compareAndSet(expected, state), "Failed to set state! Was: " + this.state.get() + " Expected: " + expected + " Update: " + state);
}
public interface TCPSocketCallback {
@@ -142,6 +215,13 @@ public class TCPSocket {
void onIncomingData(TCPSocket socket, byte [] data);
}
private enum SocketState {
CLOSED,
CREATED,
CONNECTING,
CONNECTED
}
private class TCPSocketListener implements Runnable {
private final AtomicBoolean running;
@@ -158,7 +238,7 @@ public class TCPSocket {
public void start() {
Assert.test(!running.get(), "Cannot start listener! Already started!");
Assert.isNull(thread, "Cannot start listener! Already started!");
thread = new Thread(this);
thread = new Thread(this, "TCPServer Port#" + address.getPort());
running.set(true);
thread.start();
}
@@ -187,13 +267,10 @@ public class TCPSocket {
public void run() {
try {
alive.set(true);
SocketChannel sc = TCPSocket.this.socket;
int bufferSize = TCPSocket.this.bufferSize;
Assert.notNull(sc, "SocketChannel is null at start of listener run()!");
Assert.test(bufferSize > 0, "Buffer size is <= 0 at start of listener run()!");
ByteBuffer buf = ByteBuffer.allocateDirect(bufferSize);
InputStream input = TCPSocket.this.socketInputStream;
byte [] buffer = new byte[TCPSocket.this.bufferSize];
while (running.get()) {
waitIncoming(sc, buf, bufferSize);
waitIncoming(input, buffer);
}
} catch (Throwable t) {
@@ -205,18 +282,14 @@ public class TCPSocket {
}
}
private void waitIncoming(SocketChannel sc, ByteBuffer buf, int bufferSize) throws IOException {
buf.position(0);
buf.limit(bufferSize);
int n = sc.read(buf);
buf.flip();
private void waitIncoming(InputStream input, byte [] buffer) throws IOException {
int n = input.read(buffer);
if (n == 0)
return;
if (n < 0)
throw new EOFException();
byte [] data = new byte[n];
buf.position(0);
buf.get(data, 0, n);
System.arraycopy(buffer, 0, data, 0, n);
callbackManager.callOnEach((callback) -> callback.onIncomingData(TCPSocket.this, data));
}