package io.trygvis.queue; import com.jolbox.bonecp.*; import io.trygvis.queue.spring.*; import org.junit.*; import org.junit.runner.*; import org.slf4j.*; import org.slf4j.bridge.*; import org.springframework.beans.factory.annotation.*; import org.springframework.context.annotation.*; import org.springframework.context.support.*; import org.springframework.jdbc.core.*; import org.springframework.jdbc.datasource.*; import org.springframework.test.context.*; import org.springframework.test.context.junit4.*; import org.springframework.transaction.*; import org.springframework.transaction.support.*; import javax.sql.*; import java.util.*; import static java.lang.System.*; import static org.junit.Assert.*; import static org.springframework.transaction.TransactionDefinition.*; @SuppressWarnings("SpringJavaAutowiringInspection") @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = TestConfig.class) public class QueueTest { @Autowired private TransactionTemplate transactionTemplate; @Autowired private JdbcTemplate jdbcTemplate; @Autowired private AsyncService asyncService; private static Logger log; @BeforeClass public static void beforeClass() { SLF4JBridgeHandler.install(); log = LoggerFactory.getLogger(QueueTest.class); String username = getProperty("user.name"); setProperty("database.url", getProperty("jdbc.url", "jdbc:postgresql://localhost/" + username)); setProperty("database.username", username); setProperty("database.password", username); } public int counter; AsyncService.AsyncCallable callback = new AsyncService.AsyncCallable() { public void run(List args) throws Exception { System.out.println("QueueTest.run"); counter++; } }; Queue queue; @Before public void before() { queue = transactionTemplate.execute(new TransactionCallback() { public Queue doInTransaction(TransactionStatus status) { return asyncService.registerQueue("wat", 10, callback); } }); int count = jdbcTemplate.update("DELETE FROM task"); log.info("Deleted {} tasks", count); } public void after() { asyncService.stopQueue(queue); } // @Test // public void testQueuePolling() { // long start = currentTimeMillis(); // Task abc = transactionTemplate.execute(new TransactionCallback() { // public Task doInTransaction(TransactionStatus status) { // return asyncService.schedule(queue, "abc").getTask(); // } // }); // // while(!abc.isDone()) { // abc = asyncService.update(abc); // } // // long end = currentTimeMillis(); // log.info("Completed in {}ms", end - start); // // assertEquals(1, counter); // } @Test public void testQueueWaiting() throws Exception { long start = currentTimeMillis(); TaskRef abc = transactionTemplate.execute(new TransactionCallback() { public TaskRef doInTransaction(TransactionStatus status) { return asyncService.schedule(queue, "abc"); } }); log.info("Waiting for signal"); abc.waitFor(1000); long end = currentTimeMillis(); log.info("Completed in {}ms", end - start); assertEquals(1, counter); } } @Configuration @Import(value = QueueSpringConfig.class) class TestConfig { @Bean public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() throws Exception { return new PropertySourcesPlaceholderConfigurer() {{ setProperties(System.getProperties()); setLocalOverride(true); }}; } @Bean public JdbcTemplate jdbcTemplate(DataSource dataSource) { return new JdbcTemplate(dataSource); } @Bean public DataSource dataSource(@Value("${database.url}") String jdbcUrl, @Value("${database.username}") String username, @Value("${database.password}") String password) { BoneCPDataSource ds = new BoneCPDataSource(); ds.setLogStatementsEnabled(true); ds.setJdbcUrl(jdbcUrl); ds.setUsername(username); ds.setPassword(password); ds.setIdleConnectionTestPeriodInSeconds(60); ds.setIdleMaxAgeInSeconds(240); ds.setMaxConnectionsPerPartition(40); ds.setMinConnectionsPerPartition(0); ds.setPartitionCount(1); ds.setAcquireIncrement(1); ds.setStatementsCacheSize(1000); ds.setReleaseHelperThreads(3); ds.setStatisticsEnabled(true); return new TransactionAwareDataSourceProxy(new LazyConnectionDataSourceProxy(ds)); } @Bean public PlatformTransactionManager transactionManager(DataSource dataSource) { return new DataSourceTransactionManager(dataSource); } @Bean public TransactionTemplate transactionTemplate(PlatformTransactionManager platformTransactionManager) { DefaultTransactionDefinition td = new DefaultTransactionDefinition(); td.setPropagationBehavior(PROPAGATION_REQUIRED); td.setIsolationLevel(ISOLATION_READ_COMMITTED); return new TransactionTemplate(platformTransactionManager, td); } }