From db068714e64bcb1b0e78279287ea891d29f687f7 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sun, 9 Dec 2012 19:07:41 +0100 Subject: o Some more Esper experiments. --- .../esper/testing/esper/GenericListener.java | 24 +++++ .../java/io/trygvis/esper/testing/esper/Main.java | 108 -------------------- .../java/io/trygvis/esper/testing/esper/Main2.java | 77 --------------- .../java/io/trygvis/esper/testing/esper/Test1.java | 110 +++++++++++++++++++++ .../java/io/trygvis/esper/testing/esper/Test2.java | 70 +++++++++++++ 5 files changed, 204 insertions(+), 185 deletions(-) create mode 100644 src/main/java/io/trygvis/esper/testing/esper/GenericListener.java delete mode 100755 src/main/java/io/trygvis/esper/testing/esper/Main.java delete mode 100755 src/main/java/io/trygvis/esper/testing/esper/Main2.java create mode 100755 src/main/java/io/trygvis/esper/testing/esper/Test1.java create mode 100755 src/main/java/io/trygvis/esper/testing/esper/Test2.java diff --git a/src/main/java/io/trygvis/esper/testing/esper/GenericListener.java b/src/main/java/io/trygvis/esper/testing/esper/GenericListener.java new file mode 100644 index 0000000..c7922e0 --- /dev/null +++ b/src/main/java/io/trygvis/esper/testing/esper/GenericListener.java @@ -0,0 +1,24 @@ +package io.trygvis.esper.testing.esper; + +import com.espertech.esper.client.*; +import org.slf4j.*; + +public class GenericListener implements UpdateListener { + + private final Logger logger; + + public GenericListener(Logger logger) { + this.logger = logger; + } + + public void update(EventBean[] newEvents, EventBean[] oldEvents) { + for (EventBean event : newEvents) { + logger.info("new event"); + EventType eventType = event.getEventType(); + + for (String name : eventType.getPropertyNames()) { + System.out.println(name + " = " + event.get(name)); + } + } + } +} diff --git a/src/main/java/io/trygvis/esper/testing/esper/Main.java b/src/main/java/io/trygvis/esper/testing/esper/Main.java deleted file mode 100755 index 66b88df..0000000 --- a/src/main/java/io/trygvis/esper/testing/esper/Main.java +++ /dev/null @@ -1,108 +0,0 @@ -package io.trygvis.esper.testing.esper; - -import com.espertech.esper.client.*; -import io.trygvis.esper.testing.*; -import org.slf4j.*; - -import java.util.*; - -import static java.lang.Thread.sleep; - -public class Main { - private static final String JDBC_URL = "jdbc:h2:mem:esper;DB_CLOSE_DELAY=-1"; -// private static final String JDBC_URL = "jdbc:h2:tcp://127.0.0.1/esper;DB_CLOSE_DELAY=-1"; - - Logger logger; - EPRuntime runtime; - - public static void main(String[] args) throws Exception { - Config.loadFromDisk(); - Main main = new Main(); - main.work(); - } - - private void work() throws Exception { - logger = LoggerFactory.getLogger("app"); - - Configuration config = new Configuration(); - - ConfigurationDBRef configurationDBRef = new ConfigurationDBRef(); - configurationDBRef.setDriverManagerConnection("org.h2.Driver", JDBC_URL, "", ""); - configurationDBRef.setConnectionAutoCommit(false); - config.addDatabaseReference("db1", configurationDBRef); - config.addEventTypeAutoName(getClass().getPackage().getName()); - EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(config); - runtime = epService.getEPRuntime(); - EPAdministrator administrator = epService.getEPAdministrator(); - -// String expression = "select avg(price) from OrderEvent.win:time(30 sec)"; -// String expression = "select * from pattern [every b=NewBuildEvent() -> (timer:interval(3 seconds) and not NewBuildEvent(uuid = b.uuid))]"; - - String expression = "insert into BuildLastHour " + - "select uuid, " + - "sum(case result when 'SUCCESS' then 1 else 0 end) as successes, " + - "sum(case result when 'FAILURE' then 1 else 0 end) as failures " + - "from NewBuildEvent.win:time_batch(3 sec) group by uuid"; - - EPStatement statement = administrator.createEPL(expression); - - MyListener listener = new MyListener(); - statement.addListener(listener); - - UUID job1 = UUID.fromString("11111111-1111-1111-1111-111111111111"); - UUID job2 = UUID.fromString("22222222-2222-2222-2222-222222222222"); - - send(job1, "SUCCESS"); sleep(100); - send(job2, "SUCCESS"); sleep(100); - send(job2, "FAILURE"); sleep(100); - send(job2, "SUCCESS"); sleep(100); - - Timer timer = new Timer(); - timer.scheduleAtFixedRate(new TimerTask() { - @Override - public void run() { - logger.info("tick"); - } - }, 0, 1000); - - while(true) { - sleep(1000); - } - } - - public void send(UUID job, String result) { - logger.info("Inserting " + result); - runtime.sendEvent(new NewBuildEvent(job, result)); - } - - class MyListener implements UpdateListener { - public void update(EventBean[] newEvents, EventBean[] oldEvents) { - for (EventBean event : newEvents) { - logger.info("new event"); - EventType eventType = event.getEventType(); - - for (String name : eventType.getPropertyNames()) { - System.out.println(name + " = " + event.get(name)); - } - } - } - } -} - -class NewBuildEvent { - private UUID uuid; - private String result; - - NewBuildEvent(UUID uuid, String result) { - this.uuid = uuid; - this.result = result; - } - - public UUID getUuid() { - return uuid; - } - - public String getResult() { - return result; - } -} diff --git a/src/main/java/io/trygvis/esper/testing/esper/Main2.java b/src/main/java/io/trygvis/esper/testing/esper/Main2.java deleted file mode 100755 index 9c53a8f..0000000 --- a/src/main/java/io/trygvis/esper/testing/esper/Main2.java +++ /dev/null @@ -1,77 +0,0 @@ -package io.trygvis.esper.testing.esper; - -import com.espertech.esper.client.*; -import io.trygvis.esper.testing.*; - -import javax.sql.*; -import java.sql.*; -import java.util.*; - -public class Main2 { - private static Config config; - - @SuppressWarnings("UnusedDeclaration") - public static DataSource createDataSource(Properties p) throws SQLException { - return config.createBoneCp(); - } - - public static void main(String[] args) throws Exception { - config = Config.loadFromDisk(); - - Configuration c = new Configuration(); - - ConfigurationDBRef configurationDBRef = new ConfigurationDBRef(); - configurationDBRef.setDataSourceFactory(new Properties(), Main2.class.getName()); - configurationDBRef.setConnectionAutoCommit(true); - c.addDatabaseReference("db1", configurationDBRef); - - c.addVariable("VarLastTimestamp", Long.class, Long.valueOf(0)); - -// c.addEventTypeAutoName(getClass().getPackage().getName()); - - EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(c); - -// String expression = "select avg(price) from OrderEvent.win:time(30 sec)"; - - EPAdministrator administrator = epService.getEPAdministrator(); - - EPStatement statement = administrator.createEPL("insert into JenkinsBuild " + - "select uuid, job, result from pattern [every timer:interval(1 sec)], " + - "sql:db1 ['SELECT uuid, job, result FROM jenkins_build WHERE extract(epoch from created_date) > ${VarLastTimestamp}']"); - - JenkinsBuildListener listener = new JenkinsBuildListener(); - statement.addListener(listener); - - administrator.createEPL("on JenkinsBuild set VarLastTimestamp = current_timestamp()"); - - administrator.createEPL("every build=JenkinsBuild(result='FAILURE') -> ((JenkinsBuild(job=build.job, result='FAILURE') and not Sample(sensor=sample.sensor, temp <= 50)) -> (Sample(sensor=sample.sensor, temp > 50) and not Sample(sensor=sample.sensor, temp <= 50))) where timer:within(90 seconds))\n"); - - while(true) { - Thread.sleep(10000); - } - } -} - -class JenkinsBuildListener implements UpdateListener { - public void update(EventBean[] newEvents, EventBean[] oldEvents) { - for (EventBean event : newEvents) { - System.out.println("uuid=" + event.get("uuid")); - } - } -} - -/* -every build=JenkinsBuild(result='FAILURE') -> - ( - ( - Sample(sensor=sample.sensor, temp > 50) and not - Sample(sensor=sample.sensor, temp <= 50) - ) -> - ( - Sample(sensor=sample.sensor, temp > 50) and not - Sample(sensor=sample.sensor, temp <= 50) - ) - ) -where timer:within(90 seconds)) - -*/ diff --git a/src/main/java/io/trygvis/esper/testing/esper/Test1.java b/src/main/java/io/trygvis/esper/testing/esper/Test1.java new file mode 100755 index 0000000..842dcb3 --- /dev/null +++ b/src/main/java/io/trygvis/esper/testing/esper/Test1.java @@ -0,0 +1,110 @@ +package io.trygvis.esper.testing.esper; + +import com.espertech.esper.client.*; +import io.trygvis.esper.testing.*; +import org.slf4j.*; + +import java.util.*; + +import static java.lang.Thread.sleep; + +public class Test1 { + private static final String JDBC_URL = "jdbc:h2:mem:esper;DB_CLOSE_DELAY=-1"; +// private static final String JDBC_URL = "jdbc:h2:tcp://127.0.0.1/esper;DB_CLOSE_DELAY=-1"; + + Logger logger; + EPRuntime runtime; + + public static void main(String[] args) throws Exception { + Config.loadFromDisk(); + new Test1().work(); + } + + private void work() throws Exception { + logger = LoggerFactory.getLogger("app"); + + Configuration config = new Configuration(); + + ConfigurationDBRef configurationDBRef = new ConfigurationDBRef(); + configurationDBRef.setDriverManagerConnection("org.h2.Driver", JDBC_URL, "", ""); + configurationDBRef.setConnectionAutoCommit(false); + config.addDatabaseReference("db1", configurationDBRef); + config.addEventTypeAutoName(getClass().getPackage().getName()); + EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(config); + runtime = epService.getEPRuntime(); + EPAdministrator administrator = epService.getEPAdministrator(); + +// String expression = "select avg(price) from OrderEvent.win:time(30 sec)"; +// String expression = "select * from pattern [every b=NewBuildEvent() -> (timer:interval(3 seconds) and not NewBuildEvent(uuid = b.uuid))]"; + +// String expression = "insert into BuildLastHour " + +// "select job, " + +// "sum(case result when 'SUCCESS' then 1 else 0 end) as successes, " + +// "sum(case result when 'FAILURE' then 1 else 0 end) as failures " + +// "from NewBuildEvent.win:time_batch(3 sec) group by job"; + + String expression = "insert into BuildLastHour " + + "select job, " + + "sum(case result when 'SUCCESS' then 1 else 0 end) as successes, " + + "sum(case result when 'FAILURE' then 1 else 0 end) as failures " + + "from NewBuildEvent.win:length(10) " + + "group by job " + + "having " + + "sum(case result when 'SUCCESS' then 1 else 0 end) > 1 AND " + + "count(result) > 3"; + + EPStatement statement = administrator.createEPL(expression); + + statement.addListener(new GenericListener(logger)); + + UUID job1 = UUID.fromString("11111111-1111-1111-1111-111111111111"); + UUID job2 = UUID.fromString("22222222-2222-2222-2222-222222222222"); + + send(job1, "SUCCESS"); sleep(100); + send(job2, "SUCCESS"); sleep(100); + send(job2, "FAILURE"); sleep(100); + send(job2, "SUCCESS"); sleep(100); + send(job2, "SUCCESS"); sleep(100); + + Timer timer = new Timer(); + timer.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + logger.info("tick"); + } + }, 0, 1000); + + while(true) { + sleep(1000); + } + } + + public void send(UUID job, String result) { + logger.info("Inserting " + result); + runtime.sendEvent(new NewBuildEvent(job, UUID.randomUUID(), result)); + } + + class NewBuildEvent { + private UUID job; + private UUID uuid; + private String result; + + NewBuildEvent(UUID job, UUID uuid, String result) { + this.job = job; + this.uuid = uuid; + this.result = result; + } + + public UUID getJob() { + return job; + } + + public UUID getUuid() { + return uuid; + } + + public String getResult() { + return result; + } + } +} diff --git a/src/main/java/io/trygvis/esper/testing/esper/Test2.java b/src/main/java/io/trygvis/esper/testing/esper/Test2.java new file mode 100755 index 0000000..ecd276a --- /dev/null +++ b/src/main/java/io/trygvis/esper/testing/esper/Test2.java @@ -0,0 +1,70 @@ +package io.trygvis.esper.testing.esper; + +import com.espertech.esper.client.*; +import io.trygvis.esper.testing.*; +import org.slf4j.*; + +import javax.sql.*; +import java.sql.*; +import java.util.*; + +public class Test2 { + private static Config config; + + @SuppressWarnings("UnusedDeclaration") + public static DataSource createDataSource(Properties p) throws SQLException { + return config.createBoneCp(); + } + + public static void main(String[] args) throws Exception { + config = Config.loadFromDisk(); + + Configuration c = new Configuration(); + + ConfigurationDBRef configurationDBRef = new ConfigurationDBRef(); + configurationDBRef.setDataSourceFactory(new Properties(), Test2.class.getName()); + configurationDBRef.setConnectionAutoCommit(true); + c.addDatabaseReference("db1", configurationDBRef); + + c.addVariable("VarLastTimestamp", Long.class, Long.valueOf(0)); + +// c.addEventTypeAutoName(getClass().getPackage().getName()); + + EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(c); + +// String expression = "select avg(price) from OrderEvent.win:time(30 sec)"; + + EPAdministrator administrator = epService.getEPAdministrator(); + + EPStatement statement = administrator.createEPL("insert into JenkinsBuild " + + "select uuid, job, result from pattern [every timer:interval(1 sec)], " + + "sql:db1 ['SELECT uuid, job, result FROM jenkins_build WHERE extract(epoch from created_date) > ${VarLastTimestamp}']"); + + Logger logger = LoggerFactory.getLogger("app"); + + statement.addListener(new GenericListener(logger)); + + administrator.createEPL("on JenkinsBuild set VarLastTimestamp = current_timestamp()"); + + administrator.createEPL("every build=JenkinsBuild(result='FAILURE') -> ((JenkinsBuild(job=build.job, result='FAILURE') and not Sample(sensor=sample.sensor, temp <= 50)) -> (Sample(sensor=sample.sensor, temp > 50) and not Sample(sensor=sample.sensor, temp <= 50))) where timer:within(90 seconds))\n"); + + while(true) { + Thread.sleep(10000); + } + } +} + +/* +every build=JenkinsBuild(result='FAILURE') -> + ( + ( + Sample(sensor=sample.sensor, temp > 50) and not + Sample(sensor=sample.sensor, temp <= 50) + ) -> + ( + Sample(sensor=sample.sensor, temp > 50) and not + Sample(sensor=sample.sensor, temp <= 50) + ) + ) +where timer:within(90 seconds)) +*/ -- cgit v1.2.3