diff --git a/pubsub/schemas/commit_avro_schema.go b/pubsub/schemas/commit_avro_schema.go index b2aea4b982..0fe382055b 100644 --- a/pubsub/schemas/commit_avro_schema.go +++ b/pubsub/schemas/commit_avro_schema.go @@ -21,7 +21,8 @@ import ( "io" "os" - "cloud.google.com/go/pubsub" + pubsub "cloud.google.com/go/pubsub/v2/apiv1" + "cloud.google.com/go/pubsub/v2/apiv1/pubsubpb" ) // commitAvroSchema commits a new Avro schema revision to an existing schema. @@ -30,7 +31,7 @@ func commitAvroSchema(w io.Writer, projectID, schemaID, avscFile string) error { // schemaID := "my-schema-id" // avscFile = "path/to/an/avro/schema/file(.avsc)/formatted/in/json" ctx := context.Background() - client, err := pubsub.NewSchemaClient(ctx, projectID) + client, err := pubsub.NewSchemaClient(ctx) if err != nil { return fmt.Errorf("pubsub.NewSchemaClient: %w", err) } @@ -42,14 +43,18 @@ func commitAvroSchema(w io.Writer, projectID, schemaID, avscFile string) error { return fmt.Errorf("error reading from file: %s", avscFile) } - config := pubsub.SchemaConfig{ + schema := &pubsubpb.Schema{ Name: fmt.Sprintf("projects/%s/schemas/%s", projectID, schemaID), - Type: pubsub.SchemaAvro, + Type: pubsubpb.Schema_AVRO, Definition: string(avscSource), } - s, err := client.CommitSchema(ctx, schemaID, config) + req := &pubsubpb.CommitSchemaRequest{ + Name: fmt.Sprintf("projects/%s/schemas/%s", projectID, schemaID), + Schema: schema, + } + s, err := client.CommitSchema(ctx, req) if err != nil { - return fmt.Errorf("CommitSchema: %w", err) + return fmt.Errorf("error calling CommitSchema: %w", err) } fmt.Fprintf(w, "Committed a schema using an Avro schema: %#v\n", s) return nil diff --git a/pubsub/schemas/commit_proto_schema.go b/pubsub/schemas/commit_proto_schema.go index ea9b0584cf..f0621a7c55 100644 --- a/pubsub/schemas/commit_proto_schema.go +++ b/pubsub/schemas/commit_proto_schema.go @@ -21,7 +21,8 @@ import ( "io" "os" - "cloud.google.com/go/pubsub" + pubsub "cloud.google.com/go/pubsub/v2/apiv1" + "cloud.google.com/go/pubsub/v2/apiv1/pubsubpb" ) // commitProtoSchema commits a new proto schema revision to an existing schema. @@ -30,7 +31,7 @@ func commitProtoSchema(w io.Writer, projectID, schemaID, protoFile string) error // schemaID := "my-schema" // protoFile = "path/to/a/proto/schema/file(.proto)/formatted/in/protocol/buffers" ctx := context.Background() - client, err := pubsub.NewSchemaClient(ctx, projectID) + client, err := pubsub.NewSchemaClient(ctx) if err != nil { return fmt.Errorf("pubsub.NewSchemaClient: %w", err) } @@ -42,12 +43,17 @@ func commitProtoSchema(w io.Writer, projectID, schemaID, protoFile string) error return fmt.Errorf("error reading from file: %s", protoFile) } - config := pubsub.SchemaConfig{ + schema := &pubsubpb.Schema{ + // TODO(hongalex): check if name is necessary here Name: fmt.Sprintf("projects/%s/schemas/%s", projectID, schemaID), - Type: pubsub.SchemaProtocolBuffer, + Type: pubsubpb.Schema_PROTOCOL_BUFFER, Definition: string(protoSource), } - s, err := client.CommitSchema(ctx, schemaID, config) + req := &pubsubpb.CommitSchemaRequest{ + Name: fmt.Sprintf("projects/%s/schemas/%s", projectID, schemaID), + Schema: schema, + } + s, err := client.CommitSchema(ctx, req) if err != nil { return fmt.Errorf("CommitSchema: %w", err) } diff --git a/pubsub/schemas/create_avro_schema.go b/pubsub/schemas/create_avro_schema.go index 9b8e59b625..5450cbe36f 100644 --- a/pubsub/schemas/create_avro_schema.go +++ b/pubsub/schemas/create_avro_schema.go @@ -21,7 +21,8 @@ import ( "io" "os" - "cloud.google.com/go/pubsub" + pubsub "cloud.google.com/go/pubsub/v2/apiv1" + "cloud.google.com/go/pubsub/v2/apiv1/pubsubpb" ) // createAvroSchema creates a schema resource from a JSON-formatted Avro schema file. @@ -30,7 +31,7 @@ func createAvroSchema(w io.Writer, projectID, schemaID, avscFile string) error { // schemaID := "my-schema" // avscFile = "path/to/an/avro/schema/file(.avsc)/formatted/in/json" ctx := context.Background() - client, err := pubsub.NewSchemaClient(ctx, projectID) + client, err := pubsub.NewSchemaClient(ctx) if err != nil { return fmt.Errorf("pubsub.NewSchemaClient: %w", err) } @@ -41,13 +42,17 @@ func createAvroSchema(w io.Writer, projectID, schemaID, avscFile string) error { return fmt.Errorf("error reading from file: %s", avscFile) } - config := pubsub.SchemaConfig{ - Type: pubsub.SchemaAvro, - Definition: string(avscSource), + req := &pubsubpb.CreateSchemaRequest{ + Parent: fmt.Sprintf("projects/%s", projectID), + Schema: &pubsubpb.Schema{ + Type: pubsubpb.Schema_AVRO, + Definition: string(avscSource), + }, + SchemaId: schemaID, } - s, err := client.CreateSchema(ctx, schemaID, config) + s, err := client.CreateSchema(ctx, req) if err != nil { - return fmt.Errorf("CreateSchema: %w", err) + return fmt.Errorf("error calling CreateSchema: %w", err) } fmt.Fprintf(w, "Schema created: %#v\n", s) return nil diff --git a/pubsub/schemas/create_proto_schema.go b/pubsub/schemas/create_proto_schema.go index e372f177cf..ddcdc0594b 100644 --- a/pubsub/schemas/create_proto_schema.go +++ b/pubsub/schemas/create_proto_schema.go @@ -21,7 +21,8 @@ import ( "io" "os" - "cloud.google.com/go/pubsub" + pubsub "cloud.google.com/go/pubsub/v2/apiv1" + "cloud.google.com/go/pubsub/v2/apiv1/pubsubpb" ) // createProtoSchema creates a schema resource from a schema proto file. @@ -30,7 +31,7 @@ func createProtoSchema(w io.Writer, projectID, schemaID, protoFile string) error // schemaID := "my-schema" // protoFile = "path/to/a/proto/schema/file(.proto)/formatted/in/protocol/buffers" ctx := context.Background() - client, err := pubsub.NewSchemaClient(ctx, projectID) + client, err := pubsub.NewSchemaClient(ctx) if err != nil { return fmt.Errorf("pubsub.NewSchemaClient: %w", err) } @@ -41,13 +42,17 @@ func createProtoSchema(w io.Writer, projectID, schemaID, protoFile string) error return fmt.Errorf("error reading from file: %s", protoFile) } - config := pubsub.SchemaConfig{ - Type: pubsub.SchemaProtocolBuffer, - Definition: string(protoSource), + req := &pubsubpb.CreateSchemaRequest{ + Parent: fmt.Sprintf("projects/%s", projectID), + Schema: &pubsubpb.Schema{ + Type: pubsubpb.Schema_PROTOCOL_BUFFER, + Definition: string(protoSource), + }, + SchemaId: schemaID, } - s, err := client.CreateSchema(ctx, schemaID, config) + s, err := client.CreateSchema(ctx, req) if err != nil { - return fmt.Errorf("CreateSchema: %w", err) + return fmt.Errorf("error calling CreateSchema: %w", err) } fmt.Fprintf(w, "Schema created: %#v\n", s) return nil diff --git a/pubsub/schemas/create_topic_with_schema.go b/pubsub/schemas/create_topic_with_schema.go index 8df5871009..7f7451b417 100644 --- a/pubsub/schemas/create_topic_with_schema.go +++ b/pubsub/schemas/create_topic_with_schema.go @@ -20,10 +20,11 @@ import ( "fmt" "io" - "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsub/v2" + "cloud.google.com/go/pubsub/v2/apiv1/pubsubpb" ) -func createTopicWithSchema(w io.Writer, projectID, topicID, schemaID string, encoding pubsub.SchemaEncoding) error { +func createTopicWithSchema(w io.Writer, projectID, topicID, schemaID string, encoding pubsubpb.Encoding) error { // projectID := "my-project-id" // topicID := "my-topic" // schemaID := "my-schema-id" @@ -34,13 +35,14 @@ func createTopicWithSchema(w io.Writer, projectID, topicID, schemaID string, enc return fmt.Errorf("pubsub.NewClient: %w", err) } - tc := &pubsub.TopicConfig{ - SchemaSettings: &pubsub.SchemaSettings{ + topic := &pubsubpb.Topic{ + Name: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID), + SchemaSettings: &pubsubpb.SchemaSettings{ Schema: fmt.Sprintf("projects/%s/schemas/%s", projectID, schemaID), Encoding: encoding, }, } - t, err := client.CreateTopicWithConfig(ctx, topicID, tc) + t, err := client.TopicAdminClient.CreateTopic(ctx, topic) if err != nil { return fmt.Errorf("CreateTopicWithConfig: %w", err) } diff --git a/pubsub/schemas/create_topic_with_schema_revisions.go b/pubsub/schemas/create_topic_with_schema_revisions.go index ecc110c02e..cef46e3cbe 100644 --- a/pubsub/schemas/create_topic_with_schema_revisions.go +++ b/pubsub/schemas/create_topic_with_schema_revisions.go @@ -20,31 +20,33 @@ import ( "fmt" "io" - "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsub/v2" + "cloud.google.com/go/pubsub/v2/apiv1/pubsubpb" ) -func createTopicWithSchemaRevisions(w io.Writer, projectID, topicID, schemaID, firstRevisionID, lastRevisionID string, encoding pubsub.SchemaEncoding) error { +func createTopicWithSchemaRevisions(w io.Writer, projectID, topicID, schemaID, firstRevisionID, lastRevisionID string) error { // projectID := "my-project-id" // topicID := "my-topic" // schemaID := "my-schema-id" // firstRevisionID := "my-revision-id" // lastRevisionID := "my-revision-id" - // encoding := pubsub.EncodingJSON ctx := context.Background() client, err := pubsub.NewClient(ctx, projectID) if err != nil { return fmt.Errorf("pubsub.NewClient: %w", err) } - tc := &pubsub.TopicConfig{ - SchemaSettings: &pubsub.SchemaSettings{ + topic := &pubsubpb.Topic{ + Name: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID), + SchemaSettings: &pubsubpb.SchemaSettings{ Schema: fmt.Sprintf("projects/%s/schemas/%s", projectID, schemaID), - FirstRevisionID: firstRevisionID, - LastRevisionID: lastRevisionID, - Encoding: encoding, + FirstRevisionId: firstRevisionID, + LastRevisionId: lastRevisionID, + // Alternative encoding is pubsubpb.Encoding_JSON + Encoding: pubsubpb.Encoding_BINARY, }, } - t, err := client.CreateTopicWithConfig(ctx, topicID, tc) + t, err := client.TopicAdminClient.CreateTopic(ctx, topic) if err != nil { return fmt.Errorf("CreateTopicWithConfig: %w", err) } diff --git a/pubsub/schemas/delete_schema.go b/pubsub/schemas/delete_schema.go index 3564babc2e..bc29710f64 100644 --- a/pubsub/schemas/delete_schema.go +++ b/pubsub/schemas/delete_schema.go @@ -20,20 +20,24 @@ import ( "fmt" "io" - "cloud.google.com/go/pubsub" + pubsub "cloud.google.com/go/pubsub/v2/apiv1" + "cloud.google.com/go/pubsub/v2/apiv1/pubsubpb" ) func deleteSchema(w io.Writer, projectID, schemaID string) error { // projectID := "my-project-id" // schemaID := "my-schema" ctx := context.Background() - client, err := pubsub.NewSchemaClient(ctx, projectID) + client, err := pubsub.NewSchemaClient(ctx) if err != nil { return fmt.Errorf("pubsub.NewSchemaClient: %w", err) } defer client.Close() - if err := client.DeleteSchema(ctx, schemaID); err != nil { + req := &pubsubpb.DeleteSchemaRequest{ + Name: fmt.Sprintf("projects/%s/schemas/%s", projectID, schemaID), + } + if err := client.DeleteSchema(ctx, req); err != nil { return fmt.Errorf("client.DeleteSchema: %w", err) } fmt.Fprintf(w, "Deleted schema: %s", schemaID) diff --git a/pubsub/schemas/delete_schema_revision.go b/pubsub/schemas/delete_schema_revision.go index db4aff9a5f..b9cc7b976d 100644 --- a/pubsub/schemas/delete_schema_revision.go +++ b/pubsub/schemas/delete_schema_revision.go @@ -20,7 +20,8 @@ import ( "fmt" "io" - "cloud.google.com/go/pubsub" + pubsub "cloud.google.com/go/pubsub/v2/apiv1" + "cloud.google.com/go/pubsub/v2/apiv1/pubsubpb" ) func deleteSchemaRevision(w io.Writer, projectID, schemaID, revisionID string) error { @@ -28,14 +29,17 @@ func deleteSchemaRevision(w io.Writer, projectID, schemaID, revisionID string) e // schemaID := "my-schema-id" // revisionID := "my-revision-id" ctx := context.Background() - client, err := pubsub.NewSchemaClient(ctx, projectID) + client, err := pubsub.NewSchemaClient(ctx) if err != nil { return fmt.Errorf("pubsub.NewSchemaClient: %w", err) } defer client.Close() - if _, err := client.DeleteSchemaRevision(ctx, schemaID, revisionID); err != nil { - return fmt.Errorf("client.DeleteSchema revision: %w", err) + req := &pubsubpb.DeleteSchemaRevisionRequest{ + Name: fmt.Sprintf("projects/%s/schemas/%s@%s", projectID, schemaID, revisionID), + } + if _, err := client.DeleteSchemaRevision(ctx, req); err != nil { + return fmt.Errorf("client.DeleteSchemaRevision: %w", err) } fmt.Fprintf(w, "Deleted a schema revision: %s@%s", schemaID, revisionID) return nil diff --git a/pubsub/schemas/get_schema.go b/pubsub/schemas/get_schema.go index 84e8a1c952..70adea0d2f 100644 --- a/pubsub/schemas/get_schema.go +++ b/pubsub/schemas/get_schema.go @@ -20,25 +20,30 @@ import ( "fmt" "io" - "cloud.google.com/go/pubsub" + pubsub "cloud.google.com/go/pubsub/v2/apiv1" + "cloud.google.com/go/pubsub/v2/apiv1/pubsubpb" ) func getSchema(w io.Writer, projectID, schemaID string) error { // projectID := "my-project-id" // schemaID := "my-schema" ctx := context.Background() - client, err := pubsub.NewSchemaClient(ctx, projectID) + client, err := pubsub.NewSchemaClient(ctx) if err != nil { return fmt.Errorf("pubsub.NewSchemaClient: %w", err) } defer client.Close() // Retrieve the full schema view. If you don't want to retrieve the - // definition, pass in pubsub.SchemaViewBasic which retrieves + // definition, pass in pubsubpb.SchemaView_BASIC which retrieves // just the name and type of the schema. - s, err := client.Schema(ctx, schemaID, pubsub.SchemaViewFull) + req := &pubsubpb.GetSchemaRequest{ + Name: fmt.Sprintf("projects/%s/schemas/%s", projectID, schemaID), + View: pubsubpb.SchemaView_FULL, + } + s, err := client.GetSchema(ctx, req) if err != nil { - return fmt.Errorf("client.Schema: %w", err) + return fmt.Errorf("client.GetSchema: %w", err) } fmt.Fprintf(w, "Got schema: %#v\n", s) return nil diff --git a/pubsub/schemas/get_schema_revision.go b/pubsub/schemas/get_schema_revision.go index 2cdf12f2ec..57e9811228 100644 --- a/pubsub/schemas/get_schema_revision.go +++ b/pubsub/schemas/get_schema_revision.go @@ -20,22 +20,27 @@ import ( "fmt" "io" - "cloud.google.com/go/pubsub" + pubsub "cloud.google.com/go/pubsub/v2/apiv1" + "cloud.google.com/go/pubsub/v2/apiv1/pubsubpb" ) func getSchemaRevision(w io.Writer, projectID, schemaID string) error { // projectID := "my-project-id" - // schemaID := "my-schema[@my-schema-revision]" + // schemaID := my-schema@c7cfa2a8 // with revision ctx := context.Background() - client, err := pubsub.NewSchemaClient(ctx, projectID) + client, err := pubsub.NewSchemaClient(ctx) if err != nil { return fmt.Errorf("pubsub.NewSchemaClient: %w", err) } defer client.Close() - s, err := client.Schema(ctx, schemaID, pubsub.SchemaViewFull) + req := &pubsubpb.GetSchemaRequest{ + Name: fmt.Sprintf("projects/%s/schemas/%s", projectID, schemaID), + View: pubsubpb.SchemaView_FULL, + } + s, err := client.GetSchema(ctx, req) if err != nil { - return fmt.Errorf("client.Schema revision: %w", err) + return fmt.Errorf("client.GetSchema revision: %w", err) } fmt.Fprintf(w, "Got schema revision: %#v\n", s) return nil diff --git a/pubsub/schemas/list_schema_revisions.go b/pubsub/schemas/list_schema_revisions.go index 8bf47670fe..2a67242abf 100644 --- a/pubsub/schemas/list_schema_revisions.go +++ b/pubsub/schemas/list_schema_revisions.go @@ -20,23 +20,28 @@ import ( "fmt" "io" - "cloud.google.com/go/pubsub" + pubsub "cloud.google.com/go/pubsub/v2/apiv1" + "cloud.google.com/go/pubsub/v2/apiv1/pubsubpb" "google.golang.org/api/iterator" ) -func listSchemaRevisions(w io.Writer, projectID, schemaID string) ([]*pubsub.SchemaConfig, error) { +func listSchemaRevisions(w io.Writer, projectID, schemaID string) ([]*pubsubpb.Schema, error) { // projectID := "my-project-id" // schemaID := "my-schema-id" ctx := context.Background() - client, err := pubsub.NewSchemaClient(ctx, projectID) + client, err := pubsub.NewSchemaClient(ctx) if err != nil { return nil, fmt.Errorf("pubsub.NewSchemaClient: %w", err) } defer client.Close() - var schemas []*pubsub.SchemaConfig + var schemas []*pubsubpb.Schema - schemaIter := client.ListSchemaRevisions(ctx, schemaID, pubsub.SchemaViewFull) + req := &pubsubpb.ListSchemaRevisionsRequest{ + Name: fmt.Sprintf("projects/%s/schemas/%s", projectID, schemaID), + View: pubsubpb.SchemaView_FULL, + } + schemaIter := client.ListSchemaRevisions(ctx, req) for { sc, err := schemaIter.Next() if err == iterator.Done { diff --git a/pubsub/schemas/list_schemas.go b/pubsub/schemas/list_schemas.go index fd6903d4de..095a0d5661 100644 --- a/pubsub/schemas/list_schemas.go +++ b/pubsub/schemas/list_schemas.go @@ -20,22 +20,27 @@ import ( "fmt" "io" - "cloud.google.com/go/pubsub" + pubsub "cloud.google.com/go/pubsub/v2/apiv1" + "cloud.google.com/go/pubsub/v2/apiv1/pubsubpb" "google.golang.org/api/iterator" ) -func listSchemas(w io.Writer, projectID string) ([]*pubsub.SchemaConfig, error) { +func listSchemas(w io.Writer, projectID string) ([]*pubsubpb.Schema, error) { // projectID := "my-project-id" ctx := context.Background() - client, err := pubsub.NewSchemaClient(ctx, projectID) + client, err := pubsub.NewSchemaClient(ctx) if err != nil { return nil, fmt.Errorf("pubsub.NewSchemaClient: %w", err) } defer client.Close() - var schemas []*pubsub.SchemaConfig + var schemas []*pubsubpb.Schema - schemaIter := client.Schemas(ctx, pubsub.SchemaViewFull) + req := &pubsubpb.ListSchemasRequest{ + Parent: fmt.Sprintf("projects/%s", projectID), + View: pubsubpb.SchemaView_FULL, + } + schemaIter := client.ListSchemas(ctx, req) for { sc, err := schemaIter.Next() if err == iterator.Done { diff --git a/pubsub/schemas/publish_avro_records.go b/pubsub/schemas/publish_avro_records.go index ea87408ea4..77494a827b 100644 --- a/pubsub/schemas/publish_avro_records.go +++ b/pubsub/schemas/publish_avro_records.go @@ -21,7 +21,8 @@ import ( "io" "os" - "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsub/v2" + "cloud.google.com/go/pubsub/v2/apiv1/pubsubpb" "github.com/linkedin/goavro/v2" ) @@ -46,21 +47,23 @@ func publishAvroRecords(w io.Writer, projectID, topicID, avscFile string) error record := map[string]interface{}{"name": "Alaska", "post_abbr": "AK"} // Get the topic encoding type. - t := client.Topic(topicID) - cfg, err := t.Config(ctx) + req := &pubsubpb.GetTopicRequest{ + Topic: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID), + } + t, err := client.TopicAdminClient.GetTopic(ctx, req) if err != nil { - return fmt.Errorf("topic.Config err: %w", err) + return fmt.Errorf("got err in GetTopic: %w", err) } - encoding := cfg.SchemaSettings.Encoding + encoding := t.SchemaSettings.Encoding var msg []byte switch encoding { - case pubsub.EncodingBinary: + case pubsubpb.Encoding_BINARY: msg, err = codec.BinaryFromNative(nil, record) if err != nil { return fmt.Errorf("codec.BinaryFromNative err: %w", err) } - case pubsub.EncodingJSON: + case pubsubpb.Encoding_JSON: msg, err = codec.TextualFromNative(nil, record) if err != nil { return fmt.Errorf("codec.TextualFromNative err: %w", err) @@ -69,7 +72,8 @@ func publishAvroRecords(w io.Writer, projectID, topicID, avscFile string) error return fmt.Errorf("invalid encoding: %v", encoding) } - result := t.Publish(ctx, &pubsub.Message{ + publisher := client.Publisher(t.GetName()) + result := publisher.Publish(ctx, &pubsub.Message{ Data: msg, }) _, err = result.Get(ctx) diff --git a/pubsub/schemas/publish_proto_messages.go b/pubsub/schemas/publish_proto_messages.go index 6b6a8e6f70..0b98004176 100644 --- a/pubsub/schemas/publish_proto_messages.go +++ b/pubsub/schemas/publish_proto_messages.go @@ -20,7 +20,8 @@ import ( "fmt" "io" - "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsub/v2" + "cloud.google.com/go/pubsub/v2/apiv1/pubsubpb" statepb "github.com/GoogleCloudPlatform/golang-samples/internal/pubsub/schemas" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" @@ -41,21 +42,23 @@ func publishProtoMessages(w io.Writer, projectID, topicID string) error { } // Get the topic encoding type. - t := client.Topic(topicID) - cfg, err := t.Config(ctx) + req := &pubsubpb.GetTopicRequest{ + Topic: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID), + } + t, err := client.TopicAdminClient.GetTopic(ctx, req) if err != nil { - return fmt.Errorf("topic.Config err: %w", err) + return fmt.Errorf("got err in GetTopic: %w", err) } - encoding := cfg.SchemaSettings.Encoding + encoding := t.SchemaSettings.Encoding var msg []byte switch encoding { - case pubsub.EncodingBinary: + case pubsubpb.Encoding_BINARY: msg, err = proto.Marshal(state) if err != nil { return fmt.Errorf("proto.Marshal err: %w", err) } - case pubsub.EncodingJSON: + case pubsubpb.Encoding_JSON: msg, err = protojson.Marshal(state) if err != nil { return fmt.Errorf("protojson.Marshal err: %w", err) @@ -64,7 +67,8 @@ func publishProtoMessages(w io.Writer, projectID, topicID string) error { return fmt.Errorf("invalid encoding: %v", encoding) } - result := t.Publish(ctx, &pubsub.Message{ + publisher := client.Publisher(t.GetName()) + result := publisher.Publish(ctx, &pubsub.Message{ Data: msg, }) _, err = result.Get(ctx) diff --git a/pubsub/schemas/rollback_schema.go b/pubsub/schemas/rollback_schema.go index 07e1e9158e..cab94bc8a9 100644 --- a/pubsub/schemas/rollback_schema.go +++ b/pubsub/schemas/rollback_schema.go @@ -20,7 +20,8 @@ import ( "fmt" "io" - "cloud.google.com/go/pubsub" + pubsub "cloud.google.com/go/pubsub/v2/apiv1" + "cloud.google.com/go/pubsub/v2/apiv1/pubsubpb" ) // rollbackSchema creates a new schema revision that is a copy of the provided revisionID. @@ -29,17 +30,21 @@ func rollbackSchema(w io.Writer, projectID, schemaID, revisionID string) error { // schemaID := "my-schema" // revisionID := "a1b2c3d4" ctx := context.Background() - client, err := pubsub.NewSchemaClient(ctx, projectID) + client, err := pubsub.NewSchemaClient(ctx) if err != nil { return fmt.Errorf("pubsub.NewSchemaClient: %w", err) } defer client.Close() - s, err := client.RollbackSchema(ctx, schemaID, revisionID) + req := &pubsubpb.RollbackSchemaRequest{ + Name: fmt.Sprintf("projects/%s/schemas/%s", projectID, schemaID), + RevisionId: revisionID, + } + s, err := client.RollbackSchema(ctx, req) if err != nil { return fmt.Errorf("RollbackSchema: %w", err) } - fmt.Fprintf(w, "Rolled back a schema: %#v\n", s) + fmt.Fprintf(w, "Rolled back schema: %#v\n", s) return nil } diff --git a/pubsub/schemas/schemas_test.go b/pubsub/schemas/schemas_test.go index 410b2c4235..8b4a87729c 100644 --- a/pubsub/schemas/schemas_test.go +++ b/pubsub/schemas/schemas_test.go @@ -21,17 +21,18 @@ import ( "context" "fmt" "io" - "io/ioutil" + "os" "strings" "sync" "testing" "time" - "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsub/v2" + schema "cloud.google.com/go/pubsub/v2/apiv1" + "cloud.google.com/go/pubsub/v2/apiv1/pubsubpb" "github.com/GoogleCloudPlatform/golang-samples/internal/testutil" "github.com/google/go-cmp/cmp" "github.com/google/uuid" - "google.golang.org/api/iterator" ) const ( @@ -49,7 +50,7 @@ const ( // down every time, so this speeds things up. var once sync.Once -func setup(t *testing.T) (*pubsub.Client, *pubsub.SchemaClient) { +func setup(t *testing.T) (*pubsub.Client, *schema.SchemaClient) { ctx := context.Background() tc := testutil.SystemTest(t) @@ -58,72 +59,24 @@ func setup(t *testing.T) (*pubsub.Client, *pubsub.SchemaClient) { t.Fatalf("failed to create client: %v", err) } - schemaClient, err := pubsub.NewSchemaClient(ctx, tc.ProjectID) + schemaClient, err := schema.NewSchemaClient(ctx) if err != nil { t.Fatalf("failed to create schema client: %v", err) } - // Cleanup resources from the previous tests. - // This includes schemas, topics, and subscriptions. + // Cleanup schema resources from the previous tests. once.Do(func() { - wg := sync.WaitGroup{} - - wg.Add(1) - go func() { - scs, err := listSchemas(ioutil.Discard, tc.ProjectID) - if err != nil { - fmt.Printf("failed to list schemas: %v", err) - } - for _, sc := range scs { - schemaName := strings.Split(sc.Name, "/") - schemaID := schemaName[len(schemaName)-1] - if strings.HasPrefix(schemaID, schemaPrefix) { - deleteSchema(ioutil.Discard, tc.ProjectID, schemaID) - } - } - wg.Done() - }() - - wg.Add(1) - go func() { - topicIter := client.Topics(ctx) - for { - topic, err := topicIter.Next() - if err == iterator.Done { - break - } - if err != nil { - fmt.Printf("topicIter.Next got err: %v", err) - } - if strings.HasPrefix(topic.ID(), topicPrefix) { - if err := topic.Delete(ctx); err != nil { - fmt.Printf("topic.Delete got err: %v", err) - } - } - } - wg.Done() - }() - - wg.Add(1) - go func() { - subIter := client.Subscriptions(ctx) - for { - sub, err := subIter.Next() - if err == iterator.Done { - break - } - if err != nil { - fmt.Printf("subIter.Next got err: %v", err) - } - if strings.HasPrefix(sub.ID(), subPrefix) { - if err := sub.Delete(ctx); err != nil { - fmt.Printf("sub.Delete got err: %v", err) - } - } - } - wg.Done() - }() - wg.Wait() + scs, err := listSchemas(io.Discard, tc.ProjectID) + if err != nil { + fmt.Printf("failed to list schemas: %v", err) + } + for _, sc := range scs { + schemaName := strings.Split(sc.Name, "/") + schemaID := schemaName[len(schemaName)-1] + if strings.HasPrefix(schemaID, schemaPrefix) { + deleteSchema(io.Discard, tc.ProjectID, schemaID) + } + } }) return client, schemaClient @@ -163,7 +116,8 @@ func TestSchemas_Admin(t *testing.T) { }) protoSchemaID := schemaPrefix + "proto-" + uuid.NewString() - var protoSchema *pubsub.SchemaConfig + protoSchemaName := fmt.Sprintf("projects/%s/schemas/%s", tc.ProjectID, protoSchemaID) + var protoSchema *pubsubpb.Schema t.Run("createProtoSchema", func(t *testing.T) { testutil.Retry(t, 10, time.Second, func(r *testutil.R) { buf := new(bytes.Buffer) @@ -178,9 +132,13 @@ func TestSchemas_Admin(t *testing.T) { ctx := context.Background() var err error - protoSchema, err = sc.Schema(ctx, protoSchemaID, pubsub.SchemaViewFull) + req := &pubsubpb.GetSchemaRequest{ + Name: protoSchemaName, + View: pubsubpb.SchemaView_FULL, + } + protoSchema, err = sc.GetSchema(ctx, req) if err != nil { - r.Errorf("failed to get schema: %v\n", err) + r.Errorf("failed to get proto schema: %v\n", err) } }) }) @@ -202,11 +160,11 @@ func TestSchemas_Admin(t *testing.T) { t.Run("rollbackSchema", func(t *testing.T) { testutil.Retry(t, 5, time.Second, func(r *testutil.R) { buf := new(bytes.Buffer) - if err := rollbackSchema(buf, tc.ProjectID, protoSchemaID, protoSchema.RevisionID); err != nil { + if err := rollbackSchema(buf, tc.ProjectID, protoSchemaID, protoSchema.RevisionId); err != nil { r.Errorf("rollbackSchema err: %v\n", err) } got := buf.String() - want := "Rolled back a schema" + want := "Rolled back schema" if !strings.Contains(got, want) { r.Errorf("rollbackSchema() got: %q\nwant: %q\n", got, want) } @@ -231,7 +189,7 @@ func TestSchemas_Admin(t *testing.T) { t.Run("getSchemaRevision", func(t *testing.T) { testutil.Retry(t, 5, time.Second, func(r *testutil.R) { buf := new(bytes.Buffer) - schemaRev := fmt.Sprintf("%s@%s", protoSchemaID, protoSchema.RevisionID) + schemaRev := fmt.Sprintf("%s@%s", protoSchemaID, protoSchema.RevisionId) err := getSchemaRevision(buf, tc.ProjectID, schemaRev) if err != nil { r.Errorf("getSchemaRevision err: %v", err) @@ -276,7 +234,7 @@ func TestSchemas_Admin(t *testing.T) { t.Run("createTopicWithSchemaRevisions", func(t *testing.T) { testutil.Retry(t, 5, time.Second, func(r *testutil.R) { buf := new(bytes.Buffer) - err := createTopicWithSchemaRevisions(buf, tc.ProjectID, topicID, protoSchemaID, protoSchema.RevisionID, protoSchema.RevisionID, pubsub.EncodingBinary) + err := createTopicWithSchemaRevisions(buf, tc.ProjectID, topicID, protoSchemaID, protoSchema.RevisionId, protoSchema.RevisionId) if err != nil { r.Errorf("createTopicWithSchemaRevisions err: %v", err) } @@ -291,7 +249,7 @@ func TestSchemas_Admin(t *testing.T) { t.Run("deleteSchemaRevision", func(t *testing.T) { testutil.Retry(t, 5, time.Second, func(r *testutil.R) { buf := new(bytes.Buffer) - if err := deleteSchemaRevision(buf, tc.ProjectID, protoSchemaID, protoSchema.RevisionID); err != nil { + if err := deleteSchemaRevision(buf, tc.ProjectID, protoSchemaID, protoSchema.RevisionId); err != nil { r.Errorf("deleteSchemaRevision err: %v", err) } got := buf.String() @@ -322,7 +280,7 @@ func TestSchemas_AvroSchemaAll(t *testing.T) { topicID := topicPrefix + uuid.NewString() avroSchemaID := schemaPrefix + "avro-" + uuid.NewString() - _, err := defaultSchemaConfig(tc.ProjectID, avroSchemaID, avroFilePath, pubsub.SchemaAvro) + _, err := defaultSchemaConfig(tc.ProjectID, avroSchemaID, avroFilePath, pubsubpb.Schema_AVRO) if err != nil { t.Fatalf("defaultSchemaConfig err: %v", err) } @@ -330,12 +288,12 @@ func TestSchemas_AvroSchemaAll(t *testing.T) { t.Run("createTopicWithSchema", func(t *testing.T) { testutil.Retry(t, 10, time.Second, func(r *testutil.R) { - if err := createAvroSchema(ioutil.Discard, tc.ProjectID, avroSchemaID, avroFilePath); err != nil { + if err := createAvroSchema(io.Discard, tc.ProjectID, avroSchemaID, avroFilePath); err != nil { r.Errorf("createAvroSchema err: %v", err) } buf := new(bytes.Buffer) - err := createTopicWithSchema(buf, tc.ProjectID, topicID, avroSchemaID, pubsub.EncodingJSON) + err := createTopicWithSchema(buf, tc.ProjectID, topicID, avroSchemaID, pubsubpb.Encoding_JSON) if err != nil { r.Errorf("createTopicWithSchema: %v", err) } @@ -345,10 +303,11 @@ func TestSchemas_AvroSchemaAll(t *testing.T) { r.Errorf("createTopicWithSchema mismatch\ngot: %v\nwant: %v\n", got, want) } - subCfg := pubsub.SubscriptionConfig{ - Topic: client.Topic(topicID), + sub := &pubsubpb.Subscription{ + Name: fmt.Sprintf("projects/%s/subscriptions/%s", tc.ProjectID, subID), + Topic: fmt.Sprintf("projects/%s/topics/%s", tc.ProjectID, topicID), } - if _, err = client.CreateSubscription(ctx, subID, subCfg); err != nil { + if _, err = client.SubscriptionAdminClient.CreateSubscription(ctx, sub); err != nil { r.Errorf("client.CreateSubscription err: %v", err) } }) @@ -391,7 +350,7 @@ func TestSchemas_AvroSchemaAll(t *testing.T) { r.Errorf("publishAvroRecords: %v", err) } buf := new(bytes.Buffer) - err = subscribeWithAvroSchemaRevisions(buf, tc.ProjectID, subID, avroFilePath) + err = subscribeWithAvroSchemaRevisions(buf, tc.ProjectID, subID) if err != nil { r.Errorf("subscribeWithAvroSchemaRevisions: %v", err) } @@ -403,9 +362,15 @@ func TestSchemas_AvroSchemaAll(t *testing.T) { }) }) - deleteSchema(ioutil.Discard, tc.ProjectID, avroSchemaID) - client.Subscription(subID).Delete(ctx) - client.Topic(topicID).Delete(ctx) + deleteSchema(io.Discard, tc.ProjectID, avroSchemaID) + dsr := &pubsubpb.DeleteSubscriptionRequest{ + Subscription: fmt.Sprintf("projects/%s/subscriptions/%s", tc.ProjectID, subID), + } + client.SubscriptionAdminClient.DeleteSubscription(ctx, dsr) + dtr := &pubsubpb.DeleteTopicRequest{ + Topic: fmt.Sprintf("projects/%s/topics/%s", tc.ProjectID, topicID), + } + client.TopicAdminClient.DeleteTopic(ctx, dtr) } func TestSchemas_ProtoSchemaAll(t *testing.T) { @@ -415,7 +380,7 @@ func TestSchemas_ProtoSchemaAll(t *testing.T) { topicID := topicPrefix + uuid.NewString() protoSchemaID := schemaPrefix + "proto-" + uuid.NewString() - _, err := defaultSchemaConfig(tc.ProjectID, protoSchemaID, avroFilePath, pubsub.SchemaAvro) + _, err := defaultSchemaConfig(tc.ProjectID, protoSchemaID, avroFilePath, pubsubpb.Schema_AVRO) if err != nil { t.Fatalf("defaultSchemaConfig err: %v", err) } @@ -423,12 +388,12 @@ func TestSchemas_ProtoSchemaAll(t *testing.T) { t.Run("createResources", func(t *testing.T) { testutil.Retry(t, 10, time.Second, func(r *testutil.R) { - if err := createProtoSchema(ioutil.Discard, tc.ProjectID, protoSchemaID, protoFilePath); err != nil { + if err := createProtoSchema(io.Discard, tc.ProjectID, protoSchemaID, protoFilePath); err != nil { r.Errorf("createProtoSchema err: %v", err) } buf := new(bytes.Buffer) - err := createTopicWithSchema(buf, tc.ProjectID, topicID, protoSchemaID, pubsub.EncodingJSON) + err := createTopicWithSchema(buf, tc.ProjectID, topicID, protoSchemaID, pubsubpb.Encoding_JSON) if err != nil { r.Errorf("createTopicWithSchema: %v", err) } @@ -438,11 +403,12 @@ func TestSchemas_ProtoSchemaAll(t *testing.T) { r.Errorf("createTopicWithSchema mismatch\ngot: %v\nwant: %v\n", got, want) } - subCfg := pubsub.SubscriptionConfig{ - Topic: client.Topic(topicID), + sub := &pubsubpb.Subscription{ + Name: fmt.Sprintf("projects/%s/subscriptions/%s", tc.ProjectID, subID), + Topic: fmt.Sprintf("projects/%s/topics/%s", tc.ProjectID, topicID), } - if _, err = client.CreateSubscription(ctx, subID, subCfg); err != nil { - r.Errorf("client.CreateSubscription err: %v", err) + if _, err = client.SubscriptionAdminClient.CreateSubscription(ctx, sub); err != nil { + r.Errorf("failed to create subscription: %v", err) } }) }) @@ -465,7 +431,7 @@ func TestSchemas_ProtoSchemaAll(t *testing.T) { t.Run("subscribeProtoMessages", func(t *testing.T) { testutil.Retry(t, 10, time.Second, func(r *testutil.R) { buf := new(bytes.Buffer) - err := subscribeWithProtoSchema(buf, tc.ProjectID, subID, protoFilePath) + err := subscribeWithProtoSchema(buf, tc.ProjectID, subID) if err != nil { r.Errorf("subscribeWithProtoSchema: %v", err) } @@ -477,59 +443,66 @@ func TestSchemas_ProtoSchemaAll(t *testing.T) { }) }) - deleteSchema(ioutil.Discard, tc.ProjectID, protoSchemaID) - client.Subscription(subID).Delete(ctx) - client.Topic(topicID).Delete(ctx) + deleteSchema(io.Discard, tc.ProjectID, protoSchemaID) + dsr := &pubsubpb.DeleteSubscriptionRequest{ + Subscription: fmt.Sprintf("projects/%s/subscriptions/%s", tc.ProjectID, subID), + } + client.SubscriptionAdminClient.DeleteSubscription(ctx, dsr) + dtr := &pubsubpb.DeleteTopicRequest{ + Topic: fmt.Sprintf("projects/%s/topics/%s", tc.ProjectID, topicID), + } + client.TopicAdminClient.DeleteTopic(ctx, dtr) } func TestSchemas_UpdateTopicSchema(t *testing.T) { - _, schemaClient := setup(t) + pubsubClient, schemaClient := setup(t) tc := testutil.SystemTest(t) ctx := context.Background() topicID := topicPrefix + uuid.NewString() + protoSchemaID := schemaPrefix + "proto-" + uuid.NewString() - protoSchemaID2 := schemaPrefix + "proto-" + uuid.NewString() - protoSource, err := ioutil.ReadFile(protoFilePath) + protoSource, err := os.ReadFile(protoFilePath) if err != nil { t.Fatalf("error reading from file: %s", protoFilePath) } - schema, err := schemaClient.CreateSchema(ctx, protoSchemaID, pubsub.SchemaConfig{ - Type: pubsub.SchemaProtocolBuffer, - Definition: string(protoSource), - }) - if err != nil { - t.Fatalf("createProtoSchema err: %v", err) + csr := &pubsubpb.CreateSchemaRequest{ + Parent: fmt.Sprintf("projects/%s", tc.ProjectID), + SchemaId: protoSchemaID, + Schema: &pubsubpb.Schema{ + Type: pubsubpb.Schema_PROTOCOL_BUFFER, + Definition: string(protoSource), + }, } - - _, err = schemaClient.CreateSchema(ctx, protoSchemaID2, pubsub.SchemaConfig{ - Type: pubsub.SchemaProtocolBuffer, - Definition: string(protoSource), - }) + schema, err := schemaClient.CreateSchema(ctx, csr) if err != nil { t.Fatalf("createProtoSchema err: %v", err) } - if err := createTopicWithSchema(ioutil.Discard, tc.ProjectID, topicID, protoSchemaID, pubsub.EncodingJSON); err != nil { - t.Fatalf("createTopicWithSchema: %v", err) - } + pubsubClient.TopicAdminClient.CreateTopic(ctx, &pubsubpb.Topic{ + Name: fmt.Sprintf("projects/%s/topics/%s", tc.ProjectID, topicID), + SchemaSettings: &pubsubpb.SchemaSettings{ + Schema: schema.GetName(), + Encoding: pubsubpb.Encoding_BINARY, + }, + }) buf := new(bytes.Buffer) - if err := updateTopicSchema(buf, tc.ProjectID, topicID, schema.RevisionID, schema.RevisionID); err != nil { + if err := updateTopicSchema(buf, tc.ProjectID, topicID, schema.RevisionId, schema.RevisionId); err != nil { t.Fatalf("updateTopicSchema err : %v", err) } } -func defaultSchemaConfig(projectID, schemaID, schemaFile string, schemaType pubsub.SchemaType) (*pubsub.SchemaConfig, error) { - schemaSource, err := ioutil.ReadFile(schemaFile) +func defaultSchemaConfig(projectID, schemaID, schemaFile string, schemaType pubsubpb.Schema_Type) (*pubsubpb.Schema, error) { + schemaSource, err := os.ReadFile(schemaFile) if err != nil { return nil, err } - cfg := &pubsub.SchemaConfig{ + s := &pubsubpb.Schema{ Name: fmt.Sprintf("projects/%s/schemas/%s", projectID, schemaID), Type: schemaType, Definition: string(schemaSource), } - return cfg, nil + return s, nil } diff --git a/pubsub/schemas/subscribe_avro_records.go b/pubsub/schemas/subscribe_avro_records.go index 87719e2e34..97ec6c2d4a 100644 --- a/pubsub/schemas/subscribe_avro_records.go +++ b/pubsub/schemas/subscribe_avro_records.go @@ -23,7 +23,7 @@ import ( "sync" "time" - "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsub/v2" "github.com/linkedin/goavro/v2" ) @@ -46,7 +46,7 @@ func subscribeWithAvroSchema(w io.Writer, projectID, subID, avscFile string) err return fmt.Errorf("goavro.NewCodec err: %w", err) } - sub := client.Subscription(subID) + sub := client.Subscriber(subID) ctx2, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() diff --git a/pubsub/schemas/subscribe_proto_messages.go b/pubsub/schemas/subscribe_proto_messages.go index 5d08ae6ea9..69a501eec8 100644 --- a/pubsub/schemas/subscribe_proto_messages.go +++ b/pubsub/schemas/subscribe_proto_messages.go @@ -22,16 +22,15 @@ import ( "sync" "time" - "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsub/v2" statepb "github.com/GoogleCloudPlatform/golang-samples/internal/pubsub/schemas" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" ) -func subscribeWithProtoSchema(w io.Writer, projectID, subID, protoFile string) error { +func subscribeWithProtoSchema(w io.Writer, projectID, subID string) error { // projectID := "my-project-id" // subID := "my-sub" - // protoFile = "path/to/a/proto/schema/file(.proto)/formatted/in/protocol/buffers" ctx := context.Background() client, err := pubsub.NewClient(ctx, projectID) if err != nil { @@ -41,7 +40,7 @@ func subscribeWithProtoSchema(w io.Writer, projectID, subID, protoFile string) e // Create an instance of the message to be decoded (a single U.S. state). state := &statepb.State{} - sub := client.Subscription(subID) + sub := client.Subscriber(subID) ctx2, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() diff --git a/pubsub/schemas/subscribe_with_avro_schema_revisions.go b/pubsub/schemas/subscribe_with_avro_schema_revisions.go index 63b6319c49..2242556f12 100644 --- a/pubsub/schemas/subscribe_with_avro_schema_revisions.go +++ b/pubsub/schemas/subscribe_with_avro_schema_revisions.go @@ -19,25 +19,25 @@ import ( "context" "fmt" "io" - "strings" "sync" "time" - "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsub/v2" + schema "cloud.google.com/go/pubsub/v2/apiv1" + "cloud.google.com/go/pubsub/v2/apiv1/pubsubpb" "github.com/linkedin/goavro/v2" ) -func subscribeWithAvroSchemaRevisions(w io.Writer, projectID, subID, avscFile string) error { +func subscribeWithAvroSchemaRevisions(w io.Writer, projectID, subID string) error { // projectID := "my-project-id" // topicID := "my-topic" - // avscFile = "path/to/an/avro/schema/file(.avsc)/formatted/in/json" ctx := context.Background() client, err := pubsub.NewClient(ctx, projectID) if err != nil { return fmt.Errorf("pubsub.NewClient: %w", err) } - schemaClient, err := pubsub.NewSchemaClient(ctx, projectID) + schemaClient, err := schema.NewSchemaClient(ctx) if err != nil { return fmt.Errorf("pubsub.NewSchemaClient: %w", err) } @@ -45,7 +45,7 @@ func subscribeWithAvroSchemaRevisions(w io.Writer, projectID, subID, avscFile st // Create the cache for the codecs for different revision IDs. revisionCodecs := make(map[string]*goavro.Codec) - sub := client.Subscription(subID) + sub := client.Subscriber(subID) ctx2, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() @@ -62,10 +62,11 @@ func subscribeWithAvroSchemaRevisions(w io.Writer, projectID, subID, avscFile st // codec. It would be more typical to do this asynchronously, but is // shown here in a synchronous way to ease readability. if !ok { - // Extract just the schema resource name - path := strings.Split(name, "/") - name = path[len(path)-1] - schema, err := schemaClient.Schema(ctx, fmt.Sprintf("%s@%s", name, revision), pubsub.SchemaViewFull) + s := &pubsubpb.GetSchemaRequest{ + Name: fmt.Sprintf("%s@%s", name, revision), + View: pubsubpb.SchemaView_FULL, + } + schema, err := schemaClient.GetSchema(ctx, s) if err != nil { fmt.Fprintf(w, "Nacking, cannot read message without schema: %v\n", err) msg.Nack() diff --git a/pubsub/schemas/update_topic_schema.go b/pubsub/schemas/update_topic_schema.go index d4b00fc8f6..e7737ac21b 100644 --- a/pubsub/schemas/update_topic_schema.go +++ b/pubsub/schemas/update_topic_schema.go @@ -20,12 +20,14 @@ import ( "fmt" "io" - "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsub/v2" + "cloud.google.com/go/pubsub/v2/apiv1/pubsubpb" + "google.golang.org/protobuf/types/known/fieldmaskpb" ) func updateTopicSchema(w io.Writer, projectID, topicID, firstRevisionID, lastRevisionID string) error { // projectID := "my-project-id" - // topicID := "my-topic" + // topicID := "my-topic" // an existing topic that has schema settings attached to it. // firstRevisionID := "my-revision-id" // lastRevisionID := "my-revision-id" ctx := context.Background() @@ -33,18 +35,25 @@ func updateTopicSchema(w io.Writer, projectID, topicID, firstRevisionID, lastRev if err != nil { return fmt.Errorf("pubsub.NewClient: %w", err) } - t := client.Topic(topicID) // This updates the first / last revision ID for the topic's schema. - // To clear the schema entirely, use a zero valued (empty) SchemaSettings. - tc := pubsub.TopicConfigToUpdate{ - SchemaSettings: &pubsub.SchemaSettings{ - FirstRevisionID: firstRevisionID, - LastRevisionID: lastRevisionID, + // To clear the schema entirely, use a zero valued (empty) SchemaSettings + // with the same field mask. + req := &pubsubpb.UpdateTopicRequest{ + Topic: &pubsubpb.Topic{ + Name: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID), + SchemaSettings: &pubsubpb.SchemaSettings{ + FirstRevisionId: firstRevisionID, + LastRevisionId: lastRevisionID, + }, + }, + // Construct a field mask to indicate which field to update in the topic. + // Fields are specified relative to the topic + UpdateMask: &fieldmaskpb.FieldMask{ + Paths: []string{"schema_settings.first_revision_id", "schema_settings.last_revision_id"}, }, } - - gotTopicCfg, err := t.Update(ctx, tc) + gotTopicCfg, err := client.TopicAdminClient.UpdateTopic(ctx, req) if err != nil { fmt.Fprintf(w, "topic.Update err: %v\n", gotTopicCfg) return err