Brent McMillan Posted November 3, 2020 Posted November 3, 2020 I'm trying to write some java code in 5.13 to consume a Kafka topic, as there isn't a plugin for BW5. I had it working but somehow I've broken it. The below code triggers the exception "Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:825)" The Kafka jars inC:Program Filestibcotpcl5.10lib are: kafka-clients-2.5.0.jar kafka-json-serializer-5.5.2.jar and here's the code: package ProcessDefinition; import java.util.*; import java.io.*; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import org.apache.kafka.common.serialization.StringDeserializer; public class ProcessDefinitionJavaCode{ /****** START SET/GET METHOD, DO NOT MODIFY *****/ protected String groupid = ""; protected String topic = ""; protected String bootstrap = ""; protected String keystore = ""; protected String keystorepassword = ""; protected String keypassword = ""; protected String out_var_1 = ""; public String getgroupid() { return groupid; } public void setgroupid(String val) { groupid = val; } public String gettopic() { return topic; } public void settopic(String val) { topic = val; } public String getbootstrap() { return bootstrap; } public void setbootstrap(String val) { bootstrap = val; } public String getkeystore() { return keystore; } public void setkeystore(String val) { keystore = val; } public String getkeystorepassword() { return keystorepassword; } public void setkeystorepassword(String val) { keystorepassword = val; } public String getkeypassword() { return keypassword; } public void setkeypassword(String val) { keypassword = val; } public String getout_var_1() { return out_var_1; } public void setout_var_1(String val) { out_var_1 = val; } /****** END SET/GET METHOD, DO NOT MODIFY *****/ public ProcessDefinitionJavaCode() { } public void invoke() throws Exception { /* Available Variables: DO NOT MODIFY In : String groupid In : String topic In : String bootstrap In : String keystore In : String keystorepassword In : String keypassword Out : String out_var_1 * Available Variables: DO NOT MODIFY *****/ Properties props = new Properties(); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupid); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put("security.protocol", "SSL"); props.put("ssl.truststore.location", keystore); props.put("ssl.keystore.location", keystore); props.put("ssl.truststore.password", keystorepassword); props.put("ssl.keystore.password" ,keystorepassword); props.put("ssl.key.password", keypassword); Consumer consumer = new KafkaConsumer (props); consumer.subscribe(Arrays.asList(topic)); int temp=0; while(temp==0) { ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord record : records) { String key = record.key(); String value = record.value(); out_var_1 += value; temp=1; } } consumer.close();} }
Brent McMillan Posted November 4, 2020 Author Posted November 4, 2020 Now this gets stranger.. The same code, on a colleagues machine, works and is able to connect to Kafka, and I am still unable to connect. In case there was some kind of persistence, I restarted my machine, but still no luck. If anyone has an idea, I'd appreciate it Thanks
Recommended Posts
Create an account or sign in to comment
You need to be a member in order to leave a comment
Create an account
Sign up for a new account in our community. It's easy!
Register a new accountSign in
Already have an account? Sign in here.
Sign In Now