From 0f344aacc6e311aa581615ad8c1a7b01e41625da Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sun, 9 Dec 2012 11:08:13 +0100 Subject: o Some more Esper experiments. --- src/main/java/io/trygvis/esper/testing/Main.java | 72 -------------- .../java/io/trygvis/esper/testing/esper/Main.java | 108 +++++++++++++++++++++ .../java/io/trygvis/esper/testing/esper/Main2.java | 77 +++++++++++++++ 3 files changed, 185 insertions(+), 72 deletions(-) delete mode 100755 src/main/java/io/trygvis/esper/testing/Main.java create mode 100755 src/main/java/io/trygvis/esper/testing/esper/Main.java create mode 100755 src/main/java/io/trygvis/esper/testing/esper/Main2.java (limited to 'src/main/java/io/trygvis') diff --git a/src/main/java/io/trygvis/esper/testing/Main.java b/src/main/java/io/trygvis/esper/testing/Main.java deleted file mode 100755 index 323e44f..0000000 --- a/src/main/java/io/trygvis/esper/testing/Main.java +++ /dev/null @@ -1,72 +0,0 @@ -package io.trygvis.esper.testing; - -import com.espertech.esper.client.*; - -public class Main { -// private static final String JDBC_URL = "jdbc:h2:mem:esper;DB_CLOSE_DELAY=-1"; - private static final String JDBC_URL = "jdbc:h2:tcp://127.0.0.1/esper;DB_CLOSE_DELAY=-1"; - - public static void main(String[] args) throws Exception { - Config.loadFromDisk(); - Main main = new Main(); - main.work(); - } - - private void work() throws Exception { - Configuration config = new Configuration(); - - ConfigurationDBRef configurationDBRef = new ConfigurationDBRef(); - configurationDBRef.setDriverManagerConnection("org.h2.Driver", JDBC_URL, "", ""); - configurationDBRef.setConnectionAutoCommit(false); - config.addDatabaseReference("db1", configurationDBRef); - config.addEventTypeAutoName(getClass().getPackage().getName()); - EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(config); - -// String expression = "select avg(price) from OrderEvent.win:time(30 sec)"; - - String expression = "select price, SUBSCRIBER from OrderEvent.win:time(30 sec)," + - "sql:db1 ['select subscriber from subscription where itemName=${itemName}']"; - - EPStatement statement = epService.getEPAdministrator().createEPL(expression); - - MyListener listener = new MyListener(); - statement.addListener(listener); - - System.out.println("Inserting events"); - epService.getEPRuntime().sendEvent(new OrderEvent("shirt", 72)); - epService.getEPRuntime().sendEvent(new OrderEvent("shirt", 73)); - epService.getEPRuntime().sendEvent(new OrderEvent("shirt", 74)); - - System.out.println("Sleeping"); - Thread.sleep(1000); - System.out.println("Done.."); - } -} - -class OrderEvent { - private String itemName; - private double price; - - public OrderEvent(String itemName, double price) { - this.itemName = itemName; - this.price = price; - } - - public String getItemName() { - return itemName; - } - - public double getPrice() { - return price; - } -} - -class MyListener implements UpdateListener { - public void update(EventBean[] newEvents, EventBean[] oldEvents) { - for (EventBean event : newEvents) { - System.out.println("event.getEventType() = " + event.getEventType()); - System.out.println("event.getUnderlying() = " + event.getUnderlying()); - System.out.println("avg=" + event.get("price")); - } - } -} diff --git a/src/main/java/io/trygvis/esper/testing/esper/Main.java b/src/main/java/io/trygvis/esper/testing/esper/Main.java new file mode 100755 index 0000000..66b88df --- /dev/null +++ b/src/main/java/io/trygvis/esper/testing/esper/Main.java @@ -0,0 +1,108 @@ +package io.trygvis.esper.testing.esper; + +import com.espertech.esper.client.*; +import io.trygvis.esper.testing.*; +import org.slf4j.*; + +import java.util.*; + +import static java.lang.Thread.sleep; + +public class Main { + private static final String JDBC_URL = "jdbc:h2:mem:esper;DB_CLOSE_DELAY=-1"; +// private static final String JDBC_URL = "jdbc:h2:tcp://127.0.0.1/esper;DB_CLOSE_DELAY=-1"; + + Logger logger; + EPRuntime runtime; + + public static void main(String[] args) throws Exception { + Config.loadFromDisk(); + Main main = new Main(); + main.work(); + } + + private void work() throws Exception { + logger = LoggerFactory.getLogger("app"); + + Configuration config = new Configuration(); + + ConfigurationDBRef configurationDBRef = new ConfigurationDBRef(); + configurationDBRef.setDriverManagerConnection("org.h2.Driver", JDBC_URL, "", ""); + configurationDBRef.setConnectionAutoCommit(false); + config.addDatabaseReference("db1", configurationDBRef); + config.addEventTypeAutoName(getClass().getPackage().getName()); + EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(config); + runtime = epService.getEPRuntime(); + EPAdministrator administrator = epService.getEPAdministrator(); + +// String expression = "select avg(price) from OrderEvent.win:time(30 sec)"; +// String expression = "select * from pattern [every b=NewBuildEvent() -> (timer:interval(3 seconds) and not NewBuildEvent(uuid = b.uuid))]"; + + String expression = "insert into BuildLastHour " + + "select uuid, " + + "sum(case result when 'SUCCESS' then 1 else 0 end) as successes, " + + "sum(case result when 'FAILURE' then 1 else 0 end) as failures " + + "from NewBuildEvent.win:time_batch(3 sec) group by uuid"; + + EPStatement statement = administrator.createEPL(expression); + + MyListener listener = new MyListener(); + statement.addListener(listener); + + UUID job1 = UUID.fromString("11111111-1111-1111-1111-111111111111"); + UUID job2 = UUID.fromString("22222222-2222-2222-2222-222222222222"); + + send(job1, "SUCCESS"); sleep(100); + send(job2, "SUCCESS"); sleep(100); + send(job2, "FAILURE"); sleep(100); + send(job2, "SUCCESS"); sleep(100); + + Timer timer = new Timer(); + timer.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + logger.info("tick"); + } + }, 0, 1000); + + while(true) { + sleep(1000); + } + } + + public void send(UUID job, String result) { + logger.info("Inserting " + result); + runtime.sendEvent(new NewBuildEvent(job, result)); + } + + class MyListener implements UpdateListener { + public void update(EventBean[] newEvents, EventBean[] oldEvents) { + for (EventBean event : newEvents) { + logger.info("new event"); + EventType eventType = event.getEventType(); + + for (String name : eventType.getPropertyNames()) { + System.out.println(name + " = " + event.get(name)); + } + } + } + } +} + +class NewBuildEvent { + private UUID uuid; + private String result; + + NewBuildEvent(UUID uuid, String result) { + this.uuid = uuid; + this.result = result; + } + + public UUID getUuid() { + return uuid; + } + + public String getResult() { + return result; + } +} diff --git a/src/main/java/io/trygvis/esper/testing/esper/Main2.java b/src/main/java/io/trygvis/esper/testing/esper/Main2.java new file mode 100755 index 0000000..9c53a8f --- /dev/null +++ b/src/main/java/io/trygvis/esper/testing/esper/Main2.java @@ -0,0 +1,77 @@ +package io.trygvis.esper.testing.esper; + +import com.espertech.esper.client.*; +import io.trygvis.esper.testing.*; + +import javax.sql.*; +import java.sql.*; +import java.util.*; + +public class Main2 { + private static Config config; + + @SuppressWarnings("UnusedDeclaration") + public static DataSource createDataSource(Properties p) throws SQLException { + return config.createBoneCp(); + } + + public static void main(String[] args) throws Exception { + config = Config.loadFromDisk(); + + Configuration c = new Configuration(); + + ConfigurationDBRef configurationDBRef = new ConfigurationDBRef(); + configurationDBRef.setDataSourceFactory(new Properties(), Main2.class.getName()); + configurationDBRef.setConnectionAutoCommit(true); + c.addDatabaseReference("db1", configurationDBRef); + + c.addVariable("VarLastTimestamp", Long.class, Long.valueOf(0)); + +// c.addEventTypeAutoName(getClass().getPackage().getName()); + + EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(c); + +// String expression = "select avg(price) from OrderEvent.win:time(30 sec)"; + + EPAdministrator administrator = epService.getEPAdministrator(); + + EPStatement statement = administrator.createEPL("insert into JenkinsBuild " + + "select uuid, job, result from pattern [every timer:interval(1 sec)], " + + "sql:db1 ['SELECT uuid, job, result FROM jenkins_build WHERE extract(epoch from created_date) > ${VarLastTimestamp}']"); + + JenkinsBuildListener listener = new JenkinsBuildListener(); + statement.addListener(listener); + + administrator.createEPL("on JenkinsBuild set VarLastTimestamp = current_timestamp()"); + + administrator.createEPL("every build=JenkinsBuild(result='FAILURE') -> ((JenkinsBuild(job=build.job, result='FAILURE') and not Sample(sensor=sample.sensor, temp <= 50)) -> (Sample(sensor=sample.sensor, temp > 50) and not Sample(sensor=sample.sensor, temp <= 50))) where timer:within(90 seconds))\n"); + + while(true) { + Thread.sleep(10000); + } + } +} + +class JenkinsBuildListener implements UpdateListener { + public void update(EventBean[] newEvents, EventBean[] oldEvents) { + for (EventBean event : newEvents) { + System.out.println("uuid=" + event.get("uuid")); + } + } +} + +/* +every build=JenkinsBuild(result='FAILURE') -> + ( + ( + Sample(sensor=sample.sensor, temp > 50) and not + Sample(sensor=sample.sensor, temp <= 50) + ) -> + ( + Sample(sensor=sample.sensor, temp > 50) and not + Sample(sensor=sample.sensor, temp <= 50) + ) + ) +where timer:within(90 seconds)) + +*/ -- cgit v1.2.3