I am very new to concurrent programming in general and want to know if my implementation is thread-safe.
I'm currently working on implementing a TCP client in golang. The service listens on a port and accepts HTTP requests. On a parallel, the service also establishes a connection to an outside TCP server to which it will send binary data.
While I got this to work, I'm unsure if this is the correct approach or should I be adding something more.
The sequence of steps is this
I start my webserver which first tries to establish a connection with a TCP server. On a successful connection, I start listening for incoming HTTP requests
note: Successful connection to the TCP server involves me sending a login packet to this TCP server which authenticates me
type TCPconnector struct {
Conn net.Conn
DataChannel chan []byte
reconnectAttempt int
reconnectMaxRetries int
reconnectMaxDelay time.Duration
autoReconnect bool
lastOrderSent time.Time
}
const addr string = "xx.xx.xx.xx"
const defaultReconnectMaxAttempts int = 10
const defaultReconnectMaxDelay time.Duration = 60000 * time.Millisecond
func NewTCPConnector() (*TCPconnector, error) {
dataChannel := make(chan []byte)
return &TCPconnector{
DataChannel: dataChannel,
reconnectMaxDelay: defaultReconnectMaxDelay,
reconnectMaxRetries: defaultReconnectMaxAttempts,
reconnectAttempt: 0,
autoReconnect: true,
}, nil
}
func Server() {
TCPconnector, err := NewTCPConnector()
if err != nil {
fmt.Println("error")
}
err = TCPconnector.Connect()
if err != nil {
fmt.Println("error")
}
defer TCPconnector.Conn.Close()
r := chi.NewRouter()
r.Post("/placeOrder", PlaceOrder(TCPconnector))
err = http.ListenAndServe(":8080", r)
}
The connect method tries to establish a connection to the TCP server and responsible to spawning goroutines responsible for writing packets, sending heartbeat packets and reading responses from the server.
func (t *TcpConnector) Connect() {
for {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if t.reconnectAttempt > t.reconnectMaxRetries {
t.onNoReconnect(t.reconnectAttempt)
return
}
if t.reconnectAttempt > 0 {
t.onReconnect()
}
c, err := net.Dial("tcp", t.Url)
if err != nil {
t.AppState.Log.Errorf("Connection to tcp server failed with error : %+v", err)
if t.autoReconnect {
t.reconnectAttempt++
continue
}
return
}
t.Log.Infof("Connection established @ %s", time.Now())
t.Conn = c
defer t.Conn.Close()
var wg sync.WaitGroup
wg.Add(1)
go t.SendHeartbeat(ctx)
go t.Write(&wg, cancel)
go t.RecieveResponse()
err = t.SendLoginPacket()
if err != nil {
continue
}
wg.Wait()
}
}
The sendLoginPacket is pretty straightforward, we form the packet and sends it across the DataChannel which is then picked up by the Write goroutine
func (t *TcpConnector) SendLoginPacket() error {
loginRequestBody := dto.LoginRequest{
VersionNo: 24,
}
finalPacketToBeSent, err := t.getFinalPacketToBeSent(1, loginRequestBody)
if err != nil {
return err
}
t.DataChannel <- finalPacketToBeSentToMOSL
return nil
}
The Write() goroutine works like this
func (t *TcpConnector) Write(wg *sync.WaitGroup, cancel context.CancelFunc) {
for {
select {
case data := <-t.DataChannel:
_, err := t.Conn.Write(data)
if err != nil {
t.Conn.Close()
wg.Done()
cancel()
return
}
}
}
}
The Recieve goroutine is also a straightforward loop that returns on any error
func (t *TcpConnector) RecieveResponse() {
for {
// Get the first 45 bytes of the response and check the messageLength for the remaining length of the message
headerBuff := make([]byte, 45)
_, err := t.Conn.Read(headerBuff)
if err != nil {
t.AppState.Log.Errorf("Couldnt read tcp header response from MOSL : %+v ", err)
return
}
headerResponse := dto.MessageHeader{}
err = binary.Read(bytes.NewReader(headerBuff), binary.LittleEndian, &headerResponse)
if err != nil {
t.AppState.Log.Errorf("Couldnt convert bytes header to struct : %+v ", err)
return
}
// Get remaining bytes from server and decrypt it (remaining bytes = messageLength - 45 bytes for the header)
bodyBuff := make([]byte, headerResponse.MessageLength-45)
_, err = t.Conn.Read(bodyBuff)
if err != nil {
t.AppState.Log.Errorf("Couldnt read tcp body response from MOSL : %+v ", err)
return
}
decryptedResponseBody, err := DecryptAESCFB(bodyBuff, t.Key, t.IV)
if err != nil {
t.AppState.Log.Errorf("Couldnt decrypt body response from MOSL : %+v ", err)
return
}
go t.handleMOSLResponse(headerResponse, decryptedResponseBody)
}
}
And lastly, the heartbeat request which is fired every 30 seconds to the TCP server
func (t *TcpConnector) SendHeartbeat(ctx context.Context) {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case x := <-ticker.C:
messageHeader, err := getMessageHeaderInBytes(129, 0, 0, t.AppState.Config.Username)
if err != nil {
t.AppState.Log.Errorf("Error Converting heartbeat request data to bytes: %+v, with seconds: %s", err, x)
return
}
t.DataChannel <- messageHeader
case <-ctx.Done():
fmt.Println("Exiting hearbeat goroutine")
return
}
}
}
My main concern over this implementation is if i'm handling the error cases well?
- Are my goroutines being shutdown properly in case the TCP server closes the connection?
- Is there a better way to start and stop the goroutines?