From bddca73748f7eace28cdd76282fcfd33971e9995 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Thu, 8 Nov 2012 00:55:32 +0100 Subject: wip --- .../io/trygvis/esper/testing/AtomImporter.java | 18 +++++ src/main/java/io/trygvis/esper/testing/DbMain.java | 33 +++++++++ src/main/java/io/trygvis/esper/testing/Main.java | 83 ++++++++++++++++++++++ 3 files changed, 134 insertions(+) create mode 100644 src/main/java/io/trygvis/esper/testing/AtomImporter.java create mode 100644 src/main/java/io/trygvis/esper/testing/DbMain.java create mode 100644 src/main/java/io/trygvis/esper/testing/Main.java (limited to 'src/main/java/io') diff --git a/src/main/java/io/trygvis/esper/testing/AtomImporter.java b/src/main/java/io/trygvis/esper/testing/AtomImporter.java new file mode 100644 index 0000000..7e70715 --- /dev/null +++ b/src/main/java/io/trygvis/esper/testing/AtomImporter.java @@ -0,0 +1,18 @@ +package io.trygvis.esper.testing; + +import org.apache.abdera.*; +import org.apache.abdera.protocol.client.*; +import org.apache.abdera.protocol.client.cache.*; + +public class AtomImporter { + public static void main(String[] args) { + Abdera abdera = new Abdera(); + AbderaClient abderaClient = new AbderaClient(abdera, new LRUCache(abdera, 1000)); + + while(true) { + ClientResponse response = abderaClient.get("http://gitorious.org/qt.atom"); + +// response. + } + } +} diff --git a/src/main/java/io/trygvis/esper/testing/DbMain.java b/src/main/java/io/trygvis/esper/testing/DbMain.java new file mode 100644 index 0000000..635e3cc --- /dev/null +++ b/src/main/java/io/trygvis/esper/testing/DbMain.java @@ -0,0 +1,33 @@ +package io.trygvis.esper.testing; + +import org.h2.tools.*; + +import java.sql.*; + +public class DbMain { + private static final String JDBC_URL = "jdbc:h2:mem:esper;DB_CLOSE_DELAY=-1"; + + public static void main(String[] args) throws Exception { + Server server = Server.createTcpServer(args).start(); + + System.out.println("server.getURL() = " + server.getURL()); + + Connection connection = DriverManager.getConnection(JDBC_URL, "", ""); + connection.setAutoCommit(false); + Statement s = connection.createStatement(); + s.execute("create table subscription(" + + "itemName varchar(100) not null," + + "subscriber varchar(100) not null" + + ");"); + + s.execute("insert into subscription values('shirt', 'sub a');"); + s.execute("insert into subscription values('shirt', 'sub b');"); + s.execute("insert into subscription values('pants', 'sub b');"); + s.execute("insert into subscription values('pants', 'sub c');"); + connection.commit(); + + while(true) { + Thread.sleep(1000); + } + } +} diff --git a/src/main/java/io/trygvis/esper/testing/Main.java b/src/main/java/io/trygvis/esper/testing/Main.java new file mode 100644 index 0000000..25b3fad --- /dev/null +++ b/src/main/java/io/trygvis/esper/testing/Main.java @@ -0,0 +1,83 @@ +package io.trygvis.esper.testing; + +import com.espertech.esper.client.*; +import org.apache.log4j.*; + +import java.util.*; + +public class Main { +// private static final String JDBC_URL = "jdbc:h2:mem:esper;DB_CLOSE_DELAY=-1"; + private static final String JDBC_URL = "jdbc:h2:tcp://127.0.0.1/esper;DB_CLOSE_DELAY=-1"; + + public Main() throws Exception { + Properties properties = new Properties(); + properties.setProperty("log4j.rootLogger", "DEBUG, A1"); + properties.setProperty("log4j.appender.A1", "org.apache.log4j.ConsoleAppender"); + properties.setProperty("log4j.appender.A1.layout", "org.apache.log4j.PatternLayout"); + properties.setProperty("log4j.appender.A1.layout.ConversionPattern", "%-4r [%t] %-5p %c %x - %m%n"); + PropertyConfigurator.configure(properties); + } + + public static void main(String[] args) throws Exception { + Main main = new Main(); + main.work(); + } + + private void work() throws Exception { + Configuration config = new Configuration(); + + ConfigurationDBRef configurationDBRef = new ConfigurationDBRef(); + configurationDBRef.setDriverManagerConnection("org.h2.Driver", JDBC_URL, "", ""); + configurationDBRef.setConnectionAutoCommit(false); + config.addDatabaseReference("db1", configurationDBRef); + config.addEventTypeAutoName(getClass().getPackage().getName()); + EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(config); + +// String expression = "select avg(price) from OrderEvent.win:time(30 sec)"; + + String expression = "select price, SUBSCRIBER from OrderEvent.win:time(30 sec)," + + "sql:db1 ['select subscriber from subscription where itemName=${itemName}']"; + + EPStatement statement = epService.getEPAdministrator().createEPL(expression); + + MyListener listener = new MyListener(); + statement.addListener(listener); + + System.out.println("Inserting events"); + epService.getEPRuntime().sendEvent(new OrderEvent("shirt", 72)); + epService.getEPRuntime().sendEvent(new OrderEvent("shirt", 73)); + epService.getEPRuntime().sendEvent(new OrderEvent("shirt", 74)); + + System.out.println("Sleeping"); + Thread.sleep(1000); + System.out.println("Done.."); + } +} + +class OrderEvent { + private String itemName; + private double price; + + public OrderEvent(String itemName, double price) { + this.itemName = itemName; + this.price = price; + } + + public String getItemName() { + return itemName; + } + + public double getPrice() { + return price; + } +} + +class MyListener implements UpdateListener { + public void update(EventBean[] newEvents, EventBean[] oldEvents) { + for (EventBean event : newEvents) { + System.out.println("event.getEventType() = " + event.getEventType()); + System.out.println("event.getUnderlying() = " + event.getUnderlying()); + System.out.println("avg=" + event.get("price")); + } + } +} -- cgit v1.2.3