永发信息网

kafka 报错

答案:2  悬赏:0  手机版
解决时间 2021-03-14 15:09
  • 提问者网友:做自己de王妃
  • 2021-03-14 04:03
Auto offset commit failed for group bro: CommitFailedError: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
用的python 2.7
最佳答案
  • 五星知识达人网友:逐風
  • 2021-03-14 05:36
启动kafka报错,应该怎么解决
内网IP只能在内网局域网访问连接,在外网是不能认识内网IP不能访问的。
如有路由权限,且路由有固定公网IP,可以通过路由的端口映射,实现外网访问内网。如无路由,或路由无公网IP,需要借助第三方开放的nat123端口映射网络辅助实现外网访问内网。
必须使用base 包中官方的build.prop,但是必须在build.prop 任意位置加入如下几行(对比了百度和联想官方的build.prop,发现百度修改和添加了一下prop,移植时如果base的build.prop有这个属性,替换,没有则增加即可):
ro.baidu.build.hardware=c8813(以c8813为例,可选择自己适配的机型)
ro.baidu.build.hardware.version=1.0
ro.baidu.build.software=yi_3.0
ro.baidu.build.version.release=2.1
ro.product.manufacturer=Baidu
persist.sys.emmc=/mnt/sdcard2
ro.config.notification_sound=Ding.mp3
ro.config.ringtone=Echo.mp3
ro.config.alarm_alert=alarm.mp3
全部回答
  • 1楼网友:佘樂
  • 2021-03-14 06:39
kafka是由linkedin设计的一个高吞吐量、分布式、基于发布订阅模式的消息系统,使用scala编写,它以可水平扩展、可靠性、异步通信和高吞吐率等特性而被广泛使用。目前越来越多的开源分布式处理系统都支持与kafka集成,其中spark streaming作为后端流引擎配合kafka作为前端消息系统正成为当前流处理系统的主流架构之一。 然而,当下越来越多的安全漏洞、数据泄露等问题的爆发,安全正成为系统选型不得不考虑的问题,kafka由于其安全机制的匮乏,也导致其在数据敏感行业的部署存在严重的安全隐患。本文将围绕kafka,先介绍其整体架构和关键概念,再深入分析其架构之中存在的安全问题,最后分享下transwarp在kafka安全性上所做的工作及其使用方法。 kafka架构与安全 首先,我们来了解下有关kafka的几个基本概念: topic:kafka把接收的消息按种类划分,每个种类都称之为topic,由唯一的topic name标识。 producer:向topic发布消息的进程称为producer。 consumer:从topic订阅消息的进程称为consumer。 broker:kafka集群包含一个或多个服务器,这种服务器被称为broker。 kafka的整体架构如下图所示,典型的kafka集群包含一组发布消息的producer,一组管理topic的broker,和一组订阅消息的consumer。topic可以有多个分区,每个分区只存储于一个broker。producer可以按照一定的策略将消息划分给指定的分区,如简单的轮询各个分区或者按照特定字段的hash值指定分区。broker需要通过zookeeper记录集群的所有broker、选举分区的leader,记录consumer的消费消息的偏移量,以及在consumer group发生变化时进行relalance. broker接收和发送消息是被动的:由producer主动发送消息,consumer主动拉取消息。 然而,分析kafka框架,我们会发现以下严重的安全问题: 1.网络中的任何一台主机,都可以通过启动broker进程而加入kafka集群,能够接收producer的消息,能够篡改消息并发送给consumer。 2.网络中的任何一台主机,都可以启动恶意的producer/consumer连接到broker,发送非法消息或拉取隐私消息数据。 3.broker不支持连接到启用kerberos认证的zookeeper集群,没有对存放在zookeeper上的数据设置权限。任意用户都能够直接访问zookeeper集群,对这些数据进行修改或删除。 4.kafka中的topic不支持设置访问控制列表,任意连接到kafka集群的consumer(或producer)都能对任意topic读取(或发送)消息。 随着kafka应用场景越来越广泛,特别是一些数据隐私程度较高的领域(如道路交通的视频监控),上述安全问题的存在犹如一颗定时炸弹,一旦内网被黑客入侵或者内部出现恶意用户,所有的隐私数据(如车辆出行记录)都能够轻易地被窃取,而无需攻破broker所在的服务器。 kafka安全设计 基于上述分析,transwarp从以下两个方面增强kafka的安全性: 身份认证(authentication):设计并实现了基于kerberos和基于ip的两种身份认证机制。前者为强身份认证,相比于后者具有更好的安全性,后者适用于ip地址可信的网络环境,相比于前者部署更为简便。 权限控制(authorization):设计并实现了topic级别的权限模型。topic的权限分为read(从topic拉取数据)、write(向topic中生产数据)、create(创建topic)和delete(删除topic)。 基于kerberos的身份机制如下图所示: broker启动时,需要使用配置文件中的身份和密钥文件向kdc(kerberos服务器)认证,认证通过则加入kafka集群,否则报错退出。 producer(或consumer)启动后需要经过如下步骤与broker建立安全的socket连接: 1.producer向kdc认证身份,通过则得到tgt(票证请求票证),否则报错退出 2.producer使用tgt向kdc请求kafka服务,kdc验证tgt并向producer返回sessionkey(会话密钥)和serviceticket(服务票证) 3.producer使用sessionkey和serviceticket与broker建立连接,broker使用自身的密钥解密serviceticket,获得与producer通信的sessionkey,然后使用sessionkey验证producer的身份,通过则建立连接,否则拒绝连接。 zookeeper需要启用kerberos认证模式,保证broker或consumer与其的连接是安全的。 topic的访问控制列表(acl)存储于zookeeper中,存储节点的路径为/acl//,节点数据为r(ead)、w(rite)、c(reate)、d(elete)权限的集合,如/acl/transaction/jack节点的数据为rw,则表示用户jack能够对transaction这个topic进行读和写。 另外,kafka为特权用户,只有kafka用户能够赋予/取消权限。因此,acl相关的zookeeper节点权限为kafka具有所有权限,其他用户不具有任何权限。 构建安全的kafka服务 首先,我们为broker启用kerberos认证模式,配置文件为/etc/kafka/conf/server.properties,安全相关的参数如下所示: 其中,authentication参数表示认证模式,可选配置项为simple, kerberos和ipaddress,默认为simple。当认证模式为kerberos时,需要额外配置账户属性principal和对应的密钥文件路径keytab. 认证模式为ipaddress时,producer和consumer创建时不需要做任何改变。而认证模式为kerberos时,需要预先创建好相应的principal和keytab,并使用api进行登录,样例代码如下所示: public class secureproducer extends thread { private final kafka.javaapi.producer.producer producer; private final string topic; private final properties props = new properties(); public secureproducer(string topic) { authenticationmanager.setauthmethod(“kerberos”); authenticationmanager.login(“producer1″, “/etc/producer1.keytab”); props.put(“serializer.class”, “kafka.serializer.stringencoder”); props.put(“metadata.broker.list”, “172.16.1.190:9092,172.16.1.192:9092,172.16.1.193:9092″); // use random partitioner. don’t need the key type. just set it to integer. // the message is of type string. producer = new kafka.javaapi.producer.producer( new producerconfig(props)); this.topic = topic;
我要举报
如以上回答内容为低俗、色情、不良、暴力、侵权、涉及违法等信息,可以点下面链接进行举报!
点此我要举报以上问答信息
大家都在看
推荐资讯