Skip to content

Commit 1773931

Browse files
feat: hot reload eszip bundle when function source changes (#3710)
* fix: file watcher skeleton for serving functions * chore: add debounce file watcher * chore: remove unnecessary log * chore: impose default watch limit * chore: add watcher tests and main.ts (#3717) * chore: add watcher tests and main.ts * chore: add serve tests * chore: fix lints * chore: remove sleep in tests * chore: simulate fs event when possible * chore: avoid flaky tests * chore: undo debounce test --------- Co-authored-by: Qiao Han <qiao@supabase.io> * chore: simplify event ignore logic * chore: add streamer unit tests * chore: simplify streamer test * chore: update serve tests * chore: update unit tests * chore: update watcher tests --------- Co-authored-by: Andrew Valleteau <avallete@users.noreply.github.com>
1 parent e454cde commit 1773931

File tree

12 files changed

+754
-102
lines changed

12 files changed

+754
-102
lines changed

internal/functions/serve/serve.go

Lines changed: 48 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ import (
1010
"strconv"
1111
"strings"
1212

13+
"github.com/docker/cli/cli/compose/loader"
1314
"github.com/docker/docker/api/types/container"
15+
"github.com/docker/docker/api/types/mount"
1416
"github.com/docker/docker/api/types/network"
1517
"github.com/docker/go-connections/nat"
1618
"github.com/go-errors/errors"
@@ -46,6 +48,7 @@ func (mode InspectMode) toFlag() string {
4648
type RuntimeOption struct {
4749
InspectMode *InspectMode
4850
InspectMain bool
51+
fileWatcher *debounceFileWatcher
4952
}
5053

5154
func (i *RuntimeOption) toArgs() []string {
@@ -68,6 +71,36 @@ const (
6871
var mainFuncEmbed string
6972

7073
func Run(ctx context.Context, envFilePath string, noVerifyJWT *bool, importMapPath string, runtimeOption RuntimeOption, fsys afero.Fs) error {
74+
watcher, err := NewDebounceFileWatcher()
75+
if err != nil {
76+
return err
77+
}
78+
go watcher.Start()
79+
defer watcher.Close()
80+
// TODO: refactor this to edge runtime service
81+
runtimeOption.fileWatcher = watcher
82+
if err := restartEdgeRuntime(ctx, envFilePath, noVerifyJWT, importMapPath, runtimeOption, fsys); err != nil {
83+
return err
84+
}
85+
streamer := NewLogStreamer(ctx)
86+
go streamer.Start(utils.EdgeRuntimeId)
87+
defer streamer.Close()
88+
for {
89+
select {
90+
case <-ctx.Done():
91+
fmt.Println("Stopped serving " + utils.Bold(utils.FunctionsDir))
92+
return ctx.Err()
93+
case <-watcher.RestartCh:
94+
if err := restartEdgeRuntime(ctx, envFilePath, noVerifyJWT, importMapPath, runtimeOption, fsys); err != nil {
95+
return err
96+
}
97+
case err := <-streamer.ErrCh:
98+
return err
99+
}
100+
}
101+
}
102+
103+
func restartEdgeRuntime(ctx context.Context, envFilePath string, noVerifyJWT *bool, importMapPath string, runtimeOption RuntimeOption, fsys afero.Fs) error {
71104
// 1. Sanity checks.
72105
if err := flags.LoadConfig(fsys); err != nil {
73106
return err
@@ -84,14 +117,7 @@ func Run(ctx context.Context, envFilePath string, noVerifyJWT *bool, importMapPa
84117
dbUrl := fmt.Sprintf("postgresql://postgres:postgres@%s:5432/postgres", utils.DbAliases[0])
85118
// 3. Serve and log to console
86119
fmt.Fprintln(os.Stderr, "Setting up Edge Functions runtime...")
87-
if err := ServeFunctions(ctx, envFilePath, noVerifyJWT, importMapPath, dbUrl, runtimeOption, fsys); err != nil {
88-
return err
89-
}
90-
if err := utils.DockerStreamLogs(ctx, utils.EdgeRuntimeId, os.Stdout, os.Stderr); err != nil {
91-
return err
92-
}
93-
fmt.Println("Stopped serving " + utils.Bold(utils.FunctionsDir))
94-
return nil
120+
return ServeFunctions(ctx, envFilePath, noVerifyJWT, importMapPath, dbUrl, runtimeOption, fsys)
95121
}
96122

97123
func ServeFunctions(ctx context.Context, envFilePath string, noVerifyJWT *bool, importMapPath string, dbUrl string, runtimeOption RuntimeOption, fsys afero.Fs) error {
@@ -131,6 +157,19 @@ func ServeFunctions(ctx context.Context, envFilePath string, noVerifyJWT *bool,
131157
if err != nil {
132158
return err
133159
}
160+
if watcher := runtimeOption.fileWatcher; watcher != nil {
161+
var watchPaths []string
162+
for _, b := range binds {
163+
if spec, err := loader.ParseVolume(b); err != nil {
164+
return errors.Errorf("failed to parse docker volume: %w", err)
165+
} else if spec.Type == string(mount.TypeBind) {
166+
watchPaths = append(watchPaths, spec.Source)
167+
}
168+
}
169+
if err := watcher.SetWatchPaths(watchPaths, fsys); err != nil {
170+
return err
171+
}
172+
}
134173
env = append(env, "SUPABASE_INTERNAL_FUNCTIONS_CONFIG="+functionsConfigString)
135174
// 3. Parse entrypoint script
136175
cmd := append([]string{
@@ -215,6 +254,7 @@ func populatePerFunctionConfigs(cwd, importMapPath string, noVerifyJWT *bool, fs
215254
for slug, fc := range functionsConfig {
216255
if !fc.Enabled {
217256
fmt.Fprintln(os.Stderr, "Skipped serving Function:", slug)
257+
delete(functionsConfig, slug)
218258
continue
219259
}
220260
modules, err := deploy.GetBindMounts(cwd, utils.FunctionsDir, "", fc.Entrypoint, fc.ImportMap, fsys)

internal/functions/serve/serve_test.go

Lines changed: 71 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@ package serve
22

33
import (
44
"context"
5+
"embed"
56
"net/http"
6-
"os"
77
"path/filepath"
8+
"strings"
89
"testing"
910

1011
"github.com/docker/docker/api/types/container"
@@ -17,13 +18,20 @@ import (
1718
"github.com/supabase/cli/pkg/cast"
1819
)
1920

21+
var (
22+
//go:embed testdata/config.toml
23+
testConfig []byte
24+
//go:embed testdata/*
25+
testdata embed.FS
26+
)
27+
2028
func TestServeCommand(t *testing.T) {
2129
t.Run("serves all functions", func(t *testing.T) {
2230
// Setup in-memory fs
2331
fsys := afero.NewMemMapFs()
24-
require.NoError(t, utils.InitConfig(utils.InitParams{ProjectId: "test"}, fsys))
32+
require.NoError(t, afero.WriteFile(fsys, utils.ConfigPath, testConfig, 0644))
2533
require.NoError(t, afero.WriteFile(fsys, utils.FallbackEnvFilePath, []byte{}, 0644))
26-
require.NoError(t, afero.WriteFile(fsys, utils.FallbackImportMapPath, []byte{}, 0644))
34+
require.NoError(t, afero.WriteFile(fsys, utils.FallbackImportMapPath, []byte("{}"), 0644))
2735
// Setup mock docker
2836
require.NoError(t, apitest.MockDocker(utils.Docker))
2937
defer gock.OffAll()
@@ -36,11 +44,11 @@ func TestServeCommand(t *testing.T) {
3644
Delete("/v" + utils.Docker.ClientVersion() + "/containers/" + containerId).
3745
Reply(http.StatusOK)
3846
apitest.MockDockerStart(utils.Docker, utils.GetRegistryImageUrl(utils.Config.EdgeRuntime.Image), containerId)
39-
require.NoError(t, apitest.MockDockerLogs(utils.Docker, containerId, "success"))
40-
// Run test
47+
require.NoError(t, apitest.MockDockerLogsStream(utils.Docker, containerId, 1, strings.NewReader("failed")))
48+
// Run test with timeout context
4149
err := Run(context.Background(), "", nil, "", RuntimeOption{}, fsys)
4250
// Check error
43-
assert.NoError(t, err)
51+
assert.ErrorContains(t, err, "error running container: exit 1")
4452
assert.Empty(t, apitest.ListUnmatchedRequests())
4553
})
4654

@@ -88,7 +96,6 @@ func TestServeCommand(t *testing.T) {
8896
})
8997

9098
t.Run("throws error on missing import map", func(t *testing.T) {
91-
utils.CurrentDirAbs = "/"
9299
// Setup in-memory fs
93100
fsys := afero.NewMemMapFs()
94101
require.NoError(t, utils.InitConfig(utils.InitParams{ProjectId: "test"}, fsys))
@@ -105,6 +112,62 @@ func TestServeCommand(t *testing.T) {
105112
// Run test
106113
err := Run(context.Background(), ".env", cast.Ptr(true), "import_map.json", RuntimeOption{}, fsys)
107114
// Check error
108-
assert.ErrorIs(t, err, os.ErrNotExist)
115+
assert.ErrorContains(t, err, "failed to resolve relative path:")
116+
})
117+
}
118+
119+
func TestServeFunctions(t *testing.T) {
120+
require.NoError(t, utils.Config.Load("testdata/config.toml", testdata))
121+
utils.UpdateDockerIds()
122+
123+
t.Run("runs inspect mode", func(t *testing.T) {
124+
// Setup in-memory fs
125+
fsys := afero.FromIOFS{FS: testdata}
126+
// Setup mock docker
127+
require.NoError(t, apitest.MockDocker(utils.Docker))
128+
defer gock.OffAll()
129+
apitest.MockDockerStart(utils.Docker, utils.GetRegistryImageUrl(utils.Config.EdgeRuntime.Image), utils.EdgeRuntimeId)
130+
// Run test
131+
err := ServeFunctions(context.Background(), "", nil, "", "", RuntimeOption{
132+
InspectMode: cast.Ptr(InspectModeRun),
133+
InspectMain: true,
134+
}, fsys)
135+
// Check error
136+
assert.NoError(t, err)
137+
assert.Empty(t, apitest.ListUnmatchedRequests())
138+
})
139+
140+
t.Run("parses env file", func(t *testing.T) {
141+
envPath := "/project/.env"
142+
// Setup in-memory fs
143+
fsys := afero.NewMemMapFs()
144+
require.NoError(t, utils.WriteFile(envPath, []byte(`
145+
DATABASE_URL=postgresql://localhost:5432/test
146+
API_KEY=secret123
147+
DEBUG=true
148+
`), fsys))
149+
// Run test
150+
env, err := parseEnvFile(envPath, fsys)
151+
// Check error
152+
assert.NoError(t, err)
153+
assert.ElementsMatch(t, []string{
154+
"DATABASE_URL=postgresql://localhost:5432/test",
155+
"API_KEY=secret123",
156+
"DEBUG=true",
157+
}, env)
158+
})
159+
160+
t.Run("parses function config", func(t *testing.T) {
161+
// Setup in-memory fs
162+
fsys := afero.FromIOFS{FS: testdata}
163+
// Run test
164+
binds, configString, err := populatePerFunctionConfigs("/", "", nil, fsys)
165+
// Check error
166+
assert.NoError(t, err)
167+
assert.ElementsMatch(t, []string{
168+
"supabase_edge_runtime_test:/root/.cache/deno:rw",
169+
"/supabase/functions/:/supabase/functions/:ro",
170+
}, binds)
171+
assert.Equal(t, `{"hello":{"verifyJWT":true,"entrypointPath":"testdata/functions/hello/index.ts","staticFiles":["testdata/image.png"]}}`, configString)
109172
})
110173
}

internal/functions/serve/streamer.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package serve
2+
3+
import (
4+
"context"
5+
"os"
6+
"time"
7+
8+
"github.com/cenkalti/backoff/v4"
9+
"github.com/containerd/errdefs"
10+
"github.com/docker/docker/api/types/container"
11+
"github.com/go-errors/errors"
12+
"github.com/supabase/cli/internal/utils"
13+
)
14+
15+
type logStreamer struct {
16+
ctx context.Context
17+
Close context.CancelFunc
18+
ErrCh chan error
19+
}
20+
21+
func NewLogStreamer(ctx context.Context) logStreamer {
22+
cancelCtx, cancel := context.WithCancel(ctx)
23+
return logStreamer{
24+
ctx: cancelCtx,
25+
Close: cancel,
26+
ErrCh: make(chan error, 1),
27+
}
28+
}
29+
30+
// Used by unit tests
31+
var retryInterval = time.Millisecond * 400
32+
33+
func (s *logStreamer) Start(containerID string) {
34+
// Retry indefinitely until stream is closed
35+
policy := backoff.WithContext(backoff.NewConstantBackOff(retryInterval), s.ctx)
36+
fetch := func() error {
37+
if err := utils.DockerStreamLogs(s.ctx, containerID, os.Stdout, os.Stderr, func(lo *container.LogsOptions) {
38+
lo.Timestamps = true
39+
}); errdefs.IsNotFound(err) || errdefs.IsConflict(err) || errors.Is(err, utils.ErrContainerKilled) {
40+
return err
41+
} else if err != nil {
42+
return &backoff.PermanentError{Err: err}
43+
}
44+
return errors.Errorf("container exited gracefully: %s", containerID)
45+
}
46+
s.ErrCh <- backoff.Retry(fetch, policy)
47+
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package serve
2+
3+
import (
4+
"context"
5+
"net/http"
6+
"strings"
7+
"testing"
8+
"time"
9+
10+
"github.com/h2non/gock"
11+
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/require"
13+
"github.com/supabase/cli/internal/testing/apitest"
14+
"github.com/supabase/cli/internal/utils"
15+
)
16+
17+
func TestLogStreamer(t *testing.T) {
18+
containerID := "test-container"
19+
retryInterval = 0
20+
21+
t.Run("streams logs from container", func(t *testing.T) {
22+
// Setup mock docker
23+
require.NoError(t, apitest.MockDocker(utils.Docker))
24+
defer gock.OffAll()
25+
require.NoError(t, apitest.MockDockerLogsStream(utils.Docker, containerID, 1, strings.NewReader("")))
26+
// Run test
27+
streamer := NewLogStreamer(context.Background())
28+
streamer.Start(containerID)
29+
// Check error
30+
select {
31+
case err := <-streamer.ErrCh:
32+
assert.ErrorContains(t, err, "error running container: exit 1")
33+
case <-time.After(2 * time.Second):
34+
assert.Fail(t, "missing error signal from closing")
35+
}
36+
assert.Empty(t, apitest.ListUnmatchedRequests())
37+
})
38+
39+
t.Run("retries on container exit", func(t *testing.T) {
40+
// Setup mock docker
41+
require.NoError(t, apitest.MockDocker(utils.Docker))
42+
defer gock.OffAll()
43+
require.NoError(t, apitest.MockDockerLogsStream(utils.Docker, containerID, 0, strings.NewReader("")))
44+
require.NoError(t, apitest.MockDockerLogsStream(utils.Docker, containerID, 137, strings.NewReader("")))
45+
require.NoError(t, apitest.MockDockerLogsStream(utils.Docker, containerID, 1, strings.NewReader("")))
46+
// Run test
47+
streamer := NewLogStreamer(context.Background())
48+
streamer.Start(containerID)
49+
// Check error
50+
select {
51+
case err := <-streamer.ErrCh:
52+
assert.ErrorContains(t, err, "error running container: exit 1")
53+
case <-time.After(2 * time.Second):
54+
assert.Fail(t, "missing error signal from closing")
55+
}
56+
assert.Empty(t, apitest.ListUnmatchedRequests())
57+
})
58+
59+
t.Run("retries on missing container", func(t *testing.T) {
60+
// Setup mock docker
61+
require.NoError(t, apitest.MockDocker(utils.Docker))
62+
defer gock.OffAll()
63+
gock.New(utils.Docker.DaemonHost()).
64+
Get("/v" + utils.Docker.ClientVersion() + "/containers/" + containerID + "/logs").
65+
Reply(http.StatusNotFound).
66+
BodyString("No such container")
67+
gock.New(utils.Docker.DaemonHost()).
68+
Get("/v" + utils.Docker.ClientVersion() + "/containers/" + containerID + "/logs").
69+
Reply(http.StatusConflict).
70+
BodyString("can not get logs from container which is dead or marked for removal")
71+
gock.New(utils.Docker.DaemonHost()).
72+
Get("/v" + utils.Docker.ClientVersion() + "/containers/" + containerID + "/logs").
73+
Reply(http.StatusOK)
74+
gock.New(utils.Docker.DaemonHost()).
75+
Get("/v" + utils.Docker.ClientVersion() + "/containers/" + containerID + "/json").
76+
Reply(http.StatusNotFound).
77+
BodyString("No such object")
78+
require.NoError(t, apitest.MockDockerLogsStream(utils.Docker, containerID, 1, strings.NewReader("")))
79+
// Run test
80+
streamer := NewLogStreamer(context.Background())
81+
streamer.Start(containerID)
82+
// Check error
83+
select {
84+
case err := <-streamer.ErrCh:
85+
assert.ErrorContains(t, err, "error running container: exit 1")
86+
case <-time.After(2 * time.Second):
87+
assert.Fail(t, "missing error signal from closing")
88+
}
89+
assert.Empty(t, apitest.ListUnmatchedRequests())
90+
})
91+
}

0 commit comments

Comments
 (0)