diff --git a/Release.md b/Release.md index 72efbbc7..cc9e83e4 100644 --- a/Release.md +++ b/Release.md @@ -1,10 +1,11 @@ ## Features * When `transport.wireProtocol = "v2"` is enabled, ordinary UDP proxy work connection payloads now use wire protocol v2 message framing. This keeps UDP message payloads aligned with the negotiated frpc/frps wire protocol. +* SUDP proxy payloads now also follow the connection wire protocol. SUDP v2 endpoints use wire protocol v2 message framing, while v1/default endpoints continue to use the legacy message codec. When the SUDP proxy frpc and visitor frpc use mixed v1/v2 wire protocols, frps bridges UDPPacket messages between the two codecs. ## Compatibility Notes -* The default/empty `transport.wireProtocol` and `transport.wireProtocol = "v1"` continue to use the legacy message codec for ordinary UDP proxy payloads. -* Raw stream proxy paths such as TCP, HTTP, and STCP remain unframed and are not affected by the UDP payload framing change. -* SUDP and XTCP keep their existing legacy behavior in this release and will be considered separately in a future phase. -* `transport.wireProtocol = "v2"` requires both frpc and frps to use versions that support the same wire v2 semantics. Mixing a newer peer that sends v2-framed ordinary UDP payloads with an older v2-capable peer that still expects the legacy UDP payload codec can break ordinary UDP proxy traffic. +* The default/empty `transport.wireProtocol` and `transport.wireProtocol = "v1"` continue to use the legacy message codec for ordinary UDP and SUDP proxy payloads. +* Raw stream proxy paths such as TCP, HTTP, and STCP remain unframed and are not affected by the UDP/SUDP payload framing change. +* Direct NAT hole UDP sid probing packets are not changed by this release. +* `transport.wireProtocol = "v2"` requires peers to use versions that support the same wire v2 payload semantics. Mixing a newer peer that sends v2-framed UDP or SUDP payloads with an older v2-capable peer that still expects the legacy payload codec can break that proxy traffic. During rolling upgrades, upgrade both SUDP proxy and visitor frpc instances before enabling `transport.wireProtocol = "v2"` for SUDP, or keep those clients on `transport.wireProtocol = "v1"` until both sides are upgraded. diff --git a/client/proxy/sudp.go b/client/proxy/sudp.go index 21e34a41..7db6d991 100644 --- a/client/proxy/sudp.go +++ b/client/proxy/sudp.go @@ -87,6 +87,7 @@ func (pxy *SUDPProxy) InWorkConn(conn net.Conn, _ *msg.StartWorkConn) { } workConn := netpkg.WrapReadWriteCloserToConn(remote, conn) + payloadConn := msg.NewConn(workConn, msg.NewReadWriter(workConn, pxy.clientCfg.Transport.WireProtocol)) readCh := make(chan *msg.UDPPacket, 1024) sendCh := make(chan msg.Message, 1024) isClose := false @@ -109,7 +110,7 @@ func (pxy *SUDPProxy) InWorkConn(conn net.Conn, _ *msg.StartWorkConn) { } // udp service <- frpc <- frps <- frpc visitor <- user - workConnReaderFn := func(conn net.Conn, readCh chan *msg.UDPPacket) { + workConnReaderFn := func(payloadConn *msg.Conn, readCh chan *msg.UDPPacket) { defer closeFn() for { @@ -122,7 +123,7 @@ func (pxy *SUDPProxy) InWorkConn(conn net.Conn, _ *msg.StartWorkConn) { } var udpMsg msg.UDPPacket - if errRet := msg.ReadMsgInto(conn, &udpMsg); errRet != nil { + if errRet := payloadConn.ReadMsgInto(&udpMsg); errRet != nil { xl.Warnf("read from workConn for sudp error: %v", errRet) return } @@ -137,7 +138,7 @@ func (pxy *SUDPProxy) InWorkConn(conn net.Conn, _ *msg.StartWorkConn) { } // udp service -> frpc -> frps -> frpc visitor -> user - workConnSenderFn := func(conn net.Conn, sendCh chan msg.Message) { + workConnSenderFn := func(payloadConn *msg.Conn, sendCh chan msg.Message) { defer func() { closeFn() xl.Infof("writer goroutine for sudp work connection closed") @@ -148,12 +149,12 @@ func (pxy *SUDPProxy) InWorkConn(conn net.Conn, _ *msg.StartWorkConn) { switch m := rawMsg.(type) { case *msg.UDPPacket: xl.Tracef("frpc send udp package to frpc visitor, [udp local: %v, remote: %v], [tcp work conn local: %v, remote: %v]", - m.LocalAddr.String(), m.RemoteAddr.String(), conn.LocalAddr().String(), conn.RemoteAddr().String()) + m.LocalAddr.String(), m.RemoteAddr.String(), payloadConn.LocalAddr().String(), payloadConn.RemoteAddr().String()) case *msg.Ping: xl.Tracef("frpc send ping message to frpc visitor") } - if errRet = msg.WriteMsg(conn, rawMsg); errRet != nil { + if errRet = payloadConn.WriteMsg(rawMsg); errRet != nil { xl.Errorf("sudp work write error: %v", errRet) return } @@ -184,8 +185,8 @@ func (pxy *SUDPProxy) InWorkConn(conn net.Conn, _ *msg.StartWorkConn) { } } - go workConnSenderFn(workConn, sendCh) - go workConnReaderFn(workConn, readCh) + go workConnSenderFn(payloadConn, sendCh) + go workConnReaderFn(payloadConn, readCh) go heartbeatFn(sendCh) udp.Forwarder(pxy.localAddr, readCh, sendCh, int(pxy.clientCfg.UDPPacketSize), pxy.cfg.Transport.ProxyProtocolVersion) diff --git a/client/visitor/sudp.go b/client/visitor/sudp.go index 6014161c..91d57da6 100644 --- a/client/visitor/sudp.go +++ b/client/visitor/sudp.go @@ -113,15 +113,16 @@ func (sv *SUDPVisitor) dispatcher() { func (sv *SUDPVisitor) worker(workConn net.Conn, firstPacket *msg.UDPPacket) { xl := xlog.FromContextSafe(sv.ctx) xl.Debugf("starting sudp proxy worker") + payloadConn := msg.NewConn(workConn, msg.NewReadWriter(workConn, sv.clientCfg.Transport.WireProtocol)) wg := &sync.WaitGroup{} wg.Add(2) closeCh := make(chan struct{}) // udp service -> frpc -> frps -> frpc visitor -> user - workConnReaderFn := func(conn net.Conn) { + workConnReaderFn := func(payloadConn *msg.Conn) { defer func() { - conn.Close() + payloadConn.Close() close(closeCh) wg.Done() }() @@ -133,13 +134,13 @@ func (sv *SUDPVisitor) worker(workConn net.Conn, firstPacket *msg.UDPPacket) { ) // frpc will send heartbeat in workConn to frpc visitor for keeping alive - _ = conn.SetReadDeadline(time.Now().Add(60 * time.Second)) - if rawMsg, errRet = msg.ReadMsg(conn); errRet != nil { + _ = payloadConn.SetReadDeadline(time.Now().Add(60 * time.Second)) + if rawMsg, errRet = payloadConn.ReadMsg(); errRet != nil { xl.Warnf("read from workconn for user udp conn error: %v", errRet) return } - _ = conn.SetReadDeadline(time.Time{}) + _ = payloadConn.SetReadDeadline(time.Time{}) switch m := rawMsg.(type) { case *msg.Ping: xl.Debugf("frpc visitor get ping message from frpc") @@ -157,15 +158,15 @@ func (sv *SUDPVisitor) worker(workConn net.Conn, firstPacket *msg.UDPPacket) { } // udp service <- frpc <- frps <- frpc visitor <- user - workConnSenderFn := func(conn net.Conn) { + workConnSenderFn := func(payloadConn *msg.Conn) { defer func() { - conn.Close() + payloadConn.Close() wg.Done() }() var errRet error if firstPacket != nil { - if errRet = msg.WriteMsg(conn, firstPacket); errRet != nil { + if errRet = payloadConn.WriteMsg(firstPacket); errRet != nil { xl.Warnf("sender goroutine for udp work connection closed: %v", errRet) return } @@ -180,7 +181,7 @@ func (sv *SUDPVisitor) worker(workConn net.Conn, firstPacket *msg.UDPPacket) { return } - if errRet = msg.WriteMsg(conn, udpMsg); errRet != nil { + if errRet = payloadConn.WriteMsg(udpMsg); errRet != nil { xl.Warnf("sender goroutine for udp work connection closed: %v", errRet) return } @@ -191,8 +192,8 @@ func (sv *SUDPVisitor) worker(workConn net.Conn, firstPacket *msg.UDPPacket) { } } - go workConnReaderFn(workConn) - go workConnSenderFn(workConn) + go workConnReaderFn(payloadConn) + go workConnSenderFn(payloadConn) wg.Wait() xl.Infof("sudp worker is closed") diff --git a/server/proxy/proxy.go b/server/proxy/proxy.go index 7945b91b..e9ace046 100644 --- a/server/proxy/proxy.go +++ b/server/proxy/proxy.go @@ -16,6 +16,7 @@ package proxy import ( "context" + "errors" "fmt" "io" "net" @@ -31,6 +32,7 @@ import ( v1 "github.com/fatedier/frp/pkg/config/v1" "github.com/fatedier/frp/pkg/msg" plugin "github.com/fatedier/frp/pkg/plugin/server" + "github.com/fatedier/frp/pkg/proto/wire" "github.com/fatedier/frp/pkg/util/limit" netpkg "github.com/fatedier/frp/pkg/util/net" "github.com/fatedier/frp/pkg/util/xlog" @@ -316,13 +318,147 @@ func (pxy *BaseProxy) handleUserTCPConnection(userConn net.Conn) { name := pxy.GetName() proxyType := cfg.Type metrics.Server.OpenConnection(name, proxyType) - inCount, outCount, _ := libio.Join(local, userConn) + inCount, outCount, _ := pxy.joinUserConnection(local, userConn, proxyType, xl) metrics.Server.CloseConnection(name, proxyType) metrics.Server.AddTrafficIn(name, proxyType, inCount) metrics.Server.AddTrafficOut(name, proxyType, outCount) xl.Debugf("join connections closed") } +func (pxy *BaseProxy) joinUserConnection(local io.ReadWriteCloser, userConn net.Conn, proxyType string, xl *xlog.Logger) (int64, int64, []error) { + visitorWireProtocol := wireProtocolFromConn(userConn) + if proxyType == string(v1.ProxyTypeSUDP) && isMixedWireProtocol(pxy.wireProtocol, visitorWireProtocol) { + xl.Infof("bridge mixed SUDP payload codecs, proxy wireProtocol [%s], visitor wireProtocol [%s]", + normalizeWireProtocol(pxy.wireProtocol), normalizeWireProtocol(visitorWireProtocol)) + return joinSUDPMessageBridge(local, userConn, pxy.wireProtocol, visitorWireProtocol, xl) + } + return libio.Join(local, userConn) +} + +type wireProtocolGetter interface { + WireProtocol() string +} + +func wireProtocolFromConn(conn net.Conn) string { + if getter, ok := conn.(wireProtocolGetter); ok { + return getter.WireProtocol() + } + return "" +} + +func isMixedWireProtocol(left, right string) bool { + return normalizeWireProtocol(left) != normalizeWireProtocol(right) +} + +func normalizeWireProtocol(wireProtocol string) string { + if wireProtocol == wire.ProtocolV2 { + return wire.ProtocolV2 + } + return wire.ProtocolV1 +} + +func joinSUDPMessageBridge( + proxyConn io.ReadWriteCloser, + visitorConn io.ReadWriteCloser, + proxyWireProtocol string, + visitorWireProtocol string, + xl *xlog.Logger, +) (inCount int64, outCount int64, errs []error) { + // The mixed bridge decodes and re-encodes messages, so raw framed byte counts + // are not available. Count UDP payload bytes and ignore heartbeat traffic. + proxyRW := msg.NewReadWriter(proxyConn, proxyWireProtocol) + visitorRW := msg.NewReadWriter(visitorConn, visitorWireProtocol) + + var ( + once sync.Once + wait sync.WaitGroup + recordErrs = make([]error, 2) + ) + closeBoth := func() { + _ = proxyConn.Close() + _ = visitorConn.Close() + } + + wait.Add(2) + go func() { + defer wait.Done() + defer once.Do(closeBoth) + recordErrs[0] = bridgeSUDPProxyToVisitor(proxyRW, visitorRW, &outCount, xl) + }() + go func() { + defer wait.Done() + defer once.Do(closeBoth) + recordErrs[1] = bridgeSUDPVisitorToProxy(visitorRW, proxyRW, &inCount, xl) + }() + wait.Wait() + + for _, err := range recordErrs { + if err != nil { + errs = append(errs, err) + } + } + return +} + +func bridgeSUDPProxyToVisitor(from msg.ReadWriter, to msg.ReadWriter, count *int64, xl *xlog.Logger) error { + for { + rawMsg, err := from.ReadMsg() + if err != nil { + return normalizeSUDPBridgeError(err) + } + + switch m := rawMsg.(type) { + case *msg.UDPPacket: + if err := to.WriteMsg(m); err != nil { + return normalizeSUDPBridgeError(err) + } + *count += int64(len(m.Content)) + case *msg.Ping: + traceSUDPBridge(xl, "bridge SUDP ping from proxy to visitor") + if err := to.WriteMsg(m); err != nil { + return normalizeSUDPBridgeError(err) + } + default: + return fmt.Errorf("unexpected SUDP proxy message %T", rawMsg) + } + } +} + +func bridgeSUDPVisitorToProxy(from msg.ReadWriter, to msg.ReadWriter, count *int64, xl *xlog.Logger) error { + for { + rawMsg, err := from.ReadMsg() + if err != nil { + return normalizeSUDPBridgeError(err) + } + + switch m := rawMsg.(type) { + case *msg.UDPPacket: + if err := to.WriteMsg(m); err != nil { + return normalizeSUDPBridgeError(err) + } + *count += int64(len(m.Content)) + case *msg.Ping: + traceSUDPBridge(xl, "drop SUDP ping from visitor to proxy") + continue + default: + return fmt.Errorf("unexpected SUDP visitor message %T", rawMsg) + } + } +} + +func normalizeSUDPBridgeError(err error) error { + if err == nil || errors.Is(err, io.EOF) || errors.Is(err, net.ErrClosed) { + return nil + } + return err +} + +func traceSUDPBridge(xl *xlog.Logger, format string, args ...any) { + if xl != nil { + xl.Tracef(format, args...) + } +} + type Options struct { UserInfo plugin.UserInfo LoginMsg *msg.Login diff --git a/server/proxy/sudp_test.go b/server/proxy/sudp_test.go new file mode 100644 index 00000000..e2986eed --- /dev/null +++ b/server/proxy/sudp_test.go @@ -0,0 +1,141 @@ +// Copyright 2026 The frp Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package proxy + +import ( + "bufio" + "bytes" + "encoding/binary" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/fatedier/frp/pkg/msg" + "github.com/fatedier/frp/pkg/proto/wire" +) + +func TestSUDPBridgeTranscodesProxyV1ToVisitorV2(t *testing.T) { + var in, out bytes.Buffer + writeSUDPBridgeMsg(t, &in, wire.ProtocolV1, &msg.UDPPacket{Content: []byte("proxy-to-visitor")}) + + var count int64 + err := bridgeSUDPProxyToVisitor( + msg.NewReadWriter(&in, wire.ProtocolV1), + msg.NewReadWriter(&out, wire.ProtocolV2), + &count, + nil, + ) + require.NoError(t, err) + require.Equal(t, int64(len("proxy-to-visitor")), count) + + frame, err := wire.NewConn(&out).ReadFrame() + require.NoError(t, err) + require.Equal(t, wire.FrameTypeMessage, frame.Type) + require.GreaterOrEqual(t, len(frame.Payload), 2) + require.Equal(t, msg.V2TypeUDPPacket, binary.BigEndian.Uint16(frame.Payload[:2])) + + var got msg.UDPPacket + require.NoError(t, msg.DecodeV2MessageFrameInto(frame, &got)) + require.Equal(t, []byte("proxy-to-visitor"), got.Content) +} + +func TestSUDPBridgeTranscodesVisitorV2ToProxyV1(t *testing.T) { + var in, out bytes.Buffer + writeSUDPBridgeMsg(t, &in, wire.ProtocolV2, &msg.UDPPacket{Content: []byte("visitor-to-proxy")}) + + var count int64 + err := bridgeSUDPVisitorToProxy( + msg.NewReadWriter(&in, wire.ProtocolV2), + msg.NewReadWriter(&out, wire.ProtocolV1), + &count, + nil, + ) + require.NoError(t, err) + require.Equal(t, int64(len("visitor-to-proxy")), count) + + reader := bufio.NewReader(&out) + typeByte, err := reader.ReadByte() + require.NoError(t, err) + require.Equal(t, msg.TypeUDPPacket, typeByte) + require.NoError(t, reader.UnreadByte()) + + var got msg.UDPPacket + require.NoError(t, msg.ReadMsgInto(reader, &got)) + require.Equal(t, []byte("visitor-to-proxy"), got.Content) +} + +func TestSUDPBridgeForwardsProxyPing(t *testing.T) { + var in, out bytes.Buffer + writeSUDPBridgeMsg(t, &in, wire.ProtocolV1, &msg.Ping{}) + + var count int64 + err := bridgeSUDPProxyToVisitor( + msg.NewReadWriter(&in, wire.ProtocolV1), + msg.NewReadWriter(&out, wire.ProtocolV2), + &count, + nil, + ) + require.NoError(t, err) + require.Zero(t, count) + + rawMsg, err := msg.NewReadWriter(&out, wire.ProtocolV2).ReadMsg() + require.NoError(t, err) + require.IsType(t, &msg.Ping{}, rawMsg) +} + +func TestSUDPBridgeDropsVisitorPing(t *testing.T) { + var in, out bytes.Buffer + writeSUDPBridgeMsg(t, &in, wire.ProtocolV2, &msg.Ping{}) + + var count int64 + err := bridgeSUDPVisitorToProxy( + msg.NewReadWriter(&in, wire.ProtocolV2), + msg.NewReadWriter(&out, wire.ProtocolV1), + &count, + nil, + ) + require.NoError(t, err) + require.Zero(t, count) + require.Empty(t, out.Bytes()) +} + +func TestSUDPBridgeRejectsUnknownVisitorMessage(t *testing.T) { + var in, out bytes.Buffer + writeSUDPBridgeMsg(t, &in, wire.ProtocolV2, &msg.Pong{}) + + var count int64 + err := bridgeSUDPVisitorToProxy( + msg.NewReadWriter(&in, wire.ProtocolV2), + msg.NewReadWriter(&out, wire.ProtocolV1), + &count, + nil, + ) + require.ErrorContains(t, err, "unexpected SUDP visitor message *msg.Pong") + require.Zero(t, count) + require.Empty(t, out.Bytes()) +} + +func TestSUDPBridgeDetectsMixedWireProtocol(t *testing.T) { + require.False(t, isMixedWireProtocol("", wire.ProtocolV1)) + require.False(t, isMixedWireProtocol(wire.ProtocolV2, wire.ProtocolV2)) + require.True(t, isMixedWireProtocol("", wire.ProtocolV2)) + require.True(t, isMixedWireProtocol(wire.ProtocolV2, wire.ProtocolV1)) +} + +func writeSUDPBridgeMsg(t *testing.T, buf *bytes.Buffer, wireProtocol string, m msg.Message) { + t.Helper() + + require.NoError(t, msg.NewReadWriter(buf, wireProtocol).WriteMsg(m)) +} diff --git a/server/service.go b/server/service.go index bc70e803..f3f9e323 100644 --- a/server/service.go +++ b/server/service.go @@ -508,7 +508,7 @@ func (svr *Service) handleConnection(ctx context.Context, conn net.Conn, interna conn.Close() } case *msg.NewVisitorConn: - if err = svr.RegisterVisitorConn(conn, m); err != nil { + if err = svr.RegisterVisitorConn(conn, m, acceptedConn.wireProtocol); err != nil { xl.Warnf("register visitor conn error: %v", err) _ = acceptedConn.conn.WriteMsg(&msg.NewVisitorConnResp{ ProxyName: m.ProxyName, @@ -833,7 +833,7 @@ func (svr *Service) RegisterWorkConn(workConn *msg.Conn, newMsg *msg.NewWorkConn return ctl.RegisterWorkConn(proxy.NewWorkConn(workConn)) } -func (svr *Service) RegisterVisitorConn(visitorConn net.Conn, newMsg *msg.NewVisitorConn) error { +func (svr *Service) RegisterVisitorConn(visitorConn net.Conn, newMsg *msg.NewVisitorConn, wireProtocol string) error { visitorUser := "" // TODO(deprecation): Compatible with old versions, can be without runID, user is empty. In later versions, it will be mandatory to include runID. // If runID is required, it is not compatible with versions prior to v0.50.0. @@ -845,5 +845,5 @@ func (svr *Service) RegisterVisitorConn(visitorConn net.Conn, newMsg *msg.NewVis visitorUser = ctl.sessionCtx.LoginMsg.User } return svr.rc.VisitorManager.NewConn(newMsg.ProxyName, visitorConn, newMsg.Timestamp, newMsg.SignKey, - newMsg.UseEncryption, newMsg.UseCompression, visitorUser) + newMsg.UseEncryption, newMsg.UseCompression, visitorUser, wireProtocol) } diff --git a/server/visitor/visitor.go b/server/visitor/visitor.go index 57194d98..11a44367 100644 --- a/server/visitor/visitor.go +++ b/server/visitor/visitor.go @@ -65,6 +65,7 @@ func (vm *Manager) Listen(name string, sk string, allowUsers []string) (*netpkg. func (vm *Manager) NewConn(name string, conn net.Conn, timestamp int64, signKey string, useEncryption bool, useCompression bool, visitorUser string, + wireProtocol string, ) (err error) { vm.mu.RLock() defer vm.mu.RUnlock() @@ -90,7 +91,11 @@ func (vm *Manager) NewConn(name string, conn net.Conn, timestamp int64, signKey if useCompression { rwc = libio.WithCompression(rwc) } - err = l.l.PutConn(netpkg.WrapReadWriteCloserToConn(rwc, conn)) + visitorConn := netpkg.WrapReadWriteCloserToConn(rwc, conn) + err = l.l.PutConn(&wireProtocolConn{ + Conn: visitorConn, + wireProtocol: wireProtocol, + }) } else { err = fmt.Errorf("custom listener for [%s] doesn't exist", name) return @@ -98,6 +103,15 @@ func (vm *Manager) NewConn(name string, conn net.Conn, timestamp int64, signKey return } +type wireProtocolConn struct { + net.Conn + wireProtocol string +} + +func (c *wireProtocolConn) WireProtocol() string { + return c.wireProtocol +} + func (vm *Manager) CloseListener(name string) { vm.mu.Lock() defer vm.mu.Unlock() diff --git a/server/visitor/visitor_test.go b/server/visitor/visitor_test.go new file mode 100644 index 00000000..67b907fd --- /dev/null +++ b/server/visitor/visitor_test.go @@ -0,0 +1,61 @@ +// Copyright 2026 The frp Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package visitor + +import ( + "net" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/fatedier/frp/pkg/proto/wire" + "github.com/fatedier/frp/pkg/util/util" +) + +func TestManagerNewConnCarriesWireProtocol(t *testing.T) { + vm := NewManager() + listener, err := vm.Listen("sudp", "secret", []string{"*"}) + require.NoError(t, err) + defer listener.Close() + + client, server := net.Pipe() + defer client.Close() + defer server.Close() + + now := time.Now().Unix() + errCh := make(chan error, 1) + go func() { + errCh <- vm.NewConn( + "sudp", + server, + now, + util.GetAuthKey("secret", now), + false, + false, + "user", + wire.ProtocolV2, + ) + }() + + acceptedConn, err := listener.Accept() + require.NoError(t, err) + defer acceptedConn.Close() + + getter, ok := acceptedConn.(interface{ WireProtocol() string }) + require.True(t, ok) + require.Equal(t, wire.ProtocolV2, getter.WireProtocol()) + require.NoError(t, <-errCh) +} diff --git a/test/e2e/v1/basic/wire.go b/test/e2e/v1/basic/wire.go index bbae9199..051e6a6c 100644 --- a/test/e2e/v1/basic/wire.go +++ b/test/e2e/v1/basic/wire.go @@ -98,6 +98,73 @@ var _ = ginkgo.Describe("[Feature: WireProtocol]", func() { framework.NewRequestExpect(f).PortName(bindPortName).Ensure() }) + for _, tc := range []struct { + name string + proxyWireConfig string + visitorWireConfig string + extraProxyConfig string + extraVisitorConfig string + }{ + { + name: "default sudp visitor", + }, + { + name: "v2 sudp visitor", + proxyWireConfig: `transport.wireProtocol = "v2"`, + visitorWireConfig: `transport.wireProtocol = "v2"`, + }, + { + name: "mixed sudp proxy v1 visitor v2", + proxyWireConfig: `transport.wireProtocol = "v1"`, + visitorWireConfig: `transport.wireProtocol = "v2"`, + extraProxyConfig: ` + transport.useEncryption = true + transport.useCompression = true + `, + extraVisitorConfig: ` + transport.useEncryption = true + transport.useCompression = true + `, + }, + { + name: "mixed sudp proxy v2 visitor v1", + proxyWireConfig: `transport.wireProtocol = "v2"`, + visitorWireConfig: `transport.wireProtocol = "v1"`, + }, + } { + ginkgo.It(tc.name, func() { + serverConf := consts.DefaultServerConfig + bindPortName := port.GenName("WireSUDP") + clientServerConf := consts.DefaultClientConfig + fmt.Sprintf(` + user = "user1" + %s + + [[proxies]] + name = "sudp" + type = "sudp" + secretKey = "abc" + localPort = {{ .%s }} + %s + `, tc.proxyWireConfig, framework.UDPEchoServerPort, tc.extraProxyConfig) + clientVisitorConf := consts.DefaultClientConfig + fmt.Sprintf(` + user = "user1" + %s + + [[visitors]] + name = "sudp-visitor" + type = "sudp" + serverName = "sudp" + secretKey = "abc" + bindPort = {{ .%s }} + %s + `, tc.visitorWireConfig, bindPortName, tc.extraVisitorConfig) + + f.RunProcesses(serverConf, []string{clientServerConf, clientVisitorConf}) + + framework.NewRequestExpect(f).Protocol("udp").PortName(bindPortName).Ensure() + }) + } + ginkgo.It("reports client wire protocol", func() { webPort := f.AllocPort() serverConf := consts.DefaultServerConfig + fmt.Sprintf(`