永发信息网

storm整合kafka需要哪些包

答案:2  悬赏:20  手机版
解决时间 2021-02-05 20:41
  • 提问者网友:我们很暧昧
  • 2021-02-04 21:59
storm整合kafka需要哪些包
最佳答案
  • 五星知识达人网友:玩世
  • 2021-02-04 23:17
答:这个是scala的编译库喝运行时库(kafka是用sbt管理依赖的),所以建议你使用sbt,会自动下载所有依赖
全部回答
  • 1楼网友:鸽屿
  • 2021-02-04 23:49
public partitionmanager(dynamicpartitionconnections connections, string topologyinstanceid, zkstate state, map stormconf, spoutconfig spoutconfig, partition id) { _partition = id; _connections = connections; _spoutconfig = spoutconfig; _topologyinstanceid = topologyinstanceid; _consumer = connections.register(id.host, id.partition); //注册partition到connections,并生成simpleconsumer _state = state; _stormconf = stormconf; string jsontopologyid = null; long jsonoffset = null; string path = committedpath(); try { map json = _state.readjson(path); log.info("read partition information from: " + path + " --> " + json ); if (json != null) { jsontopologyid = (string) ((map) json.get("topology")).get("id"); jsonoffset = (long) json.get("offset"); // 从zk中读出commited offset } } catch (throwable e) { log.warn("error reading and/or parsing at zknode: " + path, e); } if (jsontopologyid == null || jsonoffset == null) { // zk中没有记录,那么根据spoutconfig.startoffsettime设置offset,earliest或latest _committedto = kafkautils.getoffset(_consumer, spoutconfig.topic, id.partition, spoutconfig); log.info("no partition information found, using configuration to determine offset"); } else if (!topologyinstanceid.equals(jsontopologyid) && spoutconfig.forcefromstart) { _committedto = kafkautils.getoffset(_consumer, spoutconfig.topic, id.partition, spoutconfig.startoffsettime); log.info("topology change detected and reset from start forced, using configuration to determine offset"); } else { _committedto = jsonoffset; } _emittedtooffset = _committedto; // 初始化时,中间状态都是一致的 }
我要举报
如以上回答内容为低俗、色情、不良、暴力、侵权、涉及违法等信息,可以点下面链接进行举报!
点此我要举报以上问答信息
大家都在看
推荐资讯