|
1 | | - |
2 | 1 | /* |
3 | 2 | * Copyright The Kmesh Authors. |
4 | 3 | * |
@@ -26,6 +25,8 @@ import ( |
26 | 25 | "fmt" |
27 | 26 | "net/http" |
28 | 27 | "net/netip" |
| 28 | + "os" |
| 29 | + "path/filepath" |
29 | 30 | "sort" |
30 | 31 | "strings" |
31 | 32 | "testing" |
@@ -817,6 +818,187 @@ func TestL4Telemetry(t *testing.T) { |
817 | 818 | }) |
818 | 819 | } |
819 | 820 |
|
| 821 | +func TestLongConnL4Telemetry(t *testing.T) { |
| 822 | + framework.NewTest(t).Run(func(t framework.TestContext) { |
| 823 | + |
| 824 | + namespace := apps.Namespace.Name() |
| 825 | + waypoint := "namespace-waypoint" |
| 826 | + |
| 827 | + newWaypointProxyOrFail(t, t, apps.Namespace, waypoint, constants.ServiceTraffic) |
| 828 | + t.Cleanup(func() { |
| 829 | + deleteWaypointProxyOrFail(t, t, apps.Namespace, waypoint) |
| 830 | + }) |
| 831 | + |
| 832 | + SetWaypoint(t, namespace, "", waypoint, Namespace) |
| 833 | + t.Cleanup(func() { |
| 834 | + UnsetWaypoint(t, namespace, "", Namespace) |
| 835 | + }) |
| 836 | + |
| 837 | + // deploying websocket server and client for establishing long connection |
| 838 | + serverPodYAML := ` |
| 839 | +apiVersion: v1 |
| 840 | +kind: Pod |
| 841 | +metadata: |
| 842 | + name: ws-server |
| 843 | + labels: |
| 844 | + app: ws-server |
| 845 | +spec: |
| 846 | + containers: |
| 847 | + - name: ws-server |
| 848 | + image: node:18 |
| 849 | + command: ["/bin/sh", "-c"] |
| 850 | + args: |
| 851 | + - | |
| 852 | + mkdir server |
| 853 | + cd server |
| 854 | + npm init -y |
| 855 | + npm install ws |
| 856 | + cat << EOF > server.js |
| 857 | + const WebSocket = require('ws'); |
| 858 | + const server = new WebSocket.Server({ port: 8080 }); |
| 859 | + server.on('listening', () => { |
| 860 | + console.log('🚀 WebSocket server started on ws://localhost:8080'); |
| 861 | + }); |
| 862 | +
|
| 863 | + server.on('connection', socket => { |
| 864 | + console.log('Client connected'); |
| 865 | + |
| 866 | + setInterval(() => { |
| 867 | + socket.send('Hello world'); |
| 868 | + console.log('Sent message to client'); |
| 869 | + }, 1000); |
| 870 | +
|
| 871 | + socket.on('message', message => { |
| 872 | + console.log('Received'); |
| 873 | + }); |
| 874 | +
|
| 875 | + socket.on('close', () => { |
| 876 | + console.log('Client disconnected'); |
| 877 | + }); |
| 878 | + }); |
| 879 | + EOF |
| 880 | + node server.js |
| 881 | + ports: |
| 882 | + - containerPort: 8080 |
| 883 | +` |
| 884 | + |
| 885 | + tmpFile := filepath.Join(t.TempDir(), "ws-server.yaml") |
| 886 | + if err := os.WriteFile(tmpFile, []byte(serverPodYAML), 0644); err != nil { |
| 887 | + t.Fatalf("failed to write WebSocket YAML: %v", err) |
| 888 | + } |
| 889 | + |
| 890 | + if _, err := shell.Execute(true, |
| 891 | + fmt.Sprintf("kubectl apply -f %s -n %s", tmpFile, namespace), |
| 892 | + ); err != nil { |
| 893 | + t.Fatalf("failed to deploy ws-server: %v", err) |
| 894 | + } |
| 895 | + t.Cleanup(func() { |
| 896 | + if _, err := shell.Execute(true, fmt.Sprintf("kubectl delete -f %s -n %s", tmpFile, namespace)); err != nil { |
| 897 | + t.Fatalf("failed to delete ws-server: %v", err) |
| 898 | + } |
| 899 | + }) |
| 900 | + |
| 901 | + serverServiceYAML := ` |
| 902 | +apiVersion: v1 |
| 903 | +kind: Service |
| 904 | +metadata: |
| 905 | + name: ws-server-service |
| 906 | + labels: |
| 907 | + app: ws-server |
| 908 | +spec: |
| 909 | + selector: |
| 910 | + app: ws-server |
| 911 | + ports: |
| 912 | + - protocol: TCP |
| 913 | + port: 8080 |
| 914 | + targetPort: 8080 |
| 915 | +` |
| 916 | + tmpFile = filepath.Join(t.TempDir(), "ws-server-service.yaml") |
| 917 | + if err := os.WriteFile(tmpFile, []byte(serverServiceYAML), 0644); err != nil { |
| 918 | + t.Fatalf("failed to write WebSocket server service YAML: %v", err) |
| 919 | + } |
| 920 | + |
| 921 | + if _, err := shell.Execute(true, |
| 922 | + fmt.Sprintf("kubectl apply -f %s -n %s", tmpFile, namespace), |
| 923 | + ); err != nil { |
| 924 | + t.Fatalf("failed to deploy ws-server-service: %v", err) |
| 925 | + } |
| 926 | + |
| 927 | + t.Cleanup(func() { |
| 928 | + if _, err := shell.Execute(true, fmt.Sprintf("kubectl delete -f %s -n %s", tmpFile, namespace)); err != nil { |
| 929 | + t.Fatalf("failed to delete ws-server-service: %v", err) |
| 930 | + } |
| 931 | + }) |
| 932 | + |
| 933 | + fetchFn := testKube.NewSinglePodFetch(t.Clusters().Default(), namespace) |
| 934 | + if _, err := testKube.WaitUntilPodsAreReady(fetchFn); err != nil { |
| 935 | + t.Fatalf("failed to wait ws-server pod to be ready: %v", err) |
| 936 | + } |
| 937 | + |
| 938 | + clientPodYaml := fmt.Sprintf(` |
| 939 | +apiVersion: v1 |
| 940 | +kind: Pod |
| 941 | +metadata: |
| 942 | + name: ws-client |
| 943 | + labels: |
| 944 | + app: ws-client |
| 945 | +spec: |
| 946 | + containers: |
| 947 | + - name: ws-client |
| 948 | + image: node:18 |
| 949 | + command: ["/bin/sh", "-c"] |
| 950 | + args: |
| 951 | + - | |
| 952 | + mkdir client |
| 953 | + cd client |
| 954 | + npm init -y |
| 955 | + npm install ws |
| 956 | + cat <<EOF > client.js |
| 957 | + const WebSocket = require('ws'); |
| 958 | + const socket = new WebSocket('ws://ws-server.%s.svc.cluster.local:8080'); |
| 959 | +
|
| 960 | + socket.on('open', () => { |
| 961 | + console.log('Connected to server'); |
| 962 | + socket.send('Hello from client'); |
| 963 | + console.log('Sent message to server'); |
| 964 | + }); |
| 965 | +
|
| 966 | + socket.on('message', data => { |
| 967 | + console.log("Received"); |
| 968 | +
|
| 969 | + }); |
| 970 | +
|
| 971 | + socket.on('close', () => { |
| 972 | + console.log('Connection closed'); |
| 973 | + }); |
| 974 | + EOF |
| 975 | + node client.js |
| 976 | +`, namespace) |
| 977 | + |
| 978 | + tmpFile = filepath.Join(t.TempDir(), "ws-client.yaml") |
| 979 | + if err := os.WriteFile(tmpFile, []byte(clientPodYaml), 0644); err != nil { |
| 980 | + t.Fatalf("failed to write WebSocket client YAML: %v", err) |
| 981 | + } |
| 982 | + |
| 983 | + if _, err := shell.Execute(true, |
| 984 | + fmt.Sprintf("kubectl apply -f %s -n %s", tmpFile, namespace), |
| 985 | + ); err != nil { |
| 986 | + t.Fatalf("failed to deploy ws-client: %v", err) |
| 987 | + } |
| 988 | + |
| 989 | + t.Cleanup(func() { |
| 990 | + if _, err := shell.Execute(true, fmt.Sprintf("kubectl delete -f %s -n %s", tmpFile, namespace)); err != nil { |
| 991 | + t.Fatalf("failed to delete ws-client: %v", err) |
| 992 | + } |
| 993 | + }) |
| 994 | + fetchFn = testKube.NewSinglePodFetch(t.Clusters().Default(), namespace) |
| 995 | + if _, err := testKube.WaitUntilPodsAreReady(fetchFn); err != nil { |
| 996 | + t.Fatalf("failed to wait ws-client pod to be ready: %v", err) |
| 997 | + } |
| 998 | + |
| 999 | + }) |
| 1000 | +} |
| 1001 | + |
820 | 1002 | func buildL4Query(src, dst echo.Instance) prometheus.Query { |
821 | 1003 | query := prometheus.Query{} |
822 | 1004 |
|
|
0 commit comments