Skip to content
1 change: 1 addition & 0 deletions neo4j/driver_with_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ func NewDriverWithContext(target string, auth auth.TokenManager, configurers ...
}

d.connector.Log = d.log
d.connector.LogId = d.logId
d.connector.RoutingContext = routingContext
d.connector.Config = d.config

Expand Down
4 changes: 4 additions & 0 deletions neo4j/driver_with_context_testkit.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ func ForceRoutingTableUpdate(d DriverWithContext, database string, bookmarks []s
return errorutil.WrapError(err)
}

func RegisterDnsResolver(d DriverWithContext, hook func(address string) []string) {
d.(*driverWithContext).connector.TestKitDnsResolver = hook
}

func GetRoutingTable(d DriverWithContext, database string) (*RoutingTable, error) {
driver := d.(*driverWithContext)
router, ok := driver.router.(*router.Router)
Expand Down
5 changes: 2 additions & 3 deletions neo4j/internal/bolt/hydrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@ import (
"github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/gql"
"time"

idb "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/db"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/log"

"github.com/neo4j/neo4j-go-driver/v5/neo4j/db"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/dbtype"
idb "github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/db"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/internal/packstream"
"github.com/neo4j/neo4j-go-driver/v5/neo4j/log"
)

const containsSystemUpdatesKey = "contains-system-updates"
Expand Down
42 changes: 33 additions & 9 deletions neo4j/internal/connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,18 @@ import (
)

type Connector struct {
SkipEncryption bool
SkipVerify bool
Log log.Logger
RoutingContext map[string]string
Network string
Config *config.Config
SupplyConnection func(context.Context, string) (net.Conn, error)
SkipEncryption bool
SkipVerify bool
Log log.Logger
LogId string
RoutingContext map[string]string
Network string
Config *config.Config
SupplyConnection func(context.Context, string) (net.Conn, error)
TestKitDnsResolver func(string) []string
}

func (c Connector) Connect(
func (c *Connector) Connect(
ctx context.Context,
address string,
auth *db.ReAuthToken,
Expand Down Expand Up @@ -138,7 +140,29 @@ func (c Connector) createConnection(ctx context.Context, address string) (net.Co
dialer.KeepAlive = -1 * time.Second // Turns keep-alive off
}

return dialer.DialContext(ctx, c.Network, address)
if c.TestKitDnsResolver == nil {
c.Log.Debugf(log.Driver, c.LogId, "dialing %s", address)
return dialer.DialContext(ctx, c.Network, address)
}

addresses := c.TestKitDnsResolver(address)

if len(addresses) == 0 {
return nil, errors.New("TestKit DNS resolver returned no address")
}

var (
err error
con net.Conn
)
for _, address := range addresses {
con, err = dialer.DialContext(ctx, c.Network, address)
if err == nil {
c.Log.Debugf(log.Driver, c.LogId, "dialing %s", address)
return con, nil
}
}
return nil, err
}

func (c Connector) tlsConfig(serverName string) *tls.Config {
Expand Down
102 changes: 74 additions & 28 deletions testkit-backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/neo4j/neo4j-go-driver/v5/neo4j/notifications"
"io"
"math"
"net"
"net/url"
"regexp"
"strings"
Expand All @@ -52,6 +53,7 @@ type backend struct {
explicitTransactions map[string]neo4j.ExplicitTransaction
recordedErrors map[string]error
resolvedAddresses map[string][]any
dnsResolutions map[string][]any
authTokenManagers map[string]auth.TokenManager
resolvedGetAuthTokens map[string]neo4j.AuthToken
resolvedHandleSecurityException map[string]bool
Expand All @@ -64,6 +66,7 @@ type backend struct {
bookmarkManagers map[string]neo4j.BookmarkManager
clientCertificateProviders map[string]auth.ClientCertificateProvider
resolvedClientCertificates map[string]auth.ClientCertificate
closed bool
}

// To implement transactional functions a bit of extra state is needed on the
Expand Down Expand Up @@ -148,6 +151,7 @@ func newBackend(rd *bufio.Reader, wr io.Writer) *backend {
explicitTransactions: make(map[string]neo4j.ExplicitTransaction),
recordedErrors: make(map[string]error),
resolvedAddresses: make(map[string][]any),
dnsResolutions: make(map[string][]any),
authTokenManagers: make(map[string]auth.TokenManager),
resolvedGetAuthTokens: make(map[string]neo4j.AuthToken),
resolvedHandleSecurityException: make(map[string]bool),
Expand All @@ -159,6 +163,7 @@ func newBackend(rd *bufio.Reader, wr io.Writer) *backend {
consumedBookmarks: make(map[string]struct{}),
clientCertificateProviders: make(map[string]auth.ClientCertificateProvider),
resolvedClientCertificates: make(map[string]auth.ClientCertificate),
closed: false,
}
}

Expand Down Expand Up @@ -295,6 +300,7 @@ func (b *backend) process() bool {
for {
line, err := b.rd.ReadString('\n')
if err != nil {
b.closed = true
return false
}

Expand Down Expand Up @@ -329,6 +335,10 @@ func (b *backend) writeResponse(name string, data any) {
if err != nil {
panic(err.Error())
}
if b.closed {
fmt.Print("RES ignored because backend is closed\n")
return
}
// Make sure that logging framework doesn't write anything inbetween here...
b.wrLock.Lock()
defer b.wrLock.Unlock()
Expand Down Expand Up @@ -443,8 +453,7 @@ func (b *backend) handleTransactionFunc(isRead bool, data map[string]any) {
b.managedTransactions[txId] = tx
b.writeResponse("RetryableTry", map[string]any{"id": txId})
// Process all things that the client might do within the transaction
for {
b.process()
for b.process() {
switch sessionState.retryableState {
case retryablePositive:
// Client succeeded and wants to commit
Expand All @@ -460,6 +469,7 @@ func (b *backend) handleTransactionFunc(isRead bool, data map[string]any) {
// Client did something not related to the retryable state
}
}
return nil, nil
}
var err error
if isRead {
Expand All @@ -482,8 +492,7 @@ func (b *backend) customAddressResolverFunction() config.ServerAddressResolver {
"id": id,
"address": fmt.Sprintf("%s:%s", address.Hostname(), address.Port()),
})
for {
b.process()
for b.process() {
if addresses, ok := b.resolvedAddresses[id]; ok {
delete(b.resolvedAddresses, id)
result := make([]config.ServerAddress, len(addresses))
Expand All @@ -493,6 +502,35 @@ func (b *backend) customAddressResolverFunction() config.ServerAddressResolver {
return result
}
}
return nil
}
}

func (b *backend) dnsResolverFunction() func(address string) []string {
return func(address string) []string {
id := b.nextId()
host, port, err := net.SplitHostPort(address)
if err != nil {
b.writeError(fmt.Errorf(
"couldn't parse address for custom DNS resulution (probably a bug in backend): %w", err,
))
return nil
}
b.writeResponse("DomainNameResolutionRequired", map[string]string{
"id": id,
"name": host,
})
for b.process() {
if addresses, ok := b.dnsResolutions[id]; ok {
delete(b.dnsResolutions, id)
result := make([]string, len(addresses))
for i, address := range addresses {
result[i] = fmt.Sprintf("%s:%s", address, port)
}
return result
}
}
return nil
}
}

Expand Down Expand Up @@ -533,6 +571,11 @@ func (b *backend) handleRequest(req map[string]any) {
fmt.Printf("REQ: %s %s\n", name, dataJson)
switch name {

case "DomainNameResolutionCompleted":
requestId := data["requestId"].(string)
addresses := data["addresses"].([]any)
b.dnsResolutions[requestId] = addresses

case "ResolverResolutionCompleted":
requestId := data["requestId"].(string)
addresses := data["addresses"].([]any)
Expand Down Expand Up @@ -640,6 +683,11 @@ func (b *backend) handleRequest(req map[string]any) {
b.writeError(err)
return
}

if data["domainNameResolverRegistered"] != nil && data["domainNameResolverRegistered"].(bool) {
neo4j.RegisterDnsResolver(driver, b.dnsResolverFunction())
}

idKey := b.nextId()
b.drivers[idKey] = driver
b.writeResponse("Driver", map[string]any{"id": idKey})
Expand Down Expand Up @@ -1150,13 +1198,13 @@ func (b *backend) handleRequest(req map[string]any) {
"id": id,
"authTokenManagerId": managerId,
})
for {
b.process()
for b.process() {
if token, ok := b.resolvedGetAuthTokens[id]; ok {
delete(b.resolvedGetAuthTokens, id)
return token
}
}
return neo4j.AuthToken{}
},
HandleSecurityExceptionFunc: func(token neo4j.AuthToken, error *db.Neo4jError) bool {
id := b.nextId()
Expand All @@ -1168,13 +1216,13 @@ func (b *backend) handleRequest(req map[string]any) {
"auth": serializeAuth(token),
"errorCode": error.Code,
})
for {
b.process()
for b.process() {
if handled, ok := b.resolvedHandleSecurityException[id]; ok {
delete(b.resolvedHandleSecurityException, id)
return handled
}
}
return false
},
}
b.authTokenManagers[managerId] = manager
Expand Down Expand Up @@ -1203,13 +1251,13 @@ func (b *backend) handleRequest(req map[string]any) {
"id": id,
"basicAuthTokenManagerId": managerId,
})
for {
b.process()
for b.process() {
if basicToken, ok := b.resolvedBasicTokens[id]; ok {
delete(b.resolvedBasicTokens, id)
return basicToken.token, nil
}
}
return neo4j.AuthToken{}, nil
})
b.authTokenManagers[managerId] = manager
b.writeResponse("BasicAuthTokenManager", map[string]any{"id": managerId})
Expand All @@ -1229,13 +1277,13 @@ func (b *backend) handleRequest(req map[string]any) {
"id": id,
"bearerAuthTokenManagerId": managerId,
})
for {
b.process()
for b.process() {
if bearerToken, ok := b.resolvedBearerTokens[id]; ok {
delete(b.resolvedBearerTokens, id)
return bearerToken.token, bearerToken.expiration, nil
}
}
return neo4j.AuthToken{}, nil, nil
})
b.authTokenManagers[managerId] = manager
b.writeResponse("BearerAuthTokenManager", map[string]any{"id": managerId})
Expand Down Expand Up @@ -1709,20 +1757,18 @@ func testSkips() map[string]string {
"stub.routing.test_routing_v3.RoutingV3.test_should_fail_when_writing_on_unexpectedly_interrupting_writer_on_run_using_tx_run": "Won't fix - only Bolt 3 affected (not officially supported by this driver): broken servers are not removed from routing table",
"stub.routing.test_routing_v3.RoutingV3.test_should_fail_when_writing_on_unexpectedly_interrupting_writer_using_tx_run": "Won't fix - only Bolt 3 affected (not officially supported by this driver): broken servers are not removed from routing table",

// Missing message support in testkit backend
"stub.routing.*.*.test_should_request_rt_from_all_initial_routers_until_successful_on_unknown_failure": "Add DNS resolver TestKit message and connection timeout support",
"stub.routing.*.*.test_should_request_rt_from_all_initial_routers_until_successful_on_authorization_expired": "Add DNS resolver TestKit message and connection timeout support",

// To fix/to decide whether to fix
"stub.routing.*.*.test_should_successfully_acquire_rt_when_router_ip_changes": "Backend lacks custom DNS resolution and Go driver RT discovery differs.",
"stub.routing.test_routing_v*.RoutingV*.test_should_revert_to_initial_router_if_known_router_throws_protocol_errors": "Driver always uses configured URL first and custom resolver only if that fails",
"stub.routing.test_routing_v*.RoutingV*.test_should_read_successfully_from_reachable_db_after_trying_unreachable_db": "Driver retries to fetch a routing table up to 100 times if it's empty",
"stub.routing.test_routing_v*.RoutingV*.test_should_write_successfully_after_leader_switch_using_tx_run": "Driver retries to fetch a routing table up to 100 times if it's empty",
"stub.routing.test_routing_v*.RoutingV*.test_should_fail_when_writing_without_writers_using_session_run": "Driver retries to fetch a routing table up to 100 times if it's empty",
"stub.routing.test_routing_v*.RoutingV*.test_should_accept_routing_table_without_writers_and_then_rediscover": "Driver retries to fetch a routing table up to 100 times if it's empty",
"stub.routing.test_routing_v*.RoutingV*.test_should_fail_on_routing_table_with_no_reader": "Driver retries to fetch a routing table up to 100 times if it's empty",
"stub.routing.test_routing_v*.RoutingV*.test_should_fail_discovery_when_router_fails_with_unknown_code": "Unify: other drivers have a list of fast failing errors during discover: on anything else, the driver will try the next router",
"stub.routing.test_routing_v*.RoutingV*.test_should_drop_connections_failing_liveness_check": "Liveness check error handling is not (yet) unified: https://github.com/neo-technology/drivers-adr/pull/83",
"stub.routing.*.*.test_should_successfully_acquire_rt_when_router_ip_changes": "Backend lacks custom DNS resolution and Go driver RT discovery differs.",
"stub.routing.test_routing_v*.RoutingV*.test_should_revert_to_initial_router_if_known_router_throws_protocol_errors": "Driver always uses configured URL first and custom resolver only if that fails",
"stub.routing.test_routing_v*.RoutingV*.test_should_request_rt_from_all_initial_routers_until_successful_on_authorization_expired": "Driver always uses configured URL first and custom resolver only if that fails",
"stub.routing.test_routing_v*.RoutingV*test_should_request_rt_from_all_initial_routers_until_successful_on_unknown_failure": "Driver always uses configured URL first and custom resolver only if that fails",
"stub.routing.test_routing_v*.RoutingV*.test_should_read_successfully_from_reachable_db_after_trying_unreachable_db": "Driver retries to fetch a routing table up to 100 times if it's empty",
"stub.routing.test_routing_v*.RoutingV*.test_should_write_successfully_after_leader_switch_using_tx_run": "Driver retries to fetch a routing table up to 100 times if it's empty",
"stub.routing.test_routing_v*.RoutingV*.test_should_fail_when_writing_without_writers_using_session_run": "Driver retries to fetch a routing table up to 100 times if it's empty",
"stub.routing.test_routing_v*.RoutingV*.test_should_accept_routing_table_without_writers_and_then_rediscover": "Driver retries to fetch a routing table up to 100 times if it's empty",
"stub.routing.test_routing_v*.RoutingV*.test_should_fail_on_routing_table_with_no_reader": "Driver retries to fetch a routing table up to 100 times if it's empty",
"stub.routing.test_routing_v*.RoutingV*.test_should_fail_discovery_when_router_fails_with_unknown_code": "Unify: other drivers have a list of fast failing errors during discover: on anything else, the driver will try the next router",
"stub.routing.test_routing_v*.RoutingV*.test_should_drop_connections_failing_liveness_check": "Liveness check error handling is not (yet) unified: https://github.com/neo-technology/drivers-adr/pull/83",
"stub.*.test_0_timeout": "Fixme: driver omits 0 as tx timeout value",
"stub.summary.test_summary.TestSummaryBasicInfo.test_server_info": "pending unification: should the server address be pre or post DNS resolution?",
}
Expand Down Expand Up @@ -1810,13 +1856,13 @@ func (b *backend) consumeBookmarks(bookmarkManagerId string) func(context.Contex
"bookmarkManagerId": bookmarkManagerId,
"bookmarks": bookmarks,
})
for {
b.process()
for b.process() {
if _, found := b.consumedBookmarks[id]; found {
delete(b.consumedBookmarks, id)
return nil
break
}
}
return nil
}
}

Expand Down