@@ -61,6 +61,11 @@ class TaskQueueService
6161 */
6262 private $ taskRepo ;
6363
64+ /**
65+ * @var boolean
66+ */
67+ private $ databaseDisabled = false ;
68+
6469 /**
6570 * @param EntityManager $entityManager
6671 * @param PheanstalkConnection $beanstalk
@@ -82,6 +87,16 @@ public function __construct(
8287 $ this ->logWorkerOutputOnFailure = $ params ['log_worker_output_on_failure ' ];
8388 }
8489
90+ /**
91+ * Disables the use of the task entity
92+ *
93+ * @return void
94+ */
95+ public function disableDatabase ()
96+ {
97+ $ this ->databaseDisabled = true ;
98+ }
99+
85100 /**
86101 * Returns the default tube name
87102 *
@@ -121,11 +136,13 @@ public function queueTask(TaskDescriptionInterface $task, $tube = null)
121136 }
122137 $ stringVersion = get_class ($ task ) . ':: ' . $ this ->serializer ->serialize ($ task , 'json ' );
123138 $ taskEntity = new Task ($ task , $ stringVersion , $ tube );
124- $ this ->entityManager ->persist ($ taskEntity );
125- $ this ->entityManager ->flush ($ taskEntity );
126- $ task ->setTaskIdentifier ($ taskEntity ->getId ());
127- //regenerate it now we have an identifier
128- $ stringVersion = get_class ($ task ) . ':: ' . $ this ->serializer ->serialize ($ task , 'json ' );
139+ if (!$ this ->databaseDisabled ) {
140+ $ this ->entityManager ->persist ($ taskEntity );
141+ $ this ->entityManager ->flush ($ taskEntity );
142+ $ task ->setTaskIdentifier ($ taskEntity ->getId ());
143+ //regenerate it now we have an identifier
144+ $ stringVersion = get_class ($ task ) . ':: ' . $ this ->serializer ->serialize ($ task , 'json ' );
145+ }
129146 $ this ->beanstalk
130147 ->useTube ($ tube )
131148 ->put ($ stringVersion );
@@ -228,7 +245,13 @@ public function reserveTask($tube = null)
228245 $ this ->beanstalk ->delete ($ inTask );
229246 throw new TaskQueueServiceException ("Invalid data in TaskQueue {$ tube }" );
230247 }
231- $ taskEntity = $ this ->taskRepo ->find ($ taskObject ->getTaskIdentifier ());
248+ if (!$ this ->databaseDisabled ) {
249+ $ taskEntity = $ this ->taskRepo ->find ($ taskObject ->getTaskIdentifier ());
250+ } else {
251+ //remake the task entity
252+ $ taskEntity = new Task ($ taskObject , '' , $ tube );
253+ }
254+
232255 if (!($ taskEntity instanceof Task)) {
233256 $ this ->beanstalk ->delete ($ inTask );
234257 throw new TaskQueueServiceException (
@@ -287,6 +310,9 @@ public function markFailed(WorkPackage $task, $log)
287310 */
288311 private function updateTaskLog (WorkPackage $ task , $ log )
289312 {
313+ if ($ this ->databaseDisabled ) {
314+ return ;
315+ }
290316 $ taskEntity = $ task ->getTaskEntity ();
291317 if ($ taskEntity instanceof Task) {
292318 $ taskEntity ->setLog ($ log );
@@ -307,6 +333,9 @@ private function updateTaskLog(WorkPackage $task, $log)
307333 */
308334 private function updateTaskStatus (WorkPackage $ task , $ status )
309335 {
336+ if ($ this ->databaseDisabled ) {
337+ return ;
338+ }
310339 $ taskEntity = $ task ->getTaskEntity ();
311340 if ($ taskEntity instanceof Task) {
312341 $ taskEntity ->setStatus ($ status );
0 commit comments