Merge pull request #5350 from fatedier/dev

Release v0.69.1
This commit is contained in:
fatedier
2026-06-01 18:02:22 +08:00
committed by GitHub
20 changed files with 784 additions and 67 deletions

View File

@@ -1,21 +1,9 @@
## Compatibility Policy
Starting with v0.69.0, each minor release is supported until there are nine newer minor releases. For example, v0.69.0 will be supported until v0.78.0 is released. Within this window, frpc v0.69.0 is guaranteed to work with any frps from v0.61.0 to v0.77.0, and vice versa. Patch releases within the same minor are always compatible. Versions outside the support window may continue to work on a best-effort basis, but compatibility is no longer guaranteed.
For mixed-version deployments, upgrade frps first, then upgrade frpc. This keeps the server side ready for newer client-side protocol behavior before clients start using it.
## Notes
This release introduces wire protocol v2 as a transition path for future frpc/frps protocol changes. The existing wire protocol is difficult to extend without compatibility risk, and upcoming changes, including replacing deprecated stream encryption methods, require a versioned protocol.
**The default value of `transport.wireProtocol` remains `v1` in this release.** Users can keep the default for now. To test v2 early, upgrade both frpc and frps to versions that support it, then set `transport.wireProtocol = "v2"` in frpc. A v2-enabled frpc cannot connect to an older frps.
When `transport.wireProtocol = "v2"` is enabled, the control channel uses negotiated AEAD encryption after the login handshake. Both frpc and frps must be upgraded to this release to use v2.
v1 will be deprecated when v2 becomes the default in a future release. It will continue to be supported until v0.78.0 is released, and may be removed in v0.78.0 or later.
## Features
* Added `transport.wireProtocol` for frpc to select the internal message protocol used between frpc and frps. Supported values are `v1` and `v2`.
* Added client protocol visibility in the frps dashboard and `/api/clients` API. Online clients now report their negotiated protocol as `v1` or `v2`.
* Wire protocol v2 now negotiates AEAD control-channel encryption. Supported algorithms are `xchacha20-poly1305` and `aes-256-gcm`; frpc advertises its preferred order based on local AES-GCM hardware support, and frps selects the first supported algorithm from that list.
* `transport.wireProtocol = "v2"` now also applies to UDP-based proxy payloads, including ordinary UDP and SUDP, so their payload framing is consistent with the selected wire protocol.
* Improved SUDP compatibility during mixed `transport.wireProtocol` deployments, allowing frps to bridge payloads between v1/default and v2 SUDP clients.
* XTCP work connection `NatHoleSid` messages now follow the selected `transport.wireProtocol`.
## Compatibility Notes
* When enabling `transport.wireProtocol = "v2"` for SUDP, upgrade both the proxy and visitor frpc instances first, or keep them on `v1` until both sides are upgraded.

View File

