利志分享
fast_forward
view_headline
开发工具箱
go教程
clickhouse教程
kafka教程
python教程
shell教程
原创杂文
打赏
开发工具箱
go教程
clickhouse教程
kafka教程
python教程
shell教程
原创杂文
打赏
kafka入门
kafka安装使用教程
kafka的架构设计
Kafka 消费组 Rebalance机制
全网最通俗易懂的Kafka图解入门
深入kafka的幂等性和事务
kafka深入
聊聊kafka的生成和消费的问题
go的kafka生产和消费
全网最通俗易懂的Kafka图解新建Topic,写入消息的原理
关于Kafka,你必须要知道的offset知识。
Kafka的Producer实现原理剖析
一次线上kafka磁盘扩容引发的事故分析
目录
kafka入门
kafka安装使用教程
kafka的架构设计
Kafka 消费组 Rebalance机制
全网最通俗易懂的Kafka图解入门
深入kafka的幂等性和事务
kafka深入
聊聊kafka的生成和消费的问题
go的kafka生产和消费
全网最通俗易懂的Kafka图解新建Topic,写入消息的原理
关于Kafka,你必须要知道的offset知识。
Kafka的Producer实现原理剖析
一次线上kafka磁盘扩容引发的事故分析
go的kafka生产和消费
阅读:1564
分享次数:0
之前有一篇文件聊了聊如何生产不丢失数据,消费不丢失数据。这一篇我们来看下go如何通过参数配置来处理生产和消费的。 关于go的client,官方推荐有个如下几个: https://github.com/Shopify/sarama https://github.com/stealthly/go_kafka_client https://github.com/stealthly/siesta https://github.com/optiopay/kafka https://github.com/nuance/kafka 我这里使用的就是官方推荐的第一个sarama,目前star的量7.2 go的生产端:关于生产端上篇文章也说过最核心的参数是: // 发送完数据需要leader和follow都确认 mqConfig.Producer.RequiredAcks = sarama.WaitForAll 关于go的生产端核心就是确保写入的数据都到leader和follow。代码参考如下: package main import ( "fmt" "github.com/Shopify/sarama" "strings" "time" ) var KafkaProducer sarama.SyncProducer func InitKafkaProducer(addressList string) { var err error mqConfig := sarama.NewConfig() // 设置producer // 发送完数据需要leader和follow都确认 mqConfig.Producer.RequiredAcks = sarama.WaitForAll // Partition选择随机 mqConfig.Producer.Partitioner = sarama.NewRandomPartitioner // 成功交付的消息将在success channel返回 mqConfig.Producer.Return.Successes = true // 配置版本 mqConfig.Version = sarama.V0_10_2_1 kafkaClient, err := sarama.NewClient(strings.Split(addressList, ","), mqConfig) if err != nil { panic(err) } // 客户端生产者 KafkaProducer, err = sarama.NewSyncProducerFromClient(kafkaClient) if err != nil { panic(err) } } func sendKafkaMessage(content, topic string) { msg := &sarama.ProducerMessage{ Topic: topic, Value: sarama.StringEncoder(content), Timestamp: time.Now(), } partition, offset, err := KafkaProducer.SendMessage(msg) if err != nil { return } fmt.Printf("sendKafkaMessage", topic, partition, offset) } func main() { addressList := "127.0.0.1:9092" InitKafkaProducer(addressList) content := "sdfsdfrer" topic := "test" sendKafkaMessage(content, topic) } 关于go的消费端就是手动提交offset,我们要保证写入数据库,保存成功之后提交offset,这样数据肯定是不会存在丢失的情况。下面我们来看下实现的代码: package main import ( "fmt" "github.com/Shopify/sarama" cluster "github.com/bsm/sarama-cluster" "strings" "time" ) var consumerClient *cluster.Consumer // 初始化kafka配置消费信息 func InitKafkaConsumer(addressList string, consumerGroup string, topicList []string) { var err error clusterCfg := cluster.NewConfig() clusterCfg.Consumer.Return.Errors = true clusterCfg.Consumer.Offsets.Initial = sarama.OffsetOldest clusterCfg.Consumer.Offsets.AutoCommit.Enable = false clusterCfg.Group.Return.Notifications = true //这行代码是因为github.com/bsm/sarama-cluster不维护了,库会有bug,我这里只是写个demo,将就着用 clusterCfg.Consumer.Offsets.CommitInterval = 1 * time.Second clusterCfg.Version = sarama.V0_10_2_1 // kafka消费端 if err = clusterCfg.Validate(); err != nil { msg := fmt.Sprintf("Kafka consumer config invalidate. config: %v. err: %v", *clusterCfg, err) panic(msg) } consumerClient, err = cluster.NewConsumer(strings.Split(addressList, ","), consumerGroup, topicList, clusterCfg) if err != nil { msg := fmt.Sprintf("Create kafka consumer error: %v. config: %v", err, clusterCfg) panic(msg) } } func main() { addressList := "127.0.0.1:9092" consumerGroup := "my_go" topicList := []string{"test"} InitKafkaConsumer(addressList, consumerGroup, topicList) go func() { for { select { case msg, ok := <-consumerClient.Messages(): if ok { //这里目前我是打印数据,线上可能是写入数据到db,写入成功之后我们手动提交offset fmt.Println(string(msg.Value)) //这里是临时保存offset consumerClient.MarkOffset(msg, "") //手动提交offset consumerClient.CommitOffsets() } case err, more := <-consumerClient.Errors(): if more { fmt.Printf("consumerClient error %v\n", err) } case ntf, more := <-consumerClient.Notifications(): if more { fmt.Printf("consumerClient Notifications %v \n", ntf) } } } }() time.Sleep(10 * time.Hour) }
感觉本站内容不错,读后有收获?
attach_money
我要小额打赏,鼓励作者写出更好的教程
扫码关注公众号:talk_lizhi