@@ -6,6 +6,42 @@ configDotenv()
66import amqp from 'amqplib'
77import k8s from '@kubernetes/client-node' ;
88
9+ const LOG_LEVEL = ( process . env . LOG_LEVEL || 'info' ) . toLowerCase ( ) ;
10+ const LOG_LEVELS = { error : 0 , warn : 1 , info : 2 , debug : 3 } ;
11+ const shouldLog = ( level ) => LOG_LEVELS [ level ] <= ( LOG_LEVELS [ LOG_LEVEL ] ?? 2 ) ;
12+ const log = ( level , ...args ) => {
13+ if ( ! shouldLog ( level ) ) return ;
14+ const ts = new Date ( ) . toISOString ( ) ;
15+ const out = level === 'error' ? console . error : console . log ;
16+ out ( `[${ ts } ] [${ level . toUpperCase ( ) } ]` , ...args ) ;
17+ } ;
18+
19+ const getPodErrorReason = ( pod ) => {
20+ const waitingReasons = new Set ( [
21+ 'ErrImagePull' ,
22+ 'ImagePullBackOff' ,
23+ 'CrashLoopBackOff' ,
24+ 'CreateContainerConfigError' ,
25+ 'InvalidImageName'
26+ ] ) ;
27+ const containerStatuses = pod ?. status ?. containerStatuses || [ ] ;
28+ for ( const cs of containerStatuses ) {
29+ const reason = cs ?. state ?. waiting ?. reason ;
30+ if ( reason && waitingReasons . has ( reason ) ) {
31+ return reason ;
32+ }
33+ }
34+
35+ const conditions = pod ?. status ?. conditions || [ ] ;
36+ for ( const c of conditions ) {
37+ if ( c ?. reason === 'Unschedulable' ) {
38+ return 'Unschedulable' ;
39+ }
40+ }
41+
42+ return null ;
43+ } ;
44+
945
1046
1147
@@ -40,12 +76,14 @@ const checkIfPodExists = async (podName) => {
4076
4177 } catch ( error ) {
4278 // console.log(error.body);
43- if ( error . body . reason == 'NotFound' ) {
79+ if ( error ? .body ? .reason == 'NotFound' ) {
4480 // Provison new POD
4581 return { found : false }
4682 }
4783 }
4884
85+ return { found : false , status : 'Unknown' }
86+
4987}
5088const deploy = async ( name ,
5189 env
@@ -97,19 +135,28 @@ const deploy = async (name,
97135 const namespace = 'hypermine-development'
98136 const res = await k8sApi . readNamespacedPod ( name , namespace ) ;
99137 const pod = res . body
100- console . log ( `Name: ${ pod . metadata . name } ` ) ;
101- console . log ( `Namespace: ${ pod . metadata . namespace } ` ) ;
102- console . log ( ` Status: ${ pod . status . phase } ` ) ;
103- console . log ( ` Containers: ${ pod . spec . containers . map ( container => container . name ) . join ( ', ' ) } ` ) ;
104- console . log ( ` Conditions: ${ pod . status . conditions . map ( c => `${ c . type } : ${ c . status } ` ) . join ( ', ' ) } ` ) ;
105- console . log ( '---' ) ;
106- console . log ( pod . status . phase ) ;
138+ log ( 'debug' , `Pod ${ pod . metadata . name } in ${ pod . metadata . namespace } is ${ pod . status . phase } ` ) ;
139+
140+ const errorReason = getPodErrorReason ( pod ) ;
141+ if ( errorReason ) {
142+ clearInterval ( interval )
143+ set . delete ( pod . metadata . name )
144+ await k8sApi . deleteNamespacedPod ( name , "hypermine-development" )
145+ delete globalThis [ name ]
146+
147+ log ( 'warn' , `Pod ${ name } deleted due to error: ${ errorReason } ` ) ;
148+ return ;
149+ }
150+
107151 if ( pod . status . phase === 'Pending' ) {
108- console . log ( globalThis [ name ] ) ;
109- console . log ( name ) ;
152+ log ( 'debug' , `Pod ${ name } pending count: ${ globalThis [ name ] } ` ) ;
110153 if ( globalThis [ name ] > 10 ) {
154+ clearInterval ( interval )
111155 set . delete ( pod . metadata . name )
112156 await k8sApi . deleteNamespacedPod ( name , "hypermine-development" )
157+ delete globalThis [ name ]
158+
159+ log ( 'warn' , `Pod ${ name } deleted after pending timeout` ) ;
113160
114161 }
115162 globalThis [ name ] ++
@@ -119,10 +166,11 @@ const deploy = async (name,
119166
120167
121168 const data = await k8sApi . deleteNamespacedPod ( name , "hypermine-development" )
122- const intervals = set . get ( name )
123169 clearInterval ( interval )
124170
125171 set . delete ( name )
172+ delete globalThis [ name ]
173+ log ( 'info' , `Pod ${ name } completed with status ${ pod . status . phase } and was deleted` ) ;
126174 }
127175
128176 } , 5000 )
@@ -132,7 +180,7 @@ const deploy = async (name,
132180 // const data = await k8sApi.deleteNamespacedPod("txn-processor-wallet", "hypermine-development")
133181 // console.log(data.body.status.phase);
134182 } catch ( err ) {
135- console . error ( err ) ;
183+ log ( 'error' , err ) ;
136184 }
137185} ;
138186
@@ -142,7 +190,7 @@ const queueName = process.env.GLOBAL_TXN_CONTROLLER_QUEUE || 'GLOBAL_TXN_CONTROL
142190
143191( async ( ) => {
144192 try {
145- console . log ( " Start Service" ) ;
193+ log ( 'info' , ' Start Service' ) ;
146194
147195 const namespace = 'hypermine-development'
148196
@@ -156,11 +204,14 @@ const queueName = process.env.GLOBAL_TXN_CONTROLLER_QUEUE || 'GLOBAL_TXN_CONTROL
156204 } )
157205 await channel . consume ( queueName , async ( message ) => {
158206 let queueMsg ;
159- console . log ( "Trying to consume" )
207+ log ( 'debug' , 'Trying to consume' )
208+
209+ if ( ! message ) {
210+ return
211+ }
160212
161213 try {
162214
163- console . log ( message ) ;
164215 const msg = message . content . toString ( )
165216 const parsedMessage = JSON . parse ( msg )
166217 queueMsg = {
@@ -173,34 +224,37 @@ const queueName = process.env.GLOBAL_TXN_CONTROLLER_QUEUE || 'GLOBAL_TXN_CONTROL
173224 // parse and create a pod to kubernetes
174225
175226 const { found, status } = await checkIfPodExists ( podName )
176- if ( found == 'Succeeded' ) {
227+ if ( found && ( status === 'Succeeded' || status === 'Failed' ) ) {
177228 await k8sApi . deleteNamespacedPod ( podName , "hypermine-development" )
178229 }
179- if ( found && status !== "Succeeded" ) {
180-
230+ if ( found && status !== "Succeeded" && status !== "Failed" ) {
231+ log ( 'info' , `Pod ${ podName } already exists with status ${ status } ` ) ;
232+ channel . ack ( message )
181233 return
182234 } else {
183235
184236 await deploy ( podName , queueMsg )
185237 channel . ack ( message )
186238
239+ log ( 'info' , `Pod ${ podName } deployment requested` ) ;
187240 }
188241
189242 } catch ( error ) {
190- console . log ( error . message ) ;
243+ log ( 'error' , error . message ) ;
244+ channel . nack ( message , false , false )
191245
192246 }
193247
194248 } )
195249
196250 process . on ( 'SIGINT' , async ( ) => {
197- console . log ( 'Closing RabbitMQ connection...' ) ;
251+ log ( 'info' , 'Closing RabbitMQ connection...' ) ;
198252 await channel . close ( ) ;
199253 await connection . close ( ) ;
200254 process . exit ( 0 ) ;
201255 } ) ;
202256 } catch ( error ) {
203- console . log ( error . message )
257+ log ( 'error' , error . message )
204258 }
205259} ) ( )
206260
0 commit comments