mirror of
https://github.com/fatedier/frp.git
synced 2026-06-04 11:34:23 +08:00
feat: bridge mixed wire protocol SUDP payloads (#5347)
SUDP payload codec follows transport wireProtocol; same-protocol v1/v1 and v2/v2 keep raw join; only mixed proxy/visitor protocols use message-aware bridge; no new capability/selection field.
This commit is contained in:
@@ -1,10 +1,11 @@
|
||||
## Features
|
||||
|
||||
* When `transport.wireProtocol = "v2"` is enabled, ordinary UDP proxy work connection payloads now use wire protocol v2 message framing. This keeps UDP message payloads aligned with the negotiated frpc/frps wire protocol.
|
||||
* SUDP proxy payloads now also follow the connection wire protocol. SUDP v2 endpoints use wire protocol v2 message framing, while v1/default endpoints continue to use the legacy message codec. When the SUDP proxy frpc and visitor frpc use mixed v1/v2 wire protocols, frps bridges UDPPacket messages between the two codecs.
|
||||
|
||||
## Compatibility Notes
|
||||
|
||||
* The default/empty `transport.wireProtocol` and `transport.wireProtocol = "v1"` continue to use the legacy message codec for ordinary UDP proxy payloads.
|
||||
* Raw stream proxy paths such as TCP, HTTP, and STCP remain unframed and are not affected by the UDP payload framing change.
|
||||
* SUDP and XTCP keep their existing legacy behavior in this release and will be considered separately in a future phase.
|
||||
* `transport.wireProtocol = "v2"` requires both frpc and frps to use versions that support the same wire v2 semantics. Mixing a newer peer that sends v2-framed ordinary UDP payloads with an older v2-capable peer that still expects the legacy UDP payload codec can break ordinary UDP proxy traffic.
|
||||
* The default/empty `transport.wireProtocol` and `transport.wireProtocol = "v1"` continue to use the legacy message codec for ordinary UDP and SUDP proxy payloads.
|
||||
* Raw stream proxy paths such as TCP, HTTP, and STCP remain unframed and are not affected by the UDP/SUDP payload framing change.
|
||||
* Direct NAT hole UDP sid probing packets are not changed by this release.
|
||||
* `transport.wireProtocol = "v2"` requires peers to use versions that support the same wire v2 payload semantics. Mixing a newer peer that sends v2-framed UDP or SUDP payloads with an older v2-capable peer that still expects the legacy payload codec can break that proxy traffic. During rolling upgrades, upgrade both SUDP proxy and visitor frpc instances before enabling `transport.wireProtocol = "v2"` for SUDP, or keep those clients on `transport.wireProtocol = "v1"` until both sides are upgraded.
|
||||
|
||||
@@ -87,6 +87,7 @@ func (pxy *SUDPProxy) InWorkConn(conn net.Conn, _ *msg.StartWorkConn) {
|
||||
}
|
||||
|
||||
workConn := netpkg.WrapReadWriteCloserToConn(remote, conn)
|
||||
payloadConn := msg.NewConn(workConn, msg.NewReadWriter(workConn, pxy.clientCfg.Transport.WireProtocol))
|
||||
readCh := make(chan *msg.UDPPacket, 1024)
|
||||
sendCh := make(chan msg.Message, 1024)
|
||||
isClose := false
|
||||
@@ -109,7 +110,7 @@ func (pxy *SUDPProxy) InWorkConn(conn net.Conn, _ *msg.StartWorkConn) {
|
||||
}
|
||||
|
||||
// udp service <- frpc <- frps <- frpc visitor <- user
|
||||
workConnReaderFn := func(conn net.Conn, readCh chan *msg.UDPPacket) {
|
||||
workConnReaderFn := func(payloadConn *msg.Conn, readCh chan *msg.UDPPacket) {
|
||||
defer closeFn()
|
||||
|
||||
for {
|
||||
@@ -122,7 +123,7 @@ func (pxy *SUDPProxy) InWorkConn(conn net.Conn, _ *msg.StartWorkConn) {
|
||||
}
|
||||
|
||||
var udpMsg msg.UDPPacket
|
||||
if errRet := msg.ReadMsgInto(conn, &udpMsg); errRet != nil {
|
||||
if errRet := payloadConn.ReadMsgInto(&udpMsg); errRet != nil {
|
||||
xl.Warnf("read from workConn for sudp error: %v", errRet)
|
||||
return
|
||||
}
|
||||
@@ -137,7 +138,7 @@ func (pxy *SUDPProxy) InWorkConn(conn net.Conn, _ *msg.StartWorkConn) {
|
||||
}
|
||||
|
||||
// udp service -> frpc -> frps -> frpc visitor -> user
|
||||
workConnSenderFn := func(conn net.Conn, sendCh chan msg.Message) {
|
||||
workConnSenderFn := func(payloadConn *msg.Conn, sendCh chan msg.Message) {
|
||||
defer func() {
|
||||
closeFn()
|
||||
xl.Infof("writer goroutine for sudp work connection closed")
|
||||
@@ -148,12 +149,12 @@ func (pxy *SUDPProxy) InWorkConn(conn net.Conn, _ *msg.StartWorkConn) {
|
||||
switch m := rawMsg.(type) {
|
||||
case *msg.UDPPacket:
|
||||
xl.Tracef("frpc send udp package to frpc visitor, [udp local: %v, remote: %v], [tcp work conn local: %v, remote: %v]",
|
||||
m.LocalAddr.String(), m.RemoteAddr.String(), conn.LocalAddr().String(), conn.RemoteAddr().String())
|
||||
m.LocalAddr.String(), m.RemoteAddr.String(), payloadConn.LocalAddr().String(), payloadConn.RemoteAddr().String())
|
||||
case *msg.Ping:
|
||||
xl.Tracef("frpc send ping message to frpc visitor")
|
||||
}
|
||||
|
||||
if errRet = msg.WriteMsg(conn, rawMsg); errRet != nil {
|
||||
if errRet = payloadConn.WriteMsg(rawMsg); errRet != nil {
|
||||
xl.Errorf("sudp work write error: %v", errRet)
|
||||
return
|
||||
}
|
||||
@@ -184,8 +185,8 @@ func (pxy *SUDPProxy) InWorkConn(conn net.Conn, _ *msg.StartWorkConn) {
|
||||
}
|
||||
}
|
||||
|
||||
go workConnSenderFn(workConn, sendCh)
|
||||
go workConnReaderFn(workConn, readCh)
|
||||
go workConnSenderFn(payloadConn, sendCh)
|
||||
go workConnReaderFn(payloadConn, readCh)
|
||||
go heartbeatFn(sendCh)
|
||||
|
||||
udp.Forwarder(pxy.localAddr, readCh, sendCh, int(pxy.clientCfg.UDPPacketSize), pxy.cfg.Transport.ProxyProtocolVersion)
|
||||
|
||||
@@ -113,15 +113,16 @@ func (sv *SUDPVisitor) dispatcher() {
|
||||
func (sv *SUDPVisitor) worker(workConn net.Conn, firstPacket *msg.UDPPacket) {
|
||||
xl := xlog.FromContextSafe(sv.ctx)
|
||||
xl.Debugf("starting sudp proxy worker")
|
||||
payloadConn := msg.NewConn(workConn, msg.NewReadWriter(workConn, sv.clientCfg.Transport.WireProtocol))
|
||||
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(2)
|
||||
closeCh := make(chan struct{})
|
||||
|
||||
// udp service -> frpc -> frps -> frpc visitor -> user
|
||||
workConnReaderFn := func(conn net.Conn) {
|
||||
workConnReaderFn := func(payloadConn *msg.Conn) {
|
||||
defer func() {
|
||||
conn.Close()
|
||||
payloadConn.Close()
|
||||
close(closeCh)
|
||||
wg.Done()
|
||||
}()
|
||||
@@ -133,13 +134,13 @@ func (sv *SUDPVisitor) worker(workConn net.Conn, firstPacket *msg.UDPPacket) {
|
||||
)
|
||||
|
||||
// frpc will send heartbeat in workConn to frpc visitor for keeping alive
|
||||
_ = conn.SetReadDeadline(time.Now().Add(60 * time.Second))
|
||||
if rawMsg, errRet = msg.ReadMsg(conn); errRet != nil {
|
||||
_ = payloadConn.SetReadDeadline(time.Now().Add(60 * time.Second))
|
||||
if rawMsg, errRet = payloadConn.ReadMsg(); errRet != nil {
|
||||
xl.Warnf("read from workconn for user udp conn error: %v", errRet)
|
||||
return
|
||||
}
|
||||
|
||||
_ = conn.SetReadDeadline(time.Time{})
|
||||
_ = payloadConn.SetReadDeadline(time.Time{})
|
||||
switch m := rawMsg.(type) {
|
||||
case *msg.Ping:
|
||||
xl.Debugf("frpc visitor get ping message from frpc")
|
||||
@@ -157,15 +158,15 @@ func (sv *SUDPVisitor) worker(workConn net.Conn, firstPacket *msg.UDPPacket) {
|
||||
}
|
||||
|
||||
// udp service <- frpc <- frps <- frpc visitor <- user
|
||||
workConnSenderFn := func(conn net.Conn) {
|
||||
workConnSenderFn := func(payloadConn *msg.Conn) {
|
||||
defer func() {
|
||||
conn.Close()
|
||||
payloadConn.Close()
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
var errRet error
|
||||
if firstPacket != nil {
|
||||
if errRet = msg.WriteMsg(conn, firstPacket); errRet != nil {
|
||||
if errRet = payloadConn.WriteMsg(firstPacket); errRet != nil {
|
||||
xl.Warnf("sender goroutine for udp work connection closed: %v", errRet)
|
||||
return
|
||||
}
|
||||
@@ -180,7 +181,7 @@ func (sv *SUDPVisitor) worker(workConn net.Conn, firstPacket *msg.UDPPacket) {
|
||||
return
|
||||
}
|
||||
|
||||
if errRet = msg.WriteMsg(conn, udpMsg); errRet != nil {
|
||||
if errRet = payloadConn.WriteMsg(udpMsg); errRet != nil {
|
||||
xl.Warnf("sender goroutine for udp work connection closed: %v", errRet)
|
||||
return
|
||||
}
|
||||
@@ -191,8 +192,8 @@ func (sv *SUDPVisitor) worker(workConn net.Conn, firstPacket *msg.UDPPacket) {
|
||||
}
|
||||
}
|
||||
|
||||
go workConnReaderFn(workConn)
|
||||
go workConnSenderFn(workConn)
|
||||
go workConnReaderFn(payloadConn)
|
||||
go workConnSenderFn(payloadConn)
|
||||
|
||||
wg.Wait()
|
||||
xl.Infof("sudp worker is closed")
|
||||
|
||||
@@ -16,6 +16,7 @@ package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
@@ -31,6 +32,7 @@ import (
|
||||
v1 "github.com/fatedier/frp/pkg/config/v1"
|
||||
"github.com/fatedier/frp/pkg/msg"
|
||||
plugin "github.com/fatedier/frp/pkg/plugin/server"
|
||||
"github.com/fatedier/frp/pkg/proto/wire"
|
||||
"github.com/fatedier/frp/pkg/util/limit"
|
||||
netpkg "github.com/fatedier/frp/pkg/util/net"
|
||||
"github.com/fatedier/frp/pkg/util/xlog"
|
||||
@@ -316,13 +318,147 @@ func (pxy *BaseProxy) handleUserTCPConnection(userConn net.Conn) {
|
||||
name := pxy.GetName()
|
||||
proxyType := cfg.Type
|
||||
metrics.Server.OpenConnection(name, proxyType)
|
||||
inCount, outCount, _ := libio.Join(local, userConn)
|
||||
inCount, outCount, _ := pxy.joinUserConnection(local, userConn, proxyType, xl)
|
||||
metrics.Server.CloseConnection(name, proxyType)
|
||||
metrics.Server.AddTrafficIn(name, proxyType, inCount)
|
||||
metrics.Server.AddTrafficOut(name, proxyType, outCount)
|
||||
xl.Debugf("join connections closed")
|
||||
}
|
||||
|
||||
func (pxy *BaseProxy) joinUserConnection(local io.ReadWriteCloser, userConn net.Conn, proxyType string, xl *xlog.Logger) (int64, int64, []error) {
|
||||
visitorWireProtocol := wireProtocolFromConn(userConn)
|
||||
if proxyType == string(v1.ProxyTypeSUDP) && isMixedWireProtocol(pxy.wireProtocol, visitorWireProtocol) {
|
||||
xl.Infof("bridge mixed SUDP payload codecs, proxy wireProtocol [%s], visitor wireProtocol [%s]",
|
||||
normalizeWireProtocol(pxy.wireProtocol), normalizeWireProtocol(visitorWireProtocol))
|
||||
return joinSUDPMessageBridge(local, userConn, pxy.wireProtocol, visitorWireProtocol, xl)
|
||||
}
|
||||
return libio.Join(local, userConn)
|
||||
}
|
||||
|
||||
type wireProtocolGetter interface {
|
||||
WireProtocol() string
|
||||
}
|
||||
|
||||
func wireProtocolFromConn(conn net.Conn) string {
|
||||
if getter, ok := conn.(wireProtocolGetter); ok {
|
||||
return getter.WireProtocol()
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func isMixedWireProtocol(left, right string) bool {
|
||||
return normalizeWireProtocol(left) != normalizeWireProtocol(right)
|
||||
}
|
||||
|
||||
func normalizeWireProtocol(wireProtocol string) string {
|
||||
if wireProtocol == wire.ProtocolV2 {
|
||||
return wire.ProtocolV2
|
||||
}
|
||||
return wire.ProtocolV1
|
||||
}
|
||||
|
||||
func joinSUDPMessageBridge(
|
||||
proxyConn io.ReadWriteCloser,
|
||||
visitorConn io.ReadWriteCloser,
|
||||
proxyWireProtocol string,
|
||||
visitorWireProtocol string,
|
||||
xl *xlog.Logger,
|
||||
) (inCount int64, outCount int64, errs []error) {
|
||||
// The mixed bridge decodes and re-encodes messages, so raw framed byte counts
|
||||
// are not available. Count UDP payload bytes and ignore heartbeat traffic.
|
||||
proxyRW := msg.NewReadWriter(proxyConn, proxyWireProtocol)
|
||||
visitorRW := msg.NewReadWriter(visitorConn, visitorWireProtocol)
|
||||
|
||||
var (
|
||||
once sync.Once
|
||||
wait sync.WaitGroup
|
||||
recordErrs = make([]error, 2)
|
||||
)
|
||||
closeBoth := func() {
|
||||
_ = proxyConn.Close()
|
||||
_ = visitorConn.Close()
|
||||
}
|
||||
|
||||
wait.Add(2)
|
||||
go func() {
|
||||
defer wait.Done()
|
||||
defer once.Do(closeBoth)
|
||||
recordErrs[0] = bridgeSUDPProxyToVisitor(proxyRW, visitorRW, &outCount, xl)
|
||||
}()
|
||||
go func() {
|
||||
defer wait.Done()
|
||||
defer once.Do(closeBoth)
|
||||
recordErrs[1] = bridgeSUDPVisitorToProxy(visitorRW, proxyRW, &inCount, xl)
|
||||
}()
|
||||
wait.Wait()
|
||||
|
||||
for _, err := range recordErrs {
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func bridgeSUDPProxyToVisitor(from msg.ReadWriter, to msg.ReadWriter, count *int64, xl *xlog.Logger) error {
|
||||
for {
|
||||
rawMsg, err := from.ReadMsg()
|
||||
if err != nil {
|
||||
return normalizeSUDPBridgeError(err)
|
||||
}
|
||||
|
||||
switch m := rawMsg.(type) {
|
||||
case *msg.UDPPacket:
|
||||
if err := to.WriteMsg(m); err != nil {
|
||||
return normalizeSUDPBridgeError(err)
|
||||
}
|
||||
*count += int64(len(m.Content))
|
||||
case *msg.Ping:
|
||||
traceSUDPBridge(xl, "bridge SUDP ping from proxy to visitor")
|
||||
if err := to.WriteMsg(m); err != nil {
|
||||
return normalizeSUDPBridgeError(err)
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("unexpected SUDP proxy message %T", rawMsg)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func bridgeSUDPVisitorToProxy(from msg.ReadWriter, to msg.ReadWriter, count *int64, xl *xlog.Logger) error {
|
||||
for {
|
||||
rawMsg, err := from.ReadMsg()
|
||||
if err != nil {
|
||||
return normalizeSUDPBridgeError(err)
|
||||
}
|
||||
|
||||
switch m := rawMsg.(type) {
|
||||
case *msg.UDPPacket:
|
||||
if err := to.WriteMsg(m); err != nil {
|
||||
return normalizeSUDPBridgeError(err)
|
||||
}
|
||||
*count += int64(len(m.Content))
|
||||
case *msg.Ping:
|
||||
traceSUDPBridge(xl, "drop SUDP ping from visitor to proxy")
|
||||
continue
|
||||
default:
|
||||
return fmt.Errorf("unexpected SUDP visitor message %T", rawMsg)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func normalizeSUDPBridgeError(err error) error {
|
||||
if err == nil || errors.Is(err, io.EOF) || errors.Is(err, net.ErrClosed) {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func traceSUDPBridge(xl *xlog.Logger, format string, args ...any) {
|
||||
if xl != nil {
|
||||
xl.Tracef(format, args...)
|
||||
}
|
||||
}
|
||||
|
||||
type Options struct {
|
||||
UserInfo plugin.UserInfo
|
||||
LoginMsg *msg.Login
|
||||
|
||||
141
server/proxy/sudp_test.go
Normal file
141
server/proxy/sudp_test.go
Normal file
@@ -0,0 +1,141 @@
|
||||
// Copyright 2026 The frp Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/fatedier/frp/pkg/msg"
|
||||
"github.com/fatedier/frp/pkg/proto/wire"
|
||||
)
|
||||
|
||||
func TestSUDPBridgeTranscodesProxyV1ToVisitorV2(t *testing.T) {
|
||||
var in, out bytes.Buffer
|
||||
writeSUDPBridgeMsg(t, &in, wire.ProtocolV1, &msg.UDPPacket{Content: []byte("proxy-to-visitor")})
|
||||
|
||||
var count int64
|
||||
err := bridgeSUDPProxyToVisitor(
|
||||
msg.NewReadWriter(&in, wire.ProtocolV1),
|
||||
msg.NewReadWriter(&out, wire.ProtocolV2),
|
||||
&count,
|
||||
nil,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(len("proxy-to-visitor")), count)
|
||||
|
||||
frame, err := wire.NewConn(&out).ReadFrame()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, wire.FrameTypeMessage, frame.Type)
|
||||
require.GreaterOrEqual(t, len(frame.Payload), 2)
|
||||
require.Equal(t, msg.V2TypeUDPPacket, binary.BigEndian.Uint16(frame.Payload[:2]))
|
||||
|
||||
var got msg.UDPPacket
|
||||
require.NoError(t, msg.DecodeV2MessageFrameInto(frame, &got))
|
||||
require.Equal(t, []byte("proxy-to-visitor"), got.Content)
|
||||
}
|
||||
|
||||
func TestSUDPBridgeTranscodesVisitorV2ToProxyV1(t *testing.T) {
|
||||
var in, out bytes.Buffer
|
||||
writeSUDPBridgeMsg(t, &in, wire.ProtocolV2, &msg.UDPPacket{Content: []byte("visitor-to-proxy")})
|
||||
|
||||
var count int64
|
||||
err := bridgeSUDPVisitorToProxy(
|
||||
msg.NewReadWriter(&in, wire.ProtocolV2),
|
||||
msg.NewReadWriter(&out, wire.ProtocolV1),
|
||||
&count,
|
||||
nil,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(len("visitor-to-proxy")), count)
|
||||
|
||||
reader := bufio.NewReader(&out)
|
||||
typeByte, err := reader.ReadByte()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, msg.TypeUDPPacket, typeByte)
|
||||
require.NoError(t, reader.UnreadByte())
|
||||
|
||||
var got msg.UDPPacket
|
||||
require.NoError(t, msg.ReadMsgInto(reader, &got))
|
||||
require.Equal(t, []byte("visitor-to-proxy"), got.Content)
|
||||
}
|
||||
|
||||
func TestSUDPBridgeForwardsProxyPing(t *testing.T) {
|
||||
var in, out bytes.Buffer
|
||||
writeSUDPBridgeMsg(t, &in, wire.ProtocolV1, &msg.Ping{})
|
||||
|
||||
var count int64
|
||||
err := bridgeSUDPProxyToVisitor(
|
||||
msg.NewReadWriter(&in, wire.ProtocolV1),
|
||||
msg.NewReadWriter(&out, wire.ProtocolV2),
|
||||
&count,
|
||||
nil,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.Zero(t, count)
|
||||
|
||||
rawMsg, err := msg.NewReadWriter(&out, wire.ProtocolV2).ReadMsg()
|
||||
require.NoError(t, err)
|
||||
require.IsType(t, &msg.Ping{}, rawMsg)
|
||||
}
|
||||
|
||||
func TestSUDPBridgeDropsVisitorPing(t *testing.T) {
|
||||
var in, out bytes.Buffer
|
||||
writeSUDPBridgeMsg(t, &in, wire.ProtocolV2, &msg.Ping{})
|
||||
|
||||
var count int64
|
||||
err := bridgeSUDPVisitorToProxy(
|
||||
msg.NewReadWriter(&in, wire.ProtocolV2),
|
||||
msg.NewReadWriter(&out, wire.ProtocolV1),
|
||||
&count,
|
||||
nil,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.Zero(t, count)
|
||||
require.Empty(t, out.Bytes())
|
||||
}
|
||||
|
||||
func TestSUDPBridgeRejectsUnknownVisitorMessage(t *testing.T) {
|
||||
var in, out bytes.Buffer
|
||||
writeSUDPBridgeMsg(t, &in, wire.ProtocolV2, &msg.Pong{})
|
||||
|
||||
var count int64
|
||||
err := bridgeSUDPVisitorToProxy(
|
||||
msg.NewReadWriter(&in, wire.ProtocolV2),
|
||||
msg.NewReadWriter(&out, wire.ProtocolV1),
|
||||
&count,
|
||||
nil,
|
||||
)
|
||||
require.ErrorContains(t, err, "unexpected SUDP visitor message *msg.Pong")
|
||||
require.Zero(t, count)
|
||||
require.Empty(t, out.Bytes())
|
||||
}
|
||||
|
||||
func TestSUDPBridgeDetectsMixedWireProtocol(t *testing.T) {
|
||||
require.False(t, isMixedWireProtocol("", wire.ProtocolV1))
|
||||
require.False(t, isMixedWireProtocol(wire.ProtocolV2, wire.ProtocolV2))
|
||||
require.True(t, isMixedWireProtocol("", wire.ProtocolV2))
|
||||
require.True(t, isMixedWireProtocol(wire.ProtocolV2, wire.ProtocolV1))
|
||||
}
|
||||
|
||||
func writeSUDPBridgeMsg(t *testing.T, buf *bytes.Buffer, wireProtocol string, m msg.Message) {
|
||||
t.Helper()
|
||||
|
||||
require.NoError(t, msg.NewReadWriter(buf, wireProtocol).WriteMsg(m))
|
||||
}
|
||||
@@ -508,7 +508,7 @@ func (svr *Service) handleConnection(ctx context.Context, conn net.Conn, interna
|
||||
conn.Close()
|
||||
}
|
||||
case *msg.NewVisitorConn:
|
||||
if err = svr.RegisterVisitorConn(conn, m); err != nil {
|
||||
if err = svr.RegisterVisitorConn(conn, m, acceptedConn.wireProtocol); err != nil {
|
||||
xl.Warnf("register visitor conn error: %v", err)
|
||||
_ = acceptedConn.conn.WriteMsg(&msg.NewVisitorConnResp{
|
||||
ProxyName: m.ProxyName,
|
||||
@@ -833,7 +833,7 @@ func (svr *Service) RegisterWorkConn(workConn *msg.Conn, newMsg *msg.NewWorkConn
|
||||
return ctl.RegisterWorkConn(proxy.NewWorkConn(workConn))
|
||||
}
|
||||
|
||||
func (svr *Service) RegisterVisitorConn(visitorConn net.Conn, newMsg *msg.NewVisitorConn) error {
|
||||
func (svr *Service) RegisterVisitorConn(visitorConn net.Conn, newMsg *msg.NewVisitorConn, wireProtocol string) error {
|
||||
visitorUser := ""
|
||||
// TODO(deprecation): Compatible with old versions, can be without runID, user is empty. In later versions, it will be mandatory to include runID.
|
||||
// If runID is required, it is not compatible with versions prior to v0.50.0.
|
||||
@@ -845,5 +845,5 @@ func (svr *Service) RegisterVisitorConn(visitorConn net.Conn, newMsg *msg.NewVis
|
||||
visitorUser = ctl.sessionCtx.LoginMsg.User
|
||||
}
|
||||
return svr.rc.VisitorManager.NewConn(newMsg.ProxyName, visitorConn, newMsg.Timestamp, newMsg.SignKey,
|
||||
newMsg.UseEncryption, newMsg.UseCompression, visitorUser)
|
||||
newMsg.UseEncryption, newMsg.UseCompression, visitorUser, wireProtocol)
|
||||
}
|
||||
|
||||
@@ -65,6 +65,7 @@ func (vm *Manager) Listen(name string, sk string, allowUsers []string) (*netpkg.
|
||||
|
||||
func (vm *Manager) NewConn(name string, conn net.Conn, timestamp int64, signKey string,
|
||||
useEncryption bool, useCompression bool, visitorUser string,
|
||||
wireProtocol string,
|
||||
) (err error) {
|
||||
vm.mu.RLock()
|
||||
defer vm.mu.RUnlock()
|
||||
@@ -90,7 +91,11 @@ func (vm *Manager) NewConn(name string, conn net.Conn, timestamp int64, signKey
|
||||
if useCompression {
|
||||
rwc = libio.WithCompression(rwc)
|
||||
}
|
||||
err = l.l.PutConn(netpkg.WrapReadWriteCloserToConn(rwc, conn))
|
||||
visitorConn := netpkg.WrapReadWriteCloserToConn(rwc, conn)
|
||||
err = l.l.PutConn(&wireProtocolConn{
|
||||
Conn: visitorConn,
|
||||
wireProtocol: wireProtocol,
|
||||
})
|
||||
} else {
|
||||
err = fmt.Errorf("custom listener for [%s] doesn't exist", name)
|
||||
return
|
||||
@@ -98,6 +103,15 @@ func (vm *Manager) NewConn(name string, conn net.Conn, timestamp int64, signKey
|
||||
return
|
||||
}
|
||||
|
||||
type wireProtocolConn struct {
|
||||
net.Conn
|
||||
wireProtocol string
|
||||
}
|
||||
|
||||
func (c *wireProtocolConn) WireProtocol() string {
|
||||
return c.wireProtocol
|
||||
}
|
||||
|
||||
func (vm *Manager) CloseListener(name string) {
|
||||
vm.mu.Lock()
|
||||
defer vm.mu.Unlock()
|
||||
|
||||
61
server/visitor/visitor_test.go
Normal file
61
server/visitor/visitor_test.go
Normal file
@@ -0,0 +1,61 @@
|
||||
// Copyright 2026 The frp Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package visitor
|
||||
|
||||
import (
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/fatedier/frp/pkg/proto/wire"
|
||||
"github.com/fatedier/frp/pkg/util/util"
|
||||
)
|
||||
|
||||
func TestManagerNewConnCarriesWireProtocol(t *testing.T) {
|
||||
vm := NewManager()
|
||||
listener, err := vm.Listen("sudp", "secret", []string{"*"})
|
||||
require.NoError(t, err)
|
||||
defer listener.Close()
|
||||
|
||||
client, server := net.Pipe()
|
||||
defer client.Close()
|
||||
defer server.Close()
|
||||
|
||||
now := time.Now().Unix()
|
||||
errCh := make(chan error, 1)
|
||||
go func() {
|
||||
errCh <- vm.NewConn(
|
||||
"sudp",
|
||||
server,
|
||||
now,
|
||||
util.GetAuthKey("secret", now),
|
||||
false,
|
||||
false,
|
||||
"user",
|
||||
wire.ProtocolV2,
|
||||
)
|
||||
}()
|
||||
|
||||
acceptedConn, err := listener.Accept()
|
||||
require.NoError(t, err)
|
||||
defer acceptedConn.Close()
|
||||
|
||||
getter, ok := acceptedConn.(interface{ WireProtocol() string })
|
||||
require.True(t, ok)
|
||||
require.Equal(t, wire.ProtocolV2, getter.WireProtocol())
|
||||
require.NoError(t, <-errCh)
|
||||
}
|
||||
@@ -98,6 +98,73 @@ var _ = ginkgo.Describe("[Feature: WireProtocol]", func() {
|
||||
framework.NewRequestExpect(f).PortName(bindPortName).Ensure()
|
||||
})
|
||||
|
||||
for _, tc := range []struct {
|
||||
name string
|
||||
proxyWireConfig string
|
||||
visitorWireConfig string
|
||||
extraProxyConfig string
|
||||
extraVisitorConfig string
|
||||
}{
|
||||
{
|
||||
name: "default sudp visitor",
|
||||
},
|
||||
{
|
||||
name: "v2 sudp visitor",
|
||||
proxyWireConfig: `transport.wireProtocol = "v2"`,
|
||||
visitorWireConfig: `transport.wireProtocol = "v2"`,
|
||||
},
|
||||
{
|
||||
name: "mixed sudp proxy v1 visitor v2",
|
||||
proxyWireConfig: `transport.wireProtocol = "v1"`,
|
||||
visitorWireConfig: `transport.wireProtocol = "v2"`,
|
||||
extraProxyConfig: `
|
||||
transport.useEncryption = true
|
||||
transport.useCompression = true
|
||||
`,
|
||||
extraVisitorConfig: `
|
||||
transport.useEncryption = true
|
||||
transport.useCompression = true
|
||||
`,
|
||||
},
|
||||
{
|
||||
name: "mixed sudp proxy v2 visitor v1",
|
||||
proxyWireConfig: `transport.wireProtocol = "v2"`,
|
||||
visitorWireConfig: `transport.wireProtocol = "v1"`,
|
||||
},
|
||||
} {
|
||||
ginkgo.It(tc.name, func() {
|
||||
serverConf := consts.DefaultServerConfig
|
||||
bindPortName := port.GenName("WireSUDP")
|
||||
clientServerConf := consts.DefaultClientConfig + fmt.Sprintf(`
|
||||
user = "user1"
|
||||
%s
|
||||
|
||||
[[proxies]]
|
||||
name = "sudp"
|
||||
type = "sudp"
|
||||
secretKey = "abc"
|
||||
localPort = {{ .%s }}
|
||||
%s
|
||||
`, tc.proxyWireConfig, framework.UDPEchoServerPort, tc.extraProxyConfig)
|
||||
clientVisitorConf := consts.DefaultClientConfig + fmt.Sprintf(`
|
||||
user = "user1"
|
||||
%s
|
||||
|
||||
[[visitors]]
|
||||
name = "sudp-visitor"
|
||||
type = "sudp"
|
||||
serverName = "sudp"
|
||||
secretKey = "abc"
|
||||
bindPort = {{ .%s }}
|
||||
%s
|
||||
`, tc.visitorWireConfig, bindPortName, tc.extraVisitorConfig)
|
||||
|
||||
f.RunProcesses(serverConf, []string{clientServerConf, clientVisitorConf})
|
||||
|
||||
framework.NewRequestExpect(f).Protocol("udp").PortName(bindPortName).Ensure()
|
||||
})
|
||||
}
|
||||
|
||||
ginkgo.It("reports client wire protocol", func() {
|
||||
webPort := f.AllocPort()
|
||||
serverConf := consts.DefaultServerConfig + fmt.Sprintf(`
|
||||
|
||||
Reference in New Issue
Block a user