功能:将同一个事件流分发到其它的事件流中。
格式:
1 | [context context_name] |
2 | on event_type[(filter_criteria)] [as stream_name] |
3 | insert into insert_into_def select select_list [where condition] |
4 | [insert into insert_into_def select select_list [where condition]] |
5 | [insert into...] |
6 | [output first | all] |
格式说明: 1、event_type:条件触发事件,filter_criteria为过滤条件,stream_name为事件别名; 2、insert子句:多个子句用空格分割开; 3、output first: 表示从第一个insert句子开始到最后一个,基础事件依次匹配其where条件,只要满足了某一个,那么插入到对应的事件流以后,这个基础事件就不再有机会和之后的where进行匹配,即不会再进入之后的事件流中,(默认模式); 4、output all: 表示基础事件和所有的insert句子的where进行匹配,能满足的就进入对应的事件流,即不会像first那样限制基础事件只能被输入到某一个流中; 5、select子句和where子句不能用聚合函数;
示例:
文件名:orderMainTest.java
1 | public class orderMainTest { |
2 | |
3 | public static void main(String[] args) throws InterruptedException { |
4 | |
5 | // 添加配置(包所在路劲),方面后面的引用自动添加包名前缀 |
6 | Configuration config = new Configuration(); |
7 | config.addEventTypeAutoName("cn.chenx.esper.split"); |
8 | |
9 | // |
10 | EPServiceProvider epServiceProvider = EPServiceProviderManager |
11 | .getDefaultProvider(config); |
12 | EPAdministrator epAdmin = epServiceProvider.getEPAdministrator(); |
13 | |
14 | // 创建window |
15 | String nwsql1 = "create window myWindow1.win:keepall() as (name String)"; |
16 | String nwsql2 = "create window myWindow2.win:keepall() as (name String)"; |
17 | String nwsql4 = "create window myWindow4.win:keepall() as (size int)"; |
18 | String nwsql5 = "create window myWindow5.win:keepall() as (size int)"; |
19 | // first |
20 | String insql1 = "insert into myWindow1 select key as name "; |
21 | String insql2 = "insert into myWindow2 select key as name where value=111 "; |
22 | String insql3 = "on orderBean "+ insql2 + insql1 + " "; |
23 | // all |
24 | String insql4 = "insert into myWindow4 select value as size "; |
25 | String insql5 = "insert into myWindow5 select value as size where value=111 "; |
26 | String insql6 = "on orderBean "+ insql4 + insql5 + " output all "; |
27 | // select |
28 | String stsql1 = "select * from myWindow1"; |
29 | String stsql2 = "select * from myWindow2"; |
30 | String stsql4 = "select * from myWindow4"; |
31 | String stsql5 = "select * from myWindow5"; |
32 | |
33 | System.out.println("first: " +insql3); |
34 | System.out.println("output: "+insql6); |
35 | // |
36 | epAdmin.createEPL(nwsql1); |
37 | epAdmin.createEPL(nwsql2); |
38 | epAdmin.createEPL(nwsql4); |
39 | epAdmin.createEPL(nwsql5); |
40 | |
41 | epAdmin.createEPL(insql3); |
42 | epAdmin.createEPL(insql6); |
43 | // |
44 | EPRuntime epRuntime = epServiceProvider.getEPRuntime(); |
45 | // |
46 | for (int i = 0; i < 10; i++) { |
47 | int seed = (int) (Math.random() * 100); |
48 | if (i > 4) |
49 | seed = 111; |
50 | else |
51 | seed = seed+1; |
52 | |
53 | orderBean b = new orderBean("hello World"+i,seed); |
54 | if (9 == i){ |
55 | b.setValue(110); |
56 | } |
57 | |
58 | System.out.println("key:"+b.getKey()+",value:"+b.getValue()); |
59 | epRuntime.sendEvent(b); |
60 | } |
61 | |
62 | |
63 | System.out.println("start Name Window select"); |
64 | EPOnDemandQueryResult sResult1=epRuntime.executeQuery(stsql1); |
65 | printResult(stsql1,sResult1); |
66 | EPOnDemandQueryResult sResult2=epRuntime.executeQuery(stsql2); |
67 | printResult(stsql2,sResult2); |
68 | EPOnDemandQueryResult sResult4=epRuntime.executeQuery(stsql4); |
69 | printResult(stsql4,sResult4); |
70 | EPOnDemandQueryResult sResult5=epRuntime.executeQuery(stsql5); |
71 | printResult(stsql5,sResult5); |
72 | } |
73 | |
74 | static public void printResult(String content,EPOnDemandQueryResult result){ |
75 | System.out.println("\n" + content); |
76 | EventBean[] events = result.getArray(); |
77 | for (int i = 0; i < events.length; i++) { |
78 | EventBean event = events[i]; |
79 | System.out.println(event.getUnderlying()); |
80 | } |
81 | } |
82 | } |