Jump to content

Trying to consume Kafka messages in 5.13 Java Code component

Brent McMillan

Recommended Posts

I'm trying to write some java code in 5.13 to consume a Kafka topic, as there isn't a plugin for BW5. I had it working but somehow I've broken it. The below code triggers the exception "Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:825)"


The Kafka jars inC:Program Filestibcotpcl5.10lib are:




and here's the code:

package ProcessDefinition;

import java.util.*;

import java.io.*;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.apache.kafka.clients.consumer.Consumer;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;

import org.apache.kafka.common.serialization.StringDeserializer;

public class ProcessDefinitionJavaCode{


protected String groupid = "";

protected String topic = "";

protected String bootstrap = "";

protected String keystore = "";

protected String keystorepassword = "";

protected String keypassword = "";

protected String out_var_1 = "";

public String getgroupid() {

return groupid;


public void setgroupid(String val) {

groupid = val;


public String gettopic() {

return topic;


public void settopic(String val) {

topic = val;


public String getbootstrap() {

return bootstrap;


public void setbootstrap(String val) {

bootstrap = val;


public String getkeystore() {

return keystore;


public void setkeystore(String val) {

keystore = val;


public String getkeystorepassword() {

return keystorepassword;


public void setkeystorepassword(String val) {

keystorepassword = val;


public String getkeypassword() {

return keypassword;


public void setkeypassword(String val) {

keypassword = val;


public String getout_var_1() {

return out_var_1;


public void setout_var_1(String val) {

out_var_1 = val;



public ProcessDefinitionJavaCode() {


public void invoke() throws Exception {

/* Available Variables: DO NOT MODIFY

In : String groupid

In : String topic

In : String bootstrap

In : String keystore

In : String keystorepassword

In : String keypassword

Out : String out_var_1

* Available Variables: DO NOT MODIFY *****/

Properties props = new Properties();

props.put(ConsumerConfig.GROUP_ID_CONFIG, groupid);

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

props.put("security.protocol", "SSL");

props.put("ssl.truststore.location", keystore);

props.put("ssl.keystore.location", keystore);

props.put("ssl.truststore.password", keystorepassword);

props.put("ssl.keystore.password" ,keystorepassword);

props.put("ssl.key.password", keypassword);

Consumer consumer = new KafkaConsumer (props);


int temp=0;

while(temp==0) {

ConsumerRecords records = consumer.poll(Duration.ofMillis(100));

for (ConsumerRecord record : records) {

String key = record.key();

String value = record.value();

out_var_1 += value;






Link to comment
Share on other sites

Now this gets stranger..


The same code, on a colleagues machine, works and is able to connect to Kafka, and I am still unable to connect. In case there was some kind of persistence, I restarted my machine, but still no luck. If anyone has an idea, I'd appreciate it



Link to comment
Share on other sites

Create an account or sign in to comment

You need to be a member in order to leave a comment

Create an account

Sign up for a new account in our community. It's easy!

Register a new account

Sign in

Already have an account? Sign in here.

Sign In Now
  • Create New...