Skip to content

Commit 490b477

Browse files
authored
Merge pull request #78 from maikebing/main
优化Rowrecord的构造函数,为SessionPool与SessionDataset添加IDisposable pattern
2 parents e51e272 + d9ad8cd commit 490b477

File tree

3 files changed

+85
-27
lines changed

3 files changed

+85
-27
lines changed

src/Apache.IoTDB/DataStructure/RowRecord.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,12 @@ public class RowRecord
99
{
1010
public long Timestamps { get; }
1111
public List<object> Values { get; }
12-
public List<string> Measurements { get; }
13-
12+
public List<string> Measurements { get; }
13+
14+
public RowRecord(DateTime timestamp, List<object> values, List<string> measurements)
15+
:this(new DateTimeOffset(timestamp.ToUniversalTime()).ToUnixTimeMilliseconds(), values,measurements)
16+
{
17+
}
1418
public RowRecord(long timestamps, List<object> values, List<string> measurements)
1519
{
1620
Timestamps = timestamps;

src/Apache.IoTDB/DataStructure/SessionDataSet.cs

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
namespace Apache.IoTDB.DataStructure
88
{
9-
public class SessionDataSet
9+
public class SessionDataSet: System.IDisposable
1010
{
1111
private readonly long _queryId;
1212
private readonly string _sql;
@@ -23,8 +23,8 @@ public class SessionDataSet
2323
private int _rowIndex;
2424
private bool _hasCatchedResult;
2525
private RowRecord _cachedRowRecord;
26-
private readonly bool _isClosed = false;
27-
26+
private bool _isClosed = false;
27+
private bool disposedValue;
2828

2929
private string TimestampStr => "Time";
3030
private int StartIndex => 2;
@@ -295,7 +295,7 @@ public async Task Close()
295295

296296
try
297297
{
298-
await myClient.ServiceClient.closeOperationAsync(req);
298+
var status= await myClient.ServiceClient.closeOperationAsync(req);
299299
}
300300
catch (TException e)
301301
{
@@ -306,6 +306,34 @@ public async Task Close()
306306
_clientQueue.Add(myClient);
307307
}
308308
}
309-
}
309+
}
310+
311+
protected virtual void Dispose(bool disposing)
312+
{
313+
if (!disposedValue)
314+
{
315+
if (disposing)
316+
{
317+
try
318+
{
319+
this.Close().Wait();
320+
}
321+
catch
322+
{
323+
}
324+
}
325+
_queryDataset=null;
326+
_timeBuffer = null;
327+
_valueBufferLst = null;
328+
_bitmapBufferLst = null;
329+
disposedValue = true;
330+
}
331+
}
332+
333+
public void Dispose()
334+
{
335+
Dispose(disposing: true);
336+
GC.SuppressFinalize(this);
337+
}
310338
}
311339
}

src/Apache.IoTDB/SessionPool.cs

Lines changed: 46 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414

1515
namespace Apache.IoTDB
1616
{
17-
public class SessionPool
17+
18+
public class SessionPool:IDisposable
1819
{
1920
private static int SuccessCode => 200;
2021
private static readonly TSProtocolVersion ProtocolVersion = TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3;
@@ -89,7 +90,9 @@ public SessionPool(
8990
string password = "root",
9091
int fetchSize = 1000,
9192
string zoneId = "UTC+08:00",
92-
int poolSize = 8)
93+
int poolSize = 8,
94+
bool enableRpcCompression = true
95+
)
9396
{
9497
_host = host;
9598
_port = port;
@@ -99,8 +102,12 @@ public SessionPool(
99102
_fetchSize = fetchSize;
100103
_debugMode = false;
101104
_poolSize = poolSize;
102-
}
103-
ILoggerFactory factory;
105+
_enableRpcCompression = enableRpcCompression;
106+
}
107+
108+
ILoggerFactory factory;
109+
private bool disposedValue;
110+
104111
public void OpenDebugMode(Action<ILoggingBuilder> configure)
105112
{
106113
_debugMode = true;
@@ -111,16 +118,22 @@ public void OpenDebugMode(Action<ILoggingBuilder> configure)
111118
public void CloseDebugMode()
112119
{
113120
_debugMode = false;
114-
}
115-
121+
}
122+
116123
public async Task Open(bool enableRpcCompression, CancellationToken cancellationToken = default)
124+
{
125+
_enableRpcCompression = enableRpcCompression;
126+
await Open(cancellationToken);
127+
}
128+
129+
public async Task Open(CancellationToken cancellationToken = default)
117130
{
118-
_clients = new ConcurrentClientQueue();
119-
_enableRpcCompression = enableRpcCompression;
120-
131+
_clients = new ConcurrentClientQueue();
132+
133+
121134
for (var index = 0; index < _poolSize; index++)
122135
{
123-
_clients.Add(await CreateAndOpen(enableRpcCompression, cancellationToken));
136+
_clients.Add(await CreateAndOpen(_enableRpcCompression, cancellationToken));
124137
}
125138
}
126139

@@ -2081,15 +2094,28 @@ public async Task<List<string>> ShowPathsTemplateUsingOnAsync(string templateNam
20812094
{
20822095
_clients.Add(client);
20832096
}
2084-
}
2085-
2086-
2087-
2088-
2089-
2090-
2091-
2092-
2093-
2097+
}
2098+
2099+
protected virtual void Dispose(bool disposing)
2100+
{
2101+
if (!disposedValue)
2102+
{
2103+
if (disposing)
2104+
{
2105+
#if NET461_OR_GREATER || NETSTANDARD2_0
2106+
#else
2107+
_clients.ClientQueue.Clear();
2108+
#endif
2109+
}
2110+
_clients = null;
2111+
disposedValue = true;
2112+
}
2113+
}
2114+
2115+
public void Dispose()
2116+
{
2117+
Dispose(disposing: true);
2118+
GC.SuppressFinalize(this);
2119+
}
20942120
}
20952121
}

0 commit comments

Comments
 (0)