4 Commits

Author SHA1 Message Date
eyedeekay
1f83a33823 Start integrating I2NP Message Processing with actual NetDB 2025-08-19 19:27:40 -04:00
eyedeekay
7f89641358 refactor complex DLM function 2025-08-19 18:13:12 -04:00
eyedeekay
1580a192ed Refactor router constructor 2025-08-19 18:07:08 -04:00
eyedeekay
804468cbb8 refactor readDeliveryInstructions 2025-08-19 18:00:56 -04:00
8 changed files with 314 additions and 103 deletions

View File

@@ -12,9 +12,9 @@ please keep up with these changes, as they will not be backward compatible and r
### Implemented Features
- Clients
- [ ] Datagrams
- [ ] I2CP
- [ ] Message routing
- [ ] Datagrams
- [ ] Streaming
- [Cryptographic primitives(see also: https://github.com/go-i2p/crypto)](https://github.com/go-i2p/crypto)
- Signing

View File

@@ -416,7 +416,7 @@ DatabaseManager demonstrates database-related interface usage
#### func NewDatabaseManager
```go
func NewDatabaseManager() *DatabaseManager
func NewDatabaseManager(netdb NetDBStore) *DatabaseManager
```
NewDatabaseManager creates a new database manager

View File

@@ -203,71 +203,110 @@ func ReadDatabaseLookup(data []byte) (DatabaseLookup, error) {
log.Debug("Reading DatabaseLookup")
databaseLookup := DatabaseLookup{}
if err := parseBasicFields(&databaseLookup, data); err != nil {
return databaseLookup, err
}
if err := parseVariableFields(&databaseLookup, data); err != nil {
return databaseLookup, err
}
if err := parseEncryptionFields(&databaseLookup, data); err != nil {
return databaseLookup, err
}
log.Debug("DatabaseLookup read successfully")
return databaseLookup, nil
}
// parseBasicFields extracts the fixed-size basic fields from the database lookup data.
func parseBasicFields(databaseLookup *DatabaseLookup, data []byte) error {
length, key, err := readDatabaseLookupKey(data)
if err != nil {
log.WithError(err).Error("Failed to read Key")
return databaseLookup, err
return err
}
databaseLookup.Key = key
length, from, err := readDatabaseLookupFrom(length, data)
if err != nil {
log.WithError(err).Error("Failed to read From")
return databaseLookup, err
return err
}
databaseLookup.From = from
length, flags, err := readDatabaseLookupFlags(length, data)
if err != nil {
log.WithError(err).Error("Failed to read Flags")
return databaseLookup, err
return err
}
databaseLookup.Flags = flags
length, replyTunnelID, err := readDatabaseLookupReplyTunnelID(flags, length, data)
_, replyTunnelID, err := readDatabaseLookupReplyTunnelID(flags, length, data)
if err != nil {
log.WithError(err).Error("Failed to read ReplyTunnelID")
return databaseLookup, err
return err
}
databaseLookup.ReplyTunnelID = replyTunnelID
length, size, err := readDatabaseLookupSize(length, data)
return nil
}
// parseVariableFields extracts the variable-size fields including excluded peers.
func parseVariableFields(databaseLookup *DatabaseLookup, data []byte) error {
// Calculate length offset after basic fields
length := 32 + 32 + 1 // Key + From + Flags
if databaseLookup.Flags&1 == 1 {
length += 4 // ReplyTunnelID
}
lengthAfter, size, err := readDatabaseLookupSize(length, data)
if err != nil {
log.WithError(err).Error("Failed to read Size")
return databaseLookup, err
return err
}
databaseLookup.Size = size
length, excludedPeers, err := readDatabaseLookupExcludedPeers(length, data, size)
_, excludedPeers, err := readDatabaseLookupExcludedPeers(lengthAfter, data, size)
if err != nil {
log.WithError(err).Error("Failed to read ExcludedPeers")
return databaseLookup, err
return err
}
databaseLookup.ExcludedPeers = excludedPeers
length, reply_key, err := readDatabaseLookupReplyKey(length, data)
return nil
}
// parseEncryptionFields extracts the encryption-related fields from the database lookup data.
func parseEncryptionFields(databaseLookup *DatabaseLookup, data []byte) error {
// Calculate length offset after basic and variable fields
length := 32 + 32 + 1 + 2 + (databaseLookup.Size * 32) // Key + From + Flags + Size + ExcludedPeers
if databaseLookup.Flags&1 == 1 {
length += 4 // ReplyTunnelID
}
lengthAfter, replyKey, err := readDatabaseLookupReplyKey(length, data)
if err != nil {
log.WithError(err).Error("Failed to read ReplyKey")
return databaseLookup, err
return err
}
databaseLookup.ReplyKey = reply_key
databaseLookup.ReplyKey = replyKey
length, tags, err := readDatabaseLookupTags(length, data)
lengthAfter, tags, err := readDatabaseLookupTags(lengthAfter, data)
if err != nil {
log.WithError(err).Error("Failed to read Tags")
return databaseLookup, err
return err
}
databaseLookup.Tags = tags
length, reply_tags, err := readDatabaseLookupReplyTags(length, data, tags)
_, replyTags, err := readDatabaseLookupReplyTags(lengthAfter, data, tags)
if err != nil {
log.WithError(err).Error("Failed to read ReplyTags")
return databaseLookup, err
return err
}
databaseLookup.ReplyTags = reply_tags
databaseLookup.ReplyTags = replyTags
log.Debug("DatabaseLookup read successfully")
return databaseLookup, nil
return nil
}
func readDatabaseLookupKey(data []byte) (int, common.Hash, error) {

View File

@@ -112,7 +112,7 @@ func TestTunnelManager(t *testing.T) {
}
func TestDatabaseManager(t *testing.T) {
manager := NewDatabaseManager()
manager := NewDatabaseManager(nil)
// Test database lookup
key := common.Hash{}
@@ -141,7 +141,7 @@ func TestDatabaseManager(t *testing.T) {
}
err = manager.StoreData(store)
assert.NoError(t, err)
assert.Error(t, err) // Expect error since NetDB is nil
assert.Equal(t, key, store.GetStoreKey())
assert.Equal(t, storeData, store.GetStoreData())
assert.Equal(t, byte(0x00), store.GetStoreType())

View File

@@ -99,11 +99,20 @@ func (tm *TunnelManager) ProcessTunnelReply(handler TunnelReplyHandler) error {
}
// DatabaseManager demonstrates database-related interface usage
type DatabaseManager struct{}
type DatabaseManager struct {
netdb NetDBStore
}
// NewDatabaseManager creates a new database manager
func NewDatabaseManager() *DatabaseManager {
return &DatabaseManager{}
// NetDBStore defines the interface for storing RouterInfo entries
type NetDBStore interface {
StoreRouterInfo(key common.Hash, data []byte, dataType byte) error
}
// NewDatabaseManager creates a new database manager with NetDB integration
func NewDatabaseManager(netdb NetDBStore) *DatabaseManager {
return &DatabaseManager{
netdb: netdb,
}
}
// PerformLookup performs a database lookup using DatabaseReader interface
@@ -118,7 +127,7 @@ func (dm *DatabaseManager) PerformLookup(reader DatabaseReader) error {
return nil
}
// StoreData stores data using DatabaseWriter interface
// StoreData stores data using DatabaseWriter interface and NetDB integration
func (dm *DatabaseManager) StoreData(writer DatabaseWriter) error {
key := writer.GetStoreKey()
data := writer.GetStoreData()
@@ -127,7 +136,11 @@ func (dm *DatabaseManager) StoreData(writer DatabaseWriter) error {
fmt.Printf("Storing %d bytes of type %d for key %x\n",
len(data), dataType, key[:8])
return nil
if dm.netdb != nil {
return dm.netdb.StoreRouterInfo(key, data, dataType)
}
return fmt.Errorf("no NetDB available for storage")
}
// SessionManager demonstrates session-related interface usage
@@ -189,12 +202,17 @@ func NewMessageRouter(config MessageRouterConfig) *MessageRouter {
return &MessageRouter{
config: config,
processor: NewMessageProcessor(),
dbManager: NewDatabaseManager(),
dbManager: NewDatabaseManager(nil), // Will be set later via SetNetDB
tunnelMgr: NewTunnelManager(),
sessionMgr: NewSessionManager(),
}
}
// SetNetDB sets the NetDB store for database operations
func (mr *MessageRouter) SetNetDB(netdb NetDBStore) {
mr.dbManager = NewDatabaseManager(netdb)
}
// RouteMessage routes messages based on their interfaces
func (mr *MessageRouter) RouteMessage(msg I2NPMessage) error {
// Log message if enabled

View File

@@ -432,6 +432,65 @@ func (db *StdNetDB) updateCacheAfterReseed() error {
return nil
}
// StoreRouterInfo stores a RouterInfo entry in the database from I2NP DatabaseStore message
func (db *StdNetDB) StoreRouterInfo(key common.Hash, data []byte, dataType byte) error {
log.WithField("hash", key).Debug("Storing RouterInfo from DatabaseStore message")
// Validate data type - should be RouterInfo (type 0)
if dataType != 0 {
log.WithField("type", dataType).Warn("Invalid data type for RouterInfo, expected 0")
return fmt.Errorf("invalid data type for RouterInfo: expected 0, got %d", dataType)
}
// Parse the RouterInfo from the data
ri, _, err := router_info.ReadRouterInfo(data)
if err != nil {
log.WithError(err).Error("Failed to parse RouterInfo from DatabaseStore data")
return fmt.Errorf("failed to parse RouterInfo: %w", err)
}
// Verify the key matches the RouterInfo identity hash
expectedHash := ri.IdentHash()
if key != expectedHash {
log.WithFields(logrus.Fields{
"expected_hash": expectedHash,
"provided_key": key,
}).Error("RouterInfo hash mismatch")
return fmt.Errorf("RouterInfo hash mismatch: expected %x, got %x", expectedHash, key)
}
// Check if we already have this RouterInfo in memory
db.riMutex.Lock()
if _, exists := db.RouterInfos[key]; exists {
db.riMutex.Unlock()
log.WithField("hash", key).Debug("RouterInfo already exists in memory, skipping")
return nil
}
// Add to in-memory cache
db.RouterInfos[key] = Entry{
RouterInfo: &ri,
}
db.riMutex.Unlock()
// Save to filesystem
entry := &Entry{
RouterInfo: &ri,
}
if err := db.SaveEntry(entry); err != nil {
log.WithError(err).Error("Failed to save RouterInfo to filesystem")
// Remove from memory if filesystem save failed
db.riMutex.Lock()
delete(db.RouterInfos, key)
db.riMutex.Unlock()
return fmt.Errorf("failed to save RouterInfo to filesystem: %w", err)
}
log.WithField("hash", key).Debug("Successfully stored RouterInfo")
return nil
}
// ensure that the network database exists
func (db *StdNetDB) Ensure() (err error) {
if !db.Exists() {

View File

@@ -7,7 +7,9 @@ import (
"time"
"github.com/go-i2p/common/base32"
"github.com/go-i2p/common/router_info"
"github.com/go-i2p/go-i2p/lib/bootstrap"
"github.com/go-i2p/go-i2p/lib/i2np"
ntcp "github.com/go-i2p/go-i2p/lib/transport/ntcp2"
"github.com/go-i2p/logger"
@@ -29,6 +31,8 @@ type Router struct {
*transport.TransportMuxer
// netdb
*netdb.StdNetDB
// message router for processing I2NP messages
messageRouter *i2np.MessageRouter
// router configuration
cfg *config.RouterConfig
// close channel
@@ -40,76 +44,112 @@ type Router struct {
// CreateRouter creates a router with the provided configuration
func CreateRouter(cfg *config.RouterConfig) (*Router, error) {
log.Debug("Creating router with provided configuration")
r, err := FromConfig(cfg)
if err != nil {
log.WithError(err).Error("Failed to create router from configuration")
return nil, err
} else {
log.Debug("Router created successfully with provided configuration")
}
r.RouterInfoKeystore, err = keys.NewRouterInfoKeystore(cfg.WorkingDir, "localRouter")
log.Debug("Router created successfully with provided configuration")
if err := initializeRouterKeystore(r, cfg); err != nil {
return nil, err
}
if err := validateRouterKeys(r); err != nil {
return nil, err
}
ri, err := constructRouterInfo(r)
if err != nil {
return nil, err
}
if err := setupNTCP2Transport(r, ri); err != nil {
return nil, err
}
return r, nil
}
// initializeRouterKeystore creates and stores the router keystore
func initializeRouterKeystore(r *Router, cfg *config.RouterConfig) error {
log.Debug("Working directory is:", cfg.WorkingDir)
keystore, err := keys.NewRouterInfoKeystore(cfg.WorkingDir, "localRouter")
if err != nil {
log.WithError(err).Error("Failed to create RouterInfoKeystore")
return nil, err
} else {
log.Debug("RouterInfoKeystore created successfully")
if err = r.RouterInfoKeystore.StoreKeys(); err != nil {
log.WithError(err).Error("Failed to store RouterInfoKeystore")
return nil, err
} else {
log.Debug("RouterInfoKeystore stored successfully")
}
return err
}
log.Debug("RouterInfoKeystore created successfully")
if err = keystore.StoreKeys(); err != nil {
log.WithError(err).Error("Failed to store RouterInfoKeystore")
return err
}
log.Debug("RouterInfoKeystore stored successfully")
r.RouterInfoKeystore = keystore
return nil
}
// validateRouterKeys extracts and validates the router's public key
func validateRouterKeys(r *Router) error {
pub, _, err := r.RouterInfoKeystore.GetKeys()
if err != nil {
log.WithError(err).Error("Failed to get keys from RouterInfoKeystore")
return nil, err
} else {
// sha256 hash of public key
pubHash := sha256.Sum256(pub.Bytes())
b32PubHash := base32.EncodeToString(pubHash[:])
log.Debug("Router public key hash:", b32PubHash)
return err
}
// sha256 hash of public key
pubHash := sha256.Sum256(pub.Bytes())
b32PubHash := base32.EncodeToString(pubHash[:])
log.Debug("Router public key hash:", b32PubHash)
return nil
}
// constructRouterInfo builds the router info from the keystore
func constructRouterInfo(r *Router) (*router_info.RouterInfo, error) {
ri, err := r.RouterInfoKeystore.ConstructRouterInfo(nil)
if err != nil {
log.WithError(err).Error("Failed to construct RouterInfo")
return nil, err
} else {
log.Debug("RouterInfo constructed successfully")
log.Debug("RouterInfo:", ri)
}
// we have our keystore and our routerInfo,, so now let's set up transports
log.Debug("RouterInfo constructed successfully")
log.Debug("RouterInfo:", ri)
return ri, nil
}
// setupNTCP2Transport configures and initializes the NTCP2 transport layer
func setupNTCP2Transport(r *Router, ri *router_info.RouterInfo) error {
// add NTCP2 transport
ntcp2Config, err := ntcp.NewConfig(":0") // Use port 0 for automatic assignment
if err != nil {
log.WithError(err).Error("Failed to create NTCP2 config")
return nil, err
return err
}
ntcp2, err := ntcp.NewNTCP2Transport(*ri, ntcp2Config)
ntcp2Transport, err := ntcp.NewNTCP2Transport(*ri, ntcp2Config)
if err != nil {
log.WithError(err).Error("Failed to create NTCP2 transport")
return nil, err
return err
}
log.Debug("NTCP2 transport created successfully")
r.TransportMuxer = transport.Mux(ntcp2)
ntcpaddr := ntcp2.Addr()
r.TransportMuxer = transport.Mux(ntcp2Transport)
ntcpaddr := ntcp2Transport.Addr()
if ntcpaddr == nil {
log.Error("Failed to get NTCP2 address")
return nil, errors.New("failed to get NTCP2 address")
return errors.New("failed to get NTCP2 address")
}
log.Debug("NTCP2 address:", ntcpaddr)
// TODO: Add the NTCP2 address to RouterInfo once RouterAddress conversion is implemented
// ri.AddAddress(ntcpaddr)
// create a transport address
return r, err
return nil
}
// create router from configuration
@@ -137,7 +177,7 @@ func (r *Router) Stop() {
log.Debug("Router stop signal sent")
}
// Close closes any internal state and finallizes router resources so that nothing can start up again
// Close closes any internal state and finalizes router resources so that nothing can start up again
func (r *Router) Close() error {
log.Warn("Closing router not implemented(?)")
return nil
@@ -162,6 +202,17 @@ func (r *Router) mainloop() {
log.Debug("Entering router mainloop")
r.StdNetDB = netdb.NewStdNetDB(r.cfg.NetDb.Path)
log.WithField("netdb_path", r.cfg.NetDb.Path).Debug("Created StdNetDB")
// Initialize message router and connect to NetDB
messageConfig := i2np.MessageRouterConfig{
MaxRetries: 3,
DefaultTimeout: 30 * time.Second,
EnableLogging: true,
}
r.messageRouter = i2np.NewMessageRouter(messageConfig)
r.messageRouter.SetNetDB(r.StdNetDB)
log.Debug("Message router initialized with NetDB integration")
// make sure the netdb is ready
var e error
if err := r.StdNetDB.Ensure(); err != nil {
@@ -189,8 +240,8 @@ func (r *Router) mainloop() {
// netdb ready
log.WithFields(logrus.Fields{
"at": "(Router) mainloop",
}).Debug("Router ready")
for e == nil {
}).Debug("Router ready with database message processing enabled")
for r.running && e == nil {
time.Sleep(time.Second)
}
} else {

View File

@@ -895,61 +895,105 @@ func maybeAppendSize(di_flag DeliveryInstructions, di_type int, data, current []
return
}
func readDeliveryInstructions(data []byte) (instructions DeliveryInstructions, remainder []byte, err error) {
log.Debug("Reading DeliveryInstructions")
// validateDeliveryInstructionInput checks if the provided data is valid for processing.
func validateDeliveryInstructionInput(data []byte) error {
if len(data) < 1 {
log.Error("No data provided")
err = oops.Errorf("no data provided")
return
return oops.Errorf("no data provided")
}
return nil
}
// initializeDeliveryInstructionData creates initial data structure for delivery instructions processing.
func initializeDeliveryInstructionData(data []byte) (DeliveryInstructions, int, []byte) {
di_flag := DeliveryInstructions(data[:1])
di_type, _ := di_flag.Type()
di_data := make([]byte, 0)
di_data = append(di_data, data[0])
return di_flag, di_type, di_data
}
// processFirstFragment handles the processing of FIRST_FRAGMENT delivery instruction type.
func processFirstFragment(di_flag DeliveryInstructions, data []byte, di_data []byte) ([]byte, error) {
log.Debug("Processing FIRST_FRAGMENT")
var err error
di_data, err = maybeAppendTunnelID(data, di_data)
if err != nil {
log.WithError(err).Error("Failed to append TunnelID")
return nil, err
}
di_data, err = maybeAppendHash(di_flag, data, di_data)
if err != nil {
log.WithError(err).Error("Failed to append Hash")
return nil, err
}
di_data, err = maybeAppendDelay(di_flag, data, di_data)
if err != nil {
log.WithError(err).Error("Failed to append Delay")
return nil, err
}
di_data, err = maybeAppendMessageID(di_flag, FIRST_FRAGMENT, data, di_data)
if err != nil {
log.WithError(err).Error("Failed to append MessageID")
return nil, err
}
di_data, err = maybeAppendExtendedOptions(di_flag, data, di_data)
if err != nil {
log.WithError(err).Error("Failed to append ExtendedOptions")
return nil, err
}
di_data, err = maybeAppendSize(di_flag, FIRST_FRAGMENT, data, di_data)
if err != nil {
log.WithError(err).Error("Failed to append Size")
return nil, err
}
return di_data, nil
}
// processFollowOnFragment handles the processing of FOLLOW_ON_FRAGMENT delivery instruction type.
func processFollowOnFragment(di_flag DeliveryInstructions, data []byte, di_data []byte) ([]byte, error) {
var err error
di_data, err = maybeAppendMessageID(di_flag, FOLLOW_ON_FRAGMENT, data, di_data)
if err != nil {
log.WithError(err).Error("Failed to append MessageID")
return nil, err
}
di_data, err = maybeAppendSize(di_flag, FOLLOW_ON_FRAGMENT, data, di_data)
if err != nil {
log.WithError(err).Error("Failed to append Size")
return nil, err
}
return di_data, nil
}
func readDeliveryInstructions(data []byte) (instructions DeliveryInstructions, remainder []byte, err error) {
log.Debug("Reading DeliveryInstructions")
if err = validateDeliveryInstructionInput(data); err != nil {
return
}
di_flag, di_type, di_data := initializeDeliveryInstructionData(data)
if di_type == FIRST_FRAGMENT {
log.Debug("Processing FIRST_FRAGMENT")
di_data, err = maybeAppendTunnelID(data, di_data)
di_data, err = processFirstFragment(di_flag, data, di_data)
if err != nil {
log.WithError(err).Error("Failed to append TunnelID")
return
}
di_data, err = maybeAppendHash(di_flag, data, di_data)
if err != nil {
log.WithError(err).Error("Failed to append Hash")
return
}
di_data, err = maybeAppendDelay(di_flag, data, di_data)
if err != nil {
log.WithError(err).Error("Failed to append Delay")
return
}
di_data, err = maybeAppendMessageID(di_flag, di_type, data, di_data)
if err != nil {
log.WithError(err).Error("Failed to append MessageID")
return
}
di_data, err = maybeAppendExtendedOptions(di_flag, data, di_data)
if err != nil {
log.WithError(err).Error("Failed to append ExtendedOptions")
return
}
di_data, err = maybeAppendSize(di_flag, di_type, data, di_data)
if err != nil {
log.WithError(err).Error("Failed to append Size")
return
}
} else if di_type == FOLLOW_ON_FRAGMENT {
di_data, err = maybeAppendMessageID(di_flag, di_type, data, di_data)
di_data, err = processFollowOnFragment(di_flag, data, di_data)
if err != nil {
log.WithError(err).Error("Failed to append MessageID")
return
}
di_data, err = maybeAppendSize(di_flag, di_type, data, di_data)
if err != nil {
log.WithError(err).Error("Failed to append Size")
return
}
}