Commit | Line | Data |
---|---|---|
594bd3a0 GB |
1 | From 5803aadd3f209eba1ffbb2cf7bf16778019dbee1 Mon Sep 17 00:00:00 2001 |
2 | From: fredoboulo <fredoboulo@users.noreply.github.com> | |
3 | Date: Fri, 23 Feb 2018 23:55:57 +0100 | |
4 | Subject: [PATCH] Fix #524 : V1 and V2 protocol downgrades handle received data | |
5 | in handshake buffer | |
6 | ||
7 | This patch is upstream pull request, see: | |
8 | https://gihub.com/zeromq/jeromq/pull/527. | |
9 | ||
10 | It is merged on commit c2afa9c, and we can drop it on the | |
11 | 0.4.4 release. | |
12 | ||
13 | --- | |
14 | src/main/java/zmq/io/StreamEngine.java | 21 ++++++++++-- | |
15 | src/test/java/zmq/io/AbstractProtocolVersion.java | 41 +++++++++++++---------- | |
16 | src/test/java/zmq/io/V0ProtocolTest.java | 12 +++++++ | |
17 | src/test/java/zmq/io/V1ProtocolTest.java | 16 +++++++-- | |
18 | src/test/java/zmq/io/V2ProtocolTest.java | 16 +++++++-- | |
19 | 5 files changed, 81 insertions(+), 25 deletions(-) | |
20 | ||
21 | diff --git a/src/main/java/zmq/io/StreamEngine.java b/src/main/java/zmq/io/StreamEngine.java | |
22 | index b8933c92..fe2f2d8d 100644 | |
23 | --- a/src/main/java/zmq/io/StreamEngine.java | |
24 | +++ b/src/main/java/zmq/io/StreamEngine.java | |
25 | @@ -816,9 +816,7 @@ private boolean handshake() | |
26 | assert (bufferSize == headerSize); | |
27 | ||
28 | // Make sure the decoder sees the data we have already received. | |
29 | - greetingRecv.flip(); | |
30 | - inpos = greetingRecv; | |
31 | - insize = greetingRecv.limit(); | |
32 | + decodeDataAfterHandshake(0); | |
33 | ||
34 | // To allow for interoperability with peers that do not forward | |
35 | // their subscriptions, we inject a phantom subscription message | |
36 | @@ -846,6 +844,8 @@ else if (greetingRecv.get(revisionPos) == Protocol.V1.revision) { | |
37 | } | |
38 | encoder = new V1Encoder(errno, Config.OUT_BATCH_SIZE.getValue()); | |
39 | decoder = new V1Decoder(errno, Config.IN_BATCH_SIZE.getValue(), options.maxMsgSize, options.allocator); | |
40 | + | |
41 | + decodeDataAfterHandshake(V2_GREETING_SIZE); | |
42 | } | |
43 | else if (greetingRecv.get(revisionPos) == Protocol.V2.revision) { | |
44 | // ZMTP/2.0 framing. | |
45 | @@ -859,6 +859,8 @@ else if (greetingRecv.get(revisionPos) == Protocol.V2.revision) { | |
46 | } | |
47 | encoder = new V2Encoder(errno, Config.OUT_BATCH_SIZE.getValue()); | |
48 | decoder = new V2Decoder(errno, Config.IN_BATCH_SIZE.getValue(), options.maxMsgSize, options.allocator); | |
49 | + | |
50 | + decodeDataAfterHandshake(V2_GREETING_SIZE); | |
51 | } | |
52 | else { | |
53 | zmtpVersion = Protocol.V3; | |
54 | @@ -904,6 +906,19 @@ else if (greetingRecv.get(revisionPos) == Protocol.V2.revision) { | |
55 | return true; | |
56 | } | |
57 | ||
58 | + private void decodeDataAfterHandshake(int greetingSize) | |
59 | + { | |
60 | + final int pos = greetingRecv.position(); | |
61 | + if (pos > greetingSize) { | |
62 | + // data is present after handshake | |
63 | + greetingRecv.position(greetingSize).limit(pos); | |
64 | + | |
65 | + // Make sure the decoder sees this extra data. | |
66 | + inpos = greetingRecv; | |
67 | + insize = greetingRecv.remaining(); | |
68 | + } | |
69 | + } | |
70 | + | |
71 | private Msg identityMsg() | |
72 | { | |
73 | Msg msg = new Msg(options.identitySize); | |
74 | diff --git a/src/test/java/zmq/io/AbstractProtocolVersion.java b/src/test/java/zmq/io/AbstractProtocolVersion.java | |
75 | index e60db403..aa06b4a7 100644 | |
76 | --- a/src/test/java/zmq/io/AbstractProtocolVersion.java | |
77 | +++ b/src/test/java/zmq/io/AbstractProtocolVersion.java | |
78 | @@ -18,15 +18,18 @@ | |
79 | import zmq.SocketBase; | |
80 | import zmq.ZError; | |
81 | import zmq.ZMQ; | |
82 | +import zmq.ZMQ.Event; | |
83 | import zmq.util.Utils; | |
84 | ||
85 | public abstract class AbstractProtocolVersion | |
86 | { | |
87 | + protected static final int REPETITIONS = 1000; | |
88 | + | |
89 | static class SocketMonitor extends Thread | |
90 | { | |
91 | - private final Ctx ctx; | |
92 | - private final String monitorAddr; | |
93 | - private final List<ZMQ.Event> events = new ArrayList<>(); | |
94 | + private final Ctx ctx; | |
95 | + private final String monitorAddr; | |
96 | + private final ZMQ.Event[] events = new ZMQ.Event[1]; | |
97 | ||
98 | public SocketMonitor(Ctx ctx, String monitorAddr) | |
99 | { | |
100 | @@ -41,15 +44,15 @@ public void run() | |
101 | boolean rc = s.connect(monitorAddr); | |
102 | assertThat(rc, is(true)); | |
103 | // Only some of the exceptional events could fire | |
104 | - while (true) { | |
105 | - ZMQ.Event event = ZMQ.Event.read(s); | |
106 | - if (event == null && s.errno() == ZError.ETERM) { | |
107 | - break; | |
108 | - } | |
109 | - assertThat(event, notNullValue()); | |
110 | - | |
111 | - events.add(event); | |
112 | + | |
113 | + ZMQ.Event event = ZMQ.Event.read(s); | |
114 | + if (event == null && s.errno() == ZError.ETERM) { | |
115 | + s.close(); | |
116 | + return; | |
117 | } | |
118 | + assertThat(event, notNullValue()); | |
119 | + | |
120 | + events[0] = event; | |
121 | s.close(); | |
122 | } | |
123 | } | |
124 | @@ -69,11 +72,12 @@ public void run() | |
125 | boolean rc = ZMQ.setSocketOption(receiver, ZMQ.ZMQ_LINGER, 0); | |
126 | assertThat(rc, is(true)); | |
127 | ||
128 | - SocketMonitor monitor = new SocketMonitor(ctx, "inproc://monitor"); | |
129 | - monitor.start(); | |
130 | rc = ZMQ.monitorSocket(receiver, "inproc://monitor", ZMQ.ZMQ_EVENT_HANDSHAKE_PROTOCOL); | |
131 | assertThat(rc, is(true)); | |
132 | ||
133 | + SocketMonitor monitor = new SocketMonitor(ctx, "inproc://monitor"); | |
134 | + monitor.start(); | |
135 | + | |
136 | rc = ZMQ.bind(receiver, host); | |
137 | assertThat(rc, is(true)); | |
138 | ||
139 | @@ -81,17 +85,18 @@ public void run() | |
140 | OutputStream out = sender.getOutputStream(); | |
141 | for (ByteBuffer raw : raws) { | |
142 | out.write(raw.array()); | |
143 | - ZMQ.msleep(100); | |
144 | } | |
145 | ||
146 | Msg msg = ZMQ.recv(receiver, 0); | |
147 | assertThat(msg, notNullValue()); | |
148 | assertThat(new String(msg.data(), ZMQ.CHARSET), is(payload)); | |
149 | ||
150 | - ZMQ.msleep(500); | |
151 | - assertThat(monitor.events.size(), is(1)); | |
152 | - assertThat(monitor.events.get(0).event, is(ZMQ.ZMQ_EVENT_HANDSHAKE_PROTOCOL)); | |
153 | - assertThat((Integer) monitor.events.get(0).arg, is(version)); | |
154 | + monitor.join(); | |
155 | + | |
156 | + final Event event = monitor.events[0]; | |
157 | + assertThat(event, notNullValue()); | |
158 | + assertThat(event.event, is(ZMQ.ZMQ_EVENT_HANDSHAKE_PROTOCOL)); | |
159 | + assertThat((Integer) event.arg, is(version)); | |
160 | ||
161 | InputStream in = sender.getInputStream(); | |
162 | byte[] data = new byte[255]; | |
163 | diff --git a/src/test/java/zmq/io/V0ProtocolTest.java b/src/test/java/zmq/io/V0ProtocolTest.java | |
164 | index bd547d23..1a5b7aef 100644 | |
165 | --- a/src/test/java/zmq/io/V0ProtocolTest.java | |
166 | +++ b/src/test/java/zmq/io/V0ProtocolTest.java | |
167 | @@ -10,6 +10,18 @@ | |
168 | ||
169 | public class V0ProtocolTest extends AbstractProtocolVersion | |
170 | { | |
171 | + @Test | |
172 | + public void testFixIssue524() throws IOException, InterruptedException | |
173 | + { | |
174 | + for (int idx = 0; idx < REPETITIONS; ++idx) { | |
175 | + if (idx % 100 == 0) { | |
176 | + System.out.print(idx + " "); | |
177 | + } | |
178 | + testProtocolVersion0short(); | |
179 | + } | |
180 | + System.out.println(); | |
181 | + } | |
182 | + | |
183 | @Test(timeout = 2000) | |
184 | public void testProtocolVersion0short() throws IOException, InterruptedException | |
185 | { | |
186 | diff --git a/src/test/java/zmq/io/V1ProtocolTest.java b/src/test/java/zmq/io/V1ProtocolTest.java | |
187 | index e1045f34..764159d0 100644 | |
188 | --- a/src/test/java/zmq/io/V1ProtocolTest.java | |
189 | +++ b/src/test/java/zmq/io/V1ProtocolTest.java | |
190 | @@ -10,7 +10,19 @@ | |
191 | ||
192 | public class V1ProtocolTest extends AbstractProtocolVersion | |
193 | { | |
194 | - @Test(timeout = 2000) | |
195 | + @Test | |
196 | + public void testFixIssue524() throws IOException, InterruptedException | |
197 | + { | |
198 | + for (int idx = 0; idx < REPETITIONS; ++idx) { | |
199 | + if (idx % 100 == 0) { | |
200 | + System.out.print(idx + " "); | |
201 | + } | |
202 | + testProtocolVersion1short(); | |
203 | + } | |
204 | + System.out.println(); | |
205 | + } | |
206 | + | |
207 | + @Test | |
208 | public void testProtocolVersion1short() throws IOException, InterruptedException | |
209 | { | |
210 | List<ByteBuffer> raws = raws(0); | |
211 | @@ -25,7 +37,7 @@ public void testProtocolVersion1short() throws IOException, InterruptedException | |
212 | assertProtocolVersion(1, raws, "abcdefg"); | |
213 | } | |
214 | ||
215 | - @Test(timeout = 2000) | |
216 | + @Test | |
217 | public void testProtocolVersion1long() throws IOException, InterruptedException | |
218 | { | |
219 | List<ByteBuffer> raws = raws(0); | |
220 | diff --git a/src/test/java/zmq/io/V2ProtocolTest.java b/src/test/java/zmq/io/V2ProtocolTest.java | |
221 | index d5e64bce..7fda31bc 100644 | |
222 | --- a/src/test/java/zmq/io/V2ProtocolTest.java | |
223 | +++ b/src/test/java/zmq/io/V2ProtocolTest.java | |
224 | @@ -21,7 +21,19 @@ protected ByteBuffer identity() | |
225 | .put((byte) 0); | |
226 | } | |
227 | ||
228 | - @Test(timeout = 2000) | |
229 | + @Test | |
230 | + public void testFixIssue524() throws IOException, InterruptedException | |
231 | + { | |
232 | + for (int idx = 0; idx < REPETITIONS; ++idx) { | |
233 | + if (idx % 100 == 0) { | |
234 | + System.out.print(idx + " "); | |
235 | + } | |
236 | + testProtocolVersion2short(); | |
237 | + } | |
238 | + System.out.println(); | |
239 | + } | |
240 | + | |
241 | + @Test | |
242 | public void testProtocolVersion2short() throws IOException, InterruptedException | |
243 | { | |
244 | List<ByteBuffer> raws = raws(1); | |
245 | @@ -38,7 +50,7 @@ public void testProtocolVersion2short() throws IOException, InterruptedException | |
246 | assertProtocolVersion(2, raws, "abcdefg"); | |
247 | } | |
248 | ||
249 | - @Test(timeout = 2000) | |
250 | + @Test | |
251 | public void testProtocolVersion2long() throws IOException, InterruptedException | |
252 | { | |
253 | List<ByteBuffer> raws = raws(1); |