0

I'm building a P2P file transfer system in Go that fetches chunks of data from peers in parallel. While sequential requests work fine, parallel requests using goroutines behave inconsistently, resulting in errors like negative waitgroup counter or incomplete data transfer.

Code Overview:

  • main.go: Initializes and starts two peer servers. It simulates file requests from one peer to another using goroutines.
  • peer.go: Defines the PeerServer structure and its methods for handling peer connections and file requests.
  • tcp_transport.go: Implements the transport layer, handling connection and data transmission.
  • transport.go: Defines the transport interface.
  • encoding.go: Provides encoding and decoding functionality for messages.

Key Issues:

  1. Negative WaitGroup Counter: Occurs sometimes during parallel chunk requests.
  2. Incomplete Data Transfer: Some chunks are not transferred when using goroutines.

main.go (simplified):

func main() {
    // Initialize and start peer servers
    peerServer1 := NewPeerServer(opts1, "localhost:5000")
    peerServer2 := NewPeerServer(opts2, "localhost:5001")

    var wg sync.WaitGroup
    wg.Add(2)
    go func() { defer wg.Done(); peerServer1.Start() }()
    go func() { defer wg.Done(); peerServer2.Start() }()

    time.Sleep(2 * time.Second) // Wait for servers to start

    for i := 1; i <= 3; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            peerServer1.TestFileRequest(i, "127.0.0.1:5001")
        }(i)
    }

    wg.Wait()
    log.Println("Servers shut down successfully.")
}

peer.go (simplified):

type PeerServer struct {
    peerLock  sync.Mutex
    peers     map[string]Peer
    Transport Transport
}

func (p *PeerServer) TestFileRequest(i int, ipAddr string) error {
    err := p.Transport.Dial(ipAddr, "peer-server")
    if err != nil {
        return err
    }

    dataMessage := DataMessage{Payload: TestSend{ChunkId: i, ChunkName: fmt.Sprintf("chunk_%d.chunk", i)}}
    var buf bytes.Buffer
    encoder := gob.NewEncoder(&buf)
    if err := encoder.Encode(dataMessage); err != nil {
        return fmt.Errorf("Failed to encode data: %v", err)
    }

    p.peerLock.Lock()
    peer, exists := p.peers[ipAddr]
    p.peerLock.Unlock()
    if !exists {
        return fmt.Errorf("peer with address %s not found", ipAddr)
    }

    if err := peer.Send(buf.Bytes()); err != nil {
        return fmt.Errorf("Failed to send data: %v", err)
    }

    time.Sleep(time.Millisecond * 500)

    var fileSize int64
    if err := binary.Read(peer, binary.LittleEndian, &fileSize); err != nil {
        return fmt.Errorf("Failed to read file size: %v", err)
    }

    file, err := os.Create(fmt.Sprintf("chunk_%d.chunk", i))
    if err != nil {
        return fmt.Errorf("Failed to create file: %v", err)
    }
    defer file.Close()

    if _, err := io.CopyN(file, peer, fileSize); err != nil {
        return fmt.Errorf("Failed to copy file: %v", err)
    }

    fmt.Printf("Successfully received and saved chunk_%d.chunk\n", i)
    peer.CloseStream(i)

    return nil
}

tcp_transport.go (simplified):

type TCPPeer struct {
    net.Conn
    outbound bool
    wg       *sync.WaitGroup
}

func (p *TCPPeer) CloseStream(i int) {
    p.wg.Done()
}

func (p *TCPPeer) Send(b []byte) error {
    _, err := p.Write(b)
    return err
}

type TCPTransport struct {
    TransportOpts
    listener net.Listener
    msgch    chan msg
}

func (t *TCPTransport) Dial(addr string, connType string) error {
    conn, err := net.Dial("tcp", addr)
    if err != nil {
        return err
    }
    go t.handleConn(conn, true, connType)
    return nil
}

func (t *TCPTransport) handleConn(conn net.Conn, outbound bool, connType string) {
    peer := NewTCPPeer(conn, outbound)
    if t.OnPeer != nil {
        if err := t.OnPeer(peer, connType); err != nil {
            return
        }
    }
    for {
        msg := msg{}
        err := t.Decoder.Reader(conn, &msg)//reads the message if stream is sent makes msg.stream==true
        if err != nil {
            return
        }
        msg.From = conn.RemoteAddr().String()
        if msg.Stream {
            peer.wg.Add(1)
            peer.wg.Wait()
            continue
        }
        t.msgch <- msg
    }
}

Possible Causes:

  1. Synchronization Issues: Potential race conditions with sync.WaitGroup.
  2. Network Latency/Timing: Delays or order of operations may be causing issues in parallel execution.
  3. Wait group in peer: When i am running the the goroutines in the for loop some go routines accees the peer.CloseStream before they can acess the wg.wait() in tcp_transport.go .

Request:

Can someone help identify the issue and suggest a solution to make the parallel chunk requests work correctly? Any insights into improving the concurrency handling or debugging techniques would be appreciated.

Additional Information:

  • The code works perfectly when the requests are made sequentially without using goroutines in main.go.
  • I have attempted adding/removing locks and adjusting sleep durations without success.

Thank you in advance for your help!

Excepting To fetch the Chunk data concurrently from the peer server

4
  • Did that it does not show any data race and in main.go when I remove the goroutines it works well by fetching sequentially if you want to check entire code checkout github.com/tarun-kavipurapu/temp-request-chunks Commented Jul 13, 2024 at 12:52
  • What wg.add and wg .wait does is that it is basically waiting and not sending any messages to channel so that I can handle the logic of receiving the streams in peer.go in peer.go after I close stream it send to message channel Commented Jul 13, 2024 at 12:56
  • Can you suggest something that can help in finding the races other than golang race detector. I will let you know after finding the races and other than this is do you see any other error over here. And can you suggest a better way to implement the chunk data streaming and communication Commented Jul 13, 2024 at 13:15
  • 1
    Open multiple TCP connections rather than trying to use just one. This is a time-tested and proven method that has been used by web browsers and servers until more recent HTTP versions. Commented Jul 13, 2024 at 14:03

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.