Skip to content

Commit e5edd8d

Browse files
committed
feat: add tests for socket pool
1 parent 2ddec99 commit e5edd8d

20 files changed

+367
-68
lines changed

sample/Cnblogs.DashScope.Sample/Program.cs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,17 @@
8080
}
8181

8282
writer.Close();
83-
Console.WriteLine($"audio saved to {file.FullName}");
83+
84+
var tokenUsage = 0;
85+
await foreach (var message in tts.GetMessagesAsync())
86+
{
87+
if (message.Payload.Usage?.Characters > tokenUsage)
88+
{
89+
tokenUsage = message.Payload.Usage.Characters;
90+
}
91+
}
92+
93+
Console.WriteLine($"audio saved to {file.FullName}, token usage: {tokenUsage}");
8494
break;
8595
}
8696
}

src/Cnblogs.DashScope.AspNetCore/ServiceCollectionInjector.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public static IHttpClientBuilder AddDashScopeClient(
7272

7373
if (baseWebsocketAddress != null)
7474
{
75-
o.BaseWebsocketAddress = baseWebsocketAddress;
75+
o.WebsocketBaseAddress = baseWebsocketAddress;
7676
}
7777

7878
o.WorkspaceId = workspaceId;
@@ -87,8 +87,11 @@ private static IHttpClientBuilder AddDashScopeHttpClient(
8787
string? baseAddress,
8888
string? workspaceId)
8989
{
90+
services.AddSingleton<IDashScopeClientWebSocketFactory, DashScopeClientWebSocketFactory>();
9091
services.AddSingleton<DashScopeClientWebSocketPool>(sp
91-
=> new DashScopeClientWebSocketPool(sp.GetRequiredService<IOptions<DashScopeOptions>>().Value));
92+
=> new DashScopeClientWebSocketPool(
93+
sp.GetRequiredService<IDashScopeClientWebSocketFactory>(),
94+
sp.GetRequiredService<IOptions<DashScopeOptions>>().Value));
9295
services.AddScoped<IDashScopeClient, DashScopeClientAspNetCore>();
9396
return services.AddHttpClient(
9497
DashScopeAspNetCoreDefaults.DefaultHttpClientName,

src/Cnblogs.DashScope.Core/DashScopeClient.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,11 @@ private static DashScopeClientWebSocketPool GetConfiguredSocketPool(
4949
if (pool is null)
5050
{
5151
pool = new DashScopeClientWebSocketPool(
52+
new DashScopeClientWebSocketFactory(),
5253
new DashScopeOptions
5354
{
5455
ApiKey = apiKey,
55-
BaseWebsocketAddress = baseAddress,
56+
WebsocketBaseAddress = baseAddress,
5657
SocketPoolSize = socketPoolSize,
5758
WorkspaceId = workspaceId
5859
});

src/Cnblogs.DashScope.Core/DashScopeClientCore.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ public async Task<SpeechSynthesizerSocketSession> CreateSpeechSynthesizerSocketS
283283
string modelId,
284284
CancellationToken cancellationToken = default)
285285
{
286-
var socket = await _socketPool.RentSocketAsync<SpeechSynthesizerOutput>(cancellationToken);
286+
var socket = await _socketPool.RentSocketAsync(cancellationToken);
287287
return new SpeechSynthesizerSocketSession(socket, modelId);
288288
}
289289

src/Cnblogs.DashScope.Core/DashScopeClientWebSocket.cs

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,12 @@ public sealed class DashScopeClientWebSocket : IDisposable
2323
private Task? _receiveTask;
2424
private TaskCompletionSource<bool> _taskStartedSignal = new();
2525
private Channel<byte>? _binaryOutput;
26+
private Channel<DashScopeWebSocketResponse<JsonElement>>? _jsonOutput;
27+
28+
/// <summary>
29+
/// Unique id of this socket.
30+
/// </summary>
31+
internal Guid Id { get; } = Guid.NewGuid();
2632

2733
/// <summary>
2834
/// The binary output.
@@ -31,6 +37,14 @@ public ChannelReader<byte> BinaryOutput
3137
=> _binaryOutput?.Reader
3238
?? throw new InvalidOperationException("Please call ResetOutput() before accessing output");
3339

40+
/// <summary>
41+
/// The json output.
42+
/// </summary>
43+
/// <exception cref="InvalidOperationException">Throws when ResetOutput is not called.</exception>
44+
public ChannelReader<DashScopeWebSocketResponse<JsonElement>> JsonOutput
45+
=> _jsonOutput?.Reader
46+
?? throw new InvalidOperationException("Please call ResetOutput() before accessing output");
47+
3448
/// <summary>
3549
/// A task that completed when received task-started event.
3650
/// </summary>
@@ -72,13 +86,11 @@ internal DashScopeClientWebSocket(IClientWebSocket socket)
7286
/// <param name="uri">Websocket API uri.</param>
7387
/// <param name="cancellationToken">The cancellation token to use.</param>
7488
/// <returns></returns>
75-
/// <typeparam name="TOutput">The type of the response content.</typeparam>
7689
/// <exception cref="OperationCanceledException">When <paramref name="cancellationToken"/> was request.</exception>
77-
public async Task ConnectAsync<TOutput>(Uri uri, CancellationToken cancellationToken = default)
78-
where TOutput : class
90+
public async Task ConnectAsync(Uri uri, CancellationToken cancellationToken = default)
7991
{
8092
await _socket.ConnectAsync(uri, cancellationToken);
81-
_receiveTask = ReceiveMessagesAsync<TOutput>(cancellationToken);
93+
_receiveTask = ReceiveMessagesAsync(cancellationToken);
8294
State = DashScopeWebSocketState.Ready;
8395
}
8496

@@ -89,6 +101,8 @@ public void ResetOutput()
89101
{
90102
_binaryOutput?.Writer.TryComplete();
91103
_binaryOutput = Channel.CreateUnbounded<byte>(UnboundedChannelOptions);
104+
_jsonOutput?.Writer.TryComplete();
105+
_jsonOutput = Channel.CreateUnbounded<DashScopeWebSocketResponse<JsonElement>>(UnboundedChannelOptions);
92106
_taskStartedSignal.TrySetResult(false);
93107
_taskStartedSignal = new TaskCompletionSource<bool>();
94108
}
@@ -125,7 +139,6 @@ public Task SendMessageAsync<TInput, TParameter>(
125139

126140
private async Task<DashScopeWebSocketResponse<TOutput>?> ReceiveMessageAsync<TOutput>(
127141
CancellationToken cancellationToken = default)
128-
where TOutput : class
129142
{
130143
var buffer = new byte[1024 * 4];
131144
var segment = new ArraySegment<byte>(buffer);
@@ -169,19 +182,22 @@ public Task SendMessageAsync<TInput, TParameter>(
169182
/// Wait for server response.
170183
/// </summary>
171184
/// <param name="cancellationToken">A cancellation token used to propagate notification that this operation should be canceled.</param>
172-
/// <typeparam name="TOutput">Type of the response content.</typeparam>
173185
/// <exception cref="DashScopeException">The task was failed.</exception>
174-
private async Task ReceiveMessagesAsync<TOutput>(CancellationToken cancellationToken = default)
175-
where TOutput : class
186+
private async Task ReceiveMessagesAsync(CancellationToken cancellationToken = default)
176187
{
177188
while (State != DashScopeWebSocketState.Closed && _socket.CloseStatus == null)
178189
{
179-
var json = await ReceiveMessageAsync<TOutput>(cancellationToken);
190+
var json = await ReceiveMessageAsync<JsonElement>(cancellationToken);
180191
if (json == null)
181192
{
182193
continue;
183194
}
184195

196+
if (_jsonOutput is not null)
197+
{
198+
await _jsonOutput.Writer.WriteAsync(json, cancellationToken);
199+
}
200+
185201
var eventStr = json.Header.Event;
186202
switch (eventStr)
187203
{
@@ -192,10 +208,12 @@ private async Task ReceiveMessagesAsync<TOutput>(CancellationToken cancellationT
192208
case "task-finished":
193209
State = DashScopeWebSocketState.Ready;
194210
_binaryOutput?.Writer.Complete();
211+
_jsonOutput?.Writer.Complete();
195212
break;
196213
case "task-failed":
197214
await CloseAsync(cancellationToken);
198215
_binaryOutput?.Writer.Complete();
216+
_jsonOutput?.Writer.Complete();
199217
throw new DashScopeException(
200218
null,
201219
400,
@@ -230,6 +248,7 @@ private void Dispose(bool disposing)
230248
// Dispose managed resources.
231249
_socket.Dispose();
232250
_binaryOutput?.Writer.TryComplete();
251+
_jsonOutput?.Writer.TryComplete();
233252
}
234253
}
235254

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
namespace Cnblogs.DashScope.Core;
2+
3+
/// <summary>
4+
/// Default implementation for <see cref="IDashScopeClientWebSocketFactory"/>.
5+
/// </summary>
6+
public class DashScopeClientWebSocketFactory : IDashScopeClientWebSocketFactory
7+
{
8+
/// <inheritdoc />
9+
public DashScopeClientWebSocket GetClientWebSocket(string apiKey, string? workspaceId = null)
10+
{
11+
return new DashScopeClientWebSocket(apiKey, workspaceId);
12+
}
13+
}

src/Cnblogs.DashScope.Core/DashScopeClientWebSocketPool.cs

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,29 +8,50 @@ namespace Cnblogs.DashScope.Core;
88
public sealed class DashScopeClientWebSocketPool : IDisposable
99
{
1010
private readonly ConcurrentBag<DashScopeClientWebSocket> _available = new();
11-
private readonly ConcurrentBag<DashScopeClientWebSocket> _active = new();
11+
private readonly ConcurrentDictionary<Guid, DashScopeClientWebSocket> _active = new();
1212
private readonly DashScopeOptions _options;
13+
private readonly IDashScopeClientWebSocketFactory _dashScopeClientWebSocketFactory;
1314

1415
/// <summary>
1516
/// Socket pool for DashScope API.
1617
/// </summary>
18+
/// <param name="dashScopeClientWebSocketFactory"></param>
1719
/// <param name="options">Options for DashScope sdk.</param>
18-
public DashScopeClientWebSocketPool(DashScopeOptions options)
20+
public DashScopeClientWebSocketPool(
21+
IDashScopeClientWebSocketFactory dashScopeClientWebSocketFactory,
22+
DashScopeOptions options)
1923
{
24+
_dashScopeClientWebSocketFactory = dashScopeClientWebSocketFactory;
2025
_options = options;
2126
}
2227

23-
internal DashScopeClientWebSocketPool(IEnumerable<DashScopeClientWebSocket> sockets)
28+
/// <summary>
29+
/// Get available connection count.
30+
/// </summary>
31+
internal int AvailableSocketCount => _available.Count;
32+
33+
/// <summary>
34+
/// Get active connection count.
35+
/// </summary>
36+
internal int ActiveSocketCount => _active.Count;
37+
38+
internal DashScopeClientWebSocketPool(
39+
IEnumerable<DashScopeClientWebSocket> sockets,
40+
IDashScopeClientWebSocketFactory dashScopeClientWebSocketFactory)
2441
{
2542
_options = new DashScopeOptions();
2643
foreach (var socket in sockets)
2744
{
2845
_available.Add(socket);
2946
}
47+
48+
_dashScopeClientWebSocketFactory = dashScopeClientWebSocketFactory;
3049
}
3150

32-
internal void ReturnSocketAsync(DashScopeClientWebSocket socket)
51+
internal void ReturnSocket(DashScopeClientWebSocket socket)
3352
{
53+
_active.Remove(socket.Id, out _);
54+
3455
if (socket.State != DashScopeWebSocketState.Ready)
3556
{
3657
// not returnable, disposing.
@@ -45,11 +66,8 @@ internal void ReturnSocketAsync(DashScopeClientWebSocket socket)
4566
/// Rent or create a socket connection from pool.
4667
/// </summary>
4768
/// <param name="cancellationToken"></param>
48-
/// <typeparam name="TOutput">The output type.</typeparam>
4969
/// <returns></returns>
50-
public async Task<DashScopeClientWebSocketWrapper> RentSocketAsync<TOutput>(
51-
CancellationToken cancellationToken = default)
52-
where TOutput : class
70+
public async Task<DashScopeClientWebSocketWrapper> RentSocketAsync(CancellationToken cancellationToken = default)
5371
{
5472
var found = false;
5573
DashScopeClientWebSocket? socket = null;
@@ -67,7 +85,7 @@ public async Task<DashScopeClientWebSocketWrapper> RentSocketAsync<TOutput>(
6785
}
6886
else
6987
{
70-
socket = await InitializeNewSocketAsync<TOutput>(_options.BaseWebsocketAddress, cancellationToken);
88+
socket = await InitializeNewSocketAsync(_options.WebsocketBaseAddress, cancellationToken);
7189
found = true;
7290
}
7391
}
@@ -77,22 +95,21 @@ public async Task<DashScopeClientWebSocketWrapper> RentSocketAsync<TOutput>(
7795

7896
private DashScopeClientWebSocketWrapper ActivateSocket(DashScopeClientWebSocket socket)
7997
{
80-
_active.Add(socket);
98+
_active.TryAdd(socket.Id, socket);
8199
return new DashScopeClientWebSocketWrapper(socket, this);
82100
}
83101

84-
private async Task<DashScopeClientWebSocket> InitializeNewSocketAsync<TOutput>(
102+
private async Task<DashScopeClientWebSocket> InitializeNewSocketAsync(
85103
string url,
86104
CancellationToken cancellationToken = default)
87-
where TOutput : class
88105
{
89106
if (_available.Count + _active.Count >= _options.SocketPoolSize)
90107
{
91108
throw new InvalidOperationException("[DashScopeSDK] Socket pool is full");
92109
}
93110

94-
var socket = new DashScopeClientWebSocket(_options.ApiKey, _options.WorkspaceId);
95-
await socket.ConnectAsync<TOutput>(new Uri(url), cancellationToken);
111+
var socket = _dashScopeClientWebSocketFactory.GetClientWebSocket(_options.ApiKey, _options.WorkspaceId);
112+
await socket.ConnectAsync(new Uri(url), cancellationToken);
96113
return socket;
97114
}
98115

@@ -107,10 +124,10 @@ private void Dispose(bool disposing)
107124
socket?.Dispose();
108125
}
109126

110-
while (_active.IsEmpty == false)
127+
var activeSockets = _active.Values;
128+
foreach (var activeSocket in activeSockets)
111129
{
112-
_active.TryTake(out var socket);
113-
socket?.Dispose();
130+
activeSocket.Dispose();
114131
}
115132
}
116133
}

src/Cnblogs.DashScope.Core/DashScopeClientWebSocketWrapper.cs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
namespace Cnblogs.DashScope.Core;
1+
using System.Text.Json;
2+
3+
namespace Cnblogs.DashScope.Core;
24

35
/// <summary>
46
/// Represents a transient wrapper for rented websocket, should be transient.
@@ -14,15 +16,20 @@ public sealed record DashScopeClientWebSocketWrapper(DashScopeClientWebSocket So
1416
public IAsyncEnumerable<byte> BinaryOutput => Socket.BinaryOutput.ReadAllAsync();
1517

1618
/// <summary>
17-
/// The task that completes when received task-started event from server.
19+
/// The json message output.
1820
/// </summary>
19-
public Task TaskStarted => Socket.TaskStarted;
21+
public IAsyncEnumerable<DashScopeWebSocketResponse<JsonElement>> JsonOutput => Socket.JsonOutput.ReadAllAsync();
2022

2123
/// <summary>
2224
/// Reset task signal and output cannel.
2325
/// </summary>
2426
public void ResetTask() => Socket.ResetOutput();
2527

28+
/// <summary>
29+
/// The task that completes when received task-started event from server.
30+
/// </summary>
31+
public Task TaskStarted => Socket.TaskStarted;
32+
2633
/// <summary>
2734
/// Send message to server.
2835
/// </summary>
@@ -44,7 +51,7 @@ public Task SendMessageAsync<TInput, TParameter>(
4451
/// <inheritdoc />
4552
public void Dispose()
4653
{
47-
Pool.ReturnSocketAsync(Socket);
54+
Pool.ReturnSocket(Socket);
4855
GC.SuppressFinalize(this);
4956
}
5057

src/Cnblogs.DashScope.Core/DashScopeOptions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ public class DashScopeOptions
2020
/// <summary>
2121
/// Base address for DashScope websocket API.
2222
/// </summary>
23-
public string BaseWebsocketAddress { get; set; } = DashScopeDefaults.WebsocketApiBaseAddress;
23+
public string WebsocketBaseAddress { get; set; } = DashScopeDefaults.WebsocketApiBaseAddress;
2424

2525
/// <summary>
2626
/// Default workspace Id.

src/Cnblogs.DashScope.Core/DashScopeWebSocketResponse.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,4 @@
88
/// <typeparam name="TOutput">Output type of the response.</typeparam>
99
public record DashScopeWebSocketResponse<TOutput>(
1010
DashScopeWebSocketResponseHeader Header,
11-
DashScopeWebSocketResponsePayload<TOutput> Payload)
12-
where TOutput : class;
11+
DashScopeWebSocketResponsePayload<TOutput> Payload);

0 commit comments

Comments
 (0)