38 Commits

Author SHA1 Message Date
eyedeekay
a6f90149b4 update module 2025-07-18 17:36:10 -04:00
eyedeekay
c3e2fa1124 update limiter 2025-07-18 17:32:50 -04:00
eyedeekay
35b9bebe12 update limiter 2025-07-18 17:29:30 -04:00
eyedeekay
00de9c987f update go-limit 2025-07-18 17:28:36 -04:00
eyedeekay
b0ea3e21f8 remove keys 2025-07-18 17:26:44 -04:00
eyedeekay
e49691ef9d Fix data race in Mirror map access with mutex synchronization 2025-07-18 17:25:57 -04:00
eyedeekay
525e371349 gitignore 2025-07-18 17:14:52 -04:00
eyedeekay
6059157b55 Fix shutdown race condition by closing channel before clearing map 2025-07-18 17:04:20 -04:00
eyedeekay
b6a47b286c Fix goroutine leak in header processing with context-aware copying 2025-07-18 16:45:24 -04:00
eyedeekay
3c74637f8a Fix header processing goroutine leak with timeout and error handling 2025-07-18 16:00:13 -04:00
eyedeekay
d22808794e Fix manageListeners goroutine leak by adding WaitGroup tracking 2025-07-18 13:03:51 -04:00
eyedeekay
b2f9a33f18 Fix critical deadlock in MetaListener.Close() method 2025-07-18 12:57:17 -04:00
eyedeekay
0c400f3a6a .gitignore 2025-07-18 12:35:47 -04:00
eyedeekay
3c964c12dd fix: implement connection pooling to prevent goroutine leaks in metaproxy 2025-07-18 12:19:31 -04:00
eyedeekay
f5adfd0c84 resolve locking issues 2025-07-18 11:46:26 -04:00
eyedeekay
979c70a3d3 Fix race condition in Accept method using atomic operations 2025-07-17 21:44:57 -04:00
eyedeekay
0363467080 Fix data race in listener removal using channel-based coordination 2025-07-17 21:32:06 -04:00
eyedeekay
837850ac99 Fix gitignore 2025-05-26 18:36:13 -04:00
eyedeekay
1604ecab98 Revise how the listener config is exposed 2025-05-26 18:35:35 -04:00
eyedeekay
6fac659cd6 Fix dishonored envars 2025-05-26 18:18:10 -04:00
eyedeekay
ef7b7e14aa this seems better 2025-05-26 18:09:07 -04:00
eyedeekay
f0354c20eb that approach feels doomed 2025-05-26 17:59:36 -04:00
eyedeekay
fbc444f39c that approach feels doomed 2025-05-26 17:59:09 -04:00
eyedeekay
f26e39e56d work on tcp lib 2025-05-26 17:57:23 -04:00
eyedeekay
c2c53862a4 work on tcp lib 2025-05-26 17:56:21 -04:00
eyedeekay
7775a19ffa work on tcp lib 2025-05-26 17:54:53 -04:00
eyedeekay
905fef98db Create a TCP socket manager based on the one from gitea 2025-05-26 17:45:24 -04:00
eyedeekay
37c4fdbba9 Split up metalistener, fix deadlock in accept routine, I think 2025-05-26 17:05:48 -04:00
eyedeekay
d3aa11b9d2 Use my structured logger instead of just bare log 2025-05-26 16:53:02 -04:00
eyedeekay
e2d0906577 Use my structured logger instead of just bare log 2025-05-26 16:50:08 -04:00
eyedeekay
d5f2de44a3 Use my structured logger instead of just bare log 2025-05-26 16:48:50 -04:00
eyedeekay
a588de13fd add ability to disable I2P and Tor listeners granularly 2025-05-25 23:47:08 -04:00
eyedeekay
ec0f4e5e50 add ability to disable I2P and Tor listeners granularly 2025-05-25 23:41:25 -04:00
eyedeekay
41600bbdf0 i dunno, it's late, feed it to an llm and see if it sees anything I did not already get today 2025-05-21 23:29:53 -04:00
eyedeekay
cff9eef383 upgrade mirrorListener 2025-05-21 15:56:07 -04:00
eyedeekay
58490c00e7 refactor metalistener 2025-05-21 15:21:43 -04:00
eyedeekay
273f4a33fb use a buffer to fix the header rewriting 2025-05-21 14:53:34 -04:00
eyedeekay
2058a43096 Fix incoming host header 2025-05-21 13:01:31 -04:00
20 changed files with 3862 additions and 281 deletions

9
.gitignore vendored
View File

