Skip to content

Commit 63d1bf8

Browse files
committed
bug identified - snapshot.
1 parent 9f9850d commit 63d1bf8

File tree

8 files changed

+156
-48
lines changed

8 files changed

+156
-48
lines changed

src/main/java/net/sharksystem/hub/HubConnectionManager.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77
import java.io.IOException;
88
import java.util.List;
99

10+
/**
11+
* Interface for applications. It allows connection management with hubs.
12+
*/
1013
public interface HubConnectionManager {
1114
/**
1215
* Connect a hub

src/main/java/net/sharksystem/hub/SharedChannelConnectorImpl.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -409,8 +409,7 @@ public StreamPair initDataSession(CharSequence sourcePeerID, CharSequence target
409409
throws ASAPHubException, IOException {
410410

411411
return this.initDataSession(
412-
new ConnectionRequest(sourcePeerID, targetPeerID, System.currentTimeMillis() + timeout, false),
413-
timeout);
412+
ConnectionRequest.createNewConnectRequest(sourcePeerID, targetPeerID), timeout);
414413
}
415414

416415
private Thread threadWaitingForDataConnection = null;
@@ -424,7 +423,13 @@ synchronized protected StreamPair initDataSession(ConnectionRequest connectionRe
424423
if(this.threadWaitingForDataConnection == null) {
425424
Log.writeLog(this, this.toString(), "no other thread waiting - ask for silence and wait");
426425
this.threadWaitingForDataConnection = Thread.currentThread();
427-
this.askForSilence(timeout);
426+
try {
427+
this.askForSilence(timeout);
428+
}
429+
catch(ASAPHubException ahe) {
430+
Log.writeLog(this, this.toString(),
431+
"cannot silence connection - SHOULD PUT PENDING LIST: " + connectionRequest);
432+
}
428433
try {
429434
Thread.sleep(timeout);
430435
} catch (InterruptedException e) {
@@ -435,11 +440,14 @@ synchronized protected StreamPair initDataSession(ConnectionRequest connectionRe
435440
}
436441
Log.writeLog(this, this.toString(), "waiting over");
437442
if(connectionRequest.until < System.currentTimeMillis()) {
443+
Log.writeLogErr(this, this.toString(), "timed out - will not create data connection");
438444
throw new ASAPHubException("timed out - will not create data connection");
439445
}
440446

441447
if(!this.statusInSilence()) {
442-
throw new ASAPHubException("still no silent mode - cannot establish data connection with peer ");
448+
Log.writeLogErr(this, this.toString(),
449+
"still no silent mode - cannot establish data connection with peer");
450+
throw new ASAPHubException("still no silent mode - cannot establish data connection with peer");
443451
}
444452
}
445453

@@ -490,7 +498,7 @@ public void silentRPLY(HubPDUSilentRPLY pdu) {
490498

491499
public String toString() {
492500
String status = null;
493-
if(statusHubConnectorProtocol()) status = "connected";
501+
if(statusHubConnectorProtocol()) status = "hubProtocol";
494502
else if(statusInSilence()) status = "silence";
495503
else if(statusAskedForSilence()) status = "askedForSilence";
496504
else if(statusInDataSession()) status = "dataSession";

src/main/java/net/sharksystem/hub/hubside/ASAPTCPHub.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public void run() {
8383
while(!killed) {
8484
Socket newConnection = null;
8585
try {
86-
newConnection = this.serverSocket.accept();
86+
newConnection = this. serverSocket.accept();
8787
}
8888
catch(IOException ioe) {
8989
Log.writeLog(this, "exception when going to accept TCP connections - fatal, give up: "

src/main/java/net/sharksystem/hub/hubside/SharedChannelConnectorHubSideImpl.java

Lines changed: 105 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public void createNewConnection(NewConnectionCreatorListener listener,
6969
}
7070

7171
//////////////////////////////////////////////////////////////////////////////////////////////////////////
72-
// connection establisher interface to hub and connector peer side //
72+
// interface to establish connection to hub and connector peer side //
7373
//////////////////////////////////////////////////////////////////////////////////////////////////////////
7474

7575
/**
@@ -98,20 +98,55 @@ public void connectionRequest(CharSequence sourcePeerID, CharSequence targetPeer
9898
this.connectionRequest(targetPeerID);
9999
} else {
100100
// remember call
101-
this.externalConnectionRequestList.add(
102-
new ConnectionRequest(
103-
sourcePeerID, targetPeerID,System.currentTimeMillis() + timeout,
104-
this.canEstablishTCPConnections()));
101+
ConnectionRequest newConnectionRequest = new ConnectionRequest(
102+
sourcePeerID, targetPeerID,System.currentTimeMillis() + timeout,
103+
this.canEstablishTCPConnections());
104+
105+
// check for duplicates
106+
ConnectionRequest duplicate = this.connectionRequestExists(newConnectionRequest);
107+
if(duplicate != null) {
108+
StringBuilder sb = new StringBuilder();
109+
sb.append("ignore new connection request: ");
110+
sb.append("new: " + newConnectionRequest);
111+
sb.append("pending: " + duplicate);
112+
} else {
113+
this.pendingConnectionRequests.add(newConnectionRequest);
114+
this.processPendingConnectionRequestList();
115+
}
116+
}
117+
}
105118

106-
this.handleExternalConnectionRequestList();
119+
private ConnectionRequest connectionRequestExists(ConnectionRequest newConnectionRequest) {
120+
ConnectionRequest duplicate = null;
121+
for(ConnectionRequest pendingRequest : this.pendingConnectionRequests) {
122+
if(this.sameConnectionRequest(pendingRequest, newConnectionRequest)) {
123+
duplicate = pendingRequest; break;
124+
}
107125
}
126+
return duplicate;
127+
}
128+
129+
private boolean sameConnectionRequest(ConnectionRequest requestA, ConnectionRequest requestB) {
130+
return (
131+
(
132+
PeerIDHelper.sameID(requestA.sourcePeerID, requestB.sourcePeerID)
133+
&&
134+
PeerIDHelper.sameID(requestA.targetPeerID, requestB.targetPeerID)
135+
)
136+
||
137+
(
138+
PeerIDHelper.sameID(requestA.sourcePeerID, requestB.targetPeerID)
139+
&&
140+
PeerIDHelper.sameID(requestA.targetPeerID, requestB.sourcePeerID)
141+
)
142+
);
108143
}
109144

110145
void connectionRequest(CharSequence targetPeerID) throws ASAPHubException, IOException {
111146
this.hub.connectionRequest(this.getPeerID(), targetPeerID, this.getTimeOutConnectionRequest());
112147
}
113148

114-
private List<ConnectionRequest> externalConnectionRequestList = new ArrayList<>();
149+
private List<ConnectionRequest> pendingConnectionRequests = new ArrayList<>();
115150

116151
/////////////////////////////////////////////////////////////////////////////////////////////////////////////
117152
// reaction on status changes //
@@ -120,7 +155,7 @@ void connectionRequest(CharSequence targetPeerID) throws ASAPHubException, IOExc
120155
@Override
121156
protected void silenceStarted() {
122157
try {
123-
this.handleExternalConnectionRequestList();
158+
this.processPendingConnectionRequestList();
124159
} catch (ASAPHubException | IOException e) {
125160
e.printStackTrace();
126161
}
@@ -166,28 +201,42 @@ protected void shutdown() {
166201
this.hub.unregister(this.getPeerID());
167202
}
168203

169-
synchronized private boolean handleExternalConnectionRequestList() throws ASAPHubException, IOException {
170-
// lets see if we can start another connection
171-
Log.writeLog(this, this.toString(), "#entries connection request list: "
172-
+ this.externalConnectionRequestList.size());
204+
synchronized private boolean processPendingConnectionRequestList() throws ASAPHubException, IOException {
205+
// let's see if we can start another connection
206+
Log.writeLog(this, this.toString(), "process pending connection request; #entries in list: "
207+
+ this.pendingConnectionRequests.size());
208+
209+
if(this.pendingConnectionRequests.size() > 0) {
210+
boolean first = true;
211+
StringBuilder sb = new StringBuilder();
212+
for(ConnectionRequest request : this.pendingConnectionRequests) {
213+
if(first) first = false;
214+
else sb.append("\n");
215+
sb.append(request.toString());
216+
}
217+
Log.writeLog(this, this.toString(), "list: \n" + sb.toString());
218+
}
173219

174-
if(this.externalConnectionRequestList.isEmpty()) return false; // empty nothing to do
220+
if(this.pendingConnectionRequests.isEmpty()) {
221+
Log.writeLog(this, this.toString(), "no other requests - nothing to do");
222+
return false; // empty nothing to do
223+
}
175224

176225
// remove outdated requests
177-
ConnectionRequest connectionRequest = null;
178-
while(connectionRequest == null && !this.externalConnectionRequestList.isEmpty()) {
179-
connectionRequest = this.externalConnectionRequestList.remove(0);
180-
if(connectionRequest.until < System.currentTimeMillis()) {
226+
ConnectionRequest nextRequestToProcess = null;
227+
while(nextRequestToProcess == null && !this.pendingConnectionRequests.isEmpty()) {
228+
nextRequestToProcess = this.pendingConnectionRequests.remove(0);
229+
if(nextRequestToProcess.until < System.currentTimeMillis()) {
181230
Log.writeLog(this, this.toString(), "discard connection request - timed out");
182-
connectionRequest = null;
231+
nextRequestToProcess = null;
183232
}
184233
}
185234

186-
if(connectionRequest == null) return false; // list empty
235+
if(nextRequestToProcess == null) return false; // list empty
187236

188-
if(this.canEstablishTCPConnections() && connectionRequest.newConnection) {
237+
if(this.canEstablishTCPConnections() && nextRequestToProcess.newConnection) {
189238
try {
190-
return this.initDataSessionOnNewConnection(connectionRequest,
239+
return this.initDataSessionOnNewConnection(nextRequestToProcess,
191240
this.getTimeOutConnectionRequest(), this.getTimeOutDataConnection());
192241
} catch (RuntimeException e) {
193242
Log.writeLog(this, "not yet implemented? Go ahead and try shared channel: "
@@ -201,31 +250,42 @@ synchronized private boolean handleExternalConnectionRequestList() throws ASAPHu
201250
// we are in the right status - take the oldest request
202251

203252
// handle connection request
204-
Log.writeLog(this, this.toString(), "launch data session by request: " + connectionRequest);
253+
Log.writeLog(this, this.toString(), "launch data session by request: " + nextRequestToProcess);
205254

206255
// init data session - this can fail if we are not in silence mode - that's ok, though
207256
StreamPair streamPair = null;
208257
try {
209-
streamPair = this.initDataSession(connectionRequest, this.getTimeOutDataConnection());
258+
streamPair = this.initDataSession(nextRequestToProcess, this.getTimeOutDataConnection());
210259
}
211260
catch(ASAPHubException e) {
212261
Log.writeLog(this, this.toString(), "cannot init data session yet - we can wait");
213262
return false;
214263
}
215264

216265
// tell hub
217-
Log.writeLog(this, this.toString(), "tell hub about newly created data session: " + connectionRequest);
218-
this.hub.startDataSession(this.getPeerID(), connectionRequest.sourcePeerID,
266+
Log.writeLog(this, this.toString(), "tell hub about newly created data session: " + nextRequestToProcess);
267+
this.hub.startDataSession(this.getPeerID(), nextRequestToProcess.sourcePeerID,
219268
streamPair, this.getTimeOutDataConnection());
220269
} else {
221270
Log.writeLog(this, this.toString(), "not in silence mode - ask for silence");
222271
// not in silence - should we asked for silence
223272
if (this.statusHubConnectorProtocol()) { // we are in protocol status - change it
224273
// put request back
225-
this.externalConnectionRequestList.add(connectionRequest);
226-
this.askForSilence(this.getTimeOutSilenceChannel());
274+
Log.writeLog(this, this.toString(), "put request back in pending list: "
275+
+ nextRequestToProcess);
276+
this.pendingConnectionRequests.add(nextRequestToProcess);
277+
try {
278+
this.askForSilence(this.getTimeOutSilenceChannel());
279+
}
280+
catch(ASAPHubException ahe) {
281+
Log.writeLog(this, this.toString(),
282+
"cannot ask for silence; connection request remains in pending list: "
283+
+ ahe.getLocalizedMessage());
284+
}
227285
} else {
228-
Log.writeLog(this, this.toString(), "cannot ask for silence .. not in connector mode");
286+
Log.writeLog(this, this.toString(),
287+
"cannot ask for silence .. not in connector mode - discard connection request "
288+
+ nextRequestToProcess);
229289
}
230290
}
231291
return true;
@@ -239,7 +299,7 @@ protected boolean initDataSessionOnNewConnection(
239299

240300
protected void actionWhenBackFromDataSession() {
241301
try {
242-
if(this.handleExternalConnectionRequestList()) return; // there are pending request
302+
if(this.processPendingConnectionRequestList()) return; // there are pending request
243303
// relaunch Connector thread
244304
} catch (ASAPHubException | IOException e) {
245305
e.printStackTrace();
@@ -280,7 +340,7 @@ private boolean localCall(CharSequence sourcePeerID, CharSequence targetPeerID)
280340
public void disconnect(CharSequence sourcePeerID, CharSequence targetPeerID) throws ASAPHubException {
281341
Log.writeLog(this, "disconnect called");
282342
ConnectionRequest removeRequest = null;
283-
for(ConnectionRequest request : this.externalConnectionRequestList) {
343+
for(ConnectionRequest request : this.pendingConnectionRequests) {
284344
if( PeerIDHelper.sameID(sourcePeerID, request.sourcePeerID)
285345
&& PeerIDHelper.sameID(targetPeerID, request.targetPeerID)) {
286346

@@ -290,7 +350,7 @@ public void disconnect(CharSequence sourcePeerID, CharSequence targetPeerID) thr
290350
}
291351
}
292352

293-
if(removeRequest != null) this.externalConnectionRequestList.remove(removeRequest);
353+
if(removeRequest != null) this.pendingConnectionRequests.remove(removeRequest);
294354
}
295355

296356
/**
@@ -310,12 +370,21 @@ public void startDataSession(CharSequence sourcePeerID, CharSequence targetPeerI
310370
if(this.localCall(sourcePeerID, targetPeerID))
311371
throw new ASAPHubException("a connection started notification cannot come from local peer");
312372

313-
StreamPair stream2Peer = this.initDataSession(sourcePeerID, targetPeerID, timeout);
314-
Log.writeLog(this, this.toString(), "got connection to peer side");
373+
try {
374+
StreamPair stream2Peer = this.initDataSession(sourcePeerID, targetPeerID, timeout);
375+
Log.writeLog(this, this.toString(), "got connection to peer side");
315376

316-
// link stream pair from hub with stream pair to peer
317-
new StreamPairLink(stream2Peer, sourcePeerID, stream2Hub, targetPeerID);
318-
Log.writeLog(this, this.toString(), "created and started stream pair link");
377+
// link stream pair from hub with stream pair to peer
378+
new StreamPairLink(stream2Peer, sourcePeerID, stream2Hub, targetPeerID);
379+
Log.writeLog(this, this.toString(), "created and started stream pair link");
380+
}
381+
catch(ASAPHubException ahe) {
382+
Log.writeLog(this, this.toString(), "could not establish data session: "
383+
+ ahe.getLocalizedMessage());
384+
Log.writeLog(this, this.toString(), "could not establish data session: ");
385+
ConnectionRequest pendingRequest = ConnectionRequest.createNewConnectRequest(sourcePeerID, targetPeerID);
386+
Log.writeLog(this, this.toString(), "REMEMBER REQUEST ?: " + pendingRequest);
387+
}
319388
}
320389

321390
/**

src/main/java/net/sharksystem/hub/peerside/ASAPHubManager.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,11 @@ public interface ASAPHubManager {
2626
*
2727
* @param descriptions
2828
* @param asapPeer
29-
* @param killNotDescribed if true: A complete descriptions list is assumed. Meaning:
29+
* @param killConnectionIfNotInList if true: A complete descriptions list is assumed. Meaning:
3030
* Existing connections which are not in the list are stopped.
3131
*/
32-
void connectASAPHubs(Collection<HubConnectorDescription> descriptions, ASAPPeer asapPeer, boolean killNotDescribed);
32+
void connectASAPHubs(Collection<HubConnectorDescription> descriptions, ASAPPeer asapPeer,
33+
boolean killConnectionIfNotInList);
3334

3435
/**
3536
* Produce a hub connector by its description
@@ -54,4 +55,9 @@ public interface ASAPHubManager {
5455
* kill manager thread if running
5556
*/
5657
void kill();
58+
59+
/**
60+
* Force hub manager to sync with hubs right now
61+
*/
62+
void forceSyncWithHubs();
5763
}

src/main/java/net/sharksystem/hub/peerside/ASAPHubManagerImpl.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ public HubConnector getHubConnector(HubConnectorDescription hcd) throws SharkExc
136136
}
137137

138138
public void connectASAPHubs(Collection<HubConnectorDescription> descriptions,
139-
ASAPPeer asapPeer, boolean killNotDescribed) {
139+
ASAPPeer asapPeer, boolean killConnectionIfNotInList) {
140140
// for each description
141141
for(HubConnectorDescription hcd : descriptions) {
142142
// already running?
@@ -176,7 +176,7 @@ public void run() {
176176
}
177177
} // end each description for-loop
178178

179-
if(killNotDescribed) {
179+
if(killConnectionIfNotInList) {
180180
///////////////////// kill open connections which are not in the list
181181
Collection<HubConnector> toBeKilled = new ArrayList<>();
182182

@@ -227,6 +227,12 @@ public void disconnectASAPHubs() {
227227
this.disconnectASAPHubs(toBeKilled);
228228
}
229229

230+
public void forceSyncWithHubs() {
231+
if(this.managerThread != null) {
232+
this.managerThread.interrupt();
233+
}
234+
}
235+
230236
private boolean managerThreadStopped = false;
231237

232238
@Override
@@ -260,8 +266,8 @@ public void run() {
260266
Log.writeLog(this, this.toString(), "interrupted - make next round earlier");
261267
}
262268
}
263-
Log.writeLog(this, this.toString(), "hub manager thread ended.");
264269
}
270+
Log.writeLog(this, this.toString(), "hub manager thread ended.");
265271
}
266272

267273

0 commit comments

Comments
 (0)