/*
 * Decompiled with CFR 0.152.
 */
package org.apache.streams.twitter.provider;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.Uninterruptibles;
import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.Hosts;
import com.twitter.hbc.core.HttpHosts;
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
import com.twitter.hbc.core.endpoint.StatusesFirehoseEndpoint;
import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
import com.twitter.hbc.core.endpoint.StreamingEndpoint;
import com.twitter.hbc.core.endpoint.UserstreamEndpoint;
import com.twitter.hbc.core.processor.HosebirdMessageProcessor;
import com.twitter.hbc.httpclient.BasicClient;
import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.BasicAuth;
import com.twitter.hbc.httpclient.auth.OAuth1;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigMergeable;
import com.typesafe.config.ConfigParseOptions;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.io.Serializable;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang.NotImplementedException;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfiguration;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.DatumStatus;
import org.apache.streams.core.DatumStatusCountable;
import org.apache.streams.core.DatumStatusCounter;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.twitter.TwitterStreamConfiguration;
import org.apache.streams.twitter.provider.TwitterStreamProcessor;
import org.apache.streams.util.ComponentUtils;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TwitterStreamProvider
implements StreamsProvider,
Serializable,
DatumStatusCountable {
    public static final String STREAMS_ID = "TwitterStreamProvider";
    private static final Logger LOGGER = LoggerFactory.getLogger(TwitterStreamProvider.class);
    public static final int MAX_BATCH = 1000;
    private TwitterStreamConfiguration config;
    protected volatile Queue<Future<List<StreamsDatum>>> providerQueue;
    protected Hosts hosebirdHosts;
    protected Authentication auth;
    protected StreamingEndpoint endpoint;
    protected BasicClient client;
    protected AtomicBoolean running = new AtomicBoolean(false);
    protected TwitterStreamProcessor processor = new TwitterStreamProcessor(this);
    private DatumStatusCounter countersCurrent = new DatumStatusCounter();
    private DatumStatusCounter countersTotal = new DatumStatusCounter();

    public static void main(String[] args) {
        Preconditions.checkArgument((args.length >= 2 ? 1 : 0) != 0);
        String configfile = args[0];
        String outfile = args[1];
        Config reference = ConfigFactory.load();
        File conf_file = new File(configfile);
        assert (conf_file.exists());
        Config testResourceConfig = ConfigFactory.parseFileAnySyntax((File)conf_file, (ConfigParseOptions)ConfigParseOptions.defaults().setAllowMissing(false));
        Config typesafe = testResourceConfig.withFallback((ConfigMergeable)reference).resolve();
        StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration((Config)typesafe);
        TwitterStreamConfiguration config = (TwitterStreamConfiguration)new ComponentConfigurator(TwitterStreamConfiguration.class).detectConfiguration(typesafe, "twitter");
        TwitterStreamProvider provider = new TwitterStreamProvider(config);
        StreamsJacksonMapper mapper = StreamsJacksonMapper.getInstance((List)Lists.newArrayList((Object[])new String[]{"EEE MMM dd HH:mm:ss Z yyyy"}));
        PrintStream outStream = null;
        try {
            outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
        }
        catch (FileNotFoundException e) {
            LOGGER.error("FileNotFoundException", (Throwable)e);
            return;
        }
        provider.prepare(config);
        provider.startStream();
        do {
            Uninterruptibles.sleepUninterruptibly((long)streamsConfiguration.getBatchFrequencyMs(), (TimeUnit)TimeUnit.MILLISECONDS);
            for (StreamsDatum datum : provider.readCurrent()) {
                try {
                    String json = mapper.writeValueAsString(datum.getDocument());
                    outStream.println(json);
                }
                catch (JsonProcessingException e) {
                    System.err.println(e.getMessage());
                }
            }
        } while (provider.isRunning());
        provider.cleanUp();
        outStream.flush();
    }

    public TwitterStreamConfiguration getConfig() {
        return this.config;
    }

    public void setConfig(TwitterStreamConfiguration config) {
        this.config = config;
    }

    private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
        return new ThreadPoolExecutor(nThreads, nThreads, 5000L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
    }

    public TwitterStreamProvider() {
        this.config = (TwitterStreamConfiguration)new ComponentConfigurator(TwitterStreamConfiguration.class).detectConfiguration(StreamsConfigurator.config, "twitter");
    }

    public TwitterStreamProvider(TwitterStreamConfiguration config) {
        this.config = config;
    }

    public String getId() {
        return STREAMS_ID;
    }

    public void startStream() {
        this.client.connect();
        this.running.set(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized StreamsResultSet readCurrent() {
        StreamsResultSet current;
        TwitterStreamProvider twitterStreamProvider = this;
        synchronized (twitterStreamProvider) {
            LinkedBlockingDeque drain = Queues.newLinkedBlockingDeque();
            this.drainTo(drain);
            current = new StreamsResultSet((Queue)drain);
            current.setCounter(new DatumStatusCounter());
            current.getCounter().add(this.countersCurrent);
            this.countersTotal.add(this.countersCurrent);
            this.countersCurrent = new DatumStatusCounter();
        }
        return current;
    }

    public StreamsResultSet readNew(BigInteger sequence) {
        throw new NotImplementedException();
    }

    public StreamsResultSet readRange(DateTime start, DateTime end) {
        throw new NotImplementedException();
    }

    public boolean isRunning() {
        return this.running.get() && !this.client.isDone();
    }

    public void prepare(Object o) {
        Preconditions.checkNotNull((Object)this.config.getEndpoint());
        if (this.config.getEndpoint().equals("userstream")) {
            this.hosebirdHosts = new HttpHosts("https://userstream.twitter.com");
            UserstreamEndpoint userstreamEndpoint = new UserstreamEndpoint();
            userstreamEndpoint.withFollowings(true);
            userstreamEndpoint.withUser(false);
            userstreamEndpoint.allReplies(false);
            this.endpoint = userstreamEndpoint;
        } else if (this.config.getEndpoint().equals("sample")) {
            boolean follow;
            this.hosebirdHosts = new HttpHosts("https://stream.twitter.com");
            boolean track = this.config.getTrack() != null && !this.config.getTrack().isEmpty();
            boolean bl = follow = this.config.getFollow() != null && !this.config.getFollow().isEmpty();
            if (track || follow) {
                LOGGER.debug("***\tPRESENT\t***");
                StatusesFilterEndpoint statusesFilterEndpoint = new StatusesFilterEndpoint();
                if (track) {
                    statusesFilterEndpoint.trackTerms(this.config.getTrack());
                }
                if (follow) {
                    statusesFilterEndpoint.followings(this.config.getFollow());
                }
                this.endpoint = statusesFilterEndpoint;
            } else {
                this.endpoint = new StatusesSampleEndpoint();
            }
        } else if (this.config.getEndpoint().endsWith("firehose")) {
            this.hosebirdHosts = new HttpHosts("https://stream.twitter.com");
            this.endpoint = new StatusesFirehoseEndpoint();
        } else {
            LOGGER.error("NO ENDPOINT RESOLVED");
            return;
        }
        if (this.config.getBasicauth() != null) {
            Preconditions.checkNotNull((Object)this.config.getBasicauth().getUsername());
            Preconditions.checkNotNull((Object)this.config.getBasicauth().getPassword());
            this.auth = new BasicAuth(this.config.getBasicauth().getUsername(), this.config.getBasicauth().getPassword());
        } else if (this.config.getOauth() != null) {
            Preconditions.checkNotNull((Object)this.config.getOauth().getConsumerKey());
            Preconditions.checkNotNull((Object)this.config.getOauth().getConsumerSecret());
            Preconditions.checkNotNull((Object)this.config.getOauth().getAccessToken());
            Preconditions.checkNotNull((Object)this.config.getOauth().getAccessTokenSecret());
            this.auth = new OAuth1(this.config.getOauth().getConsumerKey(), this.config.getOauth().getConsumerSecret(), this.config.getOauth().getAccessToken(), this.config.getOauth().getAccessTokenSecret());
        } else {
            LOGGER.error("NO AUTH RESOLVED");
            return;
        }
        LOGGER.debug("host={}\tendpoint={}\taut={}", new Object[]{this.hosebirdHosts, this.endpoint, this.auth});
        this.providerQueue = new LinkedBlockingQueue<Future<List<StreamsDatum>>>(1000);
        this.client = new ClientBuilder().name("apache/streams/streams-contrib/streams-provider-twitter").hosts(this.hosebirdHosts).endpoint(this.endpoint).authentication(this.auth).connectionTimeout(1200000).processor((HosebirdMessageProcessor)this.processor).build();
    }

    public void cleanUp() {
        this.client.stop();
        this.processor.cleanUp();
        this.running.set(false);
    }

    public DatumStatusCounter getDatumStatusCounter() {
        return this.countersTotal;
    }

    protected boolean addDatum(Future<List<StreamsDatum>> future) {
        try {
            ComponentUtils.offerUntilSuccess(future, this.providerQueue);
            this.countersCurrent.incrementStatus(DatumStatus.SUCCESS);
            return true;
        }
        catch (Exception e) {
            this.countersCurrent.incrementStatus(DatumStatus.FAIL);
            LOGGER.warn("Unable to enqueue item from Twitter stream");
            return false;
        }
    }

    protected void drainTo(Queue<StreamsDatum> drain) {
        int count = 0;
        while (!this.providerQueue.isEmpty() && count <= 1000) {
            for (StreamsDatum datum : this.pollForDatum()) {
                ComponentUtils.offerUntilSuccess((Object)datum, drain);
                ++count;
            }
        }
    }

    protected List<StreamsDatum> pollForDatum() {
        try {
            return this.providerQueue.poll().get();
        }
        catch (InterruptedException e) {
            LOGGER.warn("Interrupted while waiting for future.  Initiate shutdown.");
            this.cleanUp();
            Thread.currentThread().interrupt();
            return new ArrayList<StreamsDatum>();
        }
        catch (ExecutionException e) {
            LOGGER.warn("Error getting tweet from future");
            return new ArrayList<StreamsDatum>();
        }
    }
}

