@@ -66,7 +66,7 @@ PG_FUNCTION_INFO_V1(sqlite_fdw_disconnect);
6666PG_FUNCTION_INFO_V1 (sqlite_fdw_disconnect_all );
6767
6868static void sqlite_make_new_connection (ConnCacheEntry * entry , ForeignServer * server );
69- void sqlite_do_sql_command (sqlite3 * conn , const char * sql , int level );
69+ void sqlite_do_sql_command (sqlite3 * conn , const char * sql , int level , List * * busy_connection );
7070static void sqlite_begin_remote_xact (ConnCacheEntry * entry );
7171static void sqlitefdw_xact_callback (XactEvent event , void * arg );
7272static void sqlitefdw_reset_xact_state (ConnCacheEntry * entry , bool toplevel );
@@ -75,13 +75,20 @@ static void sqlitefdw_subxact_callback(SubXactEvent event,
7575 SubTransactionId parentSubid ,
7676 void * arg );
7777static void sqlitefdw_inval_callback (Datum arg , int cacheid , uint32 hashvalue );
78- static void sqlitefdw_abort_cleanup (ConnCacheEntry * entry , bool toplevel );
78+ static void sqlitefdw_abort_cleanup (ConnCacheEntry * entry , bool toplevel , List * * busy_connection );
7979#if PG_VERSION_NUM >= 140000
8080static bool sqlite_disconnect_cached_connections (Oid serverid );
8181#endif
8282static void sqlite_finalize_list_stmt (List * * list );
8383static List * sqlite_append_stmt_to_list (List * list , sqlite3_stmt * stmt );
8484
85+ typedef struct BusyHandlerArg
86+ {
87+ sqlite3 * conn ;
88+ const char * sql ;
89+ int level ;
90+ } BusyHandlerArg ;
91+
8592/*
8693 * sqlite_get_connection:
8794 * Get a connection which can be used to execute queries on
@@ -264,18 +271,33 @@ sqlite_cleanup_connection(void)
264271 }
265272}
266273
267-
268274/*
269275 * Convenience subroutine to issue a non-data-returning SQL command to remote
270276 */
271277void
272- sqlite_do_sql_command (sqlite3 * conn , const char * sql , int level )
278+ sqlite_do_sql_command (sqlite3 * conn , const char * sql , int level , List * * busy_connection )
273279{
274280 char * err = NULL ;
281+ int rc ;
275282
276283 elog (DEBUG3 , "sqlite_fdw do_sql_command %s" , sql );
277284
278- if (sqlite3_exec (conn , sql , NULL , NULL , & err ) != SQLITE_OK )
285+ rc = sqlite3_exec (conn , sql , NULL , NULL , & err );
286+
287+ if (busy_connection && rc == SQLITE_BUSY )
288+ {
289+ /* Busy case will be handled later, not here */
290+ BusyHandlerArg * arg = palloc0 (sizeof (BusyHandlerArg ));
291+
292+ arg -> conn = conn ;
293+ arg -> sql = sql ;
294+ arg -> level = level ;
295+ * busy_connection = lappend (* busy_connection , arg );
296+
297+ return ;
298+ }
299+
300+ if (rc != SQLITE_OK )
279301 {
280302 char * perr = NULL ;
281303
@@ -319,7 +341,7 @@ sqlite_begin_remote_xact(ConnCacheEntry *entry)
319341
320342 sql = "BEGIN" ;
321343
322- sqlite_do_sql_command (entry -> conn , sql , ERROR );
344+ sqlite_do_sql_command (entry -> conn , sql , ERROR , NULL );
323345 entry -> xact_depth = 1 ;
324346
325347 }
@@ -334,7 +356,7 @@ sqlite_begin_remote_xact(ConnCacheEntry *entry)
334356 char sql [64 ];
335357
336358 snprintf (sql , sizeof (sql ), "SAVEPOINT s%d" , entry -> xact_depth + 1 );
337- sqlite_do_sql_command (entry -> conn , sql , ERROR );
359+ sqlite_do_sql_command (entry -> conn , sql , ERROR , NULL );
338360 entry -> xact_depth ++ ;
339361 }
340362}
@@ -377,6 +399,8 @@ sqlitefdw_xact_callback(XactEvent event, void *arg)
377399{
378400 HASH_SEQ_STATUS scan ;
379401 ConnCacheEntry * entry ;
402+ ListCell * lc ;
403+ List * busy_connection = NIL ;
380404
381405 /* Quick exit if no connections were touched in this transaction. */
382406 if (!xact_got_connection )
@@ -408,7 +432,7 @@ sqlitefdw_xact_callback(XactEvent event, void *arg)
408432
409433 /* Commit all remote transactions during pre-commit */
410434 if (!sqlite3_get_autocommit (entry -> conn ))
411- sqlite_do_sql_command (entry -> conn , "COMMIT" , ERROR );
435+ sqlite_do_sql_command (entry -> conn , "COMMIT" , ERROR , & busy_connection );
412436 /* Finalize all prepared statements */
413437 sqlite_finalize_list_stmt (& entry -> stmtList );
414438 break ;
@@ -436,7 +460,7 @@ sqlitefdw_xact_callback(XactEvent event, void *arg)
436460 case XACT_EVENT_PARALLEL_ABORT :
437461 case XACT_EVENT_ABORT :
438462 {
439- sqlitefdw_abort_cleanup (entry , true);
463+ sqlitefdw_abort_cleanup (entry , true, & busy_connection );
440464 break ;
441465 }
442466 }
@@ -446,6 +470,20 @@ sqlitefdw_xact_callback(XactEvent event, void *arg)
446470 sqlitefdw_reset_xact_state (entry , true);
447471 }
448472
473+ /* Execute again the query after server is available */
474+ foreach (lc , busy_connection )
475+ {
476+ BusyHandlerArg * arg = lfirst (lc );
477+
478+ /*
479+ * If there is still error, we can not do anything more, just raise it.
480+ * requireBusyHandler is set to false, and NULL busy_connection list.
481+ */
482+ sqlite_do_sql_command (arg -> conn , arg -> sql , arg -> level , NULL );
483+ }
484+
485+ list_free (busy_connection );
486+
449487 /*
450488 * Regardless of the event type, we can now mark ourselves as out of the
451489 * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
@@ -491,6 +529,8 @@ sqlitefdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
491529 HASH_SEQ_STATUS scan ;
492530 ConnCacheEntry * entry ;
493531 int curlevel ;
532+ ListCell * lc ;
533+ List * busy_connection = NIL ;
494534
495535 /* Nothing to do at subxact start, nor after commit. */
496536 if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
@@ -529,7 +569,7 @@ sqlitefdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
529569 {
530570 /* Commit all remote subtransactions during pre-commit */
531571 snprintf (sql , sizeof (sql ), "RELEASE SAVEPOINT s%d" , curlevel );
532- sqlite_do_sql_command (entry -> conn , sql , ERROR );
572+ sqlite_do_sql_command (entry -> conn , sql , ERROR , & busy_connection );
533573
534574 }
535575 else if (in_error_recursion_trouble ())
@@ -542,12 +582,26 @@ sqlitefdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
542582 else
543583 {
544584 /* Rollback all remote subtransactions during abort */
545- sqlitefdw_abort_cleanup (entry , false);
585+ sqlitefdw_abort_cleanup (entry , false, & busy_connection );
546586 }
547587
548588 /* OK, we're outta that level of subtransaction */
549589 sqlitefdw_reset_xact_state (entry , false);
550590 }
591+
592+ /* Execute again the query after server is available */
593+ foreach (lc , busy_connection )
594+ {
595+ BusyHandlerArg * arg = lfirst (lc );
596+
597+ /*
598+ * If there is still error, we can not do anything more, just raise it.
599+ * requireBusyHandler is set to false, and NULL busy_connection list.
600+ */
601+ sqlite_do_sql_command (arg -> conn , arg -> sql , arg -> level , NULL );
602+ }
603+
604+ list_free (busy_connection );
551605}
552606
553607/*
@@ -812,7 +866,7 @@ sqlite_fdw_disconnect_all(PG_FUNCTION_ARGS)
812866 * rollbacked, false otherwise.
813867 */
814868static void
815- sqlitefdw_abort_cleanup (ConnCacheEntry * entry , bool toplevel )
869+ sqlitefdw_abort_cleanup (ConnCacheEntry * entry , bool toplevel , List * * busy_connection )
816870{
817871 if (toplevel )
818872 {
@@ -826,7 +880,7 @@ sqlitefdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
826880 * already rollback
827881 */
828882 if (!sqlite3_get_autocommit (entry -> conn ))
829- sqlite_do_sql_command (entry -> conn , "ROLLBACK" , WARNING );
883+ sqlite_do_sql_command (entry -> conn , "ROLLBACK" , WARNING , busy_connection );
830884 }
831885 else
832886 {
@@ -836,7 +890,7 @@ sqlitefdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
836890 "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d" ,
837891 curlevel , curlevel );
838892 if (!sqlite3_get_autocommit (entry -> conn ))
839- sqlite_do_sql_command (entry -> conn , sql , ERROR );
893+ sqlite_do_sql_command (entry -> conn , sql , ERROR , busy_connection );
840894 }
841895}
842896
0 commit comments