@@ -2,6 +2,7 @@ package main
22
33import (
44 "context"
5+ "errors"
56 "flag"
67 "fmt"
78 "io"
@@ -13,6 +14,7 @@ import (
1314 "syscall"
1415 "time"
1516
17+ "github.com/etherlabsio/healthcheck"
1618 "github.com/prometheus/client_golang/prometheus/promhttp"
1719 "github.com/rs/zerolog"
1820 "github.com/rs/zerolog/log"
@@ -48,7 +50,13 @@ func main() {
4850
4951 metrics .Init ()
5052
51- server := initHTTPServer (cfg .App .ListenAddr )
53+ b , err := bridge .New (cfg , logger )
54+ if err != nil {
55+ logger .Fatal ().Err (err ).Msg ("could not establish bridge from MySQL to Tarantool" )
56+ }
57+
58+ health := initHealthHandler (cfg .App .Health , b )
59+ server := initHTTPServer (cfg .App .ListenAddr , health )
5260 go func () {
5361 logger .Info ().Msgf ("listening on %s" , cfg .App .ListenAddr )
5462
@@ -58,14 +66,9 @@ func main() {
5866 }
5967 }()
6068
61- b , err := bridge .New (cfg , logger )
62- if err != nil {
63- logger .Fatal ().Err (err ).Msg ("could not establish bridge from MySQL to Tarantool" )
64- }
65-
6669 go func () {
67- errors := b .Run ()
68- for errRun := range errors {
70+ runErrors := b .Run ()
71+ for errRun := range runErrors {
6972 logger .Err (errRun ).Msg ("got sync error" )
7073 }
7174
@@ -155,14 +158,52 @@ func newRollingLogFile(cfg *config.Logging) (io.Writer, error) {
155158 }, nil
156159}
157160
158- func initHTTPServer (addr string ) * http.Server {
161+ func initHTTPServer (addr string , health http. Handler ) * http.Server {
159162 server := & http.Server {
160163 Addr : addr ,
161164 ReadTimeout : 5 * time .Second , //nolint:gomnd
162165 WriteTimeout : 5 * time .Second , //nolint:gomnd
163166 }
164167
165168 http .Handle ("/metrics" , promhttp .Handler ())
169+ http .Handle ("/health" , health )
166170
167171 return server
168172}
173+
174+ func initHealthHandler (cfg config.Health , b * bridge.Bridge ) http.Handler {
175+ sbm := uint32 (cfg .SecondsBehindMaster )
176+
177+ return healthcheck .Handler (
178+ healthcheck .WithChecker (
179+ "lag" , healthcheck .CheckerFunc (
180+ func (ctx context.Context ) error {
181+ cur := b .Delay ()
182+ if cur > sbm {
183+ return fmt .Errorf ("replication lag too big: %d" , cur )
184+ }
185+
186+ return nil
187+ },
188+ ),
189+ ),
190+
191+ healthcheck .WithChecker (
192+ "state" , healthcheck .CheckerFunc (
193+ func (ctx context.Context ) error {
194+ dumping := b .Dumping ()
195+ if dumping {
196+ return errors .New ("replicator has not yet finished dump process" )
197+ }
198+
199+ running := b .Running ()
200+ if ! running {
201+ return errors .New ("replication is not running" )
202+ }
203+
204+ return nil
205+ },
206+ ),
207+ ),
208+ )
209+ }
0 commit comments