Skip to content

Commit c09bc39

Browse files
committed
fix: Connection.setCatalog() fixed
1 parent 1056424 commit c09bc39

File tree

4 files changed

+338
-278
lines changed

4 files changed

+338
-278
lines changed

flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightMetaImpl.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ static Signature newSignature(final String sql, Schema resultSetSchema, Schema p
7979
public void closeStatement(final StatementHandle statementHandle) {
8080
PreparedStatement preparedStatement =
8181
statementHandlePreparedStatementMap.remove(new StatementHandleKey(statementHandle));
82-
// Testing if the prepared statement was created because the statement can be not created until
82+
// Testing if the prepared statement was created because the statement can be
83+
// not created until
8384
// this moment
8485
if (preparedStatement != null) {
8586
preparedStatement.close();
@@ -224,7 +225,8 @@ public ExecuteResult prepareAndExecute(
224225
MetaResultSet.create(handle.connectionId, handle.id, false, handle.signature, null);
225226
return new ExecuteResult(Collections.singletonList(metaResultSet));
226227
} catch (SQLTimeoutException e) {
227-
// So far AvaticaStatement(executeInternal) only handles NoSuchStatement and Runtime
228+
// So far AvaticaStatement(executeInternal) only handles NoSuchStatement and
229+
// Runtime
228230
// Exceptions.
229231
throw new RuntimeException(e);
230232
} catch (SQLException e) {
@@ -253,6 +255,20 @@ public boolean syncResults(
253255
return false;
254256
}
255257

258+
@Override
259+
public ConnectionProperties connectionSync(ConnectionHandle ch, ConnectionProperties connProps) {
260+
final ConnectionProperties result = super.connectionSync(ch, connProps);
261+
final String newCatalog = this.connProps.getCatalog();
262+
if (newCatalog != null) {
263+
try {
264+
((ArrowFlightConnection) connection).getClientHandler().setCatalog(newCatalog);
265+
} catch (SQLException e) {
266+
throw new RuntimeException(e);
267+
}
268+
}
269+
return result;
270+
}
271+
256272
void setDefaultConnectionProperties() {
257273
// TODO Double-check this.
258274
connProps
@@ -268,7 +284,8 @@ PreparedStatement getPreparedStatement(StatementHandle statementHandle) {
268284
return statementHandlePreparedStatementMap.get(new StatementHandleKey(statementHandle));
269285
}
270286

271-
// Helper used to look up prepared statement instances later. Avatica doesn't give us the
287+
// Helper used to look up prepared statement instances later. Avatica doesn't
288+
// give us the
272289
// signature in
273290
// an UPDATE code path so we can't directly use StatementHandle as a map key.
274291
private static final class StatementHandleKey {

flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java

Lines changed: 53 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import org.apache.arrow.flight.FlightStatusCode;
4848
import org.apache.arrow.flight.Location;
4949
import org.apache.arrow.flight.LocationSchemes;
50-
import org.apache.arrow.flight.SessionOptionValue;
5150
import org.apache.arrow.flight.SessionOptionValueFactory;
5251
import org.apache.arrow.flight.SetSessionOptionsRequest;
5352
import org.apache.arrow.flight.SetSessionOptionsResult;
@@ -147,20 +146,26 @@ public List<CloseableEndpointStreamPair> getStreams(final FlightInfo flightInfo)
147146
try {
148147
for (FlightEndpoint endpoint : flightInfo.getEndpoints()) {
149148
if (endpoint.getLocations().isEmpty()) {
150-
// Create a stream using the current client only and do not close the client at the end.
149+
// Create a stream using the current client only and do not close the client at
150+
// the end.
151151
endpoints.add(
152152
new CloseableEndpointStreamPair(
153153
sqlClient.getStream(endpoint.getTicket(), getOptions()), null));
154154
} else {
155155
// Clone the builder and then set the new endpoint on it.
156156

157-
// GH-38574: Currently a new FlightClient will be made for each partition that returns a
158-
// non-empty Location then disposed of. It may be better to cache clients because a server
159-
// may report the same Locations. It would also be good to identify when the reported
157+
// GH-38574: Currently a new FlightClient will be made for each partition that
158+
// returns a
159+
// non-empty Location then disposed of. It may be better to cache clients
160+
// because a server
161+
// may report the same Locations. It would also be good to identify when the
162+
// reported
160163
// location
161-
// is the same as the original connection's Location and skip creating a FlightClient in
164+
// is the same as the original connection's Location and skip creating a
165+
// FlightClient in
162166
// that scenario.
163-
// Also copy the cache to the client so we can share a cache. Cache needs to cache
167+
// Also copy the cache to the client so we can share a cache. Cache needs to
168+
// cache
164169
// negative attempts too.
165170
List<Exception> exceptions = new ArrayList<>();
166171
CloseableEndpointStreamPair stream = null;
@@ -337,7 +342,8 @@ private boolean isBenignCloseException(FlightRuntimeException fre) {
337342
*/
338343
private void logSuppressedCloseException(
339344
FlightRuntimeException fre, String operationDescription) {
340-
// ARROW-17785 and GH-863: suppress exceptions caused by flaky gRPC layer during shutdown
345+
// ARROW-17785 and GH-863: suppress exceptions caused by flaky gRPC layer during
346+
// shutdown
341347
LOGGER.debug("Suppressed error {}", operationDescription, fre);
342348
}
343349

@@ -388,25 +394,40 @@ public interface PreparedStatement extends AutoCloseable {
388394
/** A connection is created with catalog set as a session option. */
389395
private void setSetCatalogInSessionIfPresent() {
390396
if (catalog.isPresent()) {
391-
final SetSessionOptionsRequest setSessionOptionRequest =
392-
new SetSessionOptionsRequest(
393-
ImmutableMap.<String, SessionOptionValue>builder()
394-
.put(CATALOG, SessionOptionValueFactory.makeSessionOptionValue(catalog.get()))
395-
.build());
396-
final SetSessionOptionsResult result =
397-
sqlClient.setSessionOptions(setSessionOptionRequest, getOptions());
397+
try {
398+
setCatalog(catalog.get());
399+
} catch (SQLException e) {
400+
throw CallStatus.INVALID_ARGUMENT
401+
.withDescription(e.getMessage())
402+
.withCause(e)
403+
.toRuntimeException();
404+
}
405+
}
406+
}
398407

408+
/**
409+
* Sets the catalog for the current session.
410+
*
411+
* @param catalog the catalog to set.
412+
* @throws SQLException if an error occurs while setting the catalog.
413+
*/
414+
public void setCatalog(final String catalog) throws SQLException {
415+
final SetSessionOptionsRequest request =
416+
new SetSessionOptionsRequest(
417+
ImmutableMap.of(CATALOG, SessionOptionValueFactory.makeSessionOptionValue(catalog)));
418+
try {
419+
final SetSessionOptionsResult result = sqlClient.setSessionOptions(request, getOptions());
399420
if (result.hasErrors()) {
400-
Map<String, SetSessionOptionsResult.Error> errors = result.getErrors();
401-
for (Map.Entry<String, SetSessionOptionsResult.Error> error : errors.entrySet()) {
421+
final Map<String, SetSessionOptionsResult.Error> errors = result.getErrors();
422+
for (final Map.Entry<String, SetSessionOptionsResult.Error> error : errors.entrySet()) {
402423
LOGGER.warn(error.toString());
403424
}
404-
throw CallStatus.INVALID_ARGUMENT
405-
.withDescription(
406-
String.format(
407-
"Cannot set session option for catalog = %s. Check log for details.", catalog))
408-
.toRuntimeException();
425+
throw new SQLException(
426+
String.format(
427+
"Cannot set session option for catalog = %s. Check log for details.", catalog));
409428
}
429+
} catch (final FlightRuntimeException e) {
430+
throw new SQLException(e);
410431
}
411432
}
412433

@@ -654,7 +675,8 @@ public static final class Builder {
654675

655676
@VisibleForTesting @Nullable Duration connectTimeout;
656677

657-
// These two middleware are for internal use within build() and should not be exposed by builder
678+
// These two middleware are for internal use within build() and should not be
679+
// exposed by builder
658680
// APIs.
659681
// Note that these middleware may not necessarily be registered.
660682
@VisibleForTesting
@@ -980,15 +1002,17 @@ public Location getLocation() {
9801002
* @throws SQLException on error.
9811003
*/
9821004
public ArrowFlightSqlClientHandler build() throws SQLException {
983-
// Copy middleware so that the build method doesn't change the state of the builder fields
1005+
// Copy middleware so that the build method doesn't change the state of the
1006+
// builder fields
9841007
// itself.
9851008
Set<FlightClientMiddleware.Factory> buildTimeMiddlewareFactories =
9861009
new HashSet<>(this.middlewareFactories);
9871010
FlightClient client = null;
9881011
boolean isUsingUserPasswordAuth = username != null && token == null;
9891012

9901013
try {
991-
// Token should take priority since some apps pass in a username/password even when a token
1014+
// Token should take priority since some apps pass in a username/password even
1015+
// when a token
9921016
// is provided
9931017
if (isUsingUserPasswordAuth) {
9941018
buildTimeMiddlewareFactories.add(authFactory);
@@ -1047,8 +1071,10 @@ public ArrowFlightSqlClientHandler build() throws SQLException {
10471071
allocator, channelBuilder.build(), clientBuilder.middleware());
10481072
final ArrayList<CallOption> credentialOptions = new ArrayList<>();
10491073
if (isUsingUserPasswordAuth) {
1050-
// If the authFactory has already been used for a handshake, use the existing token.
1051-
// This can occur if the authFactory is being re-used for a new connection spawned for
1074+
// If the authFactory has already been used for a handshake, use the existing
1075+
// token.
1076+
// This can occur if the authFactory is being re-used for a new connection
1077+
// spawned for
10521078
// getStream().
10531079
if (authFactory.getCredentialCallOption() != null) {
10541080
credentialOptions.add(authFactory.getCredentialCallOption());

0 commit comments

Comments
 (0)