/*
 * Decompiled with CFR 0.152.
 */
package org.apache.streams.twitter.provider;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.twitter.provider.TwitterEventClassifier;
import org.apache.streams.twitter.provider.TwitterStreamProvider;
import org.apache.streams.util.ComponentUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TwitterStreamProcessor
extends StringDelimitedProcessor {
    private static final Logger LOGGER = LoggerFactory.getLogger(TwitterStreamProcessor.class);
    private static final int DEFAULT_POOL_SIZE = 5;
    private final TwitterStreamProvider provider;
    private final ExecutorService service;

    public TwitterStreamProcessor(TwitterStreamProvider provider) {
        this(provider, 5);
    }

    public TwitterStreamProcessor(TwitterStreamProvider provider, int poolSize) {
        super(null);
        this.service = Executors.newFixedThreadPool(poolSize);
        this.provider = provider;
    }

    public boolean process() throws IOException, InterruptedException {
        String msg;
        do {
            if ((msg = this.processNextMessage()) != null) continue;
            Thread.sleep(10L);
        } while (msg == null);
        return this.provider.addDatum(this.service.submit(new StreamDeserializer(msg)));
    }

    public void cleanUp() {
        ComponentUtils.shutdownExecutor((ExecutorService)this.service, (int)1, (int)30);
    }

    protected static class StreamDeserializer
    implements Callable<List<StreamsDatum>> {
        protected static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
        protected String item;

        public StreamDeserializer(String item) {
            this.item = item;
        }

        @Override
        public List<StreamsDatum> call() throws Exception {
            if (this.item != null) {
                Class itemClass = TwitterEventClassifier.detectClass(this.item);
                Object document = mapper.readValue(this.item, itemClass);
                StreamsDatum rawDatum = new StreamsDatum(document);
                return Lists.newArrayList((Object[])new StreamsDatum[]{rawDatum});
            }
            return new ArrayList<StreamsDatum>();
        }
    }
}

