30
30
import io .netty .channel .ChannelOutboundHandlerAdapter ;
31
31
import io .netty .channel .ChannelPipeline ;
32
32
import io .netty .channel .ChannelPromise ;
33
- import io .netty .handler .codec .EncoderException ;
34
- import io .netty .util .Recycler ;
35
33
import io .netty .util .ReferenceCountUtil ;
36
- import io .netty .util .concurrent .PromiseCombiner ;
37
34
import net .md_5 .bungee .api .connection .ProxiedPlayer ;
38
35
39
36
import java .lang .reflect .InvocationTargetException ;
40
- import java .util .ArrayList ;
41
37
import java .util .List ;
42
38
43
39
// Thanks to ViaVersion for the compression method.
44
40
@ ChannelHandler .Sharable
45
41
public class PacketEventsEncoder extends ChannelOutboundHandlerAdapter {
46
42
47
- private static final Recycler <OutList > OUT_LIST_RECYCLER = new Recycler <OutList >() {
48
- @ Override
49
- protected OutList newObject (Handle <OutList > handle ) {
50
- return new OutList (handle );
51
- }
52
- };
53
-
54
43
public ProxiedPlayer player ;
55
44
public User user ;
56
45
public boolean handledCompression ;
@@ -59,8 +48,8 @@ public PacketEventsEncoder(User user) {
59
48
this .user = user ;
60
49
}
61
50
62
- public void read (ChannelHandlerContext ctx , ByteBuf buffer , ChannelPromise promise ) throws Exception {
63
- boolean doCompression = handleCompressionOrder ( ctx , buffer );
51
+ public void read (ChannelHandlerContext originalCtx , ByteBuf buffer , ChannelPromise promise ) {
52
+ ChannelHandlerContext ctx = this . tryFixCompressorOrder ( originalCtx , buffer );
64
53
int firstReaderIndex = buffer .readerIndex ();
65
54
PacketSendEvent packetSendEvent = EventCreationUtil .createSendEvent (ctx .channel (), user , player ,
66
55
buffer , false );
@@ -74,11 +63,7 @@ public void read(ChannelHandlerContext ctx, ByteBuf buffer, ChannelPromise promi
74
63
} else {
75
64
buffer .readerIndex (firstReaderIndex );
76
65
}
77
- if (doCompression ) {
78
- this .recompress (ctx , buffer , promise );
79
- } else {
80
- ctx .write (buffer , promise );
81
- }
66
+ ctx .write (buffer , promise );
82
67
} else {
83
68
ReferenceCountUtil .release (packetSendEvent .getByteBuf ());
84
69
}
@@ -103,79 +88,52 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
103
88
}
104
89
}
105
90
106
- private boolean handleCompressionOrder (ChannelHandlerContext ctx , ByteBuf buffer ) {
91
+ private ChannelHandlerContext tryFixCompressorOrder (ChannelHandlerContext ctx , ByteBuf buffer ) {
92
+ if (this .handledCompression ) {
93
+ return ctx ;
94
+ }
107
95
ChannelPipeline pipe = ctx .pipeline ();
108
- if (handledCompression ) {
109
- return false ;
96
+ List <String > pipeNames = pipe .names ();
97
+ if (pipeNames .contains ("frame-prepender-compress" )) {
98
+ // "modern" version, no need to handle this here
99
+ this .handledCompression = true ;
100
+ return ctx ;
110
101
}
111
- int encoderIndex = pipe . names () .indexOf ("compress" );
112
- if (encoderIndex == -1 ) {
113
- return false ;
102
+ int compressorIndex = pipeNames .indexOf ("compress" );
103
+ if (compressorIndex == -1 ) {
104
+ return ctx ;
114
105
}
115
- if (encoderIndex > pipe .names ().indexOf (PacketEvents .ENCODER_NAME )) {
116
- // Need to decompress this packet due to bad order
117
- ChannelHandler decompressor = pipe .get ("decompress" );
118
- try {
119
- ByteBuf decompressed = (ByteBuf ) CustomPipelineUtil .callPacketDecodeByteBuf (decompressor , ctx , buffer ).get (0 );
120
- if (buffer != decompressed ) {
121
- try {
122
- buffer .clear ().writeBytes (decompressed );
123
- } finally {
124
- decompressed .release ();
125
- }
126
- }
127
- //Relocate handlers
128
- PacketEventsDecoder decoder = (PacketEventsDecoder ) pipe .remove (PacketEvents .DECODER_NAME );
129
- PacketEventsEncoder encoder = (PacketEventsEncoder ) pipe .remove (PacketEvents .ENCODER_NAME );
130
- pipe .addAfter ("decompress" , PacketEvents .DECODER_NAME , decoder );
131
- pipe .addAfter ("compress" , PacketEvents .ENCODER_NAME , encoder );
132
- //System.out.println("Pipe: " + ChannelHelper.pipelineHandlerNamesAsString(ctx.channel()));
133
- handledCompression = true ;
134
- return true ;
135
- } catch (InvocationTargetException e ) {
136
- e .printStackTrace ();
137
- }
106
+ this .handledCompression = true ;
107
+ if (compressorIndex <= pipeNames .indexOf (PacketEvents .ENCODER_NAME )) {
108
+ return ctx ; // order already seems to be correct
138
109
}
139
- return false ;
110
+ // relocate handlers
111
+ PacketEventsDecoder decoder = (PacketEventsDecoder ) pipe .remove (PacketEvents .DECODER_NAME );
112
+ PacketEventsEncoder encoder = (PacketEventsEncoder ) pipe .remove (PacketEvents .ENCODER_NAME );
113
+ pipe .addAfter ("decompress" , PacketEvents .DECODER_NAME , decoder );
114
+ pipe .addAfter ("compress" , PacketEvents .ENCODER_NAME , encoder );
115
+
116
+ // manually decompress packet and update context,
117
+ // so we don't need to additionally manually re-compress the packet
118
+ this .decompress (pipe , buffer );
119
+ return pipe .context (PacketEvents .ENCODER_NAME );
140
120
}
141
121
142
- private void recompress (ChannelHandlerContext ctx , ByteBuf buffer , ChannelPromise promise ) {
143
- OutList outWrapper = OUT_LIST_RECYCLER .get ();
144
- List <Object > out = outWrapper .list ;
145
- try {
146
- ChannelHandler compressor = ctx .pipeline ().get ("compress" );
147
- CustomPipelineUtil .callPacketEncodeByteBuf (compressor , ctx , buffer , out );
122
+ private void decompress (ChannelPipeline pipe , ByteBuf buffer ) {
123
+ ChannelHandler decompressor = pipe .get ("decompress" );
124
+ ChannelHandlerContext decompressorCtx = pipe .context ("decompress" );
148
125
149
- int len = out .size ();
150
- if (len == 1 ) {
151
- // should be the only case which
152
- // happens on vanilla bungeecord
153
- ctx .write (out .get (0 ), promise );
154
- } else {
155
- // copied from MessageToMessageEncoder#writePromiseCombiner
156
- PromiseCombiner combiner = new PromiseCombiner (ctx .executor ());
157
- for (int i = 0 ; i < len ; i ++) {
158
- combiner .add (ctx .write (out .get (i )));
159
- }
160
- combiner .finish (promise );
126
+ ByteBuf decompressed = null ;
127
+ try {
128
+ decompressed = (ByteBuf ) CustomPipelineUtil .callPacketDecodeByteBuf (
129
+ decompressor , decompressorCtx , buffer ).get (0 );
130
+ if (buffer != decompressed ) {
131
+ buffer .clear ().writeBytes (decompressed );
161
132
}
162
133
} catch (InvocationTargetException exception ) {
163
- throw new EncoderException ( "Error while recompressing bytebuf " + buffer . readableBytes (), exception );
134
+ throw new RuntimeException ( exception );
164
135
} finally {
165
- out .clear ();
166
- outWrapper .handle .recycle (outWrapper );
167
- buffer .release ();
168
- }
169
- }
170
-
171
- private static final class OutList {
172
-
173
- // the default bungee compressor only produces one output bytebuf
174
- private final List <Object > list = new ArrayList <>(1 );
175
- private final Recycler .Handle <OutList > handle ;
176
-
177
- public OutList (Recycler .Handle <OutList > handle ) {
178
- this .handle = handle ;
136
+ ReferenceCountUtil .release (decompressed );
179
137
}
180
138
}
181
139
}
0 commit comments