@@ -87,6 +87,7 @@ func (pxy *SUDPProxy) InWorkConn(conn net.Conn, _ *msg.StartWorkConn) {
}
workConn := netpkg.WrapReadWriteCloserToConn(remote, conn)
payloadConn := msg.NewConn(workConn, msg.NewReadWriter(workConn, pxy.clientCfg.Transport.WireProtocol))
readCh := make(chan *msg.UDPPacket, 1024)
sendCh := make(chan msg.Message, 1024)
isClose := false
@@ -109,7 +110,7 @@ func (pxy *SUDPProxy) InWorkConn(conn net.Conn, _ *msg.StartWorkConn) {
}
// udp service <- frpc <- frps <- frpc visitor <- user
workConnReaderFn := func(conn net.Conn, readCh chan *msg.UDPPacket) {
workConnReaderFn := func(payloadConn *msg.Conn, readCh chan *msg.UDPPacket) {
defer closeFn()
for {
@@ -122,7 +123,7 @@ func (pxy *SUDPProxy) InWorkConn(conn net.Conn, _ *msg.StartWorkConn) {
}
var udpMsg msg.UDPPacket
if errRet := msg.ReadMsgInto(conn, &udpMsg); errRet != nil {
if errRet := payloadConn.ReadMsgInto(&udpMsg); errRet != nil {
xl.Warnf("read from workConn for sudp error: %v", errRet)
return
}
@@ -137,7 +138,7 @@ func (pxy *SUDPProxy) InWorkConn(conn net.Conn, _ *msg.StartWorkConn) {
}
// udp service -> frpc -> frps -> frpc visitor -> user
workConnSenderFn := func(conn net.Conn, sendCh chan msg.Message) {
workConnSenderFn := func(payloadConn *msg.Conn, sendCh chan msg.Message) {
defer func() {
closeFn()
xl.Infof("writer goroutine for sudp work connection closed")
@@ -148,12 +149,12 @@ func (pxy *SUDPProxy) InWorkConn(conn net.Conn, _ *msg.StartWorkConn) {
switch m := rawMsg.(type) {
case *msg.UDPPacket:
xl.Tracef("frpc send udp package to frpc visitor, [udp local: %v, remote: %v], [tcp work conn local: %v, remote: %v]",
m.LocalAddr.String(), m.RemoteAddr.String(), conn.LocalAddr().String(), conn.RemoteAddr().String())
m.LocalAddr.String(), m.RemoteAddr.String(), payloadConn.LocalAddr().String(), payloadConn.RemoteAddr().String())
case *msg.Ping:
xl.Tracef("frpc send ping message to frpc visitor")
}
if errRet = msg.WriteMsg(conn, rawMsg); errRet != nil {
if errRet = payloadConn.WriteMsg(rawMsg); errRet != nil {
xl.Errorf("sudp work write error: %v", errRet)
return
}
@@ -184,8 +185,8 @@ func (pxy *SUDPProxy) InWorkConn(conn net.Conn, _ *msg.StartWorkConn) {
}
}
go workConnSenderFn(workConn, sendCh)
go workConnReaderFn(workConn, readCh)
go workConnSenderFn(payloadConn, sendCh)
go workConnReaderFn(payloadConn, readCh)
go heartbeatFn(sendCh)
udp.Forwarder(pxy.localAddr, readCh, sendCh, int(pxy.clientCfg.UDPPacketSize), pxy.cfg.Transport.ProxyProtocolVersion)

View File

@@ -99,15 +99,17 @@ func (pxy *UDPProxy) InWorkConn(conn net.Conn, _ *msg.StartWorkConn) {
pxy.mu.Lock()
pxy.workConn = netpkg.WrapReadWriteCloserToConn(remote, conn)
// Plain UDP payload follows the configured wire protocol for message framing.
payloadRW := msg.NewReadWriter(pxy.workConn, pxy.clientCfg.Transport.WireProtocol)
pxy.readCh = make(chan *msg.UDPPacket, 1024)
pxy.sendCh = make(chan msg.Message, 1024)
pxy.closed = false
pxy.mu.Unlock()
workConnReaderFn := func(conn net.Conn, readCh chan *msg.UDPPacket) {
workConnReaderFn := func(rw msg.ReadWriter, readCh chan *msg.UDPPacket) {
for {
var udpMsg msg.UDPPacket
if errRet := msg.ReadMsgInto(conn, &udpMsg); errRet != nil {
if errRet := rw.ReadMsgInto(&udpMsg); errRet != nil {
xl.Warnf("read from workConn for udp error: %v", errRet)
return
}
@@ -120,7 +122,7 @@ func (pxy *UDPProxy) InWorkConn(conn net.Conn, _ *msg.StartWorkConn) {
}
}
}
workConnSenderFn := func(conn net.Conn, sendCh chan msg.Message) {
workConnSenderFn := func(rw msg.ReadWriter, sendCh chan msg.Message) {
defer func() {
xl.Infof("writer goroutine for udp work connection closed")
}()
@@ -132,7 +134,7 @@ func (pxy *UDPProxy) InWorkConn(conn net.Conn, _ *msg.StartWorkConn) {
case *msg.Ping:
xl.Tracef("send ping message to udp workConn")
}
if errRet = msg.WriteMsg(conn, rawMsg); errRet != nil {
if errRet = rw.WriteMsg(rawMsg); errRet != nil {
xl.Errorf("udp work write error: %v", errRet)
return
}
@@ -151,8 +153,8 @@ func (pxy *UDPProxy) InWorkConn(conn net.Conn, _ *msg.StartWorkConn) {
}
}
go workConnSenderFn(pxy.workConn, pxy.sendCh)
go workConnReaderFn(pxy.workConn, pxy.readCh)
go workConnSenderFn(payloadRW, pxy.sendCh)
go workConnReaderFn(payloadRW, pxy.readCh)
go heartbeatFn(pxy.sendCh)
// Call Forwarder with proxy protocol version (empty string means no proxy protocol)

View File

@@ -57,8 +57,7 @@ func NewXTCPProxy(baseProxy *BaseProxy, cfg v1.ProxyConfigurer) Proxy {
func (pxy *XTCPProxy) InWorkConn(conn net.Conn, startWorkConnMsg *msg.StartWorkConn) {
xl := pxy.xl
defer conn.Close()
var natHoleSidMsg msg.NatHoleSid
err := msg.ReadMsgInto(conn, &natHoleSidMsg)
natHoleSidMsg, err := readNatHoleSid(conn, pxy.clientCfg.Transport.WireProtocol)
if err != nil {
xl.Errorf("xtcp read from workConn error: %v", err)
return
@@ -131,6 +130,15 @@ func (pxy *XTCPProxy) InWorkConn(conn net.Conn, startWorkConnMsg *msg.StartWorkC
pxy.listenByQUIC(listenConn, raddr, startWorkConnMsg)
}
func readNatHoleSid(conn net.Conn, wireProtocol string) (*msg.NatHoleSid, error) {
workMsgConn := msg.NewConn(conn, msg.NewReadWriter(conn, wireProtocol))
var natHoleSidMsg msg.NatHoleSid
if err := workMsgConn.ReadMsgInto(&natHoleSidMsg); err != nil {
return nil, err
}
return &natHoleSidMsg, nil
}
func (pxy *XTCPProxy) listenByKCP(listenConn *net.UDPConn, raddr *net.UDPAddr, startWorkConnMsg *msg.StartWorkConn) {
xl := pxy.xl
listenConn.Close()

66
client/proxy/xtcp_test.go Normal file
View File

@@ -0,0 +1,66 @@
// Copyright 2026 The frp Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build !frps
package proxy
import (
"net"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/fatedier/frp/pkg/msg"
"github.com/fatedier/frp/pkg/proto/wire"
)
func TestReadNatHoleSidUsesSelectedWireProtocol(t *testing.T) {
for _, tc := range []struct {
name string
wireProtocol string
}{
{name: "v2", wireProtocol: wire.ProtocolV2},
{name: "v1", wireProtocol: wire.ProtocolV1},
{name: "default", wireProtocol: ""},
} {
t.Run(tc.name, func(t *testing.T) {
client, server := net.Pipe()
defer client.Close()
defer server.Close()
setPipeDeadline(t, client, server)
errCh := make(chan error, 1)
go func() {
writer := msg.NewConn(server, msg.NewReadWriter(server, tc.wireProtocol))
errCh <- writer.WriteMsg(&msg.NatHoleSid{Sid: "sid"})
}()
out, err := readNatHoleSid(client, tc.wireProtocol)
require.NoError(t, err)
require.Equal(t, "sid", out.Sid)
require.NoError(t, <-errCh)
})
}
}
func setPipeDeadline(t *testing.T, conns ...net.Conn) {
t.Helper()
deadline := time.Now().Add(time.Second)
for _, conn := range conns {
require.NoError(t, conn.SetDeadline(deadline))
}
}

View File

@@ -113,15 +113,16 @@ func (sv *SUDPVisitor) dispatcher() {
func (sv *SUDPVisitor) worker(workConn net.Conn, firstPacket *msg.UDPPacket) {
xl := xlog.FromContextSafe(sv.ctx)
xl.Debugf("starting sudp proxy worker")
payloadConn := msg.NewConn(workConn, msg.NewReadWriter(workConn, sv.clientCfg.Transport.WireProtocol))
wg := &sync.WaitGroup{}
wg.Add(2)
closeCh := make(chan struct{})
// udp service -> frpc -> frps -> frpc visitor -> user
workConnReaderFn := func(conn net.Conn) {
workConnReaderFn := func(payloadConn *msg.Conn) {
defer func() {
conn.Close()
payloadConn.Close()
close(closeCh)
wg.Done()
}()
@@ -133,13 +134,13 @@ func (sv *SUDPVisitor) worker(workConn net.Conn, firstPacket *msg.UDPPacket) {
)
// frpc will send heartbeat in workConn to frpc visitor for keeping alive
_ = conn.SetReadDeadline(time.Now().Add(60 * time.Second))
if rawMsg, errRet = msg.ReadMsg(conn); errRet != nil {
_ = payloadConn.SetReadDeadline(time.Now().Add(60 * time.Second))
if rawMsg, errRet = payloadConn.ReadMsg(); errRet != nil {
xl.Warnf("read from workconn for user udp conn error: %v", errRet)
return
}
_ = conn.SetReadDeadline(time.Time{})
_ = payloadConn.SetReadDeadline(time.Time{})
switch m := rawMsg.(type) {
case *msg.Ping:
xl.Debugf("frpc visitor get ping message from frpc")
@@ -157,15 +158,15 @@ func (sv *SUDPVisitor) worker(workConn net.Conn, firstPacket *msg.UDPPacket) {
}
// udp service <- frpc <- frps <- frpc visitor <- user
workConnSenderFn := func(conn net.Conn) {
workConnSenderFn := func(payloadConn *msg.Conn) {
defer func() {
conn.Close()
payloadConn.Close()
wg.Done()
}()
var errRet error
if firstPacket != nil {
if errRet = msg.WriteMsg(conn, firstPacket); errRet != nil {
if errRet = payloadConn.WriteMsg(firstPacket); errRet != nil {
xl.Warnf("sender goroutine for udp work connection closed: %v", errRet)
return
}
@@ -180,7 +181,7 @@ func (sv *SUDPVisitor) worker(workConn net.Conn, firstPacket *msg.UDPPacket) {
return
}
if errRet = msg.WriteMsg(conn, udpMsg); errRet != nil {
if errRet = payloadConn.WriteMsg(udpMsg); errRet != nil {
xl.Warnf("sender goroutine for udp work connection closed: %v", errRet)
return
}
@@ -191,8 +192,8 @@ func (sv *SUDPVisitor) worker(workConn net.Conn, firstPacket *msg.UDPPacket) {
}
}
go workConnReaderFn(workConn)
go workConnSenderFn(workConn)
go workConnReaderFn(payloadConn)
go workConnSenderFn(payloadConn)
wg.Wait()
xl.Infof("sudp worker is closed")

View File

@@ -43,9 +43,28 @@ func TestV2ReadWriterRoundTrip(t *testing.T) {
func TestNewReadWriter(t *testing.T) {
require.IsType(t, &V1ReadWriter{}, NewReadWriter(&bytes.Buffer{}, ""))
require.IsType(t, &V1ReadWriter{}, NewReadWriter(&bytes.Buffer{}, wire.ProtocolV1))
require.IsType(t, &V1ReadWriter{}, NewReadWriter(&bytes.Buffer{}, "unknown"))
require.IsType(t, &V2ReadWriter{}, NewReadWriter(&bytes.Buffer{}, wire.ProtocolV2))
}
func TestNewReadWriterEncoding(t *testing.T) {
for _, wireProtocol := range []string{"", wire.ProtocolV1} {
var legacy bytes.Buffer
legacyRW := NewReadWriter(&legacy, wireProtocol)
require.NoError(t, legacyRW.WriteMsg(&UDPPacket{Content: []byte("legacy")}))
require.NotEmpty(t, legacy.Bytes())
require.Equal(t, TypeUDPPacket, legacy.Bytes()[0])
}
var v2 bytes.Buffer
v2RW := NewReadWriter(&v2, wire.ProtocolV2)
require.NoError(t, v2RW.WriteMsg(&UDPPacket{Content: []byte("v2")}))
frame, err := wire.NewConn(&v2).ReadFrame()
require.NoError(t, err)
require.Equal(t, wire.FrameTypeMessage, frame.Type)
require.Equal(t, V2TypeUDPPacket, binary.BigEndian.Uint16(frame.Payload[:2]))
}
func TestV2MessageTypeIDsAreStable(t *testing.T) {
require.Equal(t, uint16(1), V2TypeLogin)
require.Equal(t, uint16(2), V2TypeLoginResp)

View File

@@ -14,7 +14,7 @@
package version
var version = "0.69.0"
var version = "0.69.1"
func Full() string {
return version

View File

@@ -112,6 +112,8 @@ type SessionContext struct {
ServerCfg *v1.ServerConfig
// client registry
ClientRegistry *registry.ClientRegistry
// negotiated wire protocol for this client session
WireProtocol string
}
type Control struct {
@@ -452,6 +454,7 @@ func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err
Configurer: pxyConf,
ServerCfg: ctl.sessionCtx.ServerCfg,
EncryptionKey: ctl.sessionCtx.EncryptionKey,
WireProtocol: ctl.sessionCtx.WireProtocol,
})
if err != nil {
return remoteAddr, err

View File

@@ -16,6 +16,7 @@ package proxy
import (
"context"
"errors"
"fmt"
"io"
"net"
@@ -31,6 +32,7 @@ import (
v1 "github.com/fatedier/frp/pkg/config/v1"
"github.com/fatedier/frp/pkg/msg"
plugin "github.com/fatedier/frp/pkg/plugin/server"
"github.com/fatedier/frp/pkg/proto/wire"
"github.com/fatedier/frp/pkg/util/limit"
netpkg "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/pkg/util/xlog"
@@ -92,6 +94,7 @@ type BaseProxy struct {
userInfo plugin.UserInfo
loginMsg *msg.Login
configurer v1.ProxyConfigurer
wireProtocol string
mu sync.RWMutex
xl *xlog.Logger
@@ -315,13 +318,147 @@ func (pxy *BaseProxy) handleUserTCPConnection(userConn net.Conn) {
name := pxy.GetName()
proxyType := cfg.Type
metrics.Server.OpenConnection(name, proxyType)
inCount, outCount, _ := libio.Join(local, userConn)
inCount, outCount, _ := pxy.joinUserConnection(local, userConn, proxyType, xl)
metrics.Server.CloseConnection(name, proxyType)
metrics.Server.AddTrafficIn(name, proxyType, inCount)
metrics.Server.AddTrafficOut(name, proxyType, outCount)
xl.Debugf("join connections closed")
}
func (pxy *BaseProxy) joinUserConnection(local io.ReadWriteCloser, userConn net.Conn, proxyType string, xl *xlog.Logger) (int64, int64, []error) {
visitorWireProtocol := wireProtocolFromConn(userConn)
if proxyType == string(v1.ProxyTypeSUDP) && isMixedWireProtocol(pxy.wireProtocol, visitorWireProtocol) {
xl.Infof("bridge mixed SUDP payload codecs, proxy wireProtocol [%s], visitor wireProtocol [%s]",
normalizeWireProtocol(pxy.wireProtocol), normalizeWireProtocol(visitorWireProtocol))
return joinSUDPMessageBridge(local, userConn, pxy.wireProtocol, visitorWireProtocol, xl)
}
return libio.Join(local, userConn)
}
type wireProtocolGetter interface {
WireProtocol() string
}
func wireProtocolFromConn(conn net.Conn) string {
if getter, ok := conn.(wireProtocolGetter); ok {
return getter.WireProtocol()
}
return ""
}
func isMixedWireProtocol(left, right string) bool {
return normalizeWireProtocol(left) != normalizeWireProtocol(right)
}
func normalizeWireProtocol(wireProtocol string) string {
if wireProtocol == wire.ProtocolV2 {
return wire.ProtocolV2
}
return wire.ProtocolV1
}
func joinSUDPMessageBridge(
proxyConn io.ReadWriteCloser,
visitorConn io.ReadWriteCloser,
proxyWireProtocol string,
visitorWireProtocol string,
xl *xlog.Logger,
) (inCount int64, outCount int64, errs []error) {
// The mixed bridge decodes and re-encodes messages, so raw framed byte counts
// are not available. Count UDP payload bytes and ignore heartbeat traffic.
proxyRW := msg.NewReadWriter(proxyConn, proxyWireProtocol)
visitorRW := msg.NewReadWriter(visitorConn, visitorWireProtocol)
var (
once sync.Once
wait sync.WaitGroup
recordErrs = make([]error, 2)
)
closeBoth := func() {
_ = proxyConn.Close()
_ = visitorConn.Close()
}
wait.Add(2)
go func() {
defer wait.Done()
defer once.Do(closeBoth)
recordErrs[0] = bridgeSUDPProxyToVisitor(proxyRW, visitorRW, &outCount, xl)
}()
go func() {
defer wait.Done()
defer once.Do(closeBoth)
recordErrs[1] = bridgeSUDPVisitorToProxy(visitorRW, proxyRW, &inCount, xl)
}()
wait.Wait()
for _, err := range recordErrs {
if err != nil {
errs = append(errs, err)
}
}
return
}
func bridgeSUDPProxyToVisitor(from msg.ReadWriter, to msg.ReadWriter, count *int64, xl *xlog.Logger) error {
for {
rawMsg, err := from.ReadMsg()
if err != nil {
return normalizeSUDPBridgeError(err)
}
switch m := rawMsg.(type) {
case *msg.UDPPacket:
if err := to.WriteMsg(m); err != nil {
return normalizeSUDPBridgeError(err)
}
*count += int64(len(m.Content))
case *msg.Ping:
traceSUDPBridge(xl, "bridge SUDP ping from proxy to visitor")
if err := to.WriteMsg(m); err != nil {
return normalizeSUDPBridgeError(err)
}
default:
return fmt.Errorf("unexpected SUDP proxy message %T", rawMsg)
}
}
}
func bridgeSUDPVisitorToProxy(from msg.ReadWriter, to msg.ReadWriter, count *int64, xl *xlog.Logger) error {
for {
rawMsg, err := from.ReadMsg()
if err != nil {
return normalizeSUDPBridgeError(err)
}
switch m := rawMsg.(type) {
case *msg.UDPPacket:
if err := to.WriteMsg(m); err != nil {
return normalizeSUDPBridgeError(err)
}
*count += int64(len(m.Content))
case *msg.Ping:
traceSUDPBridge(xl, "drop SUDP ping from visitor to proxy")
continue
default:
return fmt.Errorf("unexpected SUDP visitor message %T", rawMsg)
}
}
}
func normalizeSUDPBridgeError(err error) error {
if err == nil || errors.Is(err, io.EOF) || errors.Is(err, net.ErrClosed) {
return nil
}
return err
}
func traceSUDPBridge(xl *xlog.Logger, format string, args ...any) {
if xl != nil {
xl.Tracef(format, args...)
}
}
type Options struct {
UserInfo plugin.UserInfo
LoginMsg *msg.Login
@@ -331,6 +468,7 @@ type Options struct {
Configurer v1.ProxyConfigurer
ServerCfg *v1.ServerConfig
EncryptionKey []byte
WireProtocol string
}
func NewProxy(ctx context.Context, options *Options) (pxy Proxy, err error) {
@@ -357,6 +495,7 @@ func NewProxy(ctx context.Context, options *Options) (pxy Proxy, err error) {
userInfo: options.UserInfo,
loginMsg: options.LoginMsg,
configurer: configurer,
wireProtocol: options.WireProtocol,
}
factory := proxyFactoryRegistry[reflect.TypeOf(configurer)]

View File

@@ -15,12 +15,15 @@
package proxy
import (
"context"
"net"
"testing"
"github.com/stretchr/testify/require"
v1 "github.com/fatedier/frp/pkg/config/v1"
"github.com/fatedier/frp/pkg/msg"
"github.com/fatedier/frp/pkg/proto/wire"
)
func TestWorkConnStartWritesStartWorkConn(t *testing.T) {
@@ -51,3 +54,56 @@ func TestWorkConnStartWritesStartWorkConn(t *testing.T) {
require.NoError(t, result.err)
require.Same(t, serverMsgConn, result.conn)
}
func TestGetWorkConnFromPoolStartWorkConnUnchangedForUDPWireV2(t *testing.T) {
startMsg := getStartWorkConnFromPool(t, &v1.UDPProxyConfig{
ProxyBaseConfig: v1.ProxyBaseConfig{Name: "udp", Type: string(v1.ProxyTypeUDP)},
}, wire.ProtocolV2)
require.Equal(t, msg.StartWorkConn{ProxyName: "udp"}, startMsg)
}
func TestGetWorkConnFromPoolLeavesRawTCPPayloadUnframed(t *testing.T) {
startMsg := getStartWorkConnFromPool(t, &v1.TCPProxyConfig{
ProxyBaseConfig: v1.ProxyBaseConfig{Name: "tcp", Type: string(v1.ProxyTypeTCP)},
}, wire.ProtocolV2)
require.Equal(t, msg.StartWorkConn{ProxyName: "tcp"}, startMsg)
}
func getStartWorkConnFromPool(t *testing.T, cfg v1.ProxyConfigurer, wireProtocol string) msg.StartWorkConn {
t.Helper()
client, server := net.Pipe()
t.Cleanup(func() {
client.Close()
server.Close()
})
serverMsgConn := msg.NewConn(server, msg.NewV2ReadWriter(server))
clientMsgConn := msg.NewConn(client, msg.NewV2ReadWriter(client))
pxy := &BaseProxy{
name: cfg.GetBaseConfig().Name,
configurer: cfg,
poolCount: 0,
ctx: context.Background(),
wireProtocol: wireProtocol,
getWorkConnFn: func() (*WorkConn, error) {
return NewWorkConn(serverMsgConn), nil
},
}
errCh := make(chan error, 1)
go func() {
conn, err := pxy.GetWorkConnFromPool(nil, nil)
if conn != nil {
conn.Close()
}
errCh <- err
}()
var startMsg msg.StartWorkConn
require.NoError(t, clientMsgConn.ReadMsgInto(&startMsg))
require.NoError(t, <-errCh)
return startMsg
}

141
server/proxy/sudp_test.go Normal file
View File

@@ -0,0 +1,141 @@
// Copyright 2026 The frp Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package proxy
import (
"bufio"
"bytes"
"encoding/binary"
"testing"
"github.com/stretchr/testify/require"
"github.com/fatedier/frp/pkg/msg"
"github.com/fatedier/frp/pkg/proto/wire"
)
func TestSUDPBridgeTranscodesProxyV1ToVisitorV2(t *testing.T) {
var in, out bytes.Buffer
writeSUDPBridgeMsg(t, &in, wire.ProtocolV1, &msg.UDPPacket{Content: []byte("proxy-to-visitor")})
var count int64
err := bridgeSUDPProxyToVisitor(
msg.NewReadWriter(&in, wire.ProtocolV1),
msg.NewReadWriter(&out, wire.ProtocolV2),
&count,
nil,
)
require.NoError(t, err)
require.Equal(t, int64(len("proxy-to-visitor")), count)
frame, err := wire.NewConn(&out).ReadFrame()
require.NoError(t, err)
require.Equal(t, wire.FrameTypeMessage, frame.Type)
require.GreaterOrEqual(t, len(frame.Payload), 2)
require.Equal(t, msg.V2TypeUDPPacket, binary.BigEndian.Uint16(frame.Payload[:2]))
var got msg.UDPPacket
require.NoError(t, msg.DecodeV2MessageFrameInto(frame, &got))
require.Equal(t, []byte("proxy-to-visitor"), got.Content)
}
func TestSUDPBridgeTranscodesVisitorV2ToProxyV1(t *testing.T) {
var in, out bytes.Buffer
writeSUDPBridgeMsg(t, &in, wire.ProtocolV2, &msg.UDPPacket{Content: []byte("visitor-to-proxy")})
var count int64
err := bridgeSUDPVisitorToProxy(
msg.NewReadWriter(&in, wire.ProtocolV2),
msg.NewReadWriter(&out, wire.ProtocolV1),
&count,
nil,
)
require.NoError(t, err)
require.Equal(t, int64(len("visitor-to-proxy")), count)
reader := bufio.NewReader(&out)
typeByte, err := reader.ReadByte()
require.NoError(t, err)
require.Equal(t, msg.TypeUDPPacket, typeByte)
require.NoError(t, reader.UnreadByte())
var got msg.UDPPacket
require.NoError(t, msg.ReadMsgInto(reader, &got))
require.Equal(t, []byte("visitor-to-proxy"), got.Content)
}
func TestSUDPBridgeForwardsProxyPing(t *testing.T) {
var in, out bytes.Buffer
writeSUDPBridgeMsg(t, &in, wire.ProtocolV1, &msg.Ping{})
var count int64
err := bridgeSUDPProxyToVisitor(
msg.NewReadWriter(&in, wire.ProtocolV1),
msg.NewReadWriter(&out, wire.ProtocolV2),
&count,
nil,
)
require.NoError(t, err)
require.Zero(t, count)
rawMsg, err := msg.NewReadWriter(&out, wire.ProtocolV2).ReadMsg()
require.NoError(t, err)
require.IsType(t, &msg.Ping{}, rawMsg)
}
func TestSUDPBridgeDropsVisitorPing(t *testing.T) {
var in, out bytes.Buffer
writeSUDPBridgeMsg(t, &in, wire.ProtocolV2, &msg.Ping{})
var count int64
err := bridgeSUDPVisitorToProxy(
msg.NewReadWriter(&in, wire.ProtocolV2),
msg.NewReadWriter(&out, wire.ProtocolV1),
&count,
nil,
)
require.NoError(t, err)
require.Zero(t, count)
require.Empty(t, out.Bytes())
}
func TestSUDPBridgeRejectsUnknownVisitorMessage(t *testing.T) {
var in, out bytes.Buffer
writeSUDPBridgeMsg(t, &in, wire.ProtocolV2, &msg.Pong{})
var count int64
err := bridgeSUDPVisitorToProxy(
msg.NewReadWriter(&in, wire.ProtocolV2),
msg.NewReadWriter(&out, wire.ProtocolV1),
&count,
nil,
)
require.ErrorContains(t, err, "unexpected SUDP visitor message *msg.Pong")
require.Zero(t, count)
require.Empty(t, out.Bytes())
}
func TestSUDPBridgeDetectsMixedWireProtocol(t *testing.T) {
require.False(t, isMixedWireProtocol("", wire.ProtocolV1))
require.False(t, isMixedWireProtocol(wire.ProtocolV2, wire.ProtocolV2))
require.True(t, isMixedWireProtocol("", wire.ProtocolV2))
require.True(t, isMixedWireProtocol(wire.ProtocolV2, wire.ProtocolV1))
}
func writeSUDPBridgeMsg(t *testing.T, buf *bytes.Buffer, wireProtocol string, m msg.Message) {
t.Helper()
require.NoError(t, msg.NewReadWriter(buf, wireProtocol).WriteMsg(m))
}

View File

@@ -108,7 +108,7 @@ func (pxy *UDPProxy) Run() (remoteAddr string, err error) {
pxy.checkCloseCh = make(chan int)
// read message from workConn, if it returns any error, notify proxy to start a new workConn
workConnReaderFn := func(conn net.Conn) {
workConnReaderFn := func(payloadConn *msg.Conn) {
for {
var (
rawMsg msg.Message
@@ -116,10 +116,10 @@ func (pxy *UDPProxy) Run() (remoteAddr string, err error) {
)
xl.Tracef("loop waiting message from udp workConn")
// client will send heartbeat in workConn for keeping alive
_ = conn.SetReadDeadline(time.Now().Add(time.Duration(60) * time.Second))
if rawMsg, errRet = msg.ReadMsg(conn); errRet != nil {
_ = payloadConn.SetReadDeadline(time.Now().Add(time.Duration(60) * time.Second))
if rawMsg, errRet = payloadConn.ReadMsg(); errRet != nil {
xl.Warnf("read from workConn for udp error: %v", errRet)
_ = conn.Close()
_ = payloadConn.Close()
// notify proxy to start a new work connection
// ignore error here, it means the proxy is closed
_ = errors.PanicToError(func() {
@@ -127,7 +127,7 @@ func (pxy *UDPProxy) Run() (remoteAddr string, err error) {
})
return
}
if err := conn.SetReadDeadline(time.Time{}); err != nil {
if err := payloadConn.SetReadDeadline(time.Time{}); err != nil {
xl.Warnf("set read deadline error: %v", err)
}
switch m := rawMsg.(type) {
@@ -144,7 +144,7 @@ func (pxy *UDPProxy) Run() (remoteAddr string, err error) {
int64(len(m.Content)),
)
}); errRet != nil {
conn.Close()
_ = payloadConn.Close()
xl.Infof("reader goroutine for udp work connection closed")
return
}
@@ -153,7 +153,7 @@ func (pxy *UDPProxy) Run() (remoteAddr string, err error) {
}
// send message to workConn
workConnSenderFn := func(conn net.Conn, ctx context.Context) {
workConnSenderFn := func(payloadConn *msg.Conn, ctx context.Context) {
var errRet error
for {
select {
@@ -162,9 +162,9 @@ func (pxy *UDPProxy) Run() (remoteAddr string, err error) {
xl.Infof("sender goroutine for udp work connection closed")
return
}
if errRet = msg.WriteMsg(conn, udpMsg); errRet != nil {
if errRet = payloadConn.WriteMsg(udpMsg); errRet != nil {
xl.Infof("sender goroutine for udp work connection closed: %v", errRet)
conn.Close()
_ = payloadConn.Close()
return
}
xl.Tracef("send message to udp workConn, len: %d", len(udpMsg.Content))
@@ -223,9 +223,11 @@ func (pxy *UDPProxy) Run() (remoteAddr string, err error) {
}
pxy.workConn = netpkg.WrapReadWriteCloserToConn(rwc, workConn)
// Plain UDP payload follows the negotiated wire protocol for message framing.
payloadConn := msg.NewConn(pxy.workConn, msg.NewReadWriter(pxy.workConn, pxy.wireProtocol))
ctx, cancel := context.WithCancel(context.Background())
go workConnReaderFn(pxy.workConn)
go workConnSenderFn(pxy.workConn, ctx)
go workConnReaderFn(payloadConn)
go workConnSenderFn(payloadConn, ctx)
_, ok := <-pxy.checkCloseCh
cancel()
if !ok {

View File

@@ -16,6 +16,7 @@ package proxy
import (
"fmt"
"net"
"reflect"
"sync"
@@ -73,10 +74,7 @@ func (pxy *XTCPProxy) Run() (remoteAddr string, err error) {
if errRet != nil {
continue
}
m := &msg.NatHoleSid{
Sid: sid,
}
errRet = msg.WriteMsg(workConn, m)
errRet = writeNatHoleSid(workConn, pxy.wireProtocol, sid)
if errRet != nil {
xl.Warnf("write nat hole sid package error, %v", errRet)
}
@@ -87,6 +85,13 @@ func (pxy *XTCPProxy) Run() (remoteAddr string, err error) {
return
}
func writeNatHoleSid(workConn net.Conn, wireProtocol string, sid string) error {
workMsgConn := msg.NewConn(workConn, msg.NewReadWriter(workConn, wireProtocol))
return workMsgConn.WriteMsg(&msg.NatHoleSid{
Sid: sid,
})
}
func (pxy *XTCPProxy) Close() {
pxy.closeOnce.Do(func() {
pxy.BaseProxy.Close()

93
server/proxy/xtcp_test.go Normal file
View File

@@ -0,0 +1,93 @@
// Copyright 2026 The frp Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package proxy
import (
"bufio"
"encoding/binary"
"net"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/fatedier/frp/pkg/msg"
"github.com/fatedier/frp/pkg/proto/wire"
)
func TestWriteNatHoleSidUsesWireV2MessageFrame(t *testing.T) {
client, server := net.Pipe()
defer client.Close()
defer server.Close()
setPipeDeadline(t, client, server)
errCh := make(chan error, 1)
go func() {
errCh <- writeNatHoleSid(server, wire.ProtocolV2, "sid-v2")
}()
frame, err := wire.NewConn(client).ReadFrame()
require.NoError(t, err)
require.Equal(t, wire.FrameTypeMessage, frame.Type)
require.GreaterOrEqual(t, len(frame.Payload), 2)
require.Equal(t, msg.V2TypeNatHoleSid, binary.BigEndian.Uint16(frame.Payload[:2]))
var out msg.NatHoleSid
require.NoError(t, msg.DecodeV2MessageFrameInto(frame, &out))
require.Equal(t, "sid-v2", out.Sid)
require.NoError(t, <-errCh)
}
func TestWriteNatHoleSidUsesLegacyCodecForWireV1AndDefault(t *testing.T) {
for _, tc := range []struct {
name string
wireProtocol string
}{
{name: "default", wireProtocol: ""},
{name: "v1", wireProtocol: wire.ProtocolV1},
} {
t.Run(tc.name, func(t *testing.T) {
client, server := net.Pipe()
defer client.Close()
defer server.Close()
setPipeDeadline(t, client, server)
errCh := make(chan error, 1)
go func() {
errCh <- writeNatHoleSid(server, tc.wireProtocol, "sid-legacy")
}()
reader := bufio.NewReader(client)
typeByte, err := reader.ReadByte()
require.NoError(t, err)
require.Equal(t, msg.TypeNatHoleSid, typeByte)
require.NoError(t, reader.UnreadByte())
var out msg.NatHoleSid
require.NoError(t, msg.ReadMsgInto(reader, &out))
require.Equal(t, "sid-legacy", out.Sid)
require.NoError(t, <-errCh)
})
}
}
func setPipeDeadline(t *testing.T, conns ...net.Conn) {
t.Helper()
deadline := time.Now().Add(time.Second)
for _, conn := range conns {
require.NoError(t, conn.SetDeadline(deadline))
}
}

View File

@@ -508,7 +508,7 @@ func (svr *Service) handleConnection(ctx context.Context, conn net.Conn, interna
conn.Close()
}
case *msg.NewVisitorConn:
if err = svr.RegisterVisitorConn(conn, m); err != nil {
if err = svr.RegisterVisitorConn(conn, m, acceptedConn.wireProtocol); err != nil {
xl.Warnf("register visitor conn error: %v", err)
_ = acceptedConn.conn.WriteMsg(&msg.NewVisitorConnResp{
ProxyName: m.ProxyName,
@@ -777,6 +777,7 @@ func (svr *Service) RegisterControl(
LoginMsg: loginMsg,
ServerCfg: svr.cfg,
ClientRegistry: svr.clientRegistry,
WireProtocol: wireProtocol,
})
if err != nil {
xl.Warnf("create new controller error: %v", err)
@@ -832,7 +833,7 @@ func (svr *Service) RegisterWorkConn(workConn *msg.Conn, newMsg *msg.NewWorkConn
return ctl.RegisterWorkConn(proxy.NewWorkConn(workConn))
}
func (svr *Service) RegisterVisitorConn(visitorConn net.Conn, newMsg *msg.NewVisitorConn) error {
func (svr *Service) RegisterVisitorConn(visitorConn net.Conn, newMsg *msg.NewVisitorConn, wireProtocol string) error {
visitorUser := ""
// TODO(deprecation): Compatible with old versions, can be without runID, user is empty. In later versions, it will be mandatory to include runID.
// If runID is required, it is not compatible with versions prior to v0.50.0.
@@ -844,5 +845,5 @@ func (svr *Service) RegisterVisitorConn(visitorConn net.Conn, newMsg *msg.NewVis
visitorUser = ctl.sessionCtx.LoginMsg.User
}
return svr.rc.VisitorManager.NewConn(newMsg.ProxyName, visitorConn, newMsg.Timestamp, newMsg.SignKey,
newMsg.UseEncryption, newMsg.UseCompression, visitorUser)
newMsg.UseEncryption, newMsg.UseCompression, visitorUser, wireProtocol)
}

View File

@@ -65,6 +65,7 @@ func (vm *Manager) Listen(name string, sk string, allowUsers []string) (*netpkg.
func (vm *Manager) NewConn(name string, conn net.Conn, timestamp int64, signKey string,
useEncryption bool, useCompression bool, visitorUser string,
wireProtocol string,
) (err error) {
vm.mu.RLock()
defer vm.mu.RUnlock()
@@ -90,7 +91,11 @@ func (vm *Manager) NewConn(name string, conn net.Conn, timestamp int64, signKey
if useCompression {
rwc = libio.WithCompression(rwc)
}
err = l.l.PutConn(netpkg.WrapReadWriteCloserToConn(rwc, conn))
visitorConn := netpkg.WrapReadWriteCloserToConn(rwc, conn)
err = l.l.PutConn(&wireProtocolConn{
Conn: visitorConn,
wireProtocol: wireProtocol,
})
} else {
err = fmt.Errorf("custom listener for [%s] doesn't exist", name)
return
@@ -98,6 +103,15 @@ func (vm *Manager) NewConn(name string, conn net.Conn, timestamp int64, signKey
return
}
type wireProtocolConn struct {
net.Conn
wireProtocol string
}
func (c *wireProtocolConn) WireProtocol() string {
return c.wireProtocol
}
func (vm *Manager) CloseListener(name string) {
vm.mu.Lock()
defer vm.mu.Unlock()

View File

@@ -0,0 +1,61 @@
// Copyright 2026 The frp Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package visitor
import (
"net"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/fatedier/frp/pkg/proto/wire"
"github.com/fatedier/frp/pkg/util/util"
)
func TestManagerNewConnCarriesWireProtocol(t *testing.T) {
vm := NewManager()
listener, err := vm.Listen("sudp", "secret", []string{"*"})
require.NoError(t, err)
defer listener.Close()
client, server := net.Pipe()
defer client.Close()
defer server.Close()
now := time.Now().Unix()
errCh := make(chan error, 1)
go func() {
errCh <- vm.NewConn(
"sudp",
server,
now,
util.GetAuthKey("secret", now),
false,
false,
"user",
wire.ProtocolV2,
)
}()
acceptedConn, err := listener.Accept()
require.NoError(t, err)
defer acceptedConn.Close()
getter, ok := acceptedConn.(interface{ WireProtocol() string })
require.True(t, ok)
require.Equal(t, wire.ProtocolV2, getter.WireProtocol())
require.NoError(t, <-errCh)
}

View File

@@ -7,6 +7,8 @@ import (
"io"
"net/http"
"os"
"strconv"
"strings"
"testing"
"time"
@@ -158,7 +160,7 @@ webServer.port = %d
framework.NewRequestExpect(f).PortName(portName).Ensure()
})
ginkgo.It("baseline frps rejects current frpc forced to v2", func() {
ginkgo.It("baseline frps handles current frpc forced to v2 according to baseline support", func() {
portName := port.GenName("CompatBaselineFRPSForcedV2")
clientConf := tcpClientConfig("tcp", portName, `
transport.wireProtocol = "v2"
@@ -170,6 +172,21 @@ transport.wireProtocol = "v2"
consts.DefaultServerConfig,
[]string{clientConf},
)
// frp v0.69.0 added control connection wireProtocol v2 support, so
// baseline frps v0.69.0 and newer should accept a current frpc forced
// to v2. Older known baselines still must reject the unsupported protocol.
// For custom baselines, the version is unknown and the binary may be either
// side of the support boundary, so this versioned expectation is skipped.
supportsV2, knownVersion := baselineSupportsControlWireProtocolV2(compatCtx.BaselineVersion)
if !knownVersion {
ginkgo.Skip(fmt.Sprintf("baseline version %q is not semver; skip versioned forced-v2 expectation", compatCtx.BaselineVersion))
}
if supportsV2 {
framework.NewRequestExpect(f).PortName(portName).Ensure()
return
}
expectProcessExit(clientProcesses[0], 5*time.Second)
framework.NewRequestExpect(f).PortName(portName).ExpectError(true).Ensure()
})
@@ -199,6 +216,39 @@ func expectProcessExit(p *process.Process, timeout time.Duration) {
}
}
func baselineSupportsControlWireProtocolV2(version string) (supports bool, known bool) {
version = strings.TrimPrefix(version, "v")
parts := strings.Split(version, ".")
if len(parts) != 3 {
return false, false
}
major, err := strconv.Atoi(parts[0])
if err != nil {
return false, false
}
minor, err := strconv.Atoi(parts[1])
if err != nil {
return false, false
}
patch, err := strconv.Atoi(parts[2])
if err != nil {
return false, false
}
return compareSemanticVersion(major, minor, patch, 0, 69, 0) >= 0, true
}
func compareSemanticVersion(major, minor, patch int, baseMajor, baseMinor, basePatch int) int {
if major != baseMajor {
return major - baseMajor
}
if minor != baseMinor {
return minor - baseMinor
}
return patch - basePatch
}
type wireClientInfo struct {
ClientID string `json:"clientID"`
WireProtocol string `json:"wireProtocol"`

View File

@@ -98,6 +98,73 @@ var _ = ginkgo.Describe("[Feature: WireProtocol]", func() {
framework.NewRequestExpect(f).PortName(bindPortName).Ensure()
})
for _, tc := range []struct {
name string
proxyWireConfig string
visitorWireConfig string
extraProxyConfig string
extraVisitorConfig string
}{
{
name: "default sudp visitor",
},
{
name: "v2 sudp visitor",
proxyWireConfig: `transport.wireProtocol = "v2"`,
visitorWireConfig: `transport.wireProtocol = "v2"`,
},
{
name: "mixed sudp proxy v1 visitor v2",
proxyWireConfig: `transport.wireProtocol = "v1"`,
visitorWireConfig: `transport.wireProtocol = "v2"`,
extraProxyConfig: `
transport.useEncryption = true
transport.useCompression = true
`,
extraVisitorConfig: `
transport.useEncryption = true
transport.useCompression = true
`,
},
{
name: "mixed sudp proxy v2 visitor v1",
proxyWireConfig: `transport.wireProtocol = "v2"`,
visitorWireConfig: `transport.wireProtocol = "v1"`,
},
} {
ginkgo.It(tc.name, func() {
serverConf := consts.DefaultServerConfig
bindPortName := port.GenName("WireSUDP")
clientServerConf := consts.DefaultClientConfig + fmt.Sprintf(`
user = "user1"
%s
[[proxies]]
name = "sudp"
type = "sudp"
secretKey = "abc"
localPort = {{ .%s }}
%s
`, tc.proxyWireConfig, framework.UDPEchoServerPort, tc.extraProxyConfig)
clientVisitorConf := consts.DefaultClientConfig + fmt.Sprintf(`
user = "user1"
%s
[[visitors]]
name = "sudp-visitor"
type = "sudp"
serverName = "sudp"
secretKey = "abc"
bindPort = {{ .%s }}
%s
`, tc.visitorWireConfig, bindPortName, tc.extraVisitorConfig)
f.RunProcesses(serverConf, []string{clientServerConf, clientVisitorConf})
framework.NewRequestExpect(f).Protocol("udp").PortName(bindPortName).Ensure()
})
}
ginkgo.It("reports client wire protocol", func() {
webPort := f.AllocPort()
serverConf := consts.DefaultServerConfig + fmt.Sprintf(`