1

instead of writing a pipe to a huge file i want to segment the stream in chunks on signal USR1. i think i got the basics working but the app just hangs and nothing happens, any clues or best practices when handling with an uncontrollable input stream and byte perfect segmentation?

package main

import (
    "bufio"
    "fmt"
    "io"
    "os"
    "os/signal"
    "syscall"
    "time"
)

var done bool

func handle(c chan os.Signal) {
    for {
        sig := <-c
        switch sig {
        case syscall.SIGUSR1:
            fmt.Println("###Sink temporarily_closed###")
            done = true
        case syscall.SIGUSR2:
            fmt.Println("###Sink closed###")
            done = true
        case syscall.SIGHUP:
            fmt.Println("###Sink running###")
        }
        
    }
}

func check(e error) {
    if e != nil {
        panic(e)
    }
}



func main() {
    c := make(chan os.Signal, 1)
    signal.Notify(c, syscall.SIGUSR1, syscall.SIGUSR2, syscall.SIGHUP)
    go handle(c)
    
    reader := bufio.NewReaderSize(os.Stdin,1024*10)

    for true {
        if done {
            file, err := os.Create("./temp.file")
            check(err)
            writer := bufio.NewWriter(file)
            written, err := io.Copy(writer,reader)
            check(err)
            fmt.Println(written)
            writer.Flush()
            file.Close()
            reader.Reset(os.Stdin)
            done = false
        }
        time.Sleep(time.Millisecond)
    }
}
4
  • there is no guarantee that done will be true because done is set and read on different CPUs. done should be atomic or locked by mutex or should receive a value from channel. So you can create separate channel to signal from handler to main loop to do a rotation. Commented Jan 20, 2022 at 2:59
  • file is open for writing so you should check file.Close() for errors. It call system call close() internally and it can fail for multiple reasons. Commented Jan 20, 2022 at 3:08
  • hmm i had no problem with the done so far, but i think the reader blocks somehow. i changed the io.copy to written, err := io.CopyN(writer,reader,int64(reader.Buffered())) to make sure it does not block. but no difference. if you are right, could you show me how it is done correctly? Commented Jan 20, 2022 at 19:16
  • When i test with cat /dev/urandom | app and send a USR1 signal to the pid of app the signal is trapped and the loop runs, but the io.copyn always returns 0 bytes copied. Commented Jan 20, 2022 at 19:24

1 Answer 1

1

So you need to io.CopyN(dst, src, 4096) in the loop and rotate the file once in a while. See example. I made rotation by size but it is easy to add signal handling.

package main

import (
    "fmt"
    "io"
    "log"
    "os"
    "time"
)

var count int
var f *os.File

func rotate() *os.File {
    if f != nil {
        if err := f.Close(); err != nil {
            log.Fatal(err)
        }
    }

    fname := fmt.Sprintf("./dump-%d.bin", count)
    count++
    f, err := os.Create(fname)
    if err != nil {
        log.Fatal(err)
    }
    log.Println("rotated:", fname)

    return f
}

func main() {
    var n, written int
    reader := os.Stdin

    for {
        if written == 0 || written >= 4096*10 {
            f = rotate()
            written = 0
        }

        n, err := io.CopyN(f, reader, 4096)
        if err != nil {
            log.Fatal(err)
        }
        written += n
        log.Println("written:", written)
        time.Sleep(time.Millisecond * 500)
    }
}
Sign up to request clarification or add additional context in comments.

2 Comments

thanks for the example, nice trick with the filehandle i have to remember this, unfortunately i had to typeconvert written += n to written += int(n) to make it work. But if i use it with the application that emmits the pipe stream, the app crashes with broken pipe. i tried to use reader := bufio.NewReaderSize(os.Stdin,1024*10) instead of reader := os.Stdin but also no success.
I tried with cat /dev/urandom | ./app and it works (on osx). What command did you run and what OS?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.