From dcfdcd62e6b49a1741cb48fd7e4101ec6b04b8a3 Mon Sep 17 00:00:00 2001 From: Richard Case <198425+richardcase@users.noreply.github.com> Date: Thu, 22 Aug 2019 11:59:44 +0100 Subject: [PATCH] feat: Implemented additional REST API methods Added support for the following: - Getting the status of a specific task for a connector - Restarting a specific task for a connector - Getting a list of connector plugins Signed-off-by: Richard Case <198425+richardcase@users.noreply.github.com> --- connectors.go | 18 +++++++++ connectors_test.go | 95 ++++++++++++++++++++++++++++++++++++++++++++++ plugins.go | 21 ++++++++++ plugins_test.go | 51 +++++++++++++++++++++++++ 4 files changed, 185 insertions(+) create mode 100644 plugins.go create mode 100644 plugins_test.go diff --git a/connectors.go b/connectors.go index 12b99af..9b3dde0 100644 --- a/connectors.go +++ b/connectors.go @@ -148,6 +148,16 @@ func (c *Client) UpdateConnectorConfig(name string, config ConnectorConfig) (*Co return connector, response, err } +// GetConnectorTaskStatus gets the status of task for a connector. +// +// See: https://docs.confluent.io/current/connect/references/restapi.html#get--connectors-(string-name)-tasks-(int-taskid)-status +func (c *Client) GetConnectorTaskStatus(name string, taskID int) (*TaskState, *http.Response, error) { + path := fmt.Sprintf("connectors/%v/tasks/%v/status", name, taskID) + status := new(TaskState) + respnse, err := c.get(path, status) + return status, respnse, err +} + // DeleteConnector deletes a connector with the given name, halting all tasks // and deleting its configuration. // @@ -182,3 +192,11 @@ func (c *Client) RestartConnector(name string) (*http.Response, error) { path := fmt.Sprintf("connectors/%v/restart", name) return c.doRequest("POST", path, nil, nil) } + +// RestartConnectorTask restarts a tasks for a connector. +// +// See https://docs.confluent.io/current/connect/references/restapi.html#post--connectors-(string-name)-tasks-(int-taskid)-restart +func (c *Client) RestartConnectorTask(name string, taskID int) (*http.Response, error) { + path := fmt.Sprintf("connectors/%v/tasks/%v/restart", name, taskID) + return c.doRequest("POST", path, nil, nil) +} diff --git a/connectors_test.go b/connectors_test.go index 8c9a484..7692126 100644 --- a/connectors_test.go +++ b/connectors_test.go @@ -422,6 +422,50 @@ var _ = Describe("Connector CRUD", func() { }) }) }) + + Describe("GetConnectorTaskStatus", func() { + var statusCode int + resultStatus := &TaskState{ + ID: 0, + State: "RUNNING", + WorkerID: "127.0.0.1:8083", + } + + BeforeEach(func() { + server.AppendHandlers( + ghttp.CombineHandlers( + ghttp.VerifyRequest("GET", "/connectors/local-file-source/tasks/0/status"), + ghttp.VerifyHeader(jsonAcceptHeader), + ghttp.RespondWithJSONEncodedPtr(&statusCode, &resultStatus), + ), + ) + }) + + Context("when existing task id is given", func() { + BeforeEach(func() { + statusCode = http.StatusOK + }) + + It("returns connector task status", func() { + status, _, err := client.GetConnectorTaskStatus("local-file-source", 0) + Expect(err).NotTo(HaveOccurred()) + Expect(status).To(Equal(resultStatus)) + }) + }) + + Context("when nonexisting task id is given", func() { + BeforeEach(func() { + statusCode = http.StatusNotFound + }) + + It("returns an error response", func() { + status, resp, err := client.GetConnectorTaskStatus("local-file-source", 0) + Expect(err).To(HaveOccurred()) + Expect(*status).To(BeZero()) + Expect(resp.StatusCode).To(Equal(http.StatusNotFound)) + }) + }) + }) }) var _ = Describe("Connector Lifecycle", func() { @@ -560,4 +604,55 @@ var _ = Describe("Connector Lifecycle", func() { }) }) }) + + Describe("RestartConnectorTask", func() { + var statusCode int + + BeforeEach(func() { + server.AppendHandlers( + ghttp.CombineHandlers( + ghttp.VerifyRequest("POST", "/connectors/local-file-source/tasks/0/restart"), + ghttp.VerifyHeader(jsonAcceptHeader), + ghttp.RespondWithPtr(&statusCode, nil), + ), + ) + }) + + Context("when existing task id is given", func() { + BeforeEach(func() { + statusCode = http.StatusOK + }) + + It("restarts connector", func() { + resp, err := client.RestartConnectorTask("local-file-source", 0) + Expect(err).NotTo(HaveOccurred()) + Expect(resp.StatusCode).To(Equal(http.StatusOK)) + }) + + Context("when rebalance is in process", func() { + BeforeEach(func() { + statusCode = http.StatusConflict + }) + + It("returns error with a conflict response", func() { + resp, err := client.RestartConnectorTask("local-file-source", 0) + Expect(err).To(HaveOccurred()) + Expect(resp.StatusCode).To(Equal(http.StatusConflict)) + }) + }) + }) + + Context("when nonexisting task id is given", func() { + BeforeEach(func() { + // The API actually throws a 500 on POST to nonexistent + statusCode = http.StatusInternalServerError + }) + + It("returns error with a server error response", func() { + resp, err := client.RestartConnectorTask("local-file-source", 0) + Expect(err).To(HaveOccurred()) + Expect(resp.StatusCode).To(Equal(http.StatusInternalServerError)) + }) + }) + }) }) diff --git a/plugins.go b/plugins.go new file mode 100644 index 0000000..f4075ab --- /dev/null +++ b/plugins.go @@ -0,0 +1,21 @@ +package connect + +import "net/http" + +// Plugin represents a Kafka Connect connector plugin +type Plugin struct { + Class string `json:"class"` + Type string `json:"type"` + Version string `json:"version"` +} + +// ListPlugins retrieves a list of the installed plugins. +// +// See: https://docs.confluent.io/current/connect/references/restapi.html#get--connector-plugins- +func (c *Client) ListPlugins() ([]*Plugin, *http.Response, error) { + path := "connector-plugins" + var names []*Plugin + + response, err := c.get(path, &names) + return names, response, err +} diff --git a/plugins_test.go b/plugins_test.go new file mode 100644 index 0000000..b454a1b --- /dev/null +++ b/plugins_test.go @@ -0,0 +1,51 @@ +package connect_test + +import ( + "net/http" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/onsi/gomega/ghttp" + + . "github.com/go-kafka/connect" +) + +var _ = Describe("Plugins Tests", func() { + BeforeEach(func() { + server = ghttp.NewServer() + client = NewClient(server.URL()) + }) + + AfterEach(func() { + server.Close() + }) + + Describe("ListPlugins", func() { + var statusCode int + resultPlugins := []*Plugin{ + &Plugin{ + Class: "test-class", + Type: "source", + Version: "5.3.0", + }, + } + + BeforeEach(func() { + statusCode = http.StatusOK + + server.AppendHandlers( + ghttp.CombineHandlers( + ghttp.VerifyRequest("GET", "/connector-plugins"), + ghttp.VerifyHeader(jsonAcceptHeader), + ghttp.RespondWithJSONEncodedPtr(&statusCode, &resultPlugins), + ), + ) + }) + + It("returns list of connector plugins", func() { + plugins, _, err := client.ListPlugins() + Expect(err).NotTo(HaveOccurred()) + Expect(plugins).To(Equal(resultPlugins)) + }) + }) +})