@@ -23,3 +23,12 @@ go.work.sum
# env file
.env
/*keys
/data-dir*
mp
metaproxy
test*
ex
tlskeys
onionkeys
i2pkeys

2254
eval.md Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -27,7 +27,7 @@ func main() {
if err := metaListener.AddListener("tcp", tcpListener); err != nil {
log.Fatalf("Failed to add TCP listener: %v", err)
}
log.Println("Added TCP listener on 127.0.0.1:8080")
log.Println("Added TCP listener on 127.0.0.1:8082")
// Create and add a Unix socket listener (on Unix systems)
socketPath := "/tmp/example.sock"
@@ -42,6 +42,7 @@ func main() {
log.Println("Added Unix socket listener on", socketPath)
}
}
log.Println("Starting http server...")
// Create a simple HTTP server using the meta listener
server := &http.Server{
@@ -49,6 +50,7 @@ func main() {
fmt.Fprintf(w, "Hello from MetaListener! You connected via: %s\n", r.Proto)
}),
}
log.Println("Server is ready to accept connections...")
// Handle server shutdown gracefully
stop := make(chan os.Signal, 1)

20
go.mod
View File

@@ -3,18 +3,26 @@ module github.com/go-i2p/go-meta-listener
go 1.23.5
require (
github.com/go-i2p/logger v0.0.0-20241123010126-3050657e5d0c
github.com/go-i2p/onramp v0.33.92
github.com/opd-ai/wileedot v0.0.0-20241217172720-521d4175e624
github.com/samber/oops v1.19.0
)
require (
github.com/cretz/bine v0.2.0 // indirect
github.com/go-i2p/i2pkeys v0.33.10-0.20241113193422-e10de5e60708 // indirect
github.com/go-i2p/sam3 v0.33.9 // indirect
github.com/go-i2p/go-limit v0.0.0-20250718212214-52e5c6fec5d8 // indirect
github.com/go-i2p/i2pkeys v0.33.92 // indirect
github.com/go-i2p/sam3 v0.33.92 // indirect
github.com/oklog/ulid/v2 v2.1.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/samber/lo v1.51.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/net v0.31.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect
go.opentelemetry.io/otel v1.37.0 // indirect
go.opentelemetry.io/otel/trace v1.37.0 // indirect
golang.org/x/crypto v0.40.0 // indirect
golang.org/x/net v0.42.0 // indirect
golang.org/x/sys v0.34.0 // indirect
golang.org/x/text v0.27.0 // indirect
golang.org/x/time v0.12.0 // indirect
)

56
go.sum
View File

@@ -3,44 +3,90 @@ github.com/cretz/bine v0.2.0/go.mod h1:WU4o9QR9wWp8AVKtTM1XD5vUHkEqnf2vVSo6dBqbe
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-i2p/go-limit v0.0.0-20250203203028-656fa82e1cb3 h1:czQe9f4Y+UFwqMKvMVsWDG/HWKdIN6uPelIkfoCjAsk=
github.com/go-i2p/go-limit v0.0.0-20250203203028-656fa82e1cb3/go.mod h1:4jjmVRhvKj47sQ6B6wdDhN1IrEZunE6KwkYLQx/BeVE=
github.com/go-i2p/go-limit v0.0.0-20250718203732-4734f182b014 h1:USgEZEPweXNRwXJjbsIvGz9q+ADSIdo+rMxzyuqbHbQ=
github.com/go-i2p/go-limit v0.0.0-20250718203732-4734f182b014/go.mod h1:4jjmVRhvKj47sQ6B6wdDhN1IrEZunE6KwkYLQx/BeVE=
github.com/go-i2p/go-limit v0.0.0-20250718212214-52e5c6fec5d8 h1:eKJ5X3wWYg/ln8C2G9Ibp3bUn8Qg8sYe28Uthypmb9g=
github.com/go-i2p/go-limit v0.0.0-20250718212214-52e5c6fec5d8/go.mod h1:4jjmVRhvKj47sQ6B6wdDhN1IrEZunE6KwkYLQx/BeVE=
github.com/go-i2p/i2pkeys v0.0.0-20241108200332-e4f5ccdff8c4/go.mod h1:m5TlHjPZrU5KbTd7Lr+I2rljyC6aJ88HdkeMQXV0U0E=
github.com/go-i2p/i2pkeys v0.33.10-0.20241113193422-e10de5e60708 h1:Tiy9IBwi21maNpK74yCdHursJJMkyH7w87tX1nXGWzg=
github.com/go-i2p/i2pkeys v0.33.10-0.20241113193422-e10de5e60708/go.mod h1:m5TlHjPZrU5KbTd7Lr+I2rljyC6aJ88HdkeMQXV0U0E=
github.com/go-i2p/i2pkeys v0.33.92 h1:e2vx3vf7tNesaJ8HmAlGPOcfiGM86jzeIGxh27I9J2Y=
github.com/go-i2p/i2pkeys v0.33.92/go.mod h1:BRURQ/twxV0WKjZlFSKki93ivBi+MirZPWudfwTzMpE=
github.com/go-i2p/logger v0.0.0-20241123010126-3050657e5d0c h1:VTiECn3dFEmUlZjto+wOwJ7SSJTHPLyNprQMR5HzIMI=
github.com/go-i2p/logger v0.0.0-20241123010126-3050657e5d0c/go.mod h1:te7Zj3g3oMeIl8uBXAgO62UKmZ6m6kHRNg1Mm+X8Hzk=
github.com/go-i2p/onramp v0.33.92 h1:Dk3A0SGpdEw829rSjW2LqN8o16pUvuhiN0vn36z7Gpc=
github.com/go-i2p/onramp v0.33.92/go.mod h1:5sfB8H2xk05gAS2K7XAUZ7ekOfwGJu3tWF0fqdXzJG4=
github.com/go-i2p/sam3 v0.33.9 h1:3a+gunx75DFc6jxloUZTAVJbdP6736VU1dy2i7I9fKA=
github.com/go-i2p/sam3 v0.33.9/go.mod h1:oDuV145l5XWKKafeE4igJHTDpPwA0Yloz9nyKKh92eo=
github.com/go-i2p/sam3 v0.33.92 h1:TVpi4GH7Yc7nZBiE1QxLjcZfnC4fI/80zxQz1Rk36BA=
github.com/go-i2p/sam3 v0.33.92/go.mod h1:oDuV145l5XWKKafeE4igJHTDpPwA0Yloz9nyKKh92eo=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU=
github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ=
github.com/oklog/ulid/v2 v2.1.1 h1:suPZ4ARWLOJLegGFiZZ1dFAkqzhMjL3J1TzI+5wHz8s=
github.com/oklog/ulid/v2 v2.1.1/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ=
github.com/opd-ai/wileedot v0.0.0-20241217172720-521d4175e624 h1:FXCTQV93+31Yj46zpYbd41es+EYgT7qi4RK6KSVrGQM=
github.com/opd-ai/wileedot v0.0.0-20241217172720-521d4175e624/go.mod h1:ftKSvvGC9FnxZeuL3B4MB6q/DOzVSV0kET08YUyDwbM=
github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/samber/lo v1.50.0 h1:XrG0xOeHs+4FQ8gJR97zDz5uOFMW7OwFWiFVzqopKgY=
github.com/samber/lo v1.50.0/go.mod h1:RjZyNk6WSnUFRKK6EyOhsRJMqft3G+pg7dCWHQCWvsc=
github.com/samber/lo v1.51.0 h1:kysRYLbHy/MB7kQZf5DSN50JHmMsNEdeY24VzJFu7wI=
github.com/samber/lo v1.51.0/go.mod h1:4+MXEGsJzbKGaUEQFKBq2xtfuznW9oz/WrgyzMzRoM0=
github.com/samber/oops v1.18.0 h1:NnoCdxlOg/ajFos8HIC0+dV8S6cZRcrjW1WrfZe+GOc=
github.com/samber/oops v1.18.0/go.mod h1:DcZbba2s+PzSx14vY6HjvhV1FDsGOZ1TJg7T/ZZARBQ=
github.com/samber/oops v1.19.0 h1:sfZAwC8MmTXBRRyNc4Z1utuTPBx+hFKF5fJ9DEQRZfw=
github.com/samber/oops v1.19.0/go.mod h1:+f+61dbiMxEMQ8gw/zTxW2pk+YGobaDM4glEHQtPOww=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.opentelemetry.io/otel v1.29.0 h1:PdomN/Al4q/lN6iBJEN3AwPvUiHPMlt93c8bqTG5Llw=
go.opentelemetry.io/otel v1.29.0/go.mod h1:N/WtXPs1CNCUEx+Agz5uouwCba+i+bJGFicT8SR4NP8=
go.opentelemetry.io/otel v1.37.0 h1:9zhNfelUvx0KBfu/gb+ZgeAfAgtWrfHJZcAqFC228wQ=
go.opentelemetry.io/otel v1.37.0/go.mod h1:ehE/umFRLnuLa/vSccNq9oS1ErUlkkK71gMcN34UG8I=
go.opentelemetry.io/otel/trace v1.29.0 h1:J/8ZNK4XgR7a21DZUAsbF8pZ5Jcw1VhACmnYt39JTi4=
go.opentelemetry.io/otel/trace v1.29.0/go.mod h1:eHl3w0sp3paPkYstJOmAimxhiFXPg+MMTlEh3nsQgWQ=
go.opentelemetry.io/otel/trace v1.37.0 h1:HLdcFNbRQBE2imdSEgm/kwqmQj1Or1l/7bW6mxVK7z4=
go.opentelemetry.io/otel/trace v1.37.0/go.mod h1:TlgrlQ+PtQO5XFerSPUYG0JSgGyryXewPGyayAWSBS0=
golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8=
golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/crypto v0.40.0 h1:r4x+VvoG5Fm+eJcxMaY8CQM7Lb0l1lsmjGBQ6s8BfKM=
golang.org/x/crypto v0.40.0/go.mod h1:Qr1vMER5WyS2dfPHAlsOj01wgLbsyWtFn/aY+5+ZdxY=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.31.0 h1:68CPQngjLL0r2AlUKiSxtQFKvzRVbnzLwMUn5SzcLHo=
golang.org/x/net v0.31.0/go.mod h1:P4fl1q7dY2hnZFxEk4pPSkDHF+QqjitcnDjUQyMM+pM=
golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs=
golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA=
golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM=
golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY=
golang.org/x/text v0.27.0 h1:4fGWRpyh641NLlecmyl4LOe6yDdfaYNrGb2zdfo4JV4=
golang.org/x/text v0.27.0/go.mod h1:1D28KMCvyooCX9hBiosv5Tz/+YLxj0j7XhWjpSUF7CU=
golang.org/x/time v0.12.0 h1:ScB/8o8olJvc+CQPWrK3fPZNfh7qgwCrY0zJmoEQLSE=
golang.org/x/time v0.12.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

82
handler.go Normal file
View File

@@ -0,0 +1,82 @@
package meta
import (
"net"
"sync/atomic"
"time"
)
// handleListener runs in a separate goroutine for each added listener
// and forwards accepted connections to the connCh channel.
func (ml *MetaListener) handleListener(id string, listener net.Listener) {
defer func() {
// Recover from any panic to ensure WaitGroup.Done() is always called
if r := recover(); r != nil {
log.Printf("PANIC in listener goroutine for %s: %v", id, r)
}
log.Printf("Listener goroutine for %s exiting", id)
ml.listenerWg.Done()
}()
for {
// First check if the MetaListener is closed
select {
case <-ml.closeCh:
log.Printf("MetaListener closed, stopping %s listener", id)
return
default:
}
// Set a deadline for Accept to prevent blocking indefinitely
if deadline, ok := listener.(interface{ SetDeadline(time.Time) error }); ok {
deadline.SetDeadline(time.Now().Add(1 * time.Second))
}
conn, err := listener.Accept()
if err != nil {
// Check if this is a timeout error (which we expect due to our deadline)
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
continue
}
// Check if this is any other temporary error
if netErr, ok := err.(net.Error); ok && netErr.Temporary() {
log.Printf("Temporary error in %s listener: %v, retrying in 100ms", id, err)
time.Sleep(100 * time.Millisecond)
continue
}
// Check if the listener was closed (expected during shutdown)
if atomic.LoadInt64(&ml.isClosed) != 0 {
log.Printf("Listener %s closed during shutdown", id)
return
}
log.Printf("Permanent error in %s listener: %v, stopping", id, err)
select {
case ml.removeListenerCh <- id:
// Successfully signaled for removal
case <-ml.closeCh:
// MetaListener is closing, no need to signal removal
}
return
}
// If we reach here, we have a valid connection
log.Printf("Listener %s accepted connection from %s", id, conn.RemoteAddr())
// Try to forward the connection, but don't block indefinitely
select {
case ml.connCh <- ConnResult{Conn: conn, src: id}:
log.Printf("Connection from %s successfully forwarded via %s", conn.RemoteAddr(), id)
case <-ml.closeCh:
log.Printf("MetaListener closing while forwarding connection, closing connection")
conn.Close()
return
case <-time.After(5 * time.Second):
// If we can't forward within 5 seconds, something is seriously wrong
log.Printf("WARNING: Connection forwarding timed out, closing connection from %s", conn.RemoteAddr())
conn.Close()
}
}
}

104
listener.go Normal file
View File

@@ -0,0 +1,104 @@
package meta
import (
"fmt"
"net"
"sync/atomic"
"time"
)
// Accept implements the net.Listener Accept method.
// It returns the next connection from any of the managed listeners.
func (ml *MetaListener) Accept() (net.Conn, error) {
// Check if already closed before entering the select loop
if atomic.LoadInt64(&ml.isClosed) != 0 {
return nil, ErrListenerClosed
}
for {
select {
case result, ok := <-ml.connCh:
if !ok {
return nil, ErrListenerClosed
}
// Access RemoteAddr() directly on the connection
return result, nil
case <-ml.closeCh:
// Double-check the closed state using atomic operation
if atomic.LoadInt64(&ml.isClosed) != 0 {
return nil, ErrListenerClosed
}
continue
}
}
}
// Close implements the net.Listener Close method.
// It closes all managed listeners and releases resources.
func (ml *MetaListener) Close() error {
// Use atomic compare-and-swap to ensure we only close once
if !atomic.CompareAndSwapInt64(&ml.isClosed, 0, 1) {
return nil
}
ml.mu.Lock()
log.Printf("Closing MetaListener with %d listeners", len(ml.listeners))
// Signal all goroutines to stop first, before clearing listeners map
close(ml.closeCh)
// Close all listeners first to stop accepting new connections
var errs []error
for id, listener := range ml.listeners {
if err := listener.Close(); err != nil {
log.Printf("Error closing %s listener: %v", id, err)
errs = append(errs, err)
}
}
// Clear the listeners map since they're all closed
ml.listeners = make(map[string]net.Listener)
ml.mu.Unlock()
// Allow a brief grace period for handlers to finish processing current connections
gracePeriod := 100 * time.Millisecond
done := make(chan struct{})
go func() {
ml.listenerWg.Wait()
close(done)
}()
// Wait for either all goroutines to finish or grace period to expire
select {
case <-done:
log.Printf("All listener goroutines exited gracefully")
case <-time.After(gracePeriod):
log.Printf("Grace period expired, waiting for remaining goroutines")
// Wait for all listener goroutines to exit
ml.listenerWg.Wait()
}
log.Printf("All listener goroutines have exited")
// Return combined errors if any
if len(errs) > 0 {
return fmt.Errorf("errors closing listeners: %v", errs)
}
return nil
}
// Addr implements the net.Listener Addr method.
// It returns a MetaAddr representing all managed listeners.
func (ml *MetaListener) Addr() net.Addr {
ml.mu.RLock()
defer ml.mu.RUnlock()
addresses := make([]net.Addr, 0, len(ml.listeners))
for _, listener := range ml.listeners {
addresses = append(addresses, listener.Addr())
}
return &MetaAddr{addresses: addresses}
}

5
log Normal file
View File

@@ -0,0 +1,5 @@
? github.com/go-i2p/go-meta-listener [no test files]
? github.com/go-i2p/go-meta-listener/example [no test files]
? github.com/go-i2p/go-meta-listener/mirror [no test files]
? github.com/go-i2p/go-meta-listener/mirror/metaproxy [no test files]
ok github.com/go-i2p/go-meta-listener/tcp 0.079s

7
log.go Normal file
View File

@@ -0,0 +1,7 @@
package meta
import (
"github.com/go-i2p/logger"
)
var log = logger.GetGoI2PLogger()

31
metaaddr.go Normal file
View File

@@ -0,0 +1,31 @@
package meta
import "net"
// MetaAddr implements the net.Addr interface for a meta listener.
type MetaAddr struct {
addresses []net.Addr
}
// Network returns the name of the network.
func (ma *MetaAddr) Network() string {
return "meta"
}
// String returns a string representation of all managed addresses.
func (ma *MetaAddr) String() string {
if len(ma.addresses) == 0 {
return "meta(empty)"
}
result := "meta("
for i, addr := range ma.addresses {
if i > 0 {
result += ", "
}
result += addr.String()
}
result += ")"
return result
}

View File

@@ -6,16 +6,16 @@ import (
"fmt"
"net"
"sync"
"time"
"sync/atomic"
"github.com/samber/oops"
)
var (
// ErrListenerClosed is returned when attempting to accept on a closed listener
ErrListenerClosed = errors.New("listener is closed")
ErrListenerClosed = oops.Errorf("listener is closed")
// ErrNoListeners is returned when the meta listener has no active listeners
ErrNoListeners = errors.New("no active listeners")
// ErrInternalListenerFailure is returned when an internal listener fails
ErrInternalListenerFailure = errors.New("Internal listener error, shutting down metalistener for restart")
ErrNoListeners = oops.Errorf("no active listeners")
)
// MetaListener implements the net.Listener interface and manages multiple
@@ -27,12 +27,14 @@ type MetaListener struct {
listenerWg sync.WaitGroup
// connCh is used to receive connections from all managed listeners
connCh chan ConnResult
// errCh is used to receive errors from all managed listeners
errCh chan error
// closeCh signals all goroutines to stop
closeCh chan struct{}
// isClosed indicates whether the meta listener has been closed
isClosed bool
// removeListenerCh is used to signal listener removal from handlers
removeListenerCh chan string
// isClosed indicates whether the meta listener has been closed (atomic)
isClosed int64
// isShuttingDown indicates whether WaitForShutdown has been called (atomic)
isShuttingDown int64
// mu protects concurrent access to the listener's state
mu sync.RWMutex
}
@@ -45,12 +47,18 @@ type ConnResult struct {
// NewMetaListener creates a new MetaListener instance ready to manage multiple listeners.
func NewMetaListener() *MetaListener {
return &MetaListener{
listeners: make(map[string]net.Listener),
connCh: make(chan ConnResult),
errCh: make(chan error, 1), // Buffered to prevent blocking
closeCh: make(chan struct{}),
ml := &MetaListener{
listeners: make(map[string]net.Listener),
connCh: make(chan ConnResult, 100), // Larger buffer for high connection volume
closeCh: make(chan struct{}),
removeListenerCh: make(chan string, 10), // Buffer for listener removal signals
}
// Start the listener management goroutine and track it
ml.listenerWg.Add(1)
go ml.manageListeners()
return ml
}
// AddListener adds a new listener with the specified ID.
@@ -64,17 +72,22 @@ func (ml *MetaListener) AddListener(id string, listener net.Listener) error {
ml.mu.Lock()
defer ml.mu.Unlock()
if ml.isClosed {
if atomic.LoadInt64(&ml.isClosed) != 0 {
return ErrListenerClosed
}
// Check if we're in shutdown mode (WaitForShutdown has been called)
if atomic.LoadInt64(&ml.isShuttingDown) != 0 {
return fmt.Errorf("cannot add listener during shutdown")
}
if _, exists := ml.listeners[id]; exists {
return fmt.Errorf("listener with ID '%s' already exists", id)
}
ml.listeners[id] = listener
// Start a goroutine to handle connections from this listener
// Add to WaitGroup immediately before starting goroutine to prevent race
ml.listenerWg.Add(1)
go ml.handleListener(id, listener)
@@ -99,141 +112,6 @@ func (ml *MetaListener) RemoveListener(id string) error {
return err
}
// handleListener runs in a separate goroutine for each added listener
// and forwards accepted connections to the connCh channel.
func (ml *MetaListener) handleListener(id string, listener net.Listener) {
defer ml.listenerWg.Done()
for {
conn, err := listener.Accept()
select {
case <-ml.closeCh:
// Meta listener is being closed, exit
return
default:
// Continue processing
}
if err != nil {
// Check if this is a temporary error
if netErr, ok := err.(net.Error); ok && netErr.Temporary() {
// For temporary errors, wait a bit and try again
time.Sleep(100 * time.Millisecond)
continue
}
// For non-temporary errors, check if listener was closed
ml.mu.RLock()
_, stillExists := ml.listeners[id]
ml.mu.RUnlock()
if stillExists {
// Create a combined error with both the standard message and original error details
combinedErr := fmt.Errorf("%w: listener %s error - %v",
ErrInternalListenerFailure, id, err)
// Send the combined error to notify Accept() calls
select {
case ml.errCh <- combinedErr:
default:
// Don't block if no one is reading errors
}
// Then close all listeners
go ml.Close()
}
return
}
// Send the accepted connection to the connection channel
select {
case ml.connCh <- ConnResult{Conn: conn, src: id}:
// Connection forwarded successfully
case <-ml.closeCh:
// If we're closing and got a connection, close it
conn.Close()
return
}
}
}
// Accept implements the net.Listener Accept method.
// It waits for and returns the next connection from any of the managed listeners.
func (ml *MetaListener) Accept() (net.Conn, error) {
ml.mu.RLock()
if ml.isClosed {
ml.mu.RUnlock()
return nil, ErrListenerClosed
}
if len(ml.listeners) == 0 {
ml.mu.RUnlock()
return nil, ErrNoListeners
}
ml.mu.RUnlock()
// Wait for either a connection, an error, or close signal
select {
case result := <-ml.connCh:
return result.Conn, nil
case err := <-ml.errCh:
return nil, err
case <-ml.closeCh:
return nil, ErrListenerClosed
}
}
// Close implements the net.Listener Close method.
// It closes all managed listeners and releases resources.
func (ml *MetaListener) Close() error {
ml.mu.Lock()
if ml.isClosed {
ml.mu.Unlock()
return nil
}
ml.isClosed = true
// Signal all goroutines to stop
close(ml.closeCh)
// Close all listeners
var errs []error
for id, listener := range ml.listeners {
if err := listener.Close(); err != nil {
errs = append(errs, fmt.Errorf("failed to close listener %s: %w", id, err))
}
}
ml.mu.Unlock()
// Wait for all listener goroutines to exit
ml.listenerWg.Wait()
// Return combined errors if any
if len(errs) > 0 {
return fmt.Errorf("errors closing listeners: %v", errs)
}
return nil
}
// Addr implements the net.Listener Addr method.
// It returns a MetaAddr representing all managed listeners.
func (ml *MetaListener) Addr() net.Addr {
ml.mu.RLock()
defer ml.mu.RUnlock()
addresses := make([]net.Addr, 0, len(ml.listeners))
for _, listener := range ml.listeners {
addresses = append(addresses, listener.Addr())
}
return &MetaAddr{addresses: addresses}
}
// ListenerIDs returns the IDs of all active listeners.
func (ml *MetaListener) ListenerIDs() []string {
ml.mu.RLock()
@@ -258,6 +136,9 @@ func (ml *MetaListener) Count() int {
// WaitForShutdown blocks until all listener goroutines have exited.
// This is useful for ensuring clean shutdown in server applications.
func (ml *MetaListener) WaitForShutdown(ctx context.Context) error {
// Set shutdown flag to prevent new listeners from being added
atomic.StoreInt64(&ml.isShuttingDown, 1)
done := make(chan struct{})
go func() {
@@ -273,30 +154,25 @@ func (ml *MetaListener) WaitForShutdown(ctx context.Context) error {
}
}
// MetaAddr implements the net.Addr interface for a meta listener.
type MetaAddr struct {
addresses []net.Addr
}
// manageListeners handles listener removal signals from handler goroutines
func (ml *MetaListener) manageListeners() {
defer func() {
log.Printf("manageListeners goroutine exiting")
ml.listenerWg.Done()
}()
// Network returns the name of the network.
func (ma *MetaAddr) Network() string {
return "meta"
}
// String returns a string representation of all managed addresses.
func (ma *MetaAddr) String() string {
if len(ma.addresses) == 0 {
return "meta(empty)"
}
result := "meta("
for i, addr := range ma.addresses {
if i > 0 {
result += ", "
for {
select {
case <-ml.closeCh:
return
case id := <-ml.removeListenerCh:
ml.mu.Lock()
if listener, exists := ml.listeners[id]; exists {
listener.Close()
delete(ml.listeners, id)
log.Printf("Listener %s removed due to permanent error", id)
}
ml.mu.Unlock()
}
result += addr.String()
}
result += ")"
return result
}

351
metalistener_test.go Normal file
View File

@@ -0,0 +1,351 @@
package meta
import (
"context"
"fmt"
"io"
"net"
"strings"
"sync"
"testing"
"time"
)
// mockListener is a test listener that can simulate errors
type mockListener struct {
addr net.Addr
connCh chan net.Conn
closeCh chan struct{}
closed bool
mu sync.Mutex
errorMode bool
}
func newMockListener(addr string) *mockListener {
return &mockListener{
addr: &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 8080},
connCh: make(chan net.Conn, 1),
closeCh: make(chan struct{}),
}
}
func (m *mockListener) Accept() (net.Conn, error) {
m.mu.Lock()
errorMode := m.errorMode
m.mu.Unlock()
if errorMode {
return nil, fmt.Errorf("permanent error")
}
select {
case conn := <-m.connCh:
return conn, nil
case <-m.closeCh:
return nil, fmt.Errorf("listener closed")
}
}
func (m *mockListener) Close() error {
m.mu.Lock()
defer m.mu.Unlock()
if m.closed {
return nil
}
m.closed = true
close(m.closeCh)
return nil
}
func (m *mockListener) Addr() net.Addr {
return m.addr
}
func (m *mockListener) setErrorMode(errorMode bool) {
m.mu.Lock()
m.errorMode = errorMode
m.mu.Unlock()
}
// TestListenerRemovalRace tests that listener removal doesn't cause race conditions
func TestListenerRemovalRace(t *testing.T) {
ml := NewMetaListener()
defer ml.Close()
// Add multiple listeners
listener1 := newMockListener("127.0.0.1:8080")
listener2 := newMockListener("127.0.0.1:8081")
err := ml.AddListener("test1", listener1)
if err != nil {
t.Fatalf("Failed to add listener1: %v", err)
}
err = ml.AddListener("test2", listener2)
if err != nil {
t.Fatalf("Failed to add listener2: %v", err)
}
// Verify both listeners are present
if ml.Count() != 2 {
t.Errorf("Expected 2 listeners, got %d", ml.Count())
}
// Simulate permanent error in listener1
listener1.setErrorMode(true)
// Wait for listener to be removed due to error
time.Sleep(100 * time.Millisecond)
// Verify listener1 was removed
if ml.Count() != 1 {
t.Errorf("Expected 1 listener after error, got %d", ml.Count())
}
// Verify we can still use RemoveListener on the remaining listener
err = ml.RemoveListener("test2")
if err != nil {
t.Errorf("Failed to remove listener2: %v", err)
}
if ml.Count() != 0 {
t.Errorf("Expected 0 listeners after removal, got %d", ml.Count())
}
}
// TestConcurrentListenerAccess tests concurrent access to listener map
func TestConcurrentListenerAccess(t *testing.T) {
ml := NewMetaListener()
defer ml.Close()
var wg sync.WaitGroup
// Add multiple listeners concurrently
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
listener := newMockListener(fmt.Sprintf("127.0.0.1:%d", 8080+id))
err := ml.AddListener(fmt.Sprintf("test%d", id), listener)
if err != nil {
t.Errorf("Failed to add listener%d: %v", id, err)
}
}(i)
}
// Concurrently check listener count
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
ml.Count()
ml.ListenerIDs()
}()
}
wg.Wait()
if ml.Count() != 10 {
t.Errorf("Expected 10 listeners, got %d", ml.Count())
}
}
// TestAcceptRaceCondition tests that Accept() method doesn't have race conditions
func TestAcceptRaceCondition(t *testing.T) {
ml := NewMetaListener()
// Add a listener
listener := newMockListener("127.0.0.1:8080")
err := ml.AddListener("test", listener)
if err != nil {
t.Fatalf("Failed to add listener: %v", err)
}
var wg sync.WaitGroup
// Start multiple goroutines calling Accept()
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_, err := ml.Accept()
// We expect either a valid connection or ErrListenerClosed
if err != nil && err.Error() != ErrListenerClosed.Error() {
t.Errorf("Unexpected error from Accept(): %v", err)
}
}()
}
// Concurrently close the listener
go func() {
time.Sleep(10 * time.Millisecond)
ml.Close()
}()
wg.Wait()
}
// TestShutdownRaceCondition tests that shutdown doesn't cause race conditions
func TestShutdownRaceCondition(t *testing.T) {
ml := NewMetaListener()
// Add multiple listeners
for i := 0; i < 5; i++ {
listener := newMockListener(fmt.Sprintf("127.0.0.1:%d", 8080+i))
err := ml.AddListener(fmt.Sprintf("test%d", i), listener)
if err != nil {
t.Fatalf("Failed to add listener%d: %v", i, err)
}
}
var wg sync.WaitGroup
// Start multiple goroutines that will try to accept connections
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
conn, err := ml.Accept()
if err != nil {
// During shutdown, we expect ErrListenerClosed
if err.Error() != ErrListenerClosed.Error() {
t.Errorf("Goroutine %d: unexpected error: %v", id, err)
}
return
}
if conn != nil {
conn.Close()
}
}(i)
}
// Allow some time for goroutines to start
time.Sleep(10 * time.Millisecond)
// Close the listener
err := ml.Close()
if err != nil {
t.Errorf("Error closing MetaListener: %v", err)
}
// Wait for all goroutines to complete
wg.Wait()
// Verify the listener is properly closed
if ml.Count() != 0 {
t.Errorf("Expected 0 listeners after close, got %d", ml.Count())
}
}
// TestWaitGroupSynchronization tests that WaitGroup is properly synchronized
func TestWaitGroupSynchronization(t *testing.T) {
ml := NewMetaListener()
defer ml.Close()
// Test 1: Verify panic recovery in handleListener doesn't break WaitGroup
listener := newMockListener("127.0.0.1:8080")
// Add a listener
err := ml.AddListener("panic-test", listener)
if err != nil {
t.Fatalf("Failed to add listener: %v", err)
}
// Test 2: Verify we can't add listeners during shutdown
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
// Start WaitForShutdown in background
go func() {
ml.WaitForShutdown(ctx)
}()
// Give WaitForShutdown a moment to set the shutdown flag
time.Sleep(10 * time.Millisecond)
// Try to add another listener - should fail
listener2 := newMockListener("127.0.0.1:8081")
err = ml.AddListener("shutdown-test", listener2)
if err == nil {
t.Error("Expected error when adding listener during shutdown, got nil")
}
if !strings.Contains(err.Error(), "shutdown") {
t.Errorf("Expected shutdown error, got: %v", err)
}
// Close the MetaListener to finish the test cleanly
ml.Close()
}
// TestWaitGroupPanicRecovery tests that panics in handleListener are recovered
func TestWaitGroupPanicRecovery(t *testing.T) {
ml := NewMetaListener()
defer ml.Close()
// Create a custom listener that will cause a panic
panicListener := &panicMockListener{
mockListener: newMockListener("127.0.0.1:8080"),
}
// Add the panic-inducing listener
err := ml.AddListener("panic-test", panicListener)
if err != nil {
t.Fatalf("Failed to add listener: %v", err)
}
// Trigger the panic by sending a connection
conn := &mockConn{}
panicListener.connCh <- conn
// Wait a bit to let the panic occur and be recovered
time.Sleep(100 * time.Millisecond)
// The MetaListener should still be functional
// Add another listener to verify WaitGroup is not broken
normalListener := newMockListener("127.0.0.1:8081")
err = ml.AddListener("normal-test", normalListener)
if err != nil {
t.Fatalf("Failed to add normal listener after panic: %v", err)
}
// Verify we can still close cleanly
err = ml.Close()
if err != nil {
t.Errorf("Error closing MetaListener after panic: %v", err)
}
}
// panicMockListener is a mock listener that panics when Accept is called
type panicMockListener struct {
*mockListener
}
func (p *panicMockListener) Accept() (net.Conn, error) {
// First call the normal Accept to get a connection
_, err := p.mockListener.Accept()
if err != nil {
return nil, err
}
// Then panic to test panic recovery
panic("test panic in handleListener")
}
// mockConn is a minimal implementation of net.Conn for testing
type mockConn struct{}
func (m *mockConn) Read(b []byte) (n int, err error) { return 0, io.EOF }
func (m *mockConn) Write(b []byte) (n int, err error) { return len(b), nil }
func (m *mockConn) Close() error { return nil }
func (m *mockConn) LocalAddr() net.Addr {
return &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 8080}
}
func (m *mockConn) RemoteAddr() net.Addr {
return &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 8081}
}
func (m *mockConn) SetDeadline(t time.Time) error { return nil }
func (m *mockConn) SetReadDeadline(t time.Time) error { return nil }
func (m *mockConn) SetWriteDeadline(t time.Time) error { return nil }

111
mirror/fixed_race_test.go Normal file
View File

@@ -0,0 +1,111 @@
package mirror
import (
"os"
"sync"
"testing"
"time"
)
// TestFixedConcurrentListenDataRace tests that the data race fix works
func TestFixedConcurrentListenDataRace(t *testing.T) {
// Disable actual network listeners to focus on testing the map race condition
os.Setenv("DISABLE_TOR", "true")
os.Setenv("DISABLE_I2P", "true")
defer func() {
os.Unsetenv("DISABLE_TOR")
os.Unsetenv("DISABLE_I2P")
}()
// Create a Mirror instance
mirror, err := NewMirror("test-fixed:3003")
if err != nil {
t.Fatalf("Failed to create mirror: %v", err)
}
defer mirror.Close()
const numGoroutines = 10
var wg sync.WaitGroup
errorCh := make(chan error, numGoroutines)
// Launch multiple goroutines that call Listen() concurrently
// With the fix, this should NOT trigger data races
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
// Use different addresses to avoid network conflicts
addr := "test-fixed-" + string(rune('a'+id)) + ":5000"
// This should now be safe thanks to mutex protection
listener, err := mirror.Listen(addr, "")
if err != nil {
errorCh <- err
return
}
if listener != nil {
// Clean close if successful
listener.Close()
}
}(i)
}
// Wait for all goroutines to complete
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
// Wait with timeout
select {
case <-done:
// Check for any errors
close(errorCh)
for err := range errorCh {
if err != nil {
t.Errorf("Error in concurrent Listen(): %v", err)
}
}
t.Log("Test passed - no data races detected with fix")
case <-time.After(30 * time.Second):
t.Fatal("Test timed out - possible deadlock or hang")
}
}
// TestSequentialOperationsStillWork verifies sequential operations work after the fix
func TestSequentialOperationsStillWork(t *testing.T) {
// Disable actual network listeners
os.Setenv("DISABLE_TOR", "true")
os.Setenv("DISABLE_I2P", "true")
defer func() {
os.Unsetenv("DISABLE_TOR")
os.Unsetenv("DISABLE_I2P")
}()
mirror, err := NewMirror("test-sequential:3004")
if err != nil {
t.Fatalf("Failed to create mirror: %v", err)
}
defer mirror.Close()
// Sequential calls should work fine
listener1, err := mirror.Listen("test-seq-1:3004", "")
if err != nil {
t.Fatalf("First Listen() failed: %v", err)
}
if listener1 != nil {
listener1.Close()
}
listener2, err := mirror.Listen("test-seq-2:3004", "")
if err != nil {
t.Fatalf("Second Listen() failed: %v", err)
}
if listener2 != nil {
listener2.Close()
}
t.Log("Sequential operations work correctly after fix")
}

View File

@@ -2,41 +2,131 @@ package mirror
import (
"bufio"
"bytes"
"context"
"crypto/tls"
"log"
"io"
"net"
"net/http"
"time"
)
// copyWithContextCancel copies data from src to dst with context cancellation support.
// Returns when context is cancelled, src is exhausted, or an error occurs.
func copyWithContextCancel(ctx context.Context, dst io.Writer, src io.Reader) error {
buf := make([]byte, 32*1024)
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Set a short read timeout for responsiveness to context cancellation
if conn, ok := src.(net.Conn); ok {
conn.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
}
n, err := src.Read(buf)
if n > 0 {
if _, writeErr := dst.Write(buf[:n]); writeErr != nil {
return writeErr
}
}
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
continue // Retry on timeout to check context
}
if err != io.EOF {
return err
}
return nil // EOF reached, copy complete
}
}
}
// AddHeaders adds headers to the connection.
// It takes a net.Conn and a map of headers as input.
// It only adds headers if the connection is an HTTP connection.
// It returns a net.Conn with the headers added.
func AddHeaders(conn net.Conn, headers map[string]string) net.Conn {
// read a request from the connection
// if the request is an HTTP request, add the headers
// if the request is not an HTTP request, return the connection as is
req, err := http.ReadRequest(bufio.NewReader(conn))
// Create a buffer to store the original request
var buf bytes.Buffer
teeReader := io.TeeReader(conn, &buf)
// Try to read the request, but also save it to our buffer
req, err := http.ReadRequest(bufio.NewReader(teeReader))
if err != nil {
log.Println("Error reading request:", err)
// if the request is not an HTTP request, return the connection as is
// Not an HTTP request or couldn't parse, return original connection
return conn
}
log.Println("Adding headers to connection:", req.Method, req.URL)
// Add our headers
for key, value := range headers {
req.Header.Add(key, value)
log.Println("Added header:", key, value)
}
// write the request back to the connection
if err := req.Write(conn); err != nil {
log.Println("Error writing request:", err)
// if there is an error writing the request, return the connection as is
return conn
// Create a pipe to connect our modified request with the output
pr, pw := io.Pipe()
// Write the modified request to one end of the pipe with timeout protection
go func() {
defer func() {
if r := recover(); r != nil {
log.Printf("PANIC in header processing goroutine: %v", r)
}
pw.Close()
}()
// Create context with timeout for the entire operation
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Set a deadline for the connection to prevent indefinite blocking
if deadline, ok := conn.(interface{ SetDeadline(time.Time) error }); ok {
deadline.SetDeadline(time.Now().Add(30 * time.Second))
}
// Write the modified request
if err := req.Write(pw); err != nil {
log.Printf("Error writing modified request: %v", err)
return
}
// Copy remaining data with context-aware copying to prevent goroutine leak
err := copyWithContextCancel(ctx, pw, conn)
if err != nil && err != context.Canceled && err != context.DeadlineExceeded {
log.Printf("Error copying connection data: %v", err)
}
if ctx.Err() != nil {
log.Printf("Header processing goroutine timed out after 30 seconds")
}
}()
// Return a ReadWriter that reads from our pipe and writes to the original connection
return &readWriteConn{
Reader: pr,
Writer: conn,
conn: conn,
}
// If all goes well, return the connection with the headers added
return conn
}
// readWriteConn implements net.Conn
type readWriteConn struct {
io.Reader
io.Writer
conn net.Conn
}
// Implement the rest of net.Conn interface by delegating to the original connection
func (rwc *readWriteConn) Close() error { return rwc.conn.Close() }
func (rwc *readWriteConn) LocalAddr() net.Addr { return rwc.conn.LocalAddr() }
func (rwc *readWriteConn) RemoteAddr() net.Addr { return rwc.conn.RemoteAddr() }
func (rwc *readWriteConn) SetDeadline(t time.Time) error { return rwc.conn.SetDeadline(t) }
func (rwc *readWriteConn) SetReadDeadline(t time.Time) error { return rwc.conn.SetReadDeadline(t) }
func (rwc *readWriteConn) SetWriteDeadline(t time.Time) error { return rwc.conn.SetWriteDeadline(t) }
// Accept accepts a connection from the listener.
// It takes a net.Listener as input and returns a net.Conn with the headers added.
// It is used to accept connections from the meta listener and add headers to them.
@@ -56,11 +146,11 @@ func (ml *Mirror) Accept() (net.Conn, error) {
return nil, err
}
// If the handshake is successful, get the underlying connection
conn = tlsConn.NetConn()
// conn = tlsConn.NetConn()
}
host := map[string]string{
"Host": ml.MetaListener.Addr().String(),
"Host": conn.LocalAddr().String(),
"X-Forwarded-For": conn.RemoteAddr().String(),
"X-Forwarded-Proto": "http",
}

View File

@@ -2,11 +2,13 @@ package mirror
import (
"fmt"
"log"
"net"
"os"
"strings"
"sync"
"github.com/go-i2p/go-meta-listener"
"github.com/go-i2p/go-meta-listener/tcp"
"github.com/go-i2p/onramp"
wileedot "github.com/opd-ai/wileedot"
@@ -14,6 +16,7 @@ import (
type Mirror struct {
*meta.MetaListener
mu sync.RWMutex // protects Onions and Garlics maps
Onions map[string]*onramp.Onion
Garlics map[string]*onramp.Garlic
}
@@ -21,56 +24,77 @@ type Mirror struct {
var _ net.Listener = &Mirror{}
func (m *Mirror) Close() error {
log.Println("Closing Mirror")
if err := m.MetaListener.Close(); err != nil {
log.Println("Error closing MetaListener:", err)
} else {
log.Println("MetaListener closed")
}
m.mu.Lock()
defer m.mu.Unlock()
for _, onion := range m.Onions {
if err := onion.Close(); err != nil {
log.Println("Error closing Onion:", err)
} else {
log.Println("Onion closed")
}
}
for _, garlic := range m.Garlics {
if err := garlic.Close(); err != nil {
log.Println("Error closing Garlic:", err)
} else {
log.Println("Garlic closed")
}
}
log.Println("Mirror closed")
return nil
}
func NewMirror(name string) (*Mirror, error) {
log.Println("Creating new Mirror")
inner := meta.NewMetaListener()
name = strings.TrimSpace(name)
name = strings.ReplaceAll(name, " ", "")
if name == "" {
name = "mirror"
}
onion, err := onramp.NewOnion("metalistener-" + name)
if err != nil {
return nil, err
}
garlic, err := onramp.NewGarlic("metalistener-"+name, "127.0.0.1:7656", onramp.OPT_WIDE)
if err != nil {
return nil, err
}
log.Printf("Creating new MetaListener with name: '%s'\n", name)
_, port, err := net.SplitHostPort(name)
if err != nil {
port = "3000"
}
onions := make(map[string]*onramp.Onion)
if !DisableTor() {
onion, err := onramp.NewOnion("metalistener-" + name)
if err != nil {
return nil, err
}
log.Println("Created new Onion manager")
onions[port] = onion
}
garlics := make(map[string]*onramp.Garlic)
onions[port] = onion
garlics[port] = garlic
if !DisableI2P() {
garlic, err := onramp.NewGarlic("metalistener-"+name, "127.0.0.1:7656", onramp.OPT_WIDE)
if err != nil {
return nil, err
}
log.Println("Created new Garlic manager")
garlics[port] = garlic
}
ml := &Mirror{
MetaListener: inner,
Onions: onions,
Garlics: garlics,
}
log.Printf("Mirror created with name: '%s' and port: '%s', '%s'\n", name, port, ml.MetaListener.Addr().String())
return ml, nil
}
func (ml Mirror) Listen(name, addr, certdir string, hiddenTls bool) (net.Listener, error) {
func (ml Mirror) Listen(name, addr string) (net.Listener, error) {
log.Println("Starting Mirror Listener")
log.Printf("Actual args: name: '%s' addr: '%s' certDir: '%s' hiddenTls: '%t'\n", name, addr, certdir, hiddenTls)
// get the port:
_, port, err := net.SplitHostPort(name)
if err != nil {
@@ -80,93 +104,127 @@ func (ml Mirror) Listen(name, addr, certdir string, hiddenTls bool) (net.Listene
}
port = "3000"
}
if strings.HasSuffix(port, "22") {
log.Println("Port ends with 22, setting hiddenTls to true")
log.Println("This is a workaround for the fact that the default port for SSH is 22")
log.Println("This is so self-configuring SSH servers can be used without TLS, which would make connecting to them wierd")
hiddenTls = false
}
hiddenTls := hiddenTls(port)
log.Printf("Actual args: name: '%s' addr: '%s' certDir: '%s' hiddenTls: '%t'\n", name, addr, certDir(), hiddenTls)
localAddr := net.JoinHostPort("127.0.0.1", port)
// Listen on plain HTTP
tcpListener, err := net.Listen("tcp", localAddr)
listener, err := net.Listen("tcp", localAddr)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to create TCP listener on %s: %w", localAddr, err)
}
if err := ml.AddListener(port, tcpListener); err != nil {
tcpListener := listener.(*net.TCPListener)
hardenedListener, err := tcp.Config(*tcpListener)
if err != nil {
log.Fatal(err)
}
log.Printf("TCP listener created on %s\n", localAddr)
if err := ml.AddListener(port, hardenedListener); err != nil {
return nil, err
}
log.Printf("HTTP Local listener added http://%s\n", tcpListener.Addr())
log.Println("Checking for existing onion and garlic listeners")
listenerId := fmt.Sprintf("metalistener-%s-%s", name, port)
log.Println("Listener ID:", listenerId)
// Protect map access with mutex to prevent data race
ml.mu.Lock()
// Check if onion and garlic listeners already exist
if ml.Onions[port] == nil {
if ml.Onions[port] == nil && !DisableTor() {
// make a new onion listener
// and add it to the map
log.Println("Creating new onion listener")
onion, err := onramp.NewOnion(listenerId)
if err != nil {
ml.mu.Unlock()
return nil, err
}
log.Println("Onion listener created for port", port)
ml.Onions[port] = onion
}
if ml.Garlics[port] == nil {
if ml.Garlics[port] == nil && !DisableI2P() {
// make a new garlic listener
// and add it to the map
log.Println("Creating new garlic listener")
garlic, err := onramp.NewGarlic(listenerId, "127.0.0.1:7656", onramp.OPT_WIDE)
if err != nil {
ml.mu.Unlock()
return nil, err
}
log.Println("Garlic listener created for port", port)
ml.Garlics[port] = garlic
}
ml.mu.Unlock()
if hiddenTls {
// make sure an onion and a garlic listener exist at ml.Onions[port] and ml.Garlics[port]
// and listen on them, check existence first
onionListener, err := ml.Onions[port].ListenTLS()
if err != nil {
return nil, err
if !DisableTor() {
ml.mu.RLock()
onionInstance := ml.Onions[port]
ml.mu.RUnlock()
onionListener, err := onionInstance.ListenTLS()
if err != nil {
return nil, err
}
oid := fmt.Sprintf("onion-%s", onionListener.Addr().String())
if err := ml.AddListener(oid, onionListener); err != nil {
return nil, err
}
log.Printf("OnionTLS listener added https://%s\n", onionListener.Addr())
}
oid := fmt.Sprintf("onion-%s", onionListener.Addr().String())
if err := ml.AddListener(oid, onionListener); err != nil {
return nil, err
if !DisableI2P() {
ml.mu.RLock()
garlicInstance := ml.Garlics[port]
ml.mu.RUnlock()
garlicListener, err := garlicInstance.ListenTLS()
if err != nil {
return nil, err
}
gid := fmt.Sprintf("garlic-%s", garlicListener.Addr().String())
if err := ml.AddListener(gid, garlicListener); err != nil {
return nil, err
}
log.Printf("GarlicTLS listener added https://%s\n", garlicListener.Addr())
}
log.Printf("OnionTLS listener added https://%s\n", onionListener.Addr())
garlicListener, err := ml.Garlics[port].ListenTLS()
if err != nil {
return nil, err
}
gid := fmt.Sprintf("garlic-%s", garlicListener.Addr().String())
if err := ml.AddListener(gid, garlicListener); err != nil {
return nil, err
}
log.Printf("GarlicTLS listener added https://%s\n", garlicListener.Addr())
} else {
onionListener, err := ml.Onions[port].Listen()
if err != nil {
return nil, err
if !DisableTor() {
ml.mu.RLock()
onionInstance := ml.Onions[port]
ml.mu.RUnlock()
onionListener, err := onionInstance.Listen()
if err != nil {
return nil, err
}
oid := fmt.Sprintf("onion-%s", onionListener.Addr().String())
if err := ml.AddListener(oid, onionListener); err != nil {
return nil, err
}
log.Printf("Onion listener added http://%s\n", onionListener.Addr())
}
oid := fmt.Sprintf("onion-%s", onionListener.Addr().String())
if err := ml.AddListener(oid, onionListener); err != nil {
return nil, err
if !DisableI2P() {
ml.mu.RLock()
garlicInstance := ml.Garlics[port]
ml.mu.RUnlock()
garlicListener, err := garlicInstance.Listen()
if err != nil {
return nil, err
}
gid := fmt.Sprintf("garlic-%s", garlicListener.Addr().String())
if err := ml.AddListener(gid, garlicListener); err != nil {
return nil, err
}
log.Printf("Garlic listener added http://%s\n", garlicListener.Addr())
}
log.Printf("Onion listener added http://%s\n", onionListener.Addr())
garlicListener, err := ml.Garlics[port].Listen()
if err != nil {
return nil, err
}
gid := fmt.Sprintf("garlic-%s", garlicListener.Addr().String())
if err := ml.AddListener(gid, garlicListener); err != nil {
return nil, err
}
log.Printf("Garlic listener added http://%s\n", garlicListener.Addr())
}
if addr != "" {
cfg := wileedot.Config{
Domain: name,
AllowedDomains: []string{name},
CertDir: certdir,
CertDir: certDir(),
Email: addr,
}
tlsListener, err := wileedot.New(cfg)
@@ -187,10 +245,67 @@ func (ml Mirror) Listen(name, addr, certdir string, hiddenTls bool) (net.Listene
// name is the domain name used for the TLS listener, required for Let's Encrypt.
// addr is the email address used for Let's Encrypt registration.
// It is recommended to use a valid email address for production use.
func Listen(name, addr, certdir string, hiddenTls bool) (net.Listener, error) {
func Listen(name, addr string) (net.Listener, error) {
ml, err := NewMirror(name)
if err != nil {
return nil, err
}
return ml.Listen(name, addr, certdir, hiddenTls)
return ml.Listen(name, addr)
}
func DisableTor() bool {
val := os.Getenv("DISABLE_TOR")
if val == "1" || strings.ToLower(val) == "true" {
log.Println("Tor is disabled by environment variable DISABLE_TOR")
return true
}
return false
}
func DisableI2P() bool {
val := os.Getenv("DISABLE_I2P")
if val == "1" || strings.ToLower(val) == "true" {
log.Println("I2P is disabled by environment variable DISABLE_I2P")
return true
}
return false
}
// HIDDEN_TLS is a global variable that determines whether to use hidden TLS.
// It is set to true by default, but can be overridden by the hiddenTls function.
// If the port ends with "22", it will return false, indicating that hidden TLS should not be used.
// This is a useful workaround for SSH connections, which commonly use port 22.
var HIDDEN_TLS = true
func hiddenTls(port string) bool {
// Check if the port is 22, which is commonly used for SSH
if strings.HasSuffix(port, "22") {
log.Println("Port ends with 22, setting hiddenTls to false")
return false
}
// Default to true for other ports
return HIDDEN_TLS
}
var default_CERT_DIR = "./certs"
// CERT_DIR is the directory where certificates are stored.
// It can be overridden by setting the CERT_DIR environment variable.
// if CERT_DIR is not set, it defaults to "./certs".
// if CERT_DIR is set from Go code, it will always return the value set in the code.
// if CERT_DIR is set from the environment, it will return the value from the environment unless overridden by Go code.
var CERT_DIR = default_CERT_DIR
func certDir() string {
// Default certificate directory
certDir := CERT_DIR
if certDir != default_CERT_DIR {
// if the default directory is not used, always return it
return certDir
}
if dir := os.Getenv("CERT_DIR"); dir != "" {
certDir = dir
}
log.Printf("Using certificate directory: %s\n", certDir)
return certDir
}

View File

@@ -0,0 +1,94 @@
package mirror
import (
"sync"
"testing"
"time"
)
// TestConcurrentListenDataRace tests for data race in concurrent Listen() calls
func TestConcurrentListenDataRace(t *testing.T) {
// Enable race detector with go test -race
// This test reproduces the data race in Mirror.Listen()
mirror, err := NewMirror("test-mirror:3001")
if err != nil {
t.Fatalf("Failed to create mirror: %v", err)
}
defer mirror.Close()
const numGoroutines = 10
var wg sync.WaitGroup
errorCh := make(chan error, numGoroutines)
// Launch multiple goroutines that call Listen() concurrently
// This should trigger the data race on ml.Onions and ml.Garlics maps
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
// Use different ports to avoid "address already in use" errors
// The race condition occurs on the map access, not the network binding
port := 4000 + id
addr := "test-" + string(rune('a'+id)) + ":" + string(rune('0'+port%10))
// This should cause concurrent map access on ml.Onions and ml.Garlics
listener, err := mirror.Listen(addr, "")
if err != nil {
errorCh <- err
return
}
if listener != nil {
// Clean close if successful
listener.Close()
}
}(i)
}
// Wait for all goroutines to complete
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
// Wait with timeout
select {
case <-done:
// Check for any errors
close(errorCh)
for err := range errorCh {
if err != nil {
t.Errorf("Error in concurrent Listen(): %v", err)
}
}
case <-time.After(30 * time.Second):
t.Fatal("Test timed out - possible deadlock or hang")
}
}
// TestSequentialListenWorks verifies that sequential Listen calls work correctly
func TestSequentialListenWorks(t *testing.T) {
mirror, err := NewMirror("test-sequential:3002")
if err != nil {
t.Fatalf("Failed to create mirror: %v", err)
}
defer mirror.Close()
// Sequential calls should work fine
listener1, err := mirror.Listen("test-seq-1:3002", "")
if err != nil {
t.Fatalf("First Listen() failed: %v", err)
}
if listener1 != nil {
listener1.Close()
}
listener2, err := mirror.Listen("test-seq-2:3002", "")
if err != nil {
t.Fatalf("Second Listen() failed: %v", err)
}
if listener2 != nil {
listener2.Close()
}
}

7
mirror/log.go Normal file
View File

@@ -0,0 +1,7 @@
package mirror
import (
"github.com/go-i2p/logger"
)
var log = logger.GetGoI2PLogger()

60
mirror/map_race_test.go Normal file
View File

@@ -0,0 +1,60 @@
package mirror
import (
"sync"
"testing"
"time"
"github.com/go-i2p/onramp"
)
// TestConcurrentMapAccessDataRace tests for data race in concurrent map access
func TestConcurrentMapAccessDataRace(t *testing.T) {
// This test directly reproduces the data race on the maps
// Create a Mirror with empty maps like in Listen()
mirror := &Mirror{
Onions: make(map[string]*onramp.Onion),
Garlics: make(map[string]*onramp.Garlic),
}
const numGoroutines = 20
var wg sync.WaitGroup
// Simulate the exact race condition from Listen() method
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
// Use the same port to force the race condition
port := "8080"
// This replicates the exact problematic code from Listen():
// Simulate the read-check-write pattern from Listen()
if mirror.Onions[port] == nil && !DisableTor() {
onion, _ := onramp.NewOnion("test-onion")
mirror.Onions[port] = onion // Concurrent write to map
}
if mirror.Garlics[port] == nil && !DisableI2P() {
garlic, _ := onramp.NewGarlic("test-garlic", "127.0.0.1:7656", onramp.OPT_WIDE)
mirror.Garlics[port] = garlic // Concurrent write to map
}
}(i)
}
// Wait for all goroutines to complete
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
// Wait with timeout
select {
case <-done:
// Test completed - race detector should catch any issues
case <-time.After(10 * time.Second):
t.Fatal("Test timed out")
}
}

View File

@@ -1,45 +1,255 @@
package main
import (
"context"
"flag"
"fmt"
"io"
"log"
"net"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/go-i2p/go-meta-listener/mirror"
)
const (
maxConcurrentConnections = 100 // Limit concurrent connections
connectionTimeout = 30 * time.Second
shutdownTimeout = 5 * time.Second
)
// connectionPool manages concurrent connections with proper lifecycle
type connectionPool struct {
semaphore chan struct{}
activeConns sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
}
func newConnectionPool(maxConns int) *connectionPool {
ctx, cancel := context.WithCancel(context.Background())
return &connectionPool{
semaphore: make(chan struct{}, maxConns),
ctx: ctx,
cancel: cancel,
}
}
func (cp *connectionPool) handleConnection(clientConn net.Conn, targetHost string, targetPort int) {
// Acquire semaphore slot or block
select {
case cp.semaphore <- struct{}{}:
// Got slot, continue
case <-cp.ctx.Done():
clientConn.Close()
return
}
// Track active connection
cp.activeConns.Add(1)
// Handle connection in separate goroutine
go func() {
defer func() {
<-cp.semaphore // Release semaphore slot
cp.activeConns.Done()
clientConn.Close()
}()
// Set connection timeout
clientConn.SetDeadline(time.Now().Add(connectionTimeout))
// Connect to target with timeout
serverConn, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%d", targetHost, targetPort), 10*time.Second)
if err != nil {
log.Printf("Failed to connect to target %s:%d: %v", targetHost, targetPort, err)
return
}
defer serverConn.Close()
// Set timeout on server connection
serverConn.SetDeadline(time.Now().Add(connectionTimeout))
// Create context for this connection
connCtx, connCancel := context.WithCancel(cp.ctx)
defer connCancel()
// Forward data bidirectionally with proper error handling
var wg sync.WaitGroup
wg.Add(2)
// Client to server
go func() {
defer wg.Done()
if _, err := copyWithContext(connCtx, serverConn, clientConn); err != nil && err != io.EOF {
log.Printf("Error copying client to server: %v", err)
}
// Close server write side to signal completion
if tcpConn, ok := serverConn.(*net.TCPConn); ok {
tcpConn.CloseWrite()
}
}()
// Server to client
go func() {
defer wg.Done()
if _, err := copyWithContext(connCtx, clientConn, serverConn); err != nil && err != io.EOF {
log.Printf("Error copying server to client: %v", err)
}
// Close client write side to signal completion
if tcpConn, ok := clientConn.(*net.TCPConn); ok {
tcpConn.CloseWrite()
}
}()
// Wait for either copy operation to complete or context cancellation
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
// Normal completion
case <-connCtx.Done():
// Context cancelled, connections will be closed by defers
}
}()
}
func (cp *connectionPool) shutdown() {
cp.cancel()
cp.activeConns.Wait()
}
// copyWithContext copies data between connections with context cancellation support
func copyWithContext(ctx context.Context, dst, src net.Conn) (int64, error) {
// Use a small buffer for responsive cancellation
buf := make([]byte, 32*1024)
var written int64
for {
select {
case <-ctx.Done():
return written, ctx.Err()
default:
}
// Set short read timeout for responsiveness
src.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
nr, er := src.Read(buf)
if nr > 0 {
nw, ew := dst.Write(buf[0:nr])
if nw < 0 || nr < nw {
nw = 0
if ew == nil {
ew = fmt.Errorf("invalid write count")
}
}
written += int64(nw)
if ew != nil {
return written, ew
}
if nr != nw {
return written, io.ErrShortWrite
}
}
if er != nil {
if netErr, ok := er.(net.Error); ok && netErr.Timeout() {
continue // Retry on timeout
}
if er != io.EOF {
return written, er
}
break
}
}
return written, nil
}
// main function sets up a meta listener that forwards connections to a specified host and port.
// It listens for incoming connections and forwards them to the specified destination.
func main() {
host := flag.String("host", "localhost", "Host to forward connections to")
port := flag.Int("port", 8080, "Port to forward connections to")
listenPort := flag.Int("listen-port", 3002, "Port to listen for incoming connections")
domain := flag.String("domain", "i2pgit.org", "Domain name for TLS listener")
email := flag.String("email", "example@example.com", "Email address for Let's Encrypt registration")
email := flag.String("email", "", "Email address for Let's Encrypt registration")
certDir := flag.String("certdir", "./certs", "Directory for storing certificates")
hiddenTls := flag.Bool("hidden-tls", false, "Enable hidden TLS")
maxConns := flag.Int("max-conns", maxConcurrentConnections, "Maximum concurrent connections")
flag.Parse()
mirror.CERT_DIR = *certDir
mirror.HIDDEN_TLS = *hiddenTls
addr := net.JoinHostPort(*domain, fmt.Sprintf("%d", *listenPort))
// Create connection pool with specified limits
pool := newConnectionPool(*maxConns)
defer pool.shutdown()
// Create a new meta listener
metaListener, err := mirror.Listen(*domain, *email, *certDir, *hiddenTls)
metaListener, err := mirror.Listen(addr, *email)
if err != nil {
panic(err)
log.Fatalf("Failed to create meta listener: %v", err)
}
defer metaListener.Close()
// forward all connections recieved on the meta listener to a local host:port
for {
conn, err := metaListener.Accept()
if err != nil {
panic(err)
}
go func() {
defer conn.Close()
localConn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", *host, *port))
// Set up graceful shutdown
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
log.Printf("Proxy server starting, forwarding to %s:%d (max concurrent connections: %d)", *host, *port, *maxConns)
// Start accepting connections in a separate goroutine
go func() {
for {
conn, err := metaListener.Accept()
if err != nil {
panic(err)
// Check if this is due to shutdown
select {
case <-pool.ctx.Done():
log.Println("Shutting down connection accept loop")
return
default:
log.Printf("Error accepting connection: %v", err)
continue
}
}
defer localConn.Close()
go io.Copy(localConn, conn)
io.Copy(conn, localConn)
}()
log.Printf("Accepted connection from %s", conn.RemoteAddr())
pool.handleConnection(conn, *host, *port)
}
}()
// Wait for shutdown signal
<-sigCh
log.Println("Shutdown signal received, stopping proxy...")
// Close listener to stop accepting new connections
metaListener.Close()
// Shutdown connection pool with timeout
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), shutdownTimeout)
defer shutdownCancel()
done := make(chan struct{})
go func() {
pool.shutdown()
close(done)
}()
select {
case <-done:
log.Println("All connections closed gracefully")
case <-shutdownCtx.Done():
log.Println("Shutdown timeout exceeded, forcing exit")
}
log.Println("Proxy server stopped")
}

119
tcp/config.go Normal file
View File

@@ -0,0 +1,119 @@
// Package tcp provides production hardening for net.TCPListener with minimal overhead.
//
// This package wraps standard Go TCP listeners with essential TCP socket configuration
// for internet-facing services. It applies conservative defaults without requiring
// configuration, making it safe to use in production environments.
//
// Example usage:
//
// listener, err := net.Listen("tcp", ":8080")
// if err != nil {
// log.Fatal(err)
// }
// tcpListener := listener.(*net.TCPListener)
//
// hardenedListener, err := tcp.Config(*tcpListener)
// if err != nil {
// log.Fatal(err)
// }
// defer hardenedListener.Close()
//
// for {
// conn, err := hardenedListener.Accept()
// if err != nil {
// log.Printf("Accept error: %v", err)
// continue
// }
// go handleConnection(conn)
// }
package tcp
import (
"net"
"time"
)
const (
// keepAliveInterval sets TCP keep-alive probe interval to 15 seconds.
// This provides reasonable connection health detection without excessive overhead.
keepAliveInterval = 15 * time.Second
// socketBufferSize sets both read and write socket buffers to 64KB.
// This balances memory usage with throughput for typical web applications.
socketBufferSize = 64 * 1024
)
// hardenedListener wraps net.TCPListener with production hardening features.
type hardenedListener struct {
listener net.TCPListener
}
// Config wraps a net.TCPListener with production hardening features.
//
// The wrapped listener applies the following enhancements:
// - TCP keep-alive with 15-second intervals
// - TCP_NODELAY for reduced latency
// - 64KB socket buffer sizes for optimal throughput
//
// These settings are chosen to provide reliability and performance improvements
// while maintaining compatibility with standard net.Listener interface.
func Config(listener net.TCPListener) (net.Listener, error) {
return &hardenedListener{
listener: listener,
}, nil
}
// Accept waits for and returns the next connection with hardening applied.
func (hl *hardenedListener) Accept() (net.Conn, error) {
conn, err := hl.listener.AcceptTCP()
if err != nil {
return nil, err
}
// Apply TCP hardening settings
if err := hl.hardenConnection(conn); err != nil {
conn.Close()
return nil, err
}
return conn, nil
}
// hardenConnection applies security and performance settings to a TCP connection.
func (hl *hardenedListener) hardenConnection(conn *net.TCPConn) error {
// Enable TCP keep-alive to detect dead connections
if err := conn.SetKeepAlive(true); err != nil {
return err
}
// Set keep-alive interval for timely detection of connection issues
if err := conn.SetKeepAlivePeriod(keepAliveInterval); err != nil {
return err
}
// Disable Nagle's algorithm for lower latency
if err := conn.SetNoDelay(true); err != nil {
return err
}
// Set socket buffer sizes for optimal throughput
if err := conn.SetReadBuffer(socketBufferSize); err != nil {
return err
}
if err := conn.SetWriteBuffer(socketBufferSize); err != nil {
return err
}
return nil
}
// Close stops the listener and prevents new connections.
func (hl *hardenedListener) Close() error {
return hl.listener.Close()
}
// Addr returns the listener's network address.
func (hl *hardenedListener) Addr() net.Addr {
return hl.listener.Addr()
}