@@ -54,22 +54,27 @@ public Void execute(MultiDataSourceJdbcResource jdbcResource) throws MetaExcepti
5454 param = new MapSqlParameterSource ()
5555 .addValue ("id" , info .id )
5656 .addValue ("succeeded" , Character .toString (SUCCEEDED_STATE ), Types .CHAR );
57- jdbcTemplate .update (
58- "INSERT INTO \" COMPLETED_COMPACTIONS\" (\" CC_ID\" , \" CC_DATABASE\" , "
59- + "\" CC_TABLE\" , \" CC_PARTITION\" , \" CC_STATE\" , \" CC_TYPE\" , \" CC_TBLPROPERTIES\" , \" CC_WORKER_ID\" , "
60- + "\" CC_START\" , \" CC_END\" , \" CC_RUN_AS\" , \" CC_HIGHEST_WRITE_ID\" , \" CC_META_INFO\" , "
61- + "\" CC_HADOOP_JOB_ID\" , \" CC_ERROR_MESSAGE\" , \" CC_ENQUEUE_TIME\" , "
62- + "\" CC_WORKER_VERSION\" , \" CC_INITIATOR_ID\" , \" CC_INITIATOR_VERSION\" , "
63- + "\" CC_NEXT_TXN_ID\" , \" CC_TXN_ID\" , \" CC_COMMIT_TIME\" , \" CC_POOL_NAME\" , \" CC_NUMBER_OF_BUCKETS\" ,"
64- + "\" CC_ORDER_BY\" ) "
65- + "SELECT \" CQ_ID\" , \" CQ_DATABASE\" , \" CQ_TABLE\" , \" CQ_PARTITION\" , "
66- + ":succeeded, \" CQ_TYPE\" , \" CQ_TBLPROPERTIES\" , \" CQ_WORKER_ID\" , \" CQ_START\" , "
67- + getEpochFn (jdbcResource .getDatabaseProduct ()) + ", \" CQ_RUN_AS\" , \" CQ_HIGHEST_WRITE_ID\" , \" CQ_META_INFO\" , "
68- + "\" CQ_HADOOP_JOB_ID\" , \" CQ_ERROR_MESSAGE\" , \" CQ_ENQUEUE_TIME\" , "
69- + "\" CQ_WORKER_VERSION\" , \" CQ_INITIATOR_ID\" , \" CQ_INITIATOR_VERSION\" , "
70- + "\" CQ_NEXT_TXN_ID\" , \" CQ_TXN_ID\" , \" CQ_COMMIT_TIME\" , \" CQ_POOL_NAME\" , \" CQ_NUMBER_OF_BUCKETS\" , "
71- + "\" CQ_ORDER_BY\" "
72- + "FROM \" COMPACTION_QUEUE\" WHERE \" CQ_ID\" = :id" , param );
57+ jdbcTemplate .update ("""
58+ INSERT INTO "COMPLETED_COMPACTIONS"(
59+ "CC_ID", "CC_DATABASE", "CC_TABLE", "CC_PARTITION",
60+ "CC_STATE", "CC_TYPE", "CC_TBLPROPERTIES", "CC_WORKER_ID",
61+ "CC_START", "CC_END", "CC_RUN_AS", "CC_HIGHEST_WRITE_ID", "CC_META_INFO",
62+ "CC_HADOOP_JOB_ID", "CC_ERROR_MESSAGE", "CC_ENQUEUE_TIME",
63+ "CC_WORKER_VERSION", "CC_INITIATOR_ID", "CC_INITIATOR_VERSION",
64+ "CC_NEXT_TXN_ID", "CC_TXN_ID", "CC_COMMIT_TIME", "CC_POOL_NAME", "CC_NUMBER_OF_BUCKETS",
65+ "CC_ORDER_BY")
66+ SELECT
67+ "CQ_ID", "CQ_DATABASE", "CQ_TABLE", "CQ_PARTITION",
68+ :succeeded, "CQ_TYPE", "CQ_TBLPROPERTIES", "CQ_WORKER_ID",
69+ "CQ_START", %s, "CQ_RUN_AS", "CQ_HIGHEST_WRITE_ID", "CQ_META_INFO",
70+ "CQ_HADOOP_JOB_ID", "CQ_ERROR_MESSAGE", "CQ_ENQUEUE_TIME",
71+ "CQ_WORKER_VERSION", "CQ_INITIATOR_ID", "CQ_INITIATOR_VERSION",
72+ "CQ_NEXT_TXN_ID", "CQ_TXN_ID", "CQ_COMMIT_TIME", "CQ_POOL_NAME", "CQ_NUMBER_OF_BUCKETS",
73+ "CQ_ORDER_BY"
74+ FROM "COMPACTION_QUEUE"
75+ WHERE "CQ_ID" = :id""" .formatted (
76+ getEpochFn (jdbcResource .getDatabaseProduct ())),
77+ param );
7378 }
7479
7580 /* Remove compaction queue record corresponding to the compaction which has been successful as well as
@@ -84,22 +89,30 @@ public Void execute(MultiDataSourceJdbcResource jdbcResource) throws MetaExcepti
8489 // Remove entries from completed_txn_components as well, so we don't start looking there
8590 // again but only up to the highest write ID include in this compaction job.
8691 //highestWriteId will be NULL in upgrade scenarios
87- String query = "DELETE FROM \" COMPLETED_TXN_COMPONENTS\" WHERE \" CTC_DATABASE\" = :db AND \" CTC_TABLE\" = :table" ;
92+ String deleteQuery = """
93+ DELETE FROM "COMPLETED_TXN_COMPONENTS" WHERE "CTC_DATABASE" = :db AND "CTC_TABLE" = :table
94+ """ ;
8895 if (info .partName != null ) {
89- query += " AND \" CTC_PARTITION\" = :partition" ;
96+ deleteQuery += """
97+ AND "CTC_PARTITION" = :partition
98+ """ ;
9099 }
91100 if (info .highestWriteId != 0 ) {
92- query += " AND \" CTC_WRITEID\" <= :writeId" ;
101+ deleteQuery += """
102+ AND "CTC_WRITEID" <= :writeId
103+ """ ;
93104 }
94105 param = new MapSqlParameterSource ()
95106 .addValue ("db" , info .dbname )
96107 .addValue ("table" , info .tableName )
97108 .addValue ("writeId" , info .highestWriteId );
109+
98110 if (info .partName != null ) {
99111 param .addValue ("partition" , info .partName );
100112 }
101- LOG .debug ("Going to execute update <{}>" , query );
102- int updCount = jdbcTemplate .update (query , param );
113+ LOG .debug ("Going to execute update <{}>" , deleteQuery );
114+ int updCount = jdbcTemplate .update (deleteQuery , param );
115+
103116 if (updCount < 1 ) {
104117 LOG .warn ("Expected to remove at least one row from completed_txn_components when " +
105118 "marking compaction entry as clean!" );
@@ -119,31 +132,34 @@ private void removeTxnComponents(CompactionInfo info, MultiDataSourceJdbcResourc
119132 * aborted TXN_COMPONENTS above tc_writeid (and consequently about aborted txns).
120133 * See {@link ql.txn.compactor.Cleaner.removeFiles()}
121134 */
122-
123135 MapSqlParameterSource params = new MapSqlParameterSource ()
124136 .addValue ("state" , TxnStatus .ABORTED .getSqlConst (), Types .CHAR )
125137 .addValue ("db" , info .dbname )
126138 .addValue ("table" , info .tableName )
127- .addValue ("partition" , info .partName );
139+ .addValue ("partition" , info .partName , Types .VARCHAR );
140+
141+ String deleteQuery = """
142+ DELETE FROM "TXN_COMPONENTS"
143+ WHERE "TC_TXNID" IN (
144+ SELECT "TXN_ID" FROM "TXNS" WHERE "TXN_STATE" = :state
145+ )
146+ AND "TC_DATABASE" = :db AND "TC_TABLE" = :table
147+ AND (:partition is NULL OR "TC_PARTITION" = :partition)
148+ AND "TC_WRITEID" %s
149+ """ ;
128150
129151 int totalCount = 0 ;
130152 if (!info .hasUncompactedAborts && info .highestWriteId != 0 ) {
131153 totalCount = jdbcResource .getJdbcTemplate ().update (
132- "DELETE FROM \" TXN_COMPONENTS\" WHERE \" TC_TXNID\" IN ( "
133- + "SELECT \" TXN_ID\" FROM \" TXNS\" WHERE \" TXN_STATE\" = :state) "
134- + "AND \" TC_DATABASE\" = :db AND \" TC_TABLE\" = :table "
135- + "AND (:partition is NULL OR \" TC_PARTITION\" = :partition) "
136- + "AND \" TC_WRITEID\" <= :id" ,
154+ deleteQuery .formatted ("<= :id" ),
137155 params .addValue ("id" , info .highestWriteId ));
156+
138157 } else if (CollectionUtils .isNotEmpty (info .writeIds )) {
139- params .addValue ("ids" , new ArrayList <>(info .writeIds ));
140158 totalCount = jdbcResource .execute (new InClauseBatchCommand <>(
141- "DELETE FROM \" TXN_COMPONENTS\" WHERE \" TC_TXNID\" IN ( "
142- + "SELECT \" TXN_ID\" FROM \" TXNS\" WHERE \" TXN_STATE\" = :state) "
143- + "AND \" TC_DATABASE\" = :db AND \" TC_TABLE\" = :table "
144- + "AND (:partition is NULL OR \" TC_PARTITION\" = :partition) "
145- + "AND \" TC_WRITEID\" IN (:ids)" ,
146- params , "ids" , Long ::compareTo ));
159+ deleteQuery .formatted ("IN (:ids)" ),
160+ params .addValue ("ids" , new ArrayList <>(info .writeIds )),
161+ "ids" ,
162+ Long ::compareTo ));
147163 }
148164 LOG .debug ("Removed {} records from txn_components" , totalCount );
149165 }
@@ -156,20 +172,24 @@ private void removeCompactionAndAbortRetryEntries(CompactionInfo info, NamedPara
156172 }
157173
158174 MapSqlParameterSource params = new MapSqlParameterSource ("id" , info .id );
159- String query ;
160- if (info .isAbortedTxnCleanup ()) {
161- query = "DELETE FROM \" COMPACTION_QUEUE\" WHERE \" CQ_ID\" = :id" ;
162- } else {
163- query = "DELETE FROM \" COMPACTION_QUEUE\" WHERE \" CQ_ID\" = :id " +
164- "OR (\" CQ_DATABASE\" = :db AND \" CQ_TABLE\" = :table AND \" CQ_TYPE\" = :type AND (:partition is NULL OR \" CQ_PARTITION\" = :partition))" ;
175+ String deleteQuery = """
176+ DELETE FROM "COMPACTION_QUEUE" WHERE "CQ_ID" = :id
177+ """ ;
178+ if (!info .isAbortedTxnCleanup ()) {
179+ deleteQuery += """
180+ OR ("CQ_DATABASE" = :db AND "CQ_TABLE" = :table
181+ AND (:partition is NULL OR "CQ_PARTITION" = :partition)
182+ AND "CQ_TYPE" = :type)
183+ """ ;
165184 params .addValue ("db" , info .dbname )
166185 .addValue ("table" , info .tableName )
167- .addValue ("type " , Character . toString ( TxnStore . ABORT_TXN_CLEANUP_TYPE ) , Types .CHAR )
168- .addValue ("partition " , info . partName , Types .VARCHAR );
186+ .addValue ("partition " , info . partName , Types .VARCHAR )
187+ .addValue ("type " , Character . toString ( TxnStore . ABORT_TXN_CLEANUP_TYPE ) , Types .CHAR );
169188 }
170189
171- LOG .debug ("Going to execute update <{}>" , query );
172- int rc = jdbcTemplate .update (query , params );
190+ LOG .debug ("Going to execute update <{}>" , deleteQuery );
191+ int rc = jdbcTemplate .update (deleteQuery , params );
192+
173193 LOG .debug ("Removed {} records in COMPACTION_QUEUE" , rc );
174194 }
175195
0 commit comments