I'm building a P2P file transfer system in Go that fetches chunks of data from peers in parallel. While sequential requests work fine, parallel requests using goroutines behave inconsistently, resulting in errors like negative waitgroup counter or incomplete data transfer.
Code Overview:
- main.go: Initializes and starts two peer servers. It simulates file requests from one peer to another using goroutines.
- peer.go: Defines the
PeerServerstructure and its methods for handling peer connections and file requests. - tcp_transport.go: Implements the transport layer, handling connection and data transmission.
- transport.go: Defines the transport interface.
- encoding.go: Provides encoding and decoding functionality for messages.
Key Issues:
- Negative WaitGroup Counter: Occurs sometimes during parallel chunk requests.
- Incomplete Data Transfer: Some chunks are not transferred when using goroutines.
main.go (simplified):
func main() {
// Initialize and start peer servers
peerServer1 := NewPeerServer(opts1, "localhost:5000")
peerServer2 := NewPeerServer(opts2, "localhost:5001")
var wg sync.WaitGroup
wg.Add(2)
go func() { defer wg.Done(); peerServer1.Start() }()
go func() { defer wg.Done(); peerServer2.Start() }()
time.Sleep(2 * time.Second) // Wait for servers to start
for i := 1; i <= 3; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
peerServer1.TestFileRequest(i, "127.0.0.1:5001")
}(i)
}
wg.Wait()
log.Println("Servers shut down successfully.")
}
peer.go (simplified):
type PeerServer struct {
peerLock sync.Mutex
peers map[string]Peer
Transport Transport
}
func (p *PeerServer) TestFileRequest(i int, ipAddr string) error {
err := p.Transport.Dial(ipAddr, "peer-server")
if err != nil {
return err
}
dataMessage := DataMessage{Payload: TestSend{ChunkId: i, ChunkName: fmt.Sprintf("chunk_%d.chunk", i)}}
var buf bytes.Buffer
encoder := gob.NewEncoder(&buf)
if err := encoder.Encode(dataMessage); err != nil {
return fmt.Errorf("Failed to encode data: %v", err)
}
p.peerLock.Lock()
peer, exists := p.peers[ipAddr]
p.peerLock.Unlock()
if !exists {
return fmt.Errorf("peer with address %s not found", ipAddr)
}
if err := peer.Send(buf.Bytes()); err != nil {
return fmt.Errorf("Failed to send data: %v", err)
}
time.Sleep(time.Millisecond * 500)
var fileSize int64
if err := binary.Read(peer, binary.LittleEndian, &fileSize); err != nil {
return fmt.Errorf("Failed to read file size: %v", err)
}
file, err := os.Create(fmt.Sprintf("chunk_%d.chunk", i))
if err != nil {
return fmt.Errorf("Failed to create file: %v", err)
}
defer file.Close()
if _, err := io.CopyN(file, peer, fileSize); err != nil {
return fmt.Errorf("Failed to copy file: %v", err)
}
fmt.Printf("Successfully received and saved chunk_%d.chunk\n", i)
peer.CloseStream(i)
return nil
}
tcp_transport.go (simplified):
type TCPPeer struct {
net.Conn
outbound bool
wg *sync.WaitGroup
}
func (p *TCPPeer) CloseStream(i int) {
p.wg.Done()
}
func (p *TCPPeer) Send(b []byte) error {
_, err := p.Write(b)
return err
}
type TCPTransport struct {
TransportOpts
listener net.Listener
msgch chan msg
}
func (t *TCPTransport) Dial(addr string, connType string) error {
conn, err := net.Dial("tcp", addr)
if err != nil {
return err
}
go t.handleConn(conn, true, connType)
return nil
}
func (t *TCPTransport) handleConn(conn net.Conn, outbound bool, connType string) {
peer := NewTCPPeer(conn, outbound)
if t.OnPeer != nil {
if err := t.OnPeer(peer, connType); err != nil {
return
}
}
for {
msg := msg{}
err := t.Decoder.Reader(conn, &msg)//reads the message if stream is sent makes msg.stream==true
if err != nil {
return
}
msg.From = conn.RemoteAddr().String()
if msg.Stream {
peer.wg.Add(1)
peer.wg.Wait()
continue
}
t.msgch <- msg
}
}
Possible Causes:
- Synchronization Issues: Potential race conditions with
sync.WaitGroup. - Network Latency/Timing: Delays or order of operations may be causing issues in parallel execution.
- Wait group in peer: When i am running the the goroutines in the for loop some go routines accees the peer.CloseStream before they can acess the wg.wait() in tcp_transport.go .
Request:
Can someone help identify the issue and suggest a solution to make the parallel chunk requests work correctly? Any insights into improving the concurrency handling or debugging techniques would be appreciated.
Additional Information:
- The code works perfectly when the requests are made sequentially without using goroutines in main.go.
- I have attempted adding/removing locks and adjusting sleep durations without success.
Thank you in advance for your help!
Excepting To fetch the Chunk data concurrently from the peer server