aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2012-12-09 11:08:13 +0100
committerTrygve Laugstøl <trygvis@inamo.no>2012-12-09 11:08:13 +0100
commit0f344aacc6e311aa581615ad8c1a7b01e41625da (patch)
tree38b86a91d0d1dc6c42374826ee4220883dbd2266
parent906019004f1cd50cdfe4f72c7aff234ea3577b8e (diff)
downloadesper-testing-0f344aacc6e311aa581615ad8c1a7b01e41625da.tar.gz
esper-testing-0f344aacc6e311aa581615ad8c1a7b01e41625da.tar.bz2
esper-testing-0f344aacc6e311aa581615ad8c1a7b01e41625da.tar.xz
esper-testing-0f344aacc6e311aa581615ad8c1a7b01e41625da.zip
o Some more Esper experiments.
-rwxr-xr-xsrc/main/java/io/trygvis/esper/testing/Main.java72
-rwxr-xr-xsrc/main/java/io/trygvis/esper/testing/esper/Main.java108
-rwxr-xr-xsrc/main/java/io/trygvis/esper/testing/esper/Main2.java77
3 files changed, 185 insertions, 72 deletions
diff --git a/src/main/java/io/trygvis/esper/testing/Main.java b/src/main/java/io/trygvis/esper/testing/Main.java
deleted file mode 100755
index 323e44f..0000000
--- a/src/main/java/io/trygvis/esper/testing/Main.java
+++ /dev/null
@@ -1,72 +0,0 @@
-package io.trygvis.esper.testing;
-
-import com.espertech.esper.client.*;
-
-public class Main {
-// private static final String JDBC_URL = "jdbc:h2:mem:esper;DB_CLOSE_DELAY=-1";
- private static final String JDBC_URL = "jdbc:h2:tcp://127.0.0.1/esper;DB_CLOSE_DELAY=-1";
-
- public static void main(String[] args) throws Exception {
- Config.loadFromDisk();
- Main main = new Main();
- main.work();
- }
-
- private void work() throws Exception {
- Configuration config = new Configuration();
-
- ConfigurationDBRef configurationDBRef = new ConfigurationDBRef();
- configurationDBRef.setDriverManagerConnection("org.h2.Driver", JDBC_URL, "", "");
- configurationDBRef.setConnectionAutoCommit(false);
- config.addDatabaseReference("db1", configurationDBRef);
- config.addEventTypeAutoName(getClass().getPackage().getName());
- EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(config);
-
-// String expression = "select avg(price) from OrderEvent.win:time(30 sec)";
-
- String expression = "select price, SUBSCRIBER from OrderEvent.win:time(30 sec)," +
- "sql:db1 ['select subscriber from subscription where itemName=${itemName}']";
-
- EPStatement statement = epService.getEPAdministrator().createEPL(expression);
-
- MyListener listener = new MyListener();
- statement.addListener(listener);
-
- System.out.println("Inserting events");
- epService.getEPRuntime().sendEvent(new OrderEvent("shirt", 72));
- epService.getEPRuntime().sendEvent(new OrderEvent("shirt", 73));
- epService.getEPRuntime().sendEvent(new OrderEvent("shirt", 74));
-
- System.out.println("Sleeping");
- Thread.sleep(1000);
- System.out.println("Done..");
- }
-}
-
-class OrderEvent {
- private String itemName;
- private double price;
-
- public OrderEvent(String itemName, double price) {
- this.itemName = itemName;
- this.price = price;
- }
-
- public String getItemName() {
- return itemName;
- }
-
- public double getPrice() {
- return price;
- }
-}
-
-class MyListener implements UpdateListener {
- public void update(EventBean[] newEvents, EventBean[] oldEvents) {
- for (EventBean event : newEvents) {
- System.out.println("event.getEventType() = " + event.getEventType());
- System.out.println("event.getUnderlying() = " + event.getUnderlying());
- System.out.println("avg=" + event.get("price"));
- }
- }
-}
diff --git a/src/main/java/io/trygvis/esper/testing/esper/Main.java b/src/main/java/io/trygvis/esper/testing/esper/Main.java
new file mode 100755
index 0000000..66b88df
--- /dev/null
+++ b/src/main/java/io/trygvis/esper/testing/esper/Main.java
@@ -0,0 +1,108 @@
+package io.trygvis.esper.testing.esper;
+
+import com.espertech.esper.client.*;
+import io.trygvis.esper.testing.*;
+import org.slf4j.*;
+
+import java.util.*;
+
+import static java.lang.Thread.sleep;
+
+public class Main {
+ private static final String JDBC_URL = "jdbc:h2:mem:esper;DB_CLOSE_DELAY=-1";
+// private static final String JDBC_URL = "jdbc:h2:tcp://127.0.0.1/esper;DB_CLOSE_DELAY=-1";
+
+ Logger logger;
+ EPRuntime runtime;
+
+ public static void main(String[] args) throws Exception {
+ Config.loadFromDisk();
+ Main main = new Main();
+ main.work();
+ }
+
+ private void work() throws Exception {
+ logger = LoggerFactory.getLogger("app");
+
+ Configuration config = new Configuration();
+
+ ConfigurationDBRef configurationDBRef = new ConfigurationDBRef();
+ configurationDBRef.setDriverManagerConnection("org.h2.Driver", JDBC_URL, "", "");
+ configurationDBRef.setConnectionAutoCommit(false);
+ config.addDatabaseReference("db1", configurationDBRef);
+ config.addEventTypeAutoName(getClass().getPackage().getName());
+ EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(config);
+ runtime = epService.getEPRuntime();
+ EPAdministrator administrator = epService.getEPAdministrator();
+
+// String expression = "select avg(price) from OrderEvent.win:time(30 sec)";
+// String expression = "select * from pattern [every b=NewBuildEvent() -> (timer:interval(3 seconds) and not NewBuildEvent(uuid = b.uuid))]";
+
+ String expression = "insert into BuildLastHour " +
+ "select uuid, " +
+ "sum(case result when 'SUCCESS' then 1 else 0 end) as successes, " +
+ "sum(case result when 'FAILURE' then 1 else 0 end) as failures " +
+ "from NewBuildEvent.win:time_batch(3 sec) group by uuid";
+
+ EPStatement statement = administrator.createEPL(expression);
+
+ MyListener listener = new MyListener();
+ statement.addListener(listener);
+
+ UUID job1 = UUID.fromString("11111111-1111-1111-1111-111111111111");
+ UUID job2 = UUID.fromString("22222222-2222-2222-2222-222222222222");
+
+ send(job1, "SUCCESS"); sleep(100);
+ send(job2, "SUCCESS"); sleep(100);
+ send(job2, "FAILURE"); sleep(100);
+ send(job2, "SUCCESS"); sleep(100);
+
+ Timer timer = new Timer();
+ timer.scheduleAtFixedRate(new TimerTask() {
+ @Override
+ public void run() {
+ logger.info("tick");
+ }
+ }, 0, 1000);
+
+ while(true) {
+ sleep(1000);
+ }
+ }
+
+ public void send(UUID job, String result) {
+ logger.info("Inserting " + result);
+ runtime.sendEvent(new NewBuildEvent(job, result));
+ }
+
+ class MyListener implements UpdateListener {
+ public void update(EventBean[] newEvents, EventBean[] oldEvents) {
+ for (EventBean event : newEvents) {
+ logger.info("new event");
+ EventType eventType = event.getEventType();
+
+ for (String name : eventType.getPropertyNames()) {
+ System.out.println(name + " = " + event.get(name));
+ }
+ }
+ }
+ }
+}
+
+class NewBuildEvent {
+ private UUID uuid;
+ private String result;
+
+ NewBuildEvent(UUID uuid, String result) {
+ this.uuid = uuid;
+ this.result = result;
+ }
+
+ public UUID getUuid() {
+ return uuid;
+ }
+
+ public String getResult() {
+ return result;
+ }
+}
diff --git a/src/main/java/io/trygvis/esper/testing/esper/Main2.java b/src/main/java/io/trygvis/esper/testing/esper/Main2.java
new file mode 100755
index 0000000..9c53a8f
--- /dev/null
+++ b/src/main/java/io/trygvis/esper/testing/esper/Main2.java
@@ -0,0 +1,77 @@
+package io.trygvis.esper.testing.esper;
+
+import com.espertech.esper.client.*;
+import io.trygvis.esper.testing.*;
+
+import javax.sql.*;
+import java.sql.*;
+import java.util.*;
+
+public class Main2 {
+ private static Config config;
+
+ @SuppressWarnings("UnusedDeclaration")
+ public static DataSource createDataSource(Properties p) throws SQLException {
+ return config.createBoneCp();
+ }
+
+ public static void main(String[] args) throws Exception {
+ config = Config.loadFromDisk();
+
+ Configuration c = new Configuration();
+
+ ConfigurationDBRef configurationDBRef = new ConfigurationDBRef();
+ configurationDBRef.setDataSourceFactory(new Properties(), Main2.class.getName());
+ configurationDBRef.setConnectionAutoCommit(true);
+ c.addDatabaseReference("db1", configurationDBRef);
+
+ c.addVariable("VarLastTimestamp", Long.class, Long.valueOf(0));
+
+// c.addEventTypeAutoName(getClass().getPackage().getName());
+
+ EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(c);
+
+// String expression = "select avg(price) from OrderEvent.win:time(30 sec)";
+
+ EPAdministrator administrator = epService.getEPAdministrator();
+
+ EPStatement statement = administrator.createEPL("insert into JenkinsBuild " +
+ "select uuid, job, result from pattern [every timer:interval(1 sec)], " +
+ "sql:db1 ['SELECT uuid, job, result FROM jenkins_build WHERE extract(epoch from created_date) > ${VarLastTimestamp}']");
+
+ JenkinsBuildListener listener = new JenkinsBuildListener();
+ statement.addListener(listener);
+
+ administrator.createEPL("on JenkinsBuild set VarLastTimestamp = current_timestamp()");
+
+ administrator.createEPL("every build=JenkinsBuild(result='FAILURE') -> ((JenkinsBuild(job=build.job, result='FAILURE') and not Sample(sensor=sample.sensor, temp <= 50)) -> (Sample(sensor=sample.sensor, temp > 50) and not Sample(sensor=sample.sensor, temp <= 50))) where timer:within(90 seconds))\n");
+
+ while(true) {
+ Thread.sleep(10000);
+ }
+ }
+}
+
+class JenkinsBuildListener implements UpdateListener {
+ public void update(EventBean[] newEvents, EventBean[] oldEvents) {
+ for (EventBean event : newEvents) {
+ System.out.println("uuid=" + event.get("uuid"));
+ }
+ }
+}
+
+/*
+every build=JenkinsBuild(result='FAILURE') ->
+ (
+ (
+ Sample(sensor=sample.sensor, temp > 50) and not
+ Sample(sensor=sample.sensor, temp <= 50)
+ ) ->
+ (
+ Sample(sensor=sample.sensor, temp > 50) and not
+ Sample(sensor=sample.sensor, temp <= 50)
+ )
+ )
+where timer:within(90 seconds))
+
+*/