Trying to consume Kafka messages in 5.13 Java Code component

I'm trying to write some java code in 5.13 to consume a Kafka topic, as there isn't a plugin for BW5. I had it working but somehow I've broken it. The below code triggers the exception "Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:825)"

 

The Kafka jars in C:\Program Files\tibco\tpcl\5.10\lib are:

kafka-clients-2.5.0.jar

kafka-json-serializer-5.5.2.jar

 

and here's the code:

package ProcessDefinition;

import java.util.*;

import java.io.*;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.apache.kafka.clients.consumer.Consumer;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;

import org.apache.kafka.common.serialization.StringDeserializer;

public class ProcessDefinitionJavaCode{

/****** START SET/GET METHOD, DO NOT MODIFY *****/

    protected String groupid = "";

    protected String topic = "";

    protected String bootstrap = "";

    protected String keystore = "";

    protected String keystorepassword = "";

    protected String keypassword = "";

    protected String out_var_1 = "";

    public String getgroupid() {

        return groupid;

    }

    public void setgroupid(String val) {

        groupid = val;

    }

    public String gettopic() {

        return topic;

    }

    public void settopic(String val) {

        topic = val;

    }

    public String getbootstrap() {

        return bootstrap;

    }

    public void setbootstrap(String val) {

        bootstrap = val;

    }

    public String getkeystore() {

        return keystore;

    }

    public void setkeystore(String val) {

        keystore = val;

    }

    public String getkeystorepassword() {

        return keystorepassword;

    }

    public void setkeystorepassword(String val) {

        keystorepassword = val;

    }

    public String getkeypassword() {

        return keypassword;

    }

    public void setkeypassword(String val) {

        keypassword = val;

    }

    public String getout_var_1() {

        return out_var_1;

    }

    public void setout_var_1(String val) {

        out_var_1 = val;

    }

/****** END SET/GET METHOD, DO NOT MODIFY *****/

     public ProcessDefinitionJavaCode() {

    }

    public void invoke() throws Exception {

/* Available Variables: DO NOT MODIFY

    In  : String groupid

    In  : String topic

    In  : String bootstrap

    In  : String keystore

    In  : String keystorepassword

    In  : String keypassword

    Out : String out_var_1

* Available Variables: DO NOT MODIFY *****/

   Properties props = new Properties();

   props.put(ConsumerConfig.GROUP_ID_CONFIG, groupid);

   props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

   props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);

   props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

   props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

   props.put("security.protocol", "SSL");

   props.put("ssl.truststore.location", keystore);

   props.put("ssl.keystore.location", keystore);

   props.put("ssl.truststore.password", keystorepassword);

   props.put("ssl.keystore.password" ,keystorepassword);

   props.put("ssl.key.password", keypassword);

   Consumer <String, String> consumer = new KafkaConsumer <> (props);

   consumer.subscribe(Arrays.asList(topic));

    int temp=0;

     while(temp==0) {

        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

        for (ConsumerRecord<String, String> record : records) {

          String key = record.key();

          String value = record.value();

      out_var_1 += value;

      temp=1;

        }

   }

consumer.close();}

}

1 Comment

Now this gets stranger..

The same code, on a colleagues machine, works and is able to connect to Kafka, and I am still unable to connect. In case there was some kind of persistence, I restarted my machine, but still no luck. If anyone has an idea, I'd appreciate it

Thanks

brent.mcmillan - Nov 03, 2020 - 10:32pm
+ Add a Comment
Login