Go 初学者想要 V 友帮忙看一下代码

2017-07-17 21:26:51 +08:00
 yuanfnadi

主要目的是为了把 fabio 的日志写入到 kafka。 用 cmd 执行 fabio 的启动日志,读取 stdout 和 stderr。然后从控制台输出,还有就是写入到 kafka。

package main

import (
	"bufio"
	"fmt"
	"io"
	"os/exec"
	"strings"
	"github.com/Shopify/sarama"
)



func main() {
	execCommand()
}

func execCommand() {
	cmd := exec.Command("/fabio", "-cfg", "/etc/fabio/fabio.properties")
	fmt.Println(cmd.Args)
	stdout, err := cmd.StdoutPipe()
	if err != nil {
		fmt.Println(err)
	}
	stderr, err := cmd.StderrPipe()
	if err != nil {
		fmt.Println(err)
	}
	cmd.Start()
	go printLog(stdout)
	go printLog(stderr)
	cmd.Wait()
}

func printLog(readCloser io.ReadCloser)  {
	reader := bufio.NewReader(readCloser)
	for {
		line, err2 := reader.ReadString('\n')
		if err2 != nil || io.EOF == err2 {
			break
		}
		fmt.Print(line)
	}
}

作为 JAVA 程序员,第一次写 go 语言,就 go 有些不是很了解。 这样在并发高的时候会不会有问题,消耗内存会不会过多。 有没有更好的写法?输出到 kafka 的内容变成一个函数 写在 printLog 里面如何?

1888 次点击
所在节点    Go 编程语言
11 条回复
yuanfnadi
2017-07-17 21:34:40 +08:00
另外 kafka 的对象,我是 java 里面一般是定义一个 private 的对象,然后用 getProducer 来实现的。golang 是否是这样?
yuanfnadi
2017-07-17 22:21:24 +08:00
````golang
package main

import (
"bufio"
"fmt"
"io"
"os/exec"
"strings"
"github.com/Shopify/sarama"
)


var (
asyncProducer *sarama.AsyncProducer
)



func main() {
fmt.Println(gerProducer())
fmt.Println(gerProducer())

execCommand()
}

func execCommand() {
cmd := exec.Command("/fabio", "-cfg", "/etc/fabio/fabio.properties")
fmt.Println(cmd.Args)
stdout, err := cmd.StdoutPipe()
if err != nil {
fmt.Println(err)
}
stderr, err := cmd.StderrPipe()
if err != nil {
fmt.Println(err)
}
cmd.Start()
go printLog(stdout)
go printLog(stderr)
cmd.Wait()
}

func printLog(readCloser io.ReadCloser) {
reader := bufio.NewReader(readCloser)
for {
line, err2 := reader.ReadString('\n')
if err2 != nil || io.EOF == err2 {
break
}
fmt.Print(line)
go sendMessage(line)
}
}

func gerProducer() sarama.AsyncProducer {
if asyncProducer != nil {
fmt.Println("Sd")
return *asyncProducer
}else {
fmt.Println("diyici")
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.RequiredAcks = sarama.WaitForAll
p, err := sarama.NewAsyncProducer(strings.Split("192.168.1.1:9092", ","), config)
if err != nil {
fmt.Println("kafka failed")
}
asyncProducer = &p
return *asyncProducer
}
}

func sendMessage(message string) {
msg := &sarama.ProducerMessage{
Topic: "test",
Value: sarama.ByteEncoder(message),
}
p:=*asyncProducer
p.Input() <- msg
}
````
rrfeng
2017-07-17 22:23:18 +08:00
不是很了解 Java
高并发是指 fabio 输出很多很多 log ?那可能这里 fmt.Print() 是性能瓶颈。可以参考一下各种日志库。

感觉你的需求可以直接用 filebeat
如果是练手那当我没说
yuanfnadi
2017-07-17 22:25:18 +08:00
@rrfeng fmt 到时候应该会用配置来关闭,fmt.print()的代码会替换成 func sendMessage(message string)

主要是第一次写 go 完全不知道自己代码有没有结构性错误,用的都是 java 的思想来写,所以想请一个擅长 go 的帮我指导一下。
yuanfnadi
2017-07-17 22:33:35 +08:00
yuanfnadi
2017-07-17 22:37:02 +08:00
hcymk2
2017-07-17 23:43:32 +08:00
还不如自己先测试下。
akrf
2017-07-18 01:43:01 +08:00
作为 Java 程序员,肯定写啥都像 Java,别挣扎了,老老实实用吧
jarlyyn
2017-07-18 02:13:14 +08:00
看得我一愣一愣的。特别是这么多 go。

对你用的工具不熟悉。

但是不明白你为什么起这么多协程。

如果是我的话,首先不会写这个 gerProducer,初始化的时候直接初始化不就好了么。没看出来它啥时候会变 nil。

其次感觉你是为了尽可能快的异步读日志再转写?

那感觉开一个 message 的 chan,设一个恰当的缓存大小。

然后 fabio 那里 go 两个协程出来写入 chan

sendMessage 那里 go 个写成出来读 chan,感觉更符合调理一些。

个人 YY,对你这个业务不熟。

另外,按你这个写法是不是要维护个链接池?

不然连接数容易炸吧?
jarlyyn
2017-07-18 03:05:20 +08:00
如果我写可能是这样?

var asyncProducer *sarama.AsyncProducer
var cmd *exec.cmd
var quit make (chan int)
var stdout io.ReadCloser
var stderr io.ReadCloser
var outputs make(chan string,10)
func main(){
initProducer()
defer asyncProducer.Close()
initCmd()
initStdout()
defer stdout.close()
pipeMessage(stdout,outputs)
initStderr()
defer stderr.close()
pipeMessage(stderr,outputs)
for{
select{
case quit:
return
case newline:= <- outputs :
asyncProducer.Input() <-&sarama.ProducerMessage{
Topic: "test",
Value: sarama.ByteEncoder(newline),
}
case err := <-asyncProducer.Errors():
fmt.Println("kafka error")
case <-asyncProducer.Successes():
}
}

}
func initProducer(){
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.RequiredAcks = sarama.WaitForAll
p, err := sarama.NewAsyncProducer(strings.Split("192.168.1.1:9092", ","), config)
if err != nil {
fmt.Println("kafka failed")
panic(err)
}
asyncProducer = &p

}
func initCmd(){
cmd := exec.Command("/fabio", "-cfg", "/etc/fabio/fabio.properties")
fmt.Println(cmd.Args)
go func(){
defer close(quit)
err:=cmd.Run()
if err!=nil{
panic(err)
}
}
}

func initStdout(){
stdout, err := cmd.StdoutPipe()
if err != nil {
panic(err)
}
}
func initStderr(){
stderr, err := cmd.StderrPipe()
if err != nil {
panic(err)
}
}
func pipeMessage(reader io.ReadCloser,output chan string){
reader := bufio.NewReader(readCloser)
go func(){
for {
line, err2 := reader.ReadString('\n')
if err2 != nil || io.EOF == err2 {
break
}
fmt.Print(line)
output <- line
}
}
}
jarlyyn
2017-07-18 03:10:34 +08:00

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/376001

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX