diff --git a/.gitignore b/.gitignore index 4028a7a..0055802 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,5 @@ bin/ build/ build.properties /target +.classpath +.project diff --git a/.settings/org.eclipse.core.resources.prefs b/.settings/org.eclipse.core.resources.prefs new file mode 100644 index 0000000..e9441bb --- /dev/null +++ b/.settings/org.eclipse.core.resources.prefs @@ -0,0 +1,3 @@ +eclipse.preferences.version=1 +encoding//src/main/java=UTF-8 +encoding/=UTF-8 diff --git a/.settings/org.eclipse.jdt.core.prefs b/.settings/org.eclipse.jdt.core.prefs new file mode 100644 index 0000000..ec4300d --- /dev/null +++ b/.settings/org.eclipse.jdt.core.prefs @@ -0,0 +1,5 @@ +eclipse.preferences.version=1 +org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.7 +org.eclipse.jdt.core.compiler.compliance=1.7 +org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning +org.eclipse.jdt.core.compiler.source=1.7 diff --git a/.settings/org.eclipse.m2e.core.prefs b/.settings/org.eclipse.m2e.core.prefs new file mode 100644 index 0000000..f897a7f --- /dev/null +++ b/.settings/org.eclipse.m2e.core.prefs @@ -0,0 +1,4 @@ +activeProfiles= +eclipse.preferences.version=1 +resolveWorkspaceProjects=true +version=1 diff --git a/README.md b/README.md index 9c5b920..6ffa21b 100644 --- a/README.md +++ b/README.md @@ -10,17 +10,22 @@ log files into BigQuery columns. App Engine provides access to logs for each request through the [LogService API](https://developers.google.com/appengine/docs/java/logservice/). -The framework starts from a cron job that runs fairly often. The cron job simply enqueues a bunch of named tasks. These tasks are named such that one and only one task will run for a given time window. This time window is adjustable in your configuration class (see below). Every time this task runs, the task queries the log service for the logs for its specified time window, parses the logs into bigquery columns using built in extractor and/or extractors you write. It then exports this to bigquery using the streaming ingestion input method. +The framework starts from a cron job that runs fairly often. +The cron job simply enqueues a bunch of named tasks. +These tasks are named such that one and only one task will run for a given time window. +This time window is adjustable in your configuration class (see below). +Every time this task runs, the task queries the log service for the logs for its specified time window, +parses the logs into bigquery columns using built in extractor and/or extractors you write. +It then exports this to bigquery using the streaming ingestion input method. ## Customizing the columns exported The default implementation uses a set of user-defined exporters to parse each log into a CSV line, which it then outputs to Google BigQuery. -The exporters to run are defined by a BigqueryFieldExporterSet. The framework -includes a set of default exporters that export most of the basic request log -information. See the "Writing your own exporter" section for the details of -adding exporters specific to your application's logs. +The exporters to run are defined by a com.streak.logging.analysis.LogsFieldExporterSet. +The framework includes a set of default exporters that export most of the basic request log information. +See the "Writing your own exporter" section for the details of adding exporters specific to your application's logs. ``` @@ -31,8 +36,10 @@ adding exporters specific to your application's logs. ``` # Customizing the export + ## Parameters to the CRON task - **logsExportConfiguration** specify a fully qualified class name for a class that implements the **LogsExportConfiguration** interface + ## Writing your own exporter You can export any field that your heart desires, as long as your heart desires one of the following data types: - **string** up to 64k @@ -40,7 +47,8 @@ You can export any field that your heart desires, as long as your heart desires - **float** - **boolean** -You define fields to export by implementing com.streak.logging.analysis.BigqueryFieldExporter. It has the following methods that are run once for each log export to enumerate the schema: +You define fields to export by implementing com.streak.logging.analysis.LogsFieldExporter. +It has the following methods that are run once for each log export to enumerate the schema: - **getFieldCount()** returns the number of fields parsed by this exporter. - **getFieldName(int)** takes an integer between 0 and getFieldCount() - 1, and returns the field name at that index. The ordering isn't important, but must be consistent with *getFieldType()*. - **getFieldType(int)** takes an integer between 0 and getFieldCount() - 1, and returns the field type at that index. The ordering isn't important, but must be consistent with *getFieldName()*. @@ -51,21 +59,29 @@ It also contains the following method that is run once per log entry: After each call to *processLog(RequestLogs)*, the following method is called once for each field defined in the schema: - **getField(String)** returns the value for the given field name. The field name is guaranteed to be an interned string for efficient comparison. The return type should be appropriate to the data type you gave in *getFieldType*, but can be any object for which the *toString()* can be parsed appropriately by BigQuery (i.e. for an integer, either an Integer or a Long can be returned). If there is an error parsing the field, return null to abort the export. To indicate a lack of value, return an empty string. -In order to run your BigqueryFieldExporter, you will need to implement a com.streak.logging.analysis.BigqueryFieldExporterSet. It only has one method: - - **getExporters()** returns the list of BigqueryFieldExporters. +In order to run your LogsFieldExporter, you will need to implement a com.streak.logging.analysis.LogsFieldExporterSet. +It has the methods: + - **getExporters()** returns the list of LogsFieldExporters + - **skipLog(RequestLogs log)** returns true if given log request should be skipped + - **applicationVersionsToExport()** returns a list of "major" application versions or null if you want to export logs for the application this task is currently running on Checkout the documentation in LogsExportConfiguration. # Exporting Datastore Entities to BigQuery -We've been working on this functionality or a little bit of time but recently Google launched the ability for you to import datastore backups into BigQuery. The feature however is a manual process. Mache has built the ability for you to automatically kickoff backups of desired entity kinds and automatically start BigQuery ingestion jobs when the backup is complete. +Google offers the ability for you to import datastore backups into BigQuery. +The feature however is a manual process. +Mache has built the ability for you to automatically kickoff backups of desired entity kinds +and automatically start BigQuery ingestion jobs when the backup is complete. ## Getting Started With Datastore to BigQuery Exports -1. Add the mache JAR to your project -2. Add the URL's listed in the logging section to your web.xml -3. Create a class which implements BuiltinDatastoreExportConfiguration -4. Call /bqlogging/builtinDatastoreExport?builtinDatastoreExportConfig= +1. Optional, only if you want to use the datastore export: You need to include the [google-api-java-client](https://code.google.com/p/google-api-java-client/) jars in your appengine project. The [setup page](https://code.google.com/p/google-api-java-client/wiki/Setup) tells you which jars to use. You will also need the jar file of the [BigQuery API Client Library for Java](https://developers.google.com/api-client-library/java/apis/bigquery/v2). +2. Add the mache JAR to your project +3. Add the URL's listed in the logging section to your web.xml +4. Create a class which implements BuiltinDatastoreExportConfiguration +5. Call /bqlogging/builtinDatastoreExport?builtinDatastoreExportConfig=<fully-qualified-classname-of-datastore-export-config> -You can put this call in your cron.xml to have the bigquery tables updated periodically. Checkout the documentation in BuiltinDatastoreExportConfiguration. +You can put this call in your cron.xml to have the bigquery tables updated periodically. +Checkout the documentation in BuiltinDatastoreExportConfiguration. # Sample web.xml @@ -140,16 +156,35 @@ If you haven't already, sign up for BigQuery at https://code.google.com/apis/con You will need to enable billing under the Billing tab. -## Register your App Engine app with BigQuery - +### Register your App Engine app with BigQuery 1. Go to the Google APIs console at https://code.google.com/apis/console/ 2. Go to the Team tab. 3. In the "Add a teammate:" field, enter {your appid}@appspot.gserviceaccount.com and give it owner permissions. +### Turn on the BigQuery API in your appengine project +1. Got to https://console.developers.google.com/ +2. Navigate to your appengine project, select "APIs" +3. Switch on the BigQuery API. + ### Create your BigQuery dataset 1. Go to the BigQuery browser tool at https://bigquery.cloud.google.com/. 2. Choose the blue triangular dropdown next to the name of your Google APIs project. 3. Choose "Create new dataset". Note the name you chose. This is your BigQuery dataset ID. ### Get your Google APIs project ID -1. Go to the Google APIs console at https://code.google.com/apis/console/, select the Google Cloud Storage tab, and make note of the number following "x-goog-project-id:". This is your Goole APIs project ID. +1. Go to the Google APIs console at https://code.google.com/apis/console/, select the Google Cloud Storage tab, and make note of the number following "x-goog-project-id:". This is your Google APIs project ID. + + +# Migration from earlier versions + +## 0.3.0 + +To migrate from version 0.2.x, you must add a method to your implementation of BuiltinDatastoreExportConfiguration. + +``` + public boolean runAsService() { + return true; + } +``` + +If you you previously had errors like "Could not create backup via link: INSUFFICIENT_PERMISSION", returning false may solve this problem. diff --git a/pom.xml b/pom.xml index 213d1f3..e288911 100644 --- a/pom.xml +++ b/pom.xml @@ -4,11 +4,11 @@ com.streak mache - 0.2.0-SNAPSHOT + 0.5.0-NVE jar Mache - https://github.com/StreakYC/mache + https://github.com/nverwer/mache @@ -25,11 +25,6 @@ http://www.streak.com - - scm:git:git@github.com:StreakYC/mache.git - scm:git:git@github.com:StreakYC/mache.git - git@github.com:StreakYC/mache.git - diff --git a/src/main/java/com/streak/datastore/analysis/builtin/BuiltinDatastoreExportConfiguration.java b/src/main/java/com/streak/datastore/analysis/builtin/BuiltinDatastoreExportConfiguration.java index 33fddac..c055653 100644 --- a/src/main/java/com/streak/datastore/analysis/builtin/BuiltinDatastoreExportConfiguration.java +++ b/src/main/java/com/streak/datastore/analysis/builtin/BuiltinDatastoreExportConfiguration.java @@ -18,54 +18,65 @@ import java.util.List; +import com.streak.logging.utils.AnalysisUtility; + public interface BuiltinDatastoreExportConfiguration { /** - * specifies which entity kinds should be exported to bigquery + * Specifies which entity kinds should be exported to bigquery. * @return a list of strings, each representing the name of the entity kind */ public List getEntityKindsToExport(); /** - * + * Specifies the Google cloud storage bucket that will receive the datastore backup. * @return the bucket name you'd like the datastore backup stored to */ public String getBucketName(); /** - * + * Specifies the BigQuery dataset in which the exported tables will be created. * @return the dataset in bigquery that you'd like tables created in for this datastore export */ public String getBigqueryDatasetId(); /** - * + * Specifies the x-goog-project-id of your cloud storage project. * @return your project id available in the api console */ public String getBigqueryProjectId(); /** - * - * @return in bigquery whether or not to append the timestamp of the export to the - * names of the tables created in bigquery. If you wawant your export to overwrite - * previous exports in bigquery you should set this to false that way it overrides the last export. + * Specifies whether to append a timestamp to the BigQuery table names. + * @return in bigquery whether or not to append the timestamp of the export to the names of the tables created in bigquery. + * If you want your export to overwrite previous exports in bigquery you should set this to false that way it overrides the last export. */ public boolean appendTimestampToDatatables(); /** - * + * Specifies the task queue to use for this job, by default this is the default queue. * @return the task queue to use for this job. Return null if you want to use default queue */ public String getQueueName(); /** - * Return true if you want to just use mache for automatic backup creation to cloud storage and not exporting to bigquery. Normally this should be set to false. - * @return whether or not to export to bigquery + * Return true if you want to just use mache for automatic backup creation to cloud storage and not exporting to bigquery. + * Normally this should be set to false. + * @return whether not to export to bigquery */ public boolean shouldSkipExportToBigquery(); + + /** + * Return true if you want to run tasks as a service. Normally this should be set to true. + * Set to false if you do not have enough permissions to run as a service. + * If you do not have enough permissions, backup will fail with "Could not create backup via link: INSUFFICIENT_PERMISSION". + * @return whether to run the backup as a service + */ + public boolean runAsService(); /** - * This allows you to name the backups created in cloudstrage and the datastore admin console for future reference + * This allows you to name the backups created in cloud storage and the datastore admin console for future reference * @return the name of the backup to be used in Google Cloud Storage, or null to use the default + **TODO: There is no default for null at the moment, see AnalysisUtility.getPreBackupName. */ public String getBackupNamePrefix(); } \ No newline at end of file diff --git a/src/main/java/com/streak/datastore/analysis/builtin/BuiltinDatastoreToBigqueryCronTask.java b/src/main/java/com/streak/datastore/analysis/builtin/BuiltinDatastoreToBigqueryCronTask.java index d3f2f3d..0a6eb4b 100644 --- a/src/main/java/com/streak/datastore/analysis/builtin/BuiltinDatastoreToBigqueryCronTask.java +++ b/src/main/java/com/streak/datastore/analysis/builtin/BuiltinDatastoreToBigqueryCronTask.java @@ -55,6 +55,7 @@ public void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOExc String bigqueryDatasetId = exporterConfig.getBigqueryDatasetId(); String bigqueryProjectId = exporterConfig.getBigqueryProjectId(); boolean skipExport = exporterConfig.shouldSkipExportToBigquery(); + boolean runAsService = exporterConfig.runAsService(); if (!AnalysisUtility.areParametersValid(bucketName, bigqueryProjectId) || (!skipExport && !AnalysisUtility.areParametersValid(bigqueryDatasetId))) { resp.getWriter().write(AnalysisUtility.failureJson("Exporter config returned null for one of the params")); @@ -74,7 +75,7 @@ public void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOExc String backupName = AnalysisUtility.getPreBackupName(timestamp, exporterConfig.getBackupNamePrefix()); // start the backup task - TaskOptions t = createBackupTaskOptions(backupName, exporterConfig.getEntityKindsToExport(), bucketName, queue.getQueueName()); + TaskOptions t = createBackupTaskOptions(backupName, exporterConfig.getEntityKindsToExport(), bucketName, queue.getQueueName(), runAsService); queue.add(t); // start another task to do the actual import into bigquery @@ -85,7 +86,7 @@ public void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOExc resp.getWriter().println(AnalysisUtility.successJson("successfully kicked off backup job: " + backupName + ", export to bigquery will begin once backup is complete.")); } - private TaskOptions createBackupTaskOptions(String backupName, List kindsToExport, String bucketName, String queueName) { + private TaskOptions createBackupTaskOptions(String backupName, List kindsToExport, String bucketName, String queueName, boolean runAsService) { TaskOptions t = TaskOptions.Builder.withUrl("/_ah/datastore_admin/backup.create"); t.param("name", backupName); for (String kind : kindsToExport) { @@ -93,7 +94,8 @@ private TaskOptions createBackupTaskOptions(String backupName, List kind } t.param("filesystem", "gs"); t.param("gs_bucket_name", bucketName); - t.param("run_as_a_service", Boolean.TRUE.toString()); + // Do not include run_as_a_service parameter when runAsService is false, because setting it to 'false' will not work. + if (runAsService) t.param("run_as_a_service", Boolean.valueOf(runAsService).toString()); t.param("queue", queueName); diff --git a/src/main/java/com/streak/datastore/analysis/builtin/BuiltinDatastoreToBigqueryIngesterTask.java b/src/main/java/com/streak/datastore/analysis/builtin/BuiltinDatastoreToBigqueryIngesterTask.java index 702e44b..3ad9ef5 100644 --- a/src/main/java/com/streak/datastore/analysis/builtin/BuiltinDatastoreToBigqueryIngesterTask.java +++ b/src/main/java/com/streak/datastore/analysis/builtin/BuiltinDatastoreToBigqueryIngesterTask.java @@ -60,23 +60,25 @@ @SuppressWarnings("serial") public class BuiltinDatastoreToBigqueryIngesterTask extends HttpServlet { - private static final int MILLIS_TO_ENQUEUE = 60000; + private static final int MILLIS_TO_ENQUEUE = 180000; // 3 min private static final HttpTransport HTTP_TRANSPORT = new NetHttpTransport(); private static final JsonFactory JSON_FACTORY = new JacksonFactory(); - + private static final String BUILTIN_DATASTORE_TO_BIGQUERY_INGESTOR_TASK_PATH = "/builtinDatastoreToBigqueryIngestorTask"; private static final Logger log = Logger.getLogger("bqlogging"); - + + private static final long MAX_AGE_BACKUP_NOT_FOUND_MS = 900000; // 15 min + public static void enqueueTask(String baseUrl, BuiltinDatastoreExportConfiguration exporterConfig, long timestamp) { enqueueTask(baseUrl, exporterConfig, timestamp, 0); - } - + } + private static void enqueueTask(String baseUrl, BuiltinDatastoreExportConfiguration exporterConfig, long timestamp, long countdownMillis) { TaskOptions t = TaskOptions.Builder.withUrl(baseUrl + BUILTIN_DATASTORE_TO_BIGQUERY_INGESTOR_TASK_PATH); t.param(AnalysisConstants.TIMESTAMP_PARAM, Long.toString(timestamp)); t.param(AnalysisConstants.BUILTIN_DATASTORE_EXPORT_CONFIG, exporterConfig.getClass().getName()); - + t.method(Method.GET); if (countdownMillis > 0) { t.countdownMillis(countdownMillis); @@ -90,9 +92,10 @@ private static void enqueueTask(String baseUrl, BuiltinDatastoreExportConfigurat } queue.add(t); } - - - public void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException { + + + @Override + public void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException { resp.setContentType("application/json"); String timestampStr = req.getParameter(AnalysisConstants.TIMESTAMP_PARAM); @@ -106,31 +109,39 @@ public void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOExc } } if (timestamp == 0) { - log.warning("Missing required param: " + AnalysisConstants.TIMESTAMP_PARAM); + log.severe("Missing required param: " + AnalysisConstants.TIMESTAMP_PARAM); resp.setStatus(HttpServletResponse.SC_BAD_REQUEST); return; } - + String builtinDatastoreExportConfig = req.getParameter(AnalysisConstants.BUILTIN_DATASTORE_EXPORT_CONFIG); if (!AnalysisUtility.areParametersValid(builtinDatastoreExportConfig)) { - log.warning("Missing required param: " + AnalysisConstants.BUILTIN_DATASTORE_EXPORT_CONFIG); + log.severe("Missing required param: " + AnalysisConstants.BUILTIN_DATASTORE_EXPORT_CONFIG); resp.setStatus(HttpServletResponse.SC_BAD_REQUEST); return; } - - // Instantiate the export config - BuiltinDatastoreExportConfiguration exporterConfig = AnalysisUtility.instantiateDatastoreExportConfig(builtinDatastoreExportConfig); - - String gsHandleOfBackup = checkAndGetCompletedBackupGSHandle(AnalysisUtility.getPreBackupName(timestamp, exporterConfig.getBackupNamePrefix())); + + // Instantiate the export config + BuiltinDatastoreExportConfiguration exporterConfig = AnalysisUtility.instantiateDatastoreExportConfig(builtinDatastoreExportConfig); + + if (timestamp - System.currentTimeMillis() > MAX_AGE_BACKUP_NOT_FOUND_MS) { + log.severe("Cannot find backup after retrying 15 minutes: "+exporterConfig.getBucketName()+"; builtinDatastoreExportConfig: "+builtinDatastoreExportConfig); + resp.setStatus(HttpServletResponse.SC_NOT_FOUND); + return; + } + + String gsHandleOfBackup = checkAndGetCompletedBackupGSHandle(AnalysisUtility.getPreBackupName(timestamp, exporterConfig.getBackupNamePrefix())); if (gsHandleOfBackup == null) { - log.warning("gsHandleOfBackup: null"); + String baseUrl = AnalysisUtility.getRequestBaseName(req); + log.severe("gsHandleOfBackup: null; expected: "+exporterConfig.getBucketName()+"; builtinDatastoreExportConfig: "+builtinDatastoreExportConfig); + log.info("backup incomplete, retrying "+baseUrl+" in " + MILLIS_TO_ENQUEUE + " millis"); resp.getWriter().println(AnalysisUtility.successJson("backup incomplete, retrying in " + MILLIS_TO_ENQUEUE + " millis")); - enqueueTask(AnalysisUtility.getRequestBaseName(req), exporterConfig, timestamp, MILLIS_TO_ENQUEUE); + enqueueTask(baseUrl, exporterConfig, timestamp, MILLIS_TO_ENQUEUE); return; } - log.warning("backup complete, starting bigquery ingestion"); - log.warning("gsHandleOfBackup: " + gsHandleOfBackup); - + log.info("backup complete, starting bigquery ingestion"); + log.info("gsHandleOfBackup: " + gsHandleOfBackup); + AppIdentityCredential credential = new AppIdentityCredential(AnalysisConstants.SCOPES); Bigquery bigquery = new Bigquery.Builder(HTTP_TRANSPORT, JSON_FACTORY, credential).setApplicationName("Streak Logs").build(); @@ -144,8 +155,7 @@ public void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOExc } if (!exporterConfig.appendTimestampToDatatables()) { - // we aren't appending the timestamps so delete the old tables if - // they exist + // we aren't appending the timestamps so delete the old tables if they exist for (String kind : exporterConfig.getEntityKindsToExport()) { boolean found = true; try { @@ -157,6 +167,7 @@ public void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOExc } if (found) { + log.info("Deleting old BigQuery table for kind: " + kind); bigquery.tables().delete(exporterConfig.getBigqueryProjectId(), exporterConfig.getBigqueryDatasetId(), kind).execute(); } } @@ -165,8 +176,8 @@ public void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOExc // now create the ingestion for (String kind : exporterConfig.getEntityKindsToExport()) { String gsUrl = convertHandleToUrl(gsHandleOfBackup, kind); - log.warning("gsUrl: " + gsUrl); - + log.info("Ingest into BigQuery, gsUrl: " + gsUrl + ", kind: " + kind); + Job job = new Job(); JobConfiguration config = new JobConfiguration(); JobConfigurationLoad loadConfig = new JobConfigurationLoad(); @@ -186,7 +197,7 @@ public void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOExc Insert insert = bigquery.jobs().insert(exporterConfig.getBigqueryProjectId(), job); JobReference jr = insert.execute().getJobReference(); - log.warning("Uri: " + gsUrl + ", JobId: " + jr.getJobId()); + log.info("Ingest job for BigQuery, gsUrl: " + gsUrl + ", kind: " + kind + ", JobId: " + jr.getJobId()); } } @@ -197,55 +208,69 @@ private String convertHandleToUrl(String gsHandleOfBackup, String kind) { } private String checkAndGetCompletedBackupGSHandle(String backupName) throws IOException { - log.warning("backupName: " + backupName); - + log.info("checkAndGetCompletedBackupGSHandle, backupName: " + backupName); + DatastoreService datastore = DatastoreServiceFactory.getDatastoreService(); - + Query q = new Query("_AE_Backup_Information"); - - // for some reason the datastore admin code appends the date to the backup name even when creating programatically, - // so test for greater than or equal to and then take the first result + + // For some reason the datastore admin code appends the date to the backup name even when creating programatically, + // so test for greater than or equal to and then take the first result. FilterPredicate greater = new FilterPredicate("name", FilterOperator.GREATER_THAN_OR_EQUAL, backupName); FilterPredicate less = new FilterPredicate("name", FilterOperator.LESS_THAN_OR_EQUAL, backupName + "Z"); List filters = new ArrayList(); filters.add(greater); filters.add(less); - CompositeFilter comp = new CompositeFilter(CompositeFilterOperator.AND, filters); + CompositeFilter comp = new CompositeFilter(CompositeFilterOperator.AND, filters); q.setFilter(comp); - - PreparedQuery pq = datastore.prepare(q); - List results = pq.asList(FetchOptions.Builder.withLimit(1)); - if (results.size() != 1 || !results.get(0).getProperty("name").toString().contains(backupName)) { - System.err.println("BuiltinDatatoreToBigqueryIngesterTask: can't find backupName: " + backupName); - } - Entity result = results.get(0); - - Object completion = result.getProperty("complete_time"); - Object gs_handle_obj = result.getProperty("gs_handle"); - if (gs_handle_obj == null) { - return null; - } - log.warning("gs handle object: " + gs_handle_obj.toString()); - log.warning("gs handle obj type: " + gs_handle_obj.getClass().toString()); - - String gs_handle = null; - if (gs_handle_obj instanceof String) { - gs_handle = (String) gs_handle_obj; - } - else if (gs_handle_obj instanceof Text) { - gs_handle = ((Text)gs_handle_obj).getValue(); - } - String keyResult = null; - if (completion != null) { - keyResult = KeyFactory.keyToString(result.getKey()); + try { + PreparedQuery pq = datastore.prepare(q); + List results = pq.asList(FetchOptions.Builder.withLimit(1)); + if (results.size() != 1 || !results.get(0).getProperty("name").toString().contains(backupName)) { + String message = "BuiltinDatatoreToBigqueryIngesterTask: can't find backupName: " + backupName; + String resultsNames = "Results names: ["; + for (int i = 0; i < results.size(); ++i) { + resultsNames += results.get(i).getProperty("name").toString(); + } + resultsNames += "]"; + log.severe(message); + log.severe(resultsNames); + return null; + } + Entity result = results.get(0); + + Object completion = result.getProperty("complete_time"); + Object gs_handle_obj = result.getProperty("gs_handle"); + if (gs_handle_obj == null) { + log.severe("Backup has no gs handle: "+result.toString()); + return null; + } + log.info("gs handle object: " + gs_handle_obj.toString()); + log.info("gs handle obj type: " + gs_handle_obj.getClass().toString()); + + String gs_handle = null; + if (gs_handle_obj instanceof String) { + gs_handle = (String) gs_handle_obj; + } + else if (gs_handle_obj instanceof Text) { + gs_handle = ((Text)gs_handle_obj).getValue(); + } + + String keyResult = null; + if (completion != null) { + keyResult = KeyFactory.keyToString(result.getKey()); + } + + log.info("checkAndGetCompletedBackupGSHandle, result: " + result); + log.info("checkAndGetCompletedBackupGSHandle, complete_time: " + completion); + log.info("checkAndGetCompletedBackupGSHandle, keyResult: " + keyResult); + log.info("checkAndGetCompletedBackupGSHandle, gs_handle: " + gs_handle); + + return gs_handle; + } catch (Exception ex) { + log.severe("checkAndGetCompletedBackupGSHandle encountered a "+ex.getClass().getName()+" for "+backupName+": "+ex.getMessage()); + return null; } - - log.warning("result: " + result); - log.warning("complete_time: " + completion); - log.warning("keyResult: " + keyResult); - log.warning("gs_handle: " + gs_handle); - - return gs_handle; } }