mirror of
https://github.com/go-i2p/go-meta-listener.git
synced 2025-08-29 15:47:44 -04:00
Compare commits
15 Commits
979c70a3d3
...
a6f90149b4
Author | SHA1 | Date | |
---|---|---|---|
![]() |
a6f90149b4 | ||
![]() |
c3e2fa1124 | ||
![]() |
35b9bebe12 | ||
![]() |
00de9c987f | ||
![]() |
b0ea3e21f8 | ||
![]() |
e49691ef9d | ||
![]() |
525e371349 | ||
![]() |
6059157b55 | ||
![]() |
b6a47b286c | ||
![]() |
3c74637f8a | ||
![]() |
d22808794e | ||
![]() |
b2f9a33f18 | ||
![]() |
0c400f3a6a | ||
![]() |
3c964c12dd | ||
![]() |
f5adfd0c84 |
7
.gitignore
vendored
7
.gitignore
vendored
@@ -25,3 +25,10 @@ go.work.sum
|
||||
.env
|
||||
/*keys
|
||||
/data-dir*
|
||||
mp
|
||||
metaproxy
|
||||
test*
|
||||
ex
|
||||
tlskeys
|
||||
onionkeys
|
||||
i2pkeys
|
24
go.mod
24
go.mod
@@ -6,21 +6,23 @@ require (
|
||||
github.com/go-i2p/logger v0.0.0-20241123010126-3050657e5d0c
|
||||
github.com/go-i2p/onramp v0.33.92
|
||||
github.com/opd-ai/wileedot v0.0.0-20241217172720-521d4175e624
|
||||
github.com/samber/oops v1.18.0
|
||||
github.com/samber/oops v1.19.0
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/cretz/bine v0.2.0 // indirect
|
||||
github.com/go-i2p/i2pkeys v0.33.10-0.20241113193422-e10de5e60708 // indirect
|
||||
github.com/go-i2p/sam3 v0.33.9 // indirect
|
||||
github.com/oklog/ulid/v2 v2.1.0 // indirect
|
||||
github.com/go-i2p/go-limit v0.0.0-20250718212214-52e5c6fec5d8 // indirect
|
||||
github.com/go-i2p/i2pkeys v0.33.92 // indirect
|
||||
github.com/go-i2p/sam3 v0.33.92 // indirect
|
||||
github.com/oklog/ulid/v2 v2.1.1 // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/samber/lo v1.50.0 // indirect
|
||||
github.com/samber/lo v1.51.0 // indirect
|
||||
github.com/sirupsen/logrus v1.9.3 // indirect
|
||||
go.opentelemetry.io/otel v1.29.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.29.0 // indirect
|
||||
golang.org/x/crypto v0.31.0 // indirect
|
||||
golang.org/x/net v0.31.0 // indirect
|
||||
golang.org/x/sys v0.28.0 // indirect
|
||||
golang.org/x/text v0.22.0 // indirect
|
||||
go.opentelemetry.io/otel v1.37.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.37.0 // indirect
|
||||
golang.org/x/crypto v0.40.0 // indirect
|
||||
golang.org/x/net v0.42.0 // indirect
|
||||
golang.org/x/sys v0.34.0 // indirect
|
||||
golang.org/x/text v0.27.0 // indirect
|
||||
golang.org/x/time v0.12.0 // indirect
|
||||
)
|
||||
|
30
go.sum
30
go.sum
@@ -3,19 +3,31 @@ github.com/cretz/bine v0.2.0/go.mod h1:WU4o9QR9wWp8AVKtTM1XD5vUHkEqnf2vVSo6dBqbe
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/go-i2p/go-limit v0.0.0-20250203203028-656fa82e1cb3 h1:czQe9f4Y+UFwqMKvMVsWDG/HWKdIN6uPelIkfoCjAsk=
|
||||
github.com/go-i2p/go-limit v0.0.0-20250203203028-656fa82e1cb3/go.mod h1:4jjmVRhvKj47sQ6B6wdDhN1IrEZunE6KwkYLQx/BeVE=
|
||||
github.com/go-i2p/go-limit v0.0.0-20250718203732-4734f182b014 h1:USgEZEPweXNRwXJjbsIvGz9q+ADSIdo+rMxzyuqbHbQ=
|
||||
github.com/go-i2p/go-limit v0.0.0-20250718203732-4734f182b014/go.mod h1:4jjmVRhvKj47sQ6B6wdDhN1IrEZunE6KwkYLQx/BeVE=
|
||||
github.com/go-i2p/go-limit v0.0.0-20250718212214-52e5c6fec5d8 h1:eKJ5X3wWYg/ln8C2G9Ibp3bUn8Qg8sYe28Uthypmb9g=
|
||||
github.com/go-i2p/go-limit v0.0.0-20250718212214-52e5c6fec5d8/go.mod h1:4jjmVRhvKj47sQ6B6wdDhN1IrEZunE6KwkYLQx/BeVE=
|
||||
github.com/go-i2p/i2pkeys v0.0.0-20241108200332-e4f5ccdff8c4/go.mod h1:m5TlHjPZrU5KbTd7Lr+I2rljyC6aJ88HdkeMQXV0U0E=
|
||||
github.com/go-i2p/i2pkeys v0.33.10-0.20241113193422-e10de5e60708 h1:Tiy9IBwi21maNpK74yCdHursJJMkyH7w87tX1nXGWzg=
|
||||
github.com/go-i2p/i2pkeys v0.33.10-0.20241113193422-e10de5e60708/go.mod h1:m5TlHjPZrU5KbTd7Lr+I2rljyC6aJ88HdkeMQXV0U0E=
|
||||
github.com/go-i2p/i2pkeys v0.33.92 h1:e2vx3vf7tNesaJ8HmAlGPOcfiGM86jzeIGxh27I9J2Y=
|
||||
github.com/go-i2p/i2pkeys v0.33.92/go.mod h1:BRURQ/twxV0WKjZlFSKki93ivBi+MirZPWudfwTzMpE=
|
||||
github.com/go-i2p/logger v0.0.0-20241123010126-3050657e5d0c h1:VTiECn3dFEmUlZjto+wOwJ7SSJTHPLyNprQMR5HzIMI=
|
||||
github.com/go-i2p/logger v0.0.0-20241123010126-3050657e5d0c/go.mod h1:te7Zj3g3oMeIl8uBXAgO62UKmZ6m6kHRNg1Mm+X8Hzk=
|
||||
github.com/go-i2p/onramp v0.33.92 h1:Dk3A0SGpdEw829rSjW2LqN8o16pUvuhiN0vn36z7Gpc=
|
||||
github.com/go-i2p/onramp v0.33.92/go.mod h1:5sfB8H2xk05gAS2K7XAUZ7ekOfwGJu3tWF0fqdXzJG4=
|
||||
github.com/go-i2p/sam3 v0.33.9 h1:3a+gunx75DFc6jxloUZTAVJbdP6736VU1dy2i7I9fKA=
|
||||
github.com/go-i2p/sam3 v0.33.9/go.mod h1:oDuV145l5XWKKafeE4igJHTDpPwA0Yloz9nyKKh92eo=
|
||||
github.com/go-i2p/sam3 v0.33.92 h1:TVpi4GH7Yc7nZBiE1QxLjcZfnC4fI/80zxQz1Rk36BA=
|
||||
github.com/go-i2p/sam3 v0.33.92/go.mod h1:oDuV145l5XWKKafeE4igJHTDpPwA0Yloz9nyKKh92eo=
|
||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU=
|
||||
github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ=
|
||||
github.com/oklog/ulid/v2 v2.1.1 h1:suPZ4ARWLOJLegGFiZZ1dFAkqzhMjL3J1TzI+5wHz8s=
|
||||
github.com/oklog/ulid/v2 v2.1.1/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ=
|
||||
github.com/opd-ai/wileedot v0.0.0-20241217172720-521d4175e624 h1:FXCTQV93+31Yj46zpYbd41es+EYgT7qi4RK6KSVrGQM=
|
||||
github.com/opd-ai/wileedot v0.0.0-20241217172720-521d4175e624/go.mod h1:ftKSvvGC9FnxZeuL3B4MB6q/DOzVSV0kET08YUyDwbM=
|
||||
github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o=
|
||||
@@ -25,8 +37,12 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/samber/lo v1.50.0 h1:XrG0xOeHs+4FQ8gJR97zDz5uOFMW7OwFWiFVzqopKgY=
|
||||
github.com/samber/lo v1.50.0/go.mod h1:RjZyNk6WSnUFRKK6EyOhsRJMqft3G+pg7dCWHQCWvsc=
|
||||
github.com/samber/lo v1.51.0 h1:kysRYLbHy/MB7kQZf5DSN50JHmMsNEdeY24VzJFu7wI=
|
||||
github.com/samber/lo v1.51.0/go.mod h1:4+MXEGsJzbKGaUEQFKBq2xtfuznW9oz/WrgyzMzRoM0=
|
||||
github.com/samber/oops v1.18.0 h1:NnoCdxlOg/ajFos8HIC0+dV8S6cZRcrjW1WrfZe+GOc=
|
||||
github.com/samber/oops v1.18.0/go.mod h1:DcZbba2s+PzSx14vY6HjvhV1FDsGOZ1TJg7T/ZZARBQ=
|
||||
github.com/samber/oops v1.19.0 h1:sfZAwC8MmTXBRRyNc4Z1utuTPBx+hFKF5fJ9DEQRZfw=
|
||||
github.com/samber/oops v1.19.0/go.mod h1:+f+61dbiMxEMQ8gw/zTxW2pk+YGobaDM4glEHQtPOww=
|
||||
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
|
||||
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
@@ -35,26 +51,40 @@ github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOf
|
||||
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||
go.opentelemetry.io/otel v1.29.0 h1:PdomN/Al4q/lN6iBJEN3AwPvUiHPMlt93c8bqTG5Llw=
|
||||
go.opentelemetry.io/otel v1.29.0/go.mod h1:N/WtXPs1CNCUEx+Agz5uouwCba+i+bJGFicT8SR4NP8=
|
||||
go.opentelemetry.io/otel v1.37.0 h1:9zhNfelUvx0KBfu/gb+ZgeAfAgtWrfHJZcAqFC228wQ=
|
||||
go.opentelemetry.io/otel v1.37.0/go.mod h1:ehE/umFRLnuLa/vSccNq9oS1ErUlkkK71gMcN34UG8I=
|
||||
go.opentelemetry.io/otel/trace v1.29.0 h1:J/8ZNK4XgR7a21DZUAsbF8pZ5Jcw1VhACmnYt39JTi4=
|
||||
go.opentelemetry.io/otel/trace v1.29.0/go.mod h1:eHl3w0sp3paPkYstJOmAimxhiFXPg+MMTlEh3nsQgWQ=
|
||||
go.opentelemetry.io/otel/trace v1.37.0 h1:HLdcFNbRQBE2imdSEgm/kwqmQj1Or1l/7bW6mxVK7z4=
|
||||
go.opentelemetry.io/otel/trace v1.37.0/go.mod h1:TlgrlQ+PtQO5XFerSPUYG0JSgGyryXewPGyayAWSBS0=
|
||||
golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8=
|
||||
golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
|
||||
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
|
||||
golang.org/x/crypto v0.40.0 h1:r4x+VvoG5Fm+eJcxMaY8CQM7Lb0l1lsmjGBQ6s8BfKM=
|
||||
golang.org/x/crypto v0.40.0/go.mod h1:Qr1vMER5WyS2dfPHAlsOj01wgLbsyWtFn/aY+5+ZdxY=
|
||||
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
|
||||
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||
golang.org/x/net v0.31.0 h1:68CPQngjLL0r2AlUKiSxtQFKvzRVbnzLwMUn5SzcLHo=
|
||||
golang.org/x/net v0.31.0/go.mod h1:P4fl1q7dY2hnZFxEk4pPSkDHF+QqjitcnDjUQyMM+pM=
|
||||
golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs=
|
||||
golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8=
|
||||
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
|
||||
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA=
|
||||
golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM=
|
||||
golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY=
|
||||
golang.org/x/text v0.27.0 h1:4fGWRpyh641NLlecmyl4LOe6yDdfaYNrGb2zdfo4JV4=
|
||||
golang.org/x/text v0.27.0/go.mod h1:1D28KMCvyooCX9hBiosv5Tz/+YLxj0j7XhWjpSUF7CU=
|
||||
golang.org/x/time v0.12.0 h1:ScB/8o8olJvc+CQPWrK3fPZNfh7qgwCrY0zJmoEQLSE=
|
||||
golang.org/x/time v0.12.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
|
11
handler.go
11
handler.go
@@ -2,6 +2,7 @@ package meta
|
||||
|
||||
import (
|
||||
"net"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -9,6 +10,10 @@ import (
|
||||
// and forwards accepted connections to the connCh channel.
|
||||
func (ml *MetaListener) handleListener(id string, listener net.Listener) {
|
||||
defer func() {
|
||||
// Recover from any panic to ensure WaitGroup.Done() is always called
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("PANIC in listener goroutine for %s: %v", id, r)
|
||||
}
|
||||
log.Printf("Listener goroutine for %s exiting", id)
|
||||
ml.listenerWg.Done()
|
||||
}()
|
||||
@@ -41,6 +46,12 @@ func (ml *MetaListener) handleListener(id string, listener net.Listener) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if the listener was closed (expected during shutdown)
|
||||
if atomic.LoadInt64(&ml.isClosed) != 0 {
|
||||
log.Printf("Listener %s closed during shutdown", id)
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("Permanent error in %s listener: %v, stopping", id, err)
|
||||
select {
|
||||
case ml.removeListenerCh <- id:
|
||||
|
28
listener.go
28
listener.go
@@ -4,6 +4,7 @@ import (
|
||||
"fmt"
|
||||
"net"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Accept implements the net.Listener Accept method.
|
||||
@@ -43,10 +44,10 @@ func (ml *MetaListener) Close() error {
|
||||
ml.mu.Lock()
|
||||
log.Printf("Closing MetaListener with %d listeners", len(ml.listeners))
|
||||
|
||||
// Signal all goroutines to stop
|
||||
// Signal all goroutines to stop first, before clearing listeners map
|
||||
close(ml.closeCh)
|
||||
|
||||
// Close all listeners
|
||||
// Close all listeners first to stop accepting new connections
|
||||
var errs []error
|
||||
for id, listener := range ml.listeners {
|
||||
if err := listener.Close(); err != nil {
|
||||
@@ -55,10 +56,29 @@ func (ml *MetaListener) Close() error {
|
||||
}
|
||||
}
|
||||
|
||||
// Clear the listeners map since they're all closed
|
||||
ml.listeners = make(map[string]net.Listener)
|
||||
|
||||
ml.mu.Unlock()
|
||||
|
||||
// Wait for all listener goroutines to exit
|
||||
ml.listenerWg.Wait()
|
||||
// Allow a brief grace period for handlers to finish processing current connections
|
||||
gracePeriod := 100 * time.Millisecond
|
||||
done := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
ml.listenerWg.Wait()
|
||||
close(done)
|
||||
}()
|
||||
|
||||
// Wait for either all goroutines to finish or grace period to expire
|
||||
select {
|
||||
case <-done:
|
||||
log.Printf("All listener goroutines exited gracefully")
|
||||
case <-time.After(gracePeriod):
|
||||
log.Printf("Grace period expired, waiting for remaining goroutines")
|
||||
// Wait for all listener goroutines to exit
|
||||
ml.listenerWg.Wait()
|
||||
}
|
||||
log.Printf("All listener goroutines have exited")
|
||||
|
||||
// Return combined errors if any
|
||||
|
@@ -33,6 +33,8 @@ type MetaListener struct {
|
||||
removeListenerCh chan string
|
||||
// isClosed indicates whether the meta listener has been closed (atomic)
|
||||
isClosed int64
|
||||
// isShuttingDown indicates whether WaitForShutdown has been called (atomic)
|
||||
isShuttingDown int64
|
||||
// mu protects concurrent access to the listener's state
|
||||
mu sync.RWMutex
|
||||
}
|
||||
@@ -52,7 +54,8 @@ func NewMetaListener() *MetaListener {
|
||||
removeListenerCh: make(chan string, 10), // Buffer for listener removal signals
|
||||
}
|
||||
|
||||
// Start the listener management goroutine
|
||||
// Start the listener management goroutine and track it
|
||||
ml.listenerWg.Add(1)
|
||||
go ml.manageListeners()
|
||||
|
||||
return ml
|
||||
@@ -73,13 +76,18 @@ func (ml *MetaListener) AddListener(id string, listener net.Listener) error {
|
||||
return ErrListenerClosed
|
||||
}
|
||||
|
||||
// Check if we're in shutdown mode (WaitForShutdown has been called)
|
||||
if atomic.LoadInt64(&ml.isShuttingDown) != 0 {
|
||||
return fmt.Errorf("cannot add listener during shutdown")
|
||||
}
|
||||
|
||||
if _, exists := ml.listeners[id]; exists {
|
||||
return fmt.Errorf("listener with ID '%s' already exists", id)
|
||||
}
|
||||
|
||||
ml.listeners[id] = listener
|
||||
|
||||
// Start a goroutine to handle connections from this listener
|
||||
// Add to WaitGroup immediately before starting goroutine to prevent race
|
||||
ml.listenerWg.Add(1)
|
||||
go ml.handleListener(id, listener)
|
||||
|
||||
@@ -128,6 +136,9 @@ func (ml *MetaListener) Count() int {
|
||||
// WaitForShutdown blocks until all listener goroutines have exited.
|
||||
// This is useful for ensuring clean shutdown in server applications.
|
||||
func (ml *MetaListener) WaitForShutdown(ctx context.Context) error {
|
||||
// Set shutdown flag to prevent new listeners from being added
|
||||
atomic.StoreInt64(&ml.isShuttingDown, 1)
|
||||
|
||||
done := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
@@ -145,6 +156,11 @@ func (ml *MetaListener) WaitForShutdown(ctx context.Context) error {
|
||||
|
||||
// manageListeners handles listener removal signals from handler goroutines
|
||||
func (ml *MetaListener) manageListeners() {
|
||||
defer func() {
|
||||
log.Printf("manageListeners goroutine exiting")
|
||||
ml.listenerWg.Done()
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ml.closeCh:
|
||||
|
@@ -1,8 +1,11 @@
|
||||
package meta
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -183,3 +186,166 @@ func TestAcceptRaceCondition(t *testing.T) {
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// TestShutdownRaceCondition tests that shutdown doesn't cause race conditions
|
||||
func TestShutdownRaceCondition(t *testing.T) {
|
||||
ml := NewMetaListener()
|
||||
|
||||
// Add multiple listeners
|
||||
for i := 0; i < 5; i++ {
|
||||
listener := newMockListener(fmt.Sprintf("127.0.0.1:%d", 8080+i))
|
||||
err := ml.AddListener(fmt.Sprintf("test%d", i), listener)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to add listener%d: %v", i, err)
|
||||
}
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// Start multiple goroutines that will try to accept connections
|
||||
for i := 0; i < 10; i++ {
|
||||
wg.Add(1)
|
||||
go func(id int) {
|
||||
defer wg.Done()
|
||||
conn, err := ml.Accept()
|
||||
if err != nil {
|
||||
// During shutdown, we expect ErrListenerClosed
|
||||
if err.Error() != ErrListenerClosed.Error() {
|
||||
t.Errorf("Goroutine %d: unexpected error: %v", id, err)
|
||||
}
|
||||
return
|
||||
}
|
||||
if conn != nil {
|
||||
conn.Close()
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
// Allow some time for goroutines to start
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
// Close the listener
|
||||
err := ml.Close()
|
||||
if err != nil {
|
||||
t.Errorf("Error closing MetaListener: %v", err)
|
||||
}
|
||||
|
||||
// Wait for all goroutines to complete
|
||||
wg.Wait()
|
||||
|
||||
// Verify the listener is properly closed
|
||||
if ml.Count() != 0 {
|
||||
t.Errorf("Expected 0 listeners after close, got %d", ml.Count())
|
||||
}
|
||||
}
|
||||
|
||||
// TestWaitGroupSynchronization tests that WaitGroup is properly synchronized
|
||||
func TestWaitGroupSynchronization(t *testing.T) {
|
||||
ml := NewMetaListener()
|
||||
defer ml.Close()
|
||||
|
||||
// Test 1: Verify panic recovery in handleListener doesn't break WaitGroup
|
||||
listener := newMockListener("127.0.0.1:8080")
|
||||
|
||||
// Add a listener
|
||||
err := ml.AddListener("panic-test", listener)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to add listener: %v", err)
|
||||
}
|
||||
|
||||
// Test 2: Verify we can't add listeners during shutdown
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
|
||||
defer cancel()
|
||||
|
||||
// Start WaitForShutdown in background
|
||||
go func() {
|
||||
ml.WaitForShutdown(ctx)
|
||||
}()
|
||||
|
||||
// Give WaitForShutdown a moment to set the shutdown flag
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
// Try to add another listener - should fail
|
||||
listener2 := newMockListener("127.0.0.1:8081")
|
||||
err = ml.AddListener("shutdown-test", listener2)
|
||||
if err == nil {
|
||||
t.Error("Expected error when adding listener during shutdown, got nil")
|
||||
}
|
||||
if !strings.Contains(err.Error(), "shutdown") {
|
||||
t.Errorf("Expected shutdown error, got: %v", err)
|
||||
}
|
||||
|
||||
// Close the MetaListener to finish the test cleanly
|
||||
ml.Close()
|
||||
}
|
||||
|
||||
// TestWaitGroupPanicRecovery tests that panics in handleListener are recovered
|
||||
func TestWaitGroupPanicRecovery(t *testing.T) {
|
||||
ml := NewMetaListener()
|
||||
defer ml.Close()
|
||||
|
||||
// Create a custom listener that will cause a panic
|
||||
panicListener := &panicMockListener{
|
||||
mockListener: newMockListener("127.0.0.1:8080"),
|
||||
}
|
||||
|
||||
// Add the panic-inducing listener
|
||||
err := ml.AddListener("panic-test", panicListener)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to add listener: %v", err)
|
||||
}
|
||||
|
||||
// Trigger the panic by sending a connection
|
||||
conn := &mockConn{}
|
||||
panicListener.connCh <- conn
|
||||
|
||||
// Wait a bit to let the panic occur and be recovered
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// The MetaListener should still be functional
|
||||
// Add another listener to verify WaitGroup is not broken
|
||||
normalListener := newMockListener("127.0.0.1:8081")
|
||||
err = ml.AddListener("normal-test", normalListener)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to add normal listener after panic: %v", err)
|
||||
}
|
||||
|
||||
// Verify we can still close cleanly
|
||||
err = ml.Close()
|
||||
if err != nil {
|
||||
t.Errorf("Error closing MetaListener after panic: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// panicMockListener is a mock listener that panics when Accept is called
|
||||
type panicMockListener struct {
|
||||
*mockListener
|
||||
}
|
||||
|
||||
func (p *panicMockListener) Accept() (net.Conn, error) {
|
||||
// First call the normal Accept to get a connection
|
||||
_, err := p.mockListener.Accept()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Then panic to test panic recovery
|
||||
panic("test panic in handleListener")
|
||||
}
|
||||
|
||||
// mockConn is a minimal implementation of net.Conn for testing
|
||||
type mockConn struct{}
|
||||
|
||||
func (m *mockConn) Read(b []byte) (n int, err error) { return 0, io.EOF }
|
||||
func (m *mockConn) Write(b []byte) (n int, err error) { return len(b), nil }
|
||||
func (m *mockConn) Close() error { return nil }
|
||||
func (m *mockConn) LocalAddr() net.Addr {
|
||||
return &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 8080}
|
||||
}
|
||||
|
||||
func (m *mockConn) RemoteAddr() net.Addr {
|
||||
return &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 8081}
|
||||
}
|
||||
func (m *mockConn) SetDeadline(t time.Time) error { return nil }
|
||||
func (m *mockConn) SetReadDeadline(t time.Time) error { return nil }
|
||||
func (m *mockConn) SetWriteDeadline(t time.Time) error { return nil }
|
||||
|
111
mirror/fixed_race_test.go
Normal file
111
mirror/fixed_race_test.go
Normal file
@@ -0,0 +1,111 @@
|
||||
package mirror
|
||||
|
||||
import (
|
||||
"os"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// TestFixedConcurrentListenDataRace tests that the data race fix works
|
||||
func TestFixedConcurrentListenDataRace(t *testing.T) {
|
||||
// Disable actual network listeners to focus on testing the map race condition
|
||||
os.Setenv("DISABLE_TOR", "true")
|
||||
os.Setenv("DISABLE_I2P", "true")
|
||||
defer func() {
|
||||
os.Unsetenv("DISABLE_TOR")
|
||||
os.Unsetenv("DISABLE_I2P")
|
||||
}()
|
||||
|
||||
// Create a Mirror instance
|
||||
mirror, err := NewMirror("test-fixed:3003")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create mirror: %v", err)
|
||||
}
|
||||
defer mirror.Close()
|
||||
|
||||
const numGoroutines = 10
|
||||
var wg sync.WaitGroup
|
||||
errorCh := make(chan error, numGoroutines)
|
||||
|
||||
// Launch multiple goroutines that call Listen() concurrently
|
||||
// With the fix, this should NOT trigger data races
|
||||
for i := 0; i < numGoroutines; i++ {
|
||||
wg.Add(1)
|
||||
go func(id int) {
|
||||
defer wg.Done()
|
||||
|
||||
// Use different addresses to avoid network conflicts
|
||||
addr := "test-fixed-" + string(rune('a'+id)) + ":5000"
|
||||
|
||||
// This should now be safe thanks to mutex protection
|
||||
listener, err := mirror.Listen(addr, "")
|
||||
if err != nil {
|
||||
errorCh <- err
|
||||
return
|
||||
}
|
||||
if listener != nil {
|
||||
// Clean close if successful
|
||||
listener.Close()
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
// Wait for all goroutines to complete
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(done)
|
||||
}()
|
||||
|
||||
// Wait with timeout
|
||||
select {
|
||||
case <-done:
|
||||
// Check for any errors
|
||||
close(errorCh)
|
||||
for err := range errorCh {
|
||||
if err != nil {
|
||||
t.Errorf("Error in concurrent Listen(): %v", err)
|
||||
}
|
||||
}
|
||||
t.Log("Test passed - no data races detected with fix")
|
||||
case <-time.After(30 * time.Second):
|
||||
t.Fatal("Test timed out - possible deadlock or hang")
|
||||
}
|
||||
}
|
||||
|
||||
// TestSequentialOperationsStillWork verifies sequential operations work after the fix
|
||||
func TestSequentialOperationsStillWork(t *testing.T) {
|
||||
// Disable actual network listeners
|
||||
os.Setenv("DISABLE_TOR", "true")
|
||||
os.Setenv("DISABLE_I2P", "true")
|
||||
defer func() {
|
||||
os.Unsetenv("DISABLE_TOR")
|
||||
os.Unsetenv("DISABLE_I2P")
|
||||
}()
|
||||
|
||||
mirror, err := NewMirror("test-sequential:3004")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create mirror: %v", err)
|
||||
}
|
||||
defer mirror.Close()
|
||||
|
||||
// Sequential calls should work fine
|
||||
listener1, err := mirror.Listen("test-seq-1:3004", "")
|
||||
if err != nil {
|
||||
t.Fatalf("First Listen() failed: %v", err)
|
||||
}
|
||||
if listener1 != nil {
|
||||
listener1.Close()
|
||||
}
|
||||
|
||||
listener2, err := mirror.Listen("test-seq-2:3004", "")
|
||||
if err != nil {
|
||||
t.Fatalf("Second Listen() failed: %v", err)
|
||||
}
|
||||
if listener2 != nil {
|
||||
listener2.Close()
|
||||
}
|
||||
|
||||
t.Log("Sequential operations work correctly after fix")
|
||||
}
|
@@ -3,6 +3,7 @@ package mirror
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"io"
|
||||
"net"
|
||||
@@ -10,6 +11,41 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// copyWithContextCancel copies data from src to dst with context cancellation support.
|
||||
// Returns when context is cancelled, src is exhausted, or an error occurs.
|
||||
func copyWithContextCancel(ctx context.Context, dst io.Writer, src io.Reader) error {
|
||||
buf := make([]byte, 32*1024)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
// Set a short read timeout for responsiveness to context cancellation
|
||||
if conn, ok := src.(net.Conn); ok {
|
||||
conn.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
|
||||
}
|
||||
|
||||
n, err := src.Read(buf)
|
||||
if n > 0 {
|
||||
if _, writeErr := dst.Write(buf[:n]); writeErr != nil {
|
||||
return writeErr
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
|
||||
continue // Retry on timeout to check context
|
||||
}
|
||||
if err != io.EOF {
|
||||
return err
|
||||
}
|
||||
return nil // EOF reached, copy complete
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// AddHeaders adds headers to the connection.
|
||||
// It takes a net.Conn and a map of headers as input.
|
||||
// It only adds headers if the connection is an HTTP connection.
|
||||
@@ -34,12 +70,38 @@ func AddHeaders(conn net.Conn, headers map[string]string) net.Conn {
|
||||
// Create a pipe to connect our modified request with the output
|
||||
pr, pw := io.Pipe()
|
||||
|
||||
// Write the modified request to one end of the pipe
|
||||
// Write the modified request to one end of the pipe with timeout protection
|
||||
go func() {
|
||||
req.Write(pw)
|
||||
// Then copy the rest of the original connection
|
||||
io.Copy(pw, conn)
|
||||
pw.Close()
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("PANIC in header processing goroutine: %v", r)
|
||||
}
|
||||
pw.Close()
|
||||
}()
|
||||
|
||||
// Create context with timeout for the entire operation
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
// Set a deadline for the connection to prevent indefinite blocking
|
||||
if deadline, ok := conn.(interface{ SetDeadline(time.Time) error }); ok {
|
||||
deadline.SetDeadline(time.Now().Add(30 * time.Second))
|
||||
}
|
||||
|
||||
// Write the modified request
|
||||
if err := req.Write(pw); err != nil {
|
||||
log.Printf("Error writing modified request: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Copy remaining data with context-aware copying to prevent goroutine leak
|
||||
err := copyWithContextCancel(ctx, pw, conn)
|
||||
if err != nil && err != context.Canceled && err != context.DeadlineExceeded {
|
||||
log.Printf("Error copying connection data: %v", err)
|
||||
}
|
||||
if ctx.Err() != nil {
|
||||
log.Printf("Header processing goroutine timed out after 30 seconds")
|
||||
}
|
||||
}()
|
||||
|
||||
// Return a ReadWriter that reads from our pipe and writes to the original connection
|
||||
|
@@ -5,6 +5,7 @@ import (
|
||||
"net"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/go-i2p/go-meta-listener"
|
||||
"github.com/go-i2p/go-meta-listener/tcp"
|
||||
@@ -15,6 +16,7 @@ import (
|
||||
|
||||
type Mirror struct {
|
||||
*meta.MetaListener
|
||||
mu sync.RWMutex // protects Onions and Garlics maps
|
||||
Onions map[string]*onramp.Onion
|
||||
Garlics map[string]*onramp.Garlic
|
||||
}
|
||||
@@ -28,6 +30,10 @@ func (m *Mirror) Close() error {
|
||||
} else {
|
||||
log.Println("MetaListener closed")
|
||||
}
|
||||
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
for _, onion := range m.Onions {
|
||||
if err := onion.Close(); err != nil {
|
||||
log.Println("Error closing Onion:", err)
|
||||
@@ -89,7 +95,6 @@ func NewMirror(name string) (*Mirror, error) {
|
||||
|
||||
func (ml Mirror) Listen(name, addr string) (net.Listener, error) {
|
||||
log.Println("Starting Mirror Listener")
|
||||
log.Printf("Actual args: name: '%s' addr: '%s' certDir: '%s' hiddenTls: '%t'\n", name, addr, certDir(), hiddenTls)
|
||||
// get the port:
|
||||
_, port, err := net.SplitHostPort(name)
|
||||
if err != nil {
|
||||
@@ -100,6 +105,7 @@ func (ml Mirror) Listen(name, addr string) (net.Listener, error) {
|
||||
port = "3000"
|
||||
}
|
||||
hiddenTls := hiddenTls(port)
|
||||
log.Printf("Actual args: name: '%s' addr: '%s' certDir: '%s' hiddenTls: '%t'\n", name, addr, certDir(), hiddenTls)
|
||||
localAddr := net.JoinHostPort("127.0.0.1", port)
|
||||
listener, err := net.Listen("tcp", localAddr)
|
||||
if err != nil {
|
||||
@@ -118,6 +124,10 @@ func (ml Mirror) Listen(name, addr string) (net.Listener, error) {
|
||||
log.Println("Checking for existing onion and garlic listeners")
|
||||
listenerId := fmt.Sprintf("metalistener-%s-%s", name, port)
|
||||
log.Println("Listener ID:", listenerId)
|
||||
|
||||
// Protect map access with mutex to prevent data race
|
||||
ml.mu.Lock()
|
||||
|
||||
// Check if onion and garlic listeners already exist
|
||||
if ml.Onions[port] == nil && !DisableTor() {
|
||||
// make a new onion listener
|
||||
@@ -125,6 +135,7 @@ func (ml Mirror) Listen(name, addr string) (net.Listener, error) {
|
||||
log.Println("Creating new onion listener")
|
||||
onion, err := onramp.NewOnion(listenerId)
|
||||
if err != nil {
|
||||
ml.mu.Unlock()
|
||||
return nil, err
|
||||
}
|
||||
log.Println("Onion listener created for port", port)
|
||||
@@ -136,16 +147,23 @@ func (ml Mirror) Listen(name, addr string) (net.Listener, error) {
|
||||
log.Println("Creating new garlic listener")
|
||||
garlic, err := onramp.NewGarlic(listenerId, "127.0.0.1:7656", onramp.OPT_WIDE)
|
||||
if err != nil {
|
||||
ml.mu.Unlock()
|
||||
return nil, err
|
||||
}
|
||||
log.Println("Garlic listener created for port", port)
|
||||
ml.Garlics[port] = garlic
|
||||
}
|
||||
|
||||
ml.mu.Unlock()
|
||||
if hiddenTls {
|
||||
// make sure an onion and a garlic listener exist at ml.Onions[port] and ml.Garlics[port]
|
||||
// and listen on them, check existence first
|
||||
if !DisableTor() {
|
||||
onionListener, err := ml.Onions[port].ListenTLS()
|
||||
ml.mu.RLock()
|
||||
onionInstance := ml.Onions[port]
|
||||
ml.mu.RUnlock()
|
||||
|
||||
onionListener, err := onionInstance.ListenTLS()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -156,7 +174,11 @@ func (ml Mirror) Listen(name, addr string) (net.Listener, error) {
|
||||
log.Printf("OnionTLS listener added https://%s\n", onionListener.Addr())
|
||||
}
|
||||
if !DisableI2P() {
|
||||
garlicListener, err := ml.Garlics[port].ListenTLS()
|
||||
ml.mu.RLock()
|
||||
garlicInstance := ml.Garlics[port]
|
||||
ml.mu.RUnlock()
|
||||
|
||||
garlicListener, err := garlicInstance.ListenTLS()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -168,7 +190,11 @@ func (ml Mirror) Listen(name, addr string) (net.Listener, error) {
|
||||
}
|
||||
} else {
|
||||
if !DisableTor() {
|
||||
onionListener, err := ml.Onions[port].Listen()
|
||||
ml.mu.RLock()
|
||||
onionInstance := ml.Onions[port]
|
||||
ml.mu.RUnlock()
|
||||
|
||||
onionListener, err := onionInstance.Listen()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -179,7 +205,11 @@ func (ml Mirror) Listen(name, addr string) (net.Listener, error) {
|
||||
log.Printf("Onion listener added http://%s\n", onionListener.Addr())
|
||||
}
|
||||
if !DisableI2P() {
|
||||
garlicListener, err := ml.Garlics[port].Listen()
|
||||
ml.mu.RLock()
|
||||
garlicInstance := ml.Garlics[port]
|
||||
ml.mu.RUnlock()
|
||||
|
||||
garlicListener, err := garlicInstance.Listen()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
94
mirror/listener_race_test.go
Normal file
94
mirror/listener_race_test.go
Normal file
@@ -0,0 +1,94 @@
|
||||
package mirror
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// TestConcurrentListenDataRace tests for data race in concurrent Listen() calls
|
||||
func TestConcurrentListenDataRace(t *testing.T) {
|
||||
// Enable race detector with go test -race
|
||||
// This test reproduces the data race in Mirror.Listen()
|
||||
|
||||
mirror, err := NewMirror("test-mirror:3001")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create mirror: %v", err)
|
||||
}
|
||||
defer mirror.Close()
|
||||
|
||||
const numGoroutines = 10
|
||||
var wg sync.WaitGroup
|
||||
errorCh := make(chan error, numGoroutines)
|
||||
// Launch multiple goroutines that call Listen() concurrently
|
||||
// This should trigger the data race on ml.Onions and ml.Garlics maps
|
||||
for i := 0; i < numGoroutines; i++ {
|
||||
wg.Add(1)
|
||||
go func(id int) {
|
||||
defer wg.Done()
|
||||
|
||||
// Use different ports to avoid "address already in use" errors
|
||||
// The race condition occurs on the map access, not the network binding
|
||||
port := 4000 + id
|
||||
addr := "test-" + string(rune('a'+id)) + ":" + string(rune('0'+port%10))
|
||||
|
||||
// This should cause concurrent map access on ml.Onions and ml.Garlics
|
||||
listener, err := mirror.Listen(addr, "")
|
||||
if err != nil {
|
||||
errorCh <- err
|
||||
return
|
||||
}
|
||||
if listener != nil {
|
||||
// Clean close if successful
|
||||
listener.Close()
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
// Wait for all goroutines to complete
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(done)
|
||||
}()
|
||||
|
||||
// Wait with timeout
|
||||
select {
|
||||
case <-done:
|
||||
// Check for any errors
|
||||
close(errorCh)
|
||||
for err := range errorCh {
|
||||
if err != nil {
|
||||
t.Errorf("Error in concurrent Listen(): %v", err)
|
||||
}
|
||||
}
|
||||
case <-time.After(30 * time.Second):
|
||||
t.Fatal("Test timed out - possible deadlock or hang")
|
||||
}
|
||||
}
|
||||
|
||||
// TestSequentialListenWorks verifies that sequential Listen calls work correctly
|
||||
func TestSequentialListenWorks(t *testing.T) {
|
||||
mirror, err := NewMirror("test-sequential:3002")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create mirror: %v", err)
|
||||
}
|
||||
defer mirror.Close()
|
||||
|
||||
// Sequential calls should work fine
|
||||
listener1, err := mirror.Listen("test-seq-1:3002", "")
|
||||
if err != nil {
|
||||
t.Fatalf("First Listen() failed: %v", err)
|
||||
}
|
||||
if listener1 != nil {
|
||||
listener1.Close()
|
||||
}
|
||||
|
||||
listener2, err := mirror.Listen("test-seq-2:3002", "")
|
||||
if err != nil {
|
||||
t.Fatalf("Second Listen() failed: %v", err)
|
||||
}
|
||||
if listener2 != nil {
|
||||
listener2.Close()
|
||||
}
|
||||
}
|
60
mirror/map_race_test.go
Normal file
60
mirror/map_race_test.go
Normal file
@@ -0,0 +1,60 @@
|
||||
package mirror
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/go-i2p/onramp"
|
||||
)
|
||||
|
||||
// TestConcurrentMapAccessDataRace tests for data race in concurrent map access
|
||||
func TestConcurrentMapAccessDataRace(t *testing.T) {
|
||||
// This test directly reproduces the data race on the maps
|
||||
// Create a Mirror with empty maps like in Listen()
|
||||
mirror := &Mirror{
|
||||
Onions: make(map[string]*onramp.Onion),
|
||||
Garlics: make(map[string]*onramp.Garlic),
|
||||
}
|
||||
|
||||
const numGoroutines = 20
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// Simulate the exact race condition from Listen() method
|
||||
for i := 0; i < numGoroutines; i++ {
|
||||
wg.Add(1)
|
||||
go func(id int) {
|
||||
defer wg.Done()
|
||||
|
||||
// Use the same port to force the race condition
|
||||
port := "8080"
|
||||
|
||||
// This replicates the exact problematic code from Listen():
|
||||
// Simulate the read-check-write pattern from Listen()
|
||||
if mirror.Onions[port] == nil && !DisableTor() {
|
||||
onion, _ := onramp.NewOnion("test-onion")
|
||||
mirror.Onions[port] = onion // Concurrent write to map
|
||||
}
|
||||
|
||||
if mirror.Garlics[port] == nil && !DisableI2P() {
|
||||
garlic, _ := onramp.NewGarlic("test-garlic", "127.0.0.1:7656", onramp.OPT_WIDE)
|
||||
mirror.Garlics[port] = garlic // Concurrent write to map
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
// Wait for all goroutines to complete
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(done)
|
||||
}()
|
||||
|
||||
// Wait with timeout
|
||||
select {
|
||||
case <-done:
|
||||
// Test completed - race detector should catch any issues
|
||||
case <-time.After(10 * time.Second):
|
||||
t.Fatal("Test timed out")
|
||||
}
|
||||
}
|
@@ -1,14 +1,177 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
"os/signal"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/go-i2p/go-meta-listener/mirror"
|
||||
)
|
||||
|
||||
const (
|
||||
maxConcurrentConnections = 100 // Limit concurrent connections
|
||||
connectionTimeout = 30 * time.Second
|
||||
shutdownTimeout = 5 * time.Second
|
||||
)
|
||||
|
||||
// connectionPool manages concurrent connections with proper lifecycle
|
||||
type connectionPool struct {
|
||||
semaphore chan struct{}
|
||||
activeConns sync.WaitGroup
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
func newConnectionPool(maxConns int) *connectionPool {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
return &connectionPool{
|
||||
semaphore: make(chan struct{}, maxConns),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
}
|
||||
|
||||
func (cp *connectionPool) handleConnection(clientConn net.Conn, targetHost string, targetPort int) {
|
||||
// Acquire semaphore slot or block
|
||||
select {
|
||||
case cp.semaphore <- struct{}{}:
|
||||
// Got slot, continue
|
||||
case <-cp.ctx.Done():
|
||||
clientConn.Close()
|
||||
return
|
||||
}
|
||||
|
||||
// Track active connection
|
||||
cp.activeConns.Add(1)
|
||||
|
||||
// Handle connection in separate goroutine
|
||||
go func() {
|
||||
defer func() {
|
||||
<-cp.semaphore // Release semaphore slot
|
||||
cp.activeConns.Done()
|
||||
clientConn.Close()
|
||||
}()
|
||||
|
||||
// Set connection timeout
|
||||
clientConn.SetDeadline(time.Now().Add(connectionTimeout))
|
||||
|
||||
// Connect to target with timeout
|
||||
serverConn, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%d", targetHost, targetPort), 10*time.Second)
|
||||
if err != nil {
|
||||
log.Printf("Failed to connect to target %s:%d: %v", targetHost, targetPort, err)
|
||||
return
|
||||
}
|
||||
defer serverConn.Close()
|
||||
|
||||
// Set timeout on server connection
|
||||
serverConn.SetDeadline(time.Now().Add(connectionTimeout))
|
||||
|
||||
// Create context for this connection
|
||||
connCtx, connCancel := context.WithCancel(cp.ctx)
|
||||
defer connCancel()
|
||||
|
||||
// Forward data bidirectionally with proper error handling
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2)
|
||||
|
||||
// Client to server
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if _, err := copyWithContext(connCtx, serverConn, clientConn); err != nil && err != io.EOF {
|
||||
log.Printf("Error copying client to server: %v", err)
|
||||
}
|
||||
// Close server write side to signal completion
|
||||
if tcpConn, ok := serverConn.(*net.TCPConn); ok {
|
||||
tcpConn.CloseWrite()
|
||||
}
|
||||
}()
|
||||
|
||||
// Server to client
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if _, err := copyWithContext(connCtx, clientConn, serverConn); err != nil && err != io.EOF {
|
||||
log.Printf("Error copying server to client: %v", err)
|
||||
}
|
||||
// Close client write side to signal completion
|
||||
if tcpConn, ok := clientConn.(*net.TCPConn); ok {
|
||||
tcpConn.CloseWrite()
|
||||
}
|
||||
}()
|
||||
|
||||
// Wait for either copy operation to complete or context cancellation
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(done)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
// Normal completion
|
||||
case <-connCtx.Done():
|
||||
// Context cancelled, connections will be closed by defers
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (cp *connectionPool) shutdown() {
|
||||
cp.cancel()
|
||||
cp.activeConns.Wait()
|
||||
}
|
||||
|
||||
// copyWithContext copies data between connections with context cancellation support
|
||||
func copyWithContext(ctx context.Context, dst, src net.Conn) (int64, error) {
|
||||
// Use a small buffer for responsive cancellation
|
||||
buf := make([]byte, 32*1024)
|
||||
var written int64
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return written, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
// Set short read timeout for responsiveness
|
||||
src.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
|
||||
nr, er := src.Read(buf)
|
||||
if nr > 0 {
|
||||
nw, ew := dst.Write(buf[0:nr])
|
||||
if nw < 0 || nr < nw {
|
||||
nw = 0
|
||||
if ew == nil {
|
||||
ew = fmt.Errorf("invalid write count")
|
||||
}
|
||||
}
|
||||
written += int64(nw)
|
||||
if ew != nil {
|
||||
return written, ew
|
||||
}
|
||||
if nr != nw {
|
||||
return written, io.ErrShortWrite
|
||||
}
|
||||
}
|
||||
if er != nil {
|
||||
if netErr, ok := er.(net.Error); ok && netErr.Timeout() {
|
||||
continue // Retry on timeout
|
||||
}
|
||||
if er != io.EOF {
|
||||
return written, er
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
return written, nil
|
||||
}
|
||||
|
||||
// main function sets up a meta listener that forwards connections to a specified host and port.
|
||||
// It listens for incoming connections and forwards them to the specified destination.
|
||||
func main() {
|
||||
@@ -19,31 +182,74 @@ func main() {
|
||||
email := flag.String("email", "", "Email address for Let's Encrypt registration")
|
||||
certDir := flag.String("certdir", "./certs", "Directory for storing certificates")
|
||||
hiddenTls := flag.Bool("hidden-tls", false, "Enable hidden TLS")
|
||||
maxConns := flag.Int("max-conns", maxConcurrentConnections, "Maximum concurrent connections")
|
||||
flag.Parse()
|
||||
|
||||
mirror.CERT_DIR = *certDir
|
||||
mirror.HIDDEN_TLS = *hiddenTls
|
||||
addr := net.JoinHostPort(*domain, fmt.Sprintf("%d", *listenPort))
|
||||
|
||||
// Create connection pool with specified limits
|
||||
pool := newConnectionPool(*maxConns)
|
||||
defer pool.shutdown()
|
||||
|
||||
// Create a new meta listener
|
||||
metaListener, err := mirror.Listen(addr, *email)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
log.Fatalf("Failed to create meta listener: %v", err)
|
||||
}
|
||||
defer metaListener.Close()
|
||||
// forward all connections recieved on the meta listener to a local host:port
|
||||
for {
|
||||
conn, err := metaListener.Accept()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
go func() {
|
||||
defer conn.Close()
|
||||
localConn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", *host, *port))
|
||||
|
||||
// Set up graceful shutdown
|
||||
sigCh := make(chan os.Signal, 1)
|
||||
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
|
||||
|
||||
log.Printf("Proxy server starting, forwarding to %s:%d (max concurrent connections: %d)", *host, *port, *maxConns)
|
||||
|
||||
// Start accepting connections in a separate goroutine
|
||||
go func() {
|
||||
for {
|
||||
conn, err := metaListener.Accept()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
// Check if this is due to shutdown
|
||||
select {
|
||||
case <-pool.ctx.Done():
|
||||
log.Println("Shutting down connection accept loop")
|
||||
return
|
||||
default:
|
||||
log.Printf("Error accepting connection: %v", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
defer localConn.Close()
|
||||
go io.Copy(localConn, conn)
|
||||
io.Copy(conn, localConn)
|
||||
}()
|
||||
|
||||
log.Printf("Accepted connection from %s", conn.RemoteAddr())
|
||||
pool.handleConnection(conn, *host, *port)
|
||||
}
|
||||
}()
|
||||
|
||||
// Wait for shutdown signal
|
||||
<-sigCh
|
||||
log.Println("Shutdown signal received, stopping proxy...")
|
||||
|
||||
// Close listener to stop accepting new connections
|
||||
metaListener.Close()
|
||||
|
||||
// Shutdown connection pool with timeout
|
||||
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), shutdownTimeout)
|
||||
defer shutdownCancel()
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
pool.shutdown()
|
||||
close(done)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
log.Println("All connections closed gracefully")
|
||||
case <-shutdownCtx.Done():
|
||||
log.Println("Shutdown timeout exceeded, forcing exit")
|
||||
}
|
||||
|
||||
log.Println("Proxy server stopped")
|
||||
}
|
||||
|
Reference in New Issue
Block a user