package com.appleframework.flume.ng.kafka.source;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.conf.ConfigurationException;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/appleframework/flume/ng/kafka/source/KafkaSource.class */
public class KafkaSource extends AbstractSource implements Configurable, PollableSource {
    private static final Logger log = LoggerFactory.getLogger(KafkaSource.class);
    private ConsumerConnector consumer;
    private ConsumerIterator<byte[], byte[]> it;
    private String topic;

    public PollableSource.Status process() throws EventDeliveryException {
        ArrayList arrayList = new ArrayList();
        try {
            if (this.it.hasNext()) {
                byte[] bArr = (byte[]) this.it.next().message();
                SimpleEvent simpleEvent = new SimpleEvent();
                HashMap hashMap = new HashMap();
                hashMap.put("timestamp", String.valueOf(System.currentTimeMillis()));
                log.debug("Message: {}", new String(bArr));
                simpleEvent.setBody(bArr);
                simpleEvent.setHeaders(hashMap);
                arrayList.add(simpleEvent);
            }
            getChannelProcessor().processEventBatch(arrayList);
            return PollableSource.Status.READY;
        } catch (Exception e) {
            log.error("KafkaSource EXCEPTION, {}", e.getMessage());
            return PollableSource.Status.BACKOFF;
        }
    }

    public void configure(Context context) {
        this.topic = context.getString("topic");
        if (this.topic == null) {
            throw new ConfigurationException("Kafka topic must be specified.");
        }
        try {
            this.consumer = KafkaSourceUtil.getConsumer(context);
        } catch (IOException e) {
            log.error("IOException occur, {}", e.getMessage());
        } catch (InterruptedException e2) {
            log.error("InterruptedException occur, {}", e2.getMessage());
        }
        HashMap hashMap = new HashMap();
        hashMap.put(this.topic, new Integer(1));
        Map createMessageStreams = this.consumer.createMessageStreams(hashMap);
        if (createMessageStreams == null) {
            throw new ConfigurationException("topicCountMap is null");
        }
        List list = (List) createMessageStreams.get(this.topic);
        if (list == null || list.isEmpty()) {
            throw new ConfigurationException("topicList is null or empty");
        }
        this.it = ((KafkaStream) list.get(0)).iterator();
    }

    public synchronized void stop() {
        this.consumer.shutdown();
        super.stop();
    }
}
