Introducing the all-new TIBCO Community site!

For current users, please click "Sign In" to reset your password and access the enhanced features. If you're a first-time visitor, we extend a warm welcome—click "Sign Up" to become a part of the TIBCO Community!

If you're seeking alternative community sites, explore ibi, Jaspersoft, and Spotfire.

Jump to content

Trying to consume Kafka messages in 5.13 Java Code component


Brent McMillan

Recommended Posts

I'm trying to write some java code in 5.13 to consume a Kafka topic, as there isn't a plugin for BW5. I had it working but somehow I've broken it. The below code triggers the exception "Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:825)"

 

The Kafka jars inC:Program Filestibcotpcl5.10lib are:

kafka-clients-2.5.0.jar

kafka-json-serializer-5.5.2.jar

 

and here's the code:

package ProcessDefinition;

import java.util.*;

import java.io.*;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.apache.kafka.clients.consumer.Consumer;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;

import org.apache.kafka.common.serialization.StringDeserializer;

public class ProcessDefinitionJavaCode{

/****** START SET/GET METHOD, DO NOT MODIFY *****/

protected String groupid = "";

protected String topic = "";

protected String bootstrap = "";

protected String keystore = "";

protected String keystorepassword = "";

protected String keypassword = "";

protected String out_var_1 = "";

public String getgroupid() {

return groupid;

}

public void setgroupid(String val) {

groupid = val;

}

public String gettopic() {

return topic;

}

public void settopic(String val) {

topic = val;

}

public String getbootstrap() {

return bootstrap;

}

public void setbootstrap(String val) {

bootstrap = val;

}

public String getkeystore() {

return keystore;

}

public void setkeystore(String val) {

keystore = val;

}

public String getkeystorepassword() {

return keystorepassword;

}

public void setkeystorepassword(String val) {

keystorepassword = val;

}

public String getkeypassword() {

return keypassword;

}

public void setkeypassword(String val) {

keypassword = val;

}

public String getout_var_1() {

return out_var_1;

}

public void setout_var_1(String val) {

out_var_1 = val;

}

/****** END SET/GET METHOD, DO NOT MODIFY *****/

public ProcessDefinitionJavaCode() {

}

public void invoke() throws Exception {

/* Available Variables: DO NOT MODIFY

In : String groupid

In : String topic

In : String bootstrap

In : String keystore

In : String keystorepassword

In : String keypassword

Out : String out_var_1

* Available Variables: DO NOT MODIFY *****/

Properties props = new Properties();

props.put(ConsumerConfig.GROUP_ID_CONFIG, groupid);

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

props.put("security.protocol", "SSL");

props.put("ssl.truststore.location", keystore);

props.put("ssl.keystore.location", keystore);

props.put("ssl.truststore.password", keystorepassword);

props.put("ssl.keystore.password" ,keystorepassword);

props.put("ssl.key.password", keypassword);

Consumer consumer = new KafkaConsumer (props);

consumer.subscribe(Arrays.asList(topic));

int temp=0;

while(temp==0) {

ConsumerRecords records = consumer.poll(Duration.ofMillis(100));

for (ConsumerRecord record : records) {

String key = record.key();

String value = record.value();

out_var_1 += value;

temp=1;

}

}

consumer.close();}

}

Link to comment
Share on other sites

Now this gets stranger..

 

The same code, on a colleagues machine, works and is able to connect to Kafka, and I am still unable to connect. In case there was some kind of persistence, I restarted my machine, but still no luck. If anyone has an idea, I'd appreciate it

 

Thanks

Link to comment
Share on other sites

×
×
  • Create New...