主要目的是为了把 fabio 的日志写入到 kafka。 用 cmd 执行 fabio 的启动日志,读取 stdout 和 stderr。然后从控制台输出,还有就是写入到 kafka。
package main
import (
"bufio"
"fmt"
"io"
"os/exec"
"strings"
"github.com/Shopify/sarama"
)
func main() {
execCommand()
}
func execCommand() {
cmd := exec.Command("/fabio", "-cfg", "/etc/fabio/fabio.properties")
fmt.Println(cmd.Args)
stdout, err := cmd.StdoutPipe()
if err != nil {
fmt.Println(err)
}
stderr, err := cmd.StderrPipe()
if err != nil {
fmt.Println(err)
}
cmd.Start()
go printLog(stdout)
go printLog(stderr)
cmd.Wait()
}
func printLog(readCloser io.ReadCloser) {
reader := bufio.NewReader(readCloser)
for {
line, err2 := reader.ReadString('\n')
if err2 != nil || io.EOF == err2 {
break
}
fmt.Print(line)
}
}
作为 JAVA 程序员,第一次写 go 语言,就 go 有些不是很了解。 这样在并发高的时候会不会有问题,消耗内存会不会过多。 有没有更好的写法?输出到 kafka 的内容变成一个函数 写在 printLog 里面如何?
1
yuanfnadi OP 另外 kafka 的对象,我是 java 里面一般是定义一个 private 的对象,然后用 getProducer 来实现的。golang 是否是这样?
|
2
yuanfnadi OP ````golang
package main import ( "bufio" "fmt" "io" "os/exec" "strings" "github.com/Shopify/sarama" ) var ( asyncProducer *sarama.AsyncProducer ) func main() { fmt.Println(gerProducer()) fmt.Println(gerProducer()) execCommand() } func execCommand() { cmd := exec.Command("/fabio", "-cfg", "/etc/fabio/fabio.properties") fmt.Println(cmd.Args) stdout, err := cmd.StdoutPipe() if err != nil { fmt.Println(err) } stderr, err := cmd.StderrPipe() if err != nil { fmt.Println(err) } cmd.Start() go printLog(stdout) go printLog(stderr) cmd.Wait() } func printLog(readCloser io.ReadCloser) { reader := bufio.NewReader(readCloser) for { line, err2 := reader.ReadString('\n') if err2 != nil || io.EOF == err2 { break } fmt.Print(line) go sendMessage(line) } } func gerProducer() sarama.AsyncProducer { if asyncProducer != nil { fmt.Println("Sd") return *asyncProducer }else { fmt.Println("diyici") config := sarama.NewConfig() config.Producer.Return.Successes = true config.Producer.RequiredAcks = sarama.WaitForAll p, err := sarama.NewAsyncProducer(strings.Split("192.168.1.1:9092", ","), config) if err != nil { fmt.Println("kafka failed") } asyncProducer = &p return *asyncProducer } } func sendMessage(message string) { msg := &sarama.ProducerMessage{ Topic: "test", Value: sarama.ByteEncoder(message), } p:=*asyncProducer p.Input() <- msg } ```` |
3
rrfeng 2017-07-17 22:23:18 +08:00
不是很了解 Java
高并发是指 fabio 输出很多很多 log ?那可能这里 fmt.Print() 是性能瓶颈。可以参考一下各种日志库。 感觉你的需求可以直接用 filebeat 如果是练手那当我没说 |
4
yuanfnadi OP @rrfeng fmt 到时候应该会用配置来关闭,fmt.print()的代码会替换成 func sendMessage(message string)
主要是第一次写 go 完全不知道自己代码有没有结构性错误,用的都是 java 的思想来写,所以想请一个擅长 go 的帮我指导一下。 |
5
yuanfnadi OP |
6
yuanfnadi OP |
7
hcymk2 2017-07-17 23:43:32 +08:00
还不如自己先测试下。
|
8
akrf 2017-07-18 01:43:01 +08:00
作为 Java 程序员,肯定写啥都像 Java,别挣扎了,老老实实用吧
|
9
jarlyyn 2017-07-18 02:13:14 +08:00 1
看得我一愣一愣的。特别是这么多 go。
对你用的工具不熟悉。 但是不明白你为什么起这么多协程。 如果是我的话,首先不会写这个 gerProducer,初始化的时候直接初始化不就好了么。没看出来它啥时候会变 nil。 其次感觉你是为了尽可能快的异步读日志再转写? 那感觉开一个 message 的 chan,设一个恰当的缓存大小。 然后 fabio 那里 go 两个协程出来写入 chan sendMessage 那里 go 个写成出来读 chan,感觉更符合调理一些。 个人 YY,对你这个业务不熟。 另外,按你这个写法是不是要维护个链接池? 不然连接数容易炸吧? |
10
jarlyyn 2017-07-18 03:05:20 +08:00
如果我写可能是这样?
var asyncProducer *sarama.AsyncProducer var cmd *exec.cmd var quit make (chan int) var stdout io.ReadCloser var stderr io.ReadCloser var outputs make(chan string,10) func main(){ initProducer() defer asyncProducer.Close() initCmd() initStdout() defer stdout.close() pipeMessage(stdout,outputs) initStderr() defer stderr.close() pipeMessage(stderr,outputs) for{ select{ case quit: return case newline:= <- outputs : asyncProducer.Input() <-&sarama.ProducerMessage{ Topic: "test", Value: sarama.ByteEncoder(newline), } case err := <-asyncProducer.Errors(): fmt.Println("kafka error") case <-asyncProducer.Successes(): } } } func initProducer(){ config := sarama.NewConfig() config.Producer.Return.Successes = true config.Producer.RequiredAcks = sarama.WaitForAll p, err := sarama.NewAsyncProducer(strings.Split("192.168.1.1:9092", ","), config) if err != nil { fmt.Println("kafka failed") panic(err) } asyncProducer = &p } func initCmd(){ cmd := exec.Command("/fabio", "-cfg", "/etc/fabio/fabio.properties") fmt.Println(cmd.Args) go func(){ defer close(quit) err:=cmd.Run() if err!=nil{ panic(err) } } } func initStdout(){ stdout, err := cmd.StdoutPipe() if err != nil { panic(err) } } func initStderr(){ stderr, err := cmd.StderrPipe() if err != nil { panic(err) } } func pipeMessage(reader io.ReadCloser,output chan string){ reader := bufio.NewReader(readCloser) go func(){ for { line, err2 := reader.ReadString('\n') if err2 != nil || io.EOF == err2 { break } fmt.Print(line) output <- line } } } |
11
jarlyyn 2017-07-18 03:10:34 +08:00
|