1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 package com.reuters.msgtest;
36
37 import com.tibco.tibrv.Tibrv;
38 import com.tibco.tibrv.TibrvDispatcher;
39 import com.tibco.tibrv.TibrvException;
40 import com.tibco.tibrv.TibrvListener;
41 import com.tibco.tibrv.TibrvMsg;
42 import com.tibco.tibrv.TibrvMsgCallback;
43 import com.tibco.tibrv.TibrvQueue;
44 import com.tibco.tibrv.TibrvRvdTransport;
45 import com.tibco.tibrv.TibrvTransport;
46 import org.apache.commons.logging.Log;
47 import org.apache.commons.logging.LogFactory;
48
49
50 /***
51 * This mimics a simple server component that enriches the incoming message with
52 * additional data. By default the server listens on the subject MSGTEST.INPUT.
53 * Other subjects can be specified as command line arguments. The two tibrvfields
54 * that are added to the incoming message are ("OrderID",3.14) and
55 * (TradingGroup,"FX"). The message is then sent on the subject MSGTEST.OUTPUT.
56 * The Stimuli and Responses are setup to expect this processing flow.
57 *
58 * @author <a href="mailto:mpollack@MEME"> </a>
59 * @version
60 */
61 public class EnrichmentServer implements TibrvMsgCallback {
62 private Log log = LogFactory.getLog(EnrichmentServer.class);
63 private boolean started;
64 private TibrvDispatcher dispatcher;
65 private String[] subjects;
66
67 public EnrichmentServer() throws Exception {
68 this(new String[0]);
69 }
70
71 public EnrichmentServer(String[] subjects) throws Exception {
72 this.subjects = subjects;
73
74 startup();
75 }
76
77 /***
78 * @throws TibrvException
79 */
80 public void startup() throws TibrvException {
81 Tibrv.open();
82
83 TibrvTransport rvTransport = new TibrvRvdTransport();
84 TibrvQueue rvQueue = new TibrvQueue();
85 dispatcher = new TibrvDispatcher(rvQueue);
86
87 if (this.subjects.length > 0) {
88 for (int i = 0; i < subjects.length; i++) {
89 new TibrvListener(rvQueue, this, rvTransport, subjects[i], null);
90 log.debug("Listening on " + subjects[i]);
91 }
92 } else {
93 log.debug("Listening on MSGTEST.INPUT");
94 new TibrvListener(rvQueue, this, rvTransport, "MSGTEST.INPUT", null);
95 }
96
97 setStarted(true);
98 }
99
100 public void shutdown() {
101 dispatcher.interrupt();
102 setStarted(false);
103 }
104
105 /***
106 * Add on the additional tibrvfields ("OrderID",3.14) and (TradingGroup,"FX") and
107 * publish on the subject MSGTEST.OUTPUT
108 *
109 * @param listener
110 * a <code>TibrvListener</code> value
111 * @param msg
112 * a <code>TibrvMsg</code> value
113 */
114 public void onMsg(TibrvListener listener, TibrvMsg msg) {
115 try {
116 TibrvMsg reply = new TibrvMsg(msg);
117
118 if (msg.getReplySubject() != null) {
119 reply.setSendSubject(msg.getReplySubject());
120 } else {
121 reply.setSendSubject("MSGTEST.OUTPUT");
122 }
123
124 reply.add("OrderId", 3.14F);
125 reply.add("TradingGroup", "FX");
126 listener.getTransport().send(reply);
127 } catch (TibrvException e) {
128 log.error("error in enrinchment server", e);
129 }
130 }
131
132 /***
133 * Run the server.
134 *
135 * @param args
136 * Subjects that the server should listen on. The default subject
137 * is MSGTEST.INPUT
138 */
139 public static void main(String[] args) throws Exception {
140 new EnrichmentServer(args);
141 }
142
143 public boolean isStarted() {
144 return started;
145 }
146
147 public void setStarted(boolean started) {
148 this.started = started;
149 }
150 }