Skip to content

Commit b13e9da

Browse files
authored
Merge pull request #11 from Adam-Software/stream_exp
Channge algoritm send/recive code/result
2 parents 38b31b0 + 6cd3e20 commit b13e9da

File tree

2 files changed

+59
-61
lines changed

2 files changed

+59
-61
lines changed

src/RemotePythonExecution.Interface/RemotePythonExecution.Interface.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
<ItemGroup>
1313
<PackageReference Include="AdamServices.Extensions.DefaultArgumets" Version="0.0.4" />
1414
<PackageReference Include="AdamServices.Extensions.ServiceFileCreator" Version="0.1.8" />
15+
<PackageReference Include="SimpleUdp" Version="2.0.2" />
1516
<PackageReference Include="WatsonTcp" Version="6.0.9" />
1617
<PackageReference Include="Microsoft.Extensions.Configuration" Version="9.0.5" />
1718
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="9.0.5" />

src/RemotePythonExecution.Services/RemotePythonExecutionService.cs

Lines changed: 58 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@
33
using Microsoft.Extensions.Logging;
44
using Microsoft.Extensions.Options;
55
using RemotePythonExecution.Interface.RemotePythonExecutionServiceDependency.JsonModel;
6+
using SimpleUdp;
67
using System;
78
using System.Collections.Generic;
89
using System.Diagnostics;
910
using System.IO;
10-
using System.Net.Sockets;
11+
using System.Linq;
1112
using System.Runtime.InteropServices;
1213
using System.Text;
1314
using System.Threading;
@@ -30,16 +31,18 @@ public class RemotePythonExecutionService : BackgroundService, IHostedService
3031

3132
private WatsonTcpServer mTcpServer;
3233
private Process mProcess;
33-
public Guid CurrentConnectionGuid;
34+
private UdpEndpoint mUdpEndpoint;
35+
private (Guid Guid, string ip) mCurrentClientParam;
36+
3437
private bool mIsDisposed;
3538

3639
private string mInterpreterPath = string.Empty;
3740
private string mWorkingDirrectoryPath = string.Empty;
3841
private string mSourceCodeSavePath = string.Empty;
3942

40-
private bool mIsProcessEnded = false;
4143
private bool mIsOutputEnded = false;
4244

45+
4346
#endregion
4447

4548
#region ~
@@ -56,11 +59,13 @@ public RemotePythonExecutionService(IServiceProvider serviceProvider)
5659
SetPath(mAppSettingsMonitor.CurrentValue);
5760

5861
mTcpServer = new WatsonTcpServer(Ip, Port);
62+
mUdpEndpoint = new UdpEndpoint(Ip, Port);
63+
5964
mAppSettingsMonitor.OnChange(OnChangeSettings);
6065

6166
Subscribe();
6267
mTcpServer.Start();
63-
mLogger.LogInformation("Server runing on {ip}:{port}", Ip, Port);
68+
mLogger.LogInformation("Servers runing on {ip}:{port}", Ip, Port);
6469
}
6570

6671
#endregion
@@ -73,6 +78,7 @@ private void Subscribe()
7378
mTcpServer.Events.ClientDisconnected += ClientDisconnected;
7479
mTcpServer.Events.MessageReceived += MessageReceived;
7580
mTcpServer.Events.ExceptionEncountered += ExceptionEncountered;
81+
7682
AppDomain.CurrentDomain.ProcessExit += AppProcessExit;
7783

7884
mAppLifetime.ApplicationStopping.Register(OnStopping);
@@ -84,6 +90,7 @@ private void UnSubscribe()
8490
mTcpServer.Events.ClientDisconnected -= ClientDisconnected;
8591
mTcpServer.Events.MessageReceived -= MessageReceived;
8692
mTcpServer.Events.ExceptionEncountered -= ExceptionEncountered;
93+
8794
AppDomain.CurrentDomain.ProcessExit -= AppProcessExit;
8895
}
8996

@@ -98,30 +105,35 @@ private void OnChangeSettings(AppSettings settings, string arg2)
98105
}
99106

100107
private void ClientConnected(object sender, ConnectionEventArgs e)
101-
{
102-
mLogger.LogInformation("Client connected. Connection id: {Guid}", e.Client.Guid);
103-
CurrentConnectionGuid = e.Client.Guid;
108+
{
109+
var ip = e.Client.IpPort.Split(':').FirstOrDefault();
110+
mCurrentClientParam = (e.Client.Guid, ip);
104111

105112
IsOutputEnded = false;
106-
IsProcessEnded = false;
113+
mLogger.LogInformation("Client connected. Connection id: {Guid}. Client ip: {ip}", e.Client.Guid, ip);
107114
}
108115

109116
private void ClientDisconnected(object sender, DisconnectionEventArgs e)
110117
{
111118
mLogger.LogInformation("Client disconnected. Connection id: {Guid}", e.Client.Guid);
112-
113119
KillProcess();
114-
IsOutputEnded = true;
115120
}
116121

117122
private void ExceptionEncountered(object sender, ExceptionEventArgs e)
118123
{
119124
if (e.Exception is IOException)
125+
{
126+
mLogger.LogError("IOException");
120127
return;
128+
}
129+
121130

122-
if (e.Exception is SocketException)
131+
if (e.Exception is OperationCanceledException)
132+
{
133+
mLogger.LogError("OperationCanceledException");
123134
return;
124-
135+
}
136+
125137
mLogger.LogError("Error happened {errorMessage}", e.Exception);
126138
}
127139

@@ -166,11 +178,9 @@ private void MessageReceived(object sender, MessageReceivedEventArgs e)
166178

167179
case "exit":
168180
{
181+
mLogger.LogInformation("Exit by client request");
169182

170-
mLogger.LogDebug("Exit by client request");
171-
172183
KillProcess();
173-
mIsOutputEnded = true;
174184
break;
175185
}
176186
}
@@ -183,9 +193,9 @@ private void ProcessErrorDataReceived(object sender, DataReceivedEventArgs e)
183193
{
184194
try
185195
{
186-
mTcpServer.SendAsync(CurrentConnectionGuid, e.Data);
196+
mUdpEndpoint.SendAsync(mCurrentClientParam.ip, Port, e.Data);
187197
mLogger.LogDebug("{data}", e.Data);
188-
IsOutputEnded = false;
198+
189199
}
190200
catch (Exception exp)
191201
{
@@ -195,20 +205,17 @@ private void ProcessErrorDataReceived(object sender, DataReceivedEventArgs e)
195205
}
196206
}
197207

198-
private async void ProcessOutputDataReceived(object sender, DataReceivedEventArgs e)
208+
private void ProcessOutputDataReceived(object sender, DataReceivedEventArgs e)
199209
{
200210
if (e.Data != null)
201211
{
202212
try
203213
{
204-
await mTcpServer.SendAsync(CurrentConnectionGuid, e.Data);
214+
if(mTcpServer.IsClientConnected(mCurrentClientParam.Guid))
215+
mUdpEndpoint.SendAsync(mCurrentClientParam.ip, Port, e.Data);
216+
205217
mLogger.LogDebug("{data}", e.Data);
206218
}
207-
catch (TaskCanceledException)
208-
{
209-
mLogger.LogError("Task was canceled");
210-
IsOutputEnded = true;
211-
}
212219
catch (Exception exp)
213220
{
214221
mLogger.LogError("Catch happened {exp}", exp);
@@ -217,16 +224,14 @@ private async void ProcessOutputDataReceived(object sender, DataReceivedEventArg
217224
}
218225
else
219226
{
220-
mLogger.LogDebug("Output ended happened");
221-
await Task.Delay(100);
227+
mLogger.LogInformation("Output ended happened");
222228
IsOutputEnded = true;
223229
}
224230
}
225231

226232
private void ProcessExited(object sender, EventArgs e)
227233
{
228-
mLogger.LogDebug("Process ended happened");
229-
IsProcessEnded = true;
234+
mLogger.LogInformation("Process ended happened");
230235
}
231236

232237
private void AppProcessExit(object sender, EventArgs e)
@@ -255,11 +260,9 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
255260
{
256261
while (!stoppingToken.IsCancellationRequested)
257262
{
258-
while (IsOutputEnded && IsProcessEnded)
263+
while (IsOutputEnded)
259264
{
260265
IsOutputEnded = false;
261-
IsProcessEnded = false;
262-
263266
ExitData exitData = new();
264267

265268
try
@@ -279,7 +282,6 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
279282
};
280283

281284
mProcess.Dispose();
282-
mProcess = null;
283285
}
284286
}
285287
catch(Exception exception)
@@ -290,13 +292,15 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
290292
var exitJson = mTcpServer.SerializationHelper.SerializeJson(exitData);
291293
var exitDictonary = new Dictionary<string, object>() { { "exitData", exitJson } };
292294

293-
if (mTcpServer.IsClientConnected(CurrentConnectionGuid))
295+
if (mTcpServer.IsClientConnected(mCurrentClientParam.Guid))
294296
{
295-
await mTcpServer.SendAsync(CurrentConnectionGuid, string.Empty, exitDictonary, token: stoppingToken);
296-
await mTcpServer.DisconnectClientAsync(CurrentConnectionGuid, MessageStatus.Removed, true, stoppingToken);
297+
mLogger.LogInformation("Send exit data");
298+
await mTcpServer.SendAsync(mCurrentClientParam.Guid, string.Empty, exitDictonary, token: stoppingToken);
299+
await mTcpServer.DisconnectClientAsync(mCurrentClientParam.Guid, MessageStatus.Removed, true, stoppingToken);
297300
}
298-
299-
CurrentConnectionGuid = Guid.Empty;
301+
302+
mCurrentClientParam.Guid = Guid.Empty;
303+
mCurrentClientParam.ip = string.Empty;
300304
}
301305
}
302306
}
@@ -317,20 +321,6 @@ private bool IsOutputEnded
317321
}
318322
}
319323

320-
321-
322-
private bool IsProcessEnded
323-
{
324-
get { return mIsProcessEnded; }
325-
set
326-
{
327-
if(value == mIsProcessEnded)
328-
return;
329-
330-
mIsProcessEnded = value;
331-
}
332-
}
333-
334324
private string InterpreterPath
335325
{
336326
get { return mInterpreterPath; }
@@ -377,7 +367,6 @@ private string SourceCodeSavePath
377367
mSourceCodeSavePath = value;
378368
mLogger.LogDebug("New path for {name} register with values {value}", nameof(SourceCodeSavePath), SourceCodeSavePath);
379369
}
380-
381370
}
382371

383372
private string mIp = string.Empty;
@@ -426,15 +415,19 @@ private int Port
426415

427416
private void KillProcess()
428417
{
429-
if(mProcess == null)
430-
return;
431-
432418
try
433419
{
434420
mProcess.CancelErrorRead();
435421
mProcess.CancelOutputRead();
436422

437423
mProcess.Kill(entireProcessTree: true);
424+
mLogger.LogInformation("KillProcess(): {id},", mProcess.Id);
425+
426+
IsOutputEnded = true;
427+
}
428+
catch (InvalidOperationException)
429+
{
430+
mLogger.LogError("KillProcess(): InvalidOperationException happened");
438431
}
439432
catch (Exception ex)
440433
{
@@ -452,17 +445,23 @@ private void OnServerAddressChange()
452445
if(mTcpServer.Connections != 0)
453446
{
454447
KillProcess();
455-
IsOutputEnded = true;
456448
}
457449

458450
mTcpServer.Stop();
451+
452+
mTcpServer.Dispose();
453+
mUdpEndpoint.Dispose();
454+
459455
UnSubscribe();
460456
}
461457

462458
mTcpServer = new WatsonTcpServer(Ip, Port);
459+
mUdpEndpoint = new UdpEndpoint(Ip, Port);
460+
463461
Subscribe();
464462
mTcpServer.Start();
465-
mLogger.LogInformation("Server runing on {ip}:{port}", Ip, Port);
463+
464+
mLogger.LogInformation("Servers runing on {ip}:{port}", Ip, Port);
466465
}
467466

468467
private void SetPath(AppSettings appSettings)
@@ -519,7 +518,7 @@ private void StartProcess(bool withDebug = false)
519518
mProcess = new()
520519
{
521520
StartInfo = proccesInfo,
522-
EnableRaisingEvents = true,
521+
EnableRaisingEvents = true
523522
};
524523

525524
mProcess.Exited += ProcessExited;
@@ -529,7 +528,7 @@ private void StartProcess(bool withDebug = false)
529528

530529
mProcess.BeginOutputReadLine();
531530
mProcess.BeginErrorReadLine();
532-
531+
533532
mProcess.WaitForExit();
534533
}
535534

@@ -542,8 +541,6 @@ protected virtual void Dispose(bool disposing)
542541
if (disposing)
543542
{
544543
KillProcess();
545-
IsOutputEnded = true;
546-
547544
UnSubscribe();
548545
mTcpServer.Stop();
549546
mTcpServer.Dispose();

0 commit comments

Comments
 (0)