diff options
Diffstat (limited to 'src/main/java/io/trygvis/queue/JdbcAsyncService.java')
-rw-r--r-- | src/main/java/io/trygvis/queue/JdbcAsyncService.java | 60 |
1 files changed, 40 insertions, 20 deletions
diff --git a/src/main/java/io/trygvis/queue/JdbcAsyncService.java b/src/main/java/io/trygvis/queue/JdbcAsyncService.java index a8f581e..06e7eee 100644 --- a/src/main/java/io/trygvis/queue/JdbcAsyncService.java +++ b/src/main/java/io/trygvis/queue/JdbcAsyncService.java @@ -1,19 +1,27 @@ package io.trygvis.queue; -import org.quartz.*; -import org.slf4j.*; -import org.springframework.beans.factory.annotation.*; -import org.springframework.stereotype.*; -import org.springframework.transaction.annotation.*; -import org.springframework.transaction.support.*; - -import java.util.*; -import java.util.concurrent.*; - -import static java.util.Arrays.*; -import static java.util.concurrent.TimeUnit.*; -import static org.springframework.transaction.annotation.Propagation.*; -import static org.springframework.transaction.support.TransactionSynchronizationManager.*; +import org.quartz.SchedulerException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.support.TransactionSynchronization; +import org.springframework.transaction.support.TransactionSynchronizationAdapter; +import org.springframework.transaction.support.TransactionTemplate; + +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +import static java.lang.System.currentTimeMillis; +import static java.lang.Thread.sleep; +import static java.util.Arrays.asList; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.springframework.transaction.annotation.Propagation.REQUIRED; +import static org.springframework.transaction.support.TransactionSynchronizationManager.registerSynchronization; @Component public class JdbcAsyncService implements AsyncService { @@ -96,12 +104,6 @@ public class JdbcAsyncService implements AsyncService { long id = taskDao.insert(queue.name, scheduled, arguments.toString()); Task task = new Task(id, queue.name, scheduled, null, 0, null, asList(args)); log.info("Created task = {}", task); -// queues.get(queue.name).ping(); -// try { -// Thread.sleep(500); -// } catch (InterruptedException e) { -// e.printStackTrace(); -// } registerSynchronization(new TransactionSynchronizationAdapter() { public void afterCompletion(int status) { @@ -114,6 +116,24 @@ public class JdbcAsyncService implements AsyncService { return task; } + @Transactional + public Task await(Task task, long timeout) { + final long start = currentTimeMillis(); + final long end = start + timeout; + + while (currentTimeMillis() < end) { + task = update(task); + + try { + sleep(100); + } catch (InterruptedException e) { + // break + } + } + + return task; + } + @Transactional(readOnly = true) public Task update(Task ref) { return taskDao.findById(ref.id); |