@@ -38,14 +38,13 @@ func ReadCommand(ch *Helper) *cobra.Command {
3838 os .Exit (1 )
3939 }
4040
41- ch .Logger .Log (internal .LOGLEVEL_INFO , "Checking connection" )
42-
4341 psc , err := parseSource (ch .FileReader , readSourceConfigFilePath )
4442 if err != nil {
4543 fmt .Fprintln (cmd .OutOrStdout (), "Please provide path to a valid configuration file" )
4644 return
4745 }
4846
47+ ch .Logger .Log (internal .LOGLEVEL_INFO , "Ensure database" )
4948 if err := ch .EnsureDB (psc ); err != nil {
5049 fmt .Fprintln (cmd .OutOrStdout (), "Unable to connect to PlanetScale Database" )
5150 return
@@ -57,12 +56,14 @@ func ReadCommand(ch *Helper) *cobra.Command {
5756 }
5857 }()
5958
59+ ch .Logger .Log (internal .LOGLEVEL_INFO , "Checking connection" )
6060 cs , err := checkConnectionStatus (ctx , ch .Database , psc )
6161 if err != nil {
6262 ch .Logger .ConnectionStatus (cs )
6363 return
6464 }
6565
66+ ch .Logger .Log (internal .LOGLEVEL_INFO , "Reading catalog" )
6667 catalog , err := readCatalog (readSourceCatalogPath )
6768 if err != nil {
6869 ch .Logger .Error (fmt .Sprintf ("Unable to read catalog: %+v" , err ))
@@ -84,24 +85,27 @@ func ReadCommand(ch *Helper) *cobra.Command {
8485 }
8586 state = string (b )
8687 }
88+
89+ ch .Logger .Log (internal .LOGLEVEL_INFO , "Listing shards" )
8790 shards , err := ch .Database .ListShards (ctx , psc )
8891 if err != nil {
8992 ch .Logger .Error (fmt .Sprintf ("Unable to list shards : %v" , err ))
9093 os .Exit (1 )
9194 }
9295
96+ ch .Logger .Log (internal .LOGLEVEL_INFO , "Reading state" )
9397 syncState , err := readState (state , psc , catalog .Streams , shards , ch .Logger )
9498 if err != nil {
9599 ch .Logger .Error (fmt .Sprintf ("Unable to read state : %v" , err ))
96100 os .Exit (1 )
97101 }
98102
99- for _ , table := range catalog .Streams {
100- keyspaceOrDatabase := table .Stream .Namespace
103+ for _ , configuredStream := range catalog .Streams {
104+ keyspaceOrDatabase := configuredStream .Stream .Namespace
101105 if keyspaceOrDatabase == "" {
102106 keyspaceOrDatabase = psc .Database
103107 }
104- streamStateKey := keyspaceOrDatabase + ":" + table .Stream .Name
108+ streamStateKey := keyspaceOrDatabase + ":" + configuredStream .Stream .Name
105109 streamState , ok := syncState .Streams [streamStateKey ]
106110 if ! ok {
107111 ch .Logger .Error (fmt .Sprintf ("Unable to read state for stream %v" , streamStateKey ))
@@ -111,14 +115,14 @@ func ReadCommand(ch *Helper) *cobra.Command {
111115 for shardName , shardState := range streamState .Shards {
112116 var tc * psdbconnectv1alpha1.TableCursor
113117
114- tc , err = shardState .SerializedCursorToTableCursor (table )
118+ tc , err = shardState .SerializedCursorToTableCursor (configuredStream )
115119 ch .Logger .Log (internal .LOGLEVEL_INFO , fmt .Sprintf ("Using serialized cursor for stream %s" , streamStateKey ))
116120 if err != nil {
117121 ch .Logger .Error (fmt .Sprintf ("Invalid serialized cursor for stream %v, failed with [%v]" , streamStateKey , err ))
118122 os .Exit (1 )
119123 }
120124
121- sc , err := ch .Database .Read (ctx , cmd .OutOrStdout (), psc , table , tc )
125+ sc , err := ch .Database .Read (ctx , cmd .OutOrStdout (), psc , configuredStream , tc )
122126 if err != nil {
123127 ch .Logger .Error (err .Error ())
124128 os .Exit (1 )
0 commit comments