停止Twitter流并返回与twitter4j的状态列表

使用Twitter4j提供的代码示例,我想在收集完1000个状态列表后停止流,并返回此列表。我怎样才能做到这一点?停止Twitter流并返回与twitter4j的状态列表

public class Stream { 

public List<Status> execute throws TwitterException {

List<Status> statuses = new ArrayList();

ConfigurationBuilder cb = new ConfigurationBuilder();

cb.setDebugEnabled(true);

cb.setOAuthConsumerKey("bbb");

cb.setOAuthConsumerSecret("bbb");

cb.setOAuthAccessToken("bbb");

cb.setOAuthAccessTokenSecret("bbb");

TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).getInstance();

StatusListener listener = new StatusListener() {

public void onStatus(Status status) {

statuses.add(status);

if (statuses.size>1000){

//return statuses. Obviously that's not the correct place for a return statement...

}

}

public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {

System.out.println("Got a status deletion notice id:" + statusDeletionNotice.getStatusId());

}

public void onTrackLimitationNotice(int numberOfLimitedStatuses) {

System.out.println("Got track limitation notice:" + numberOfLimitedStatuses);

}

public void onScrubGeo(long userId, long upToStatusId) {

System.out.println("Got scrub_geo event userId:" + userId + " upToStatusId:" + upToStatusId);

}

public void onException(Exception ex) {

ex.printStackTrace();

}

};

FilterQuery fq = new FilterQuery();

String keywords[] = {"Keyword 1", "Keyword 2"};

fq.track(keywords);

twitterStream.addListener(listener);

twitterStream.filter(fq);

}

回答:

强制异步代码进入同步模式并不是一个好主意。请参阅https://stackoverflow.com/a/5934656/276263 看看您是否可以重新修改您的逻辑。

但是,下面的代码按照您的要求工作。

public class Stream { 

public static void main(String[] args) throws TwitterException {

Stream stream = new Stream();

stream.execute();

}

private final Object lock = new Object();

public List<Status> execute() throws TwitterException {

final List<Status> statuses = new ArrayList();

ConfigurationBuilder cb = new ConfigurationBuilder();

cb.setDebugEnabled(true);

cb.setOAuthConsumerKey("bbb");

cb.setOAuthConsumerSecret("bbb");

cb.setOAuthAccessToken("bbb");

cb.setOAuthAccessTokenSecret("bbb");

TwitterStream twitterStream = new TwitterStreamFactory(cb.build())

.getInstance();

StatusListener listener = new StatusListener() {

public void onStatus(Status status) {

statuses.add(status);

System.out.println(statuses.size() + ":" + status.getText());

if (statuses.size() > 100) {

synchronized (lock) {

lock.notify();

}

System.out.println("unlocked");

}

}

public void onDeletionNotice(

StatusDeletionNotice statusDeletionNotice) {

System.out.println("Got a status deletion notice id:"

+ statusDeletionNotice.getStatusId());

}

public void onTrackLimitationNotice(int numberOfLimitedStatuses) {

System.out.println("Got track limitation notice:"

+ numberOfLimitedStatuses);

}

public void onScrubGeo(long userId, long upToStatusId) {

System.out.println("Got scrub_geo event userId:" + userId

+ " upToStatusId:" + upToStatusId);

}

public void onException(Exception ex) {

ex.printStackTrace();

}

@Override

public void onStallWarning(StallWarning sw) {

System.out.println(sw.getMessage());

}

};

FilterQuery fq = new FilterQuery();

String keywords[] = { "federer", "nadal", "#Salute" };

fq.track(keywords);

twitterStream.addListener(listener);

twitterStream.filter(fq);

try {

synchronized (lock) {

lock.wait();

}

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

System.out.println("returning statuses");

twitterStream.shutdown();

return statuses;

}

}

回答:

有两种方式:

如果你只需要完成消耗流(这是调用监听器线程),你可以使用twitterStream.cleanUp();线程。这将优雅地停止线程。您可能希望在状态侦听器中使用boolean stopped变量,以便忽略此事件后您将收到的任何呼叫。

您也可以通过调用twitterStream.shutdown();关闭流消耗螺纹,其调度线程进行(这是一个守护进程线程)沿然而,这是Twitter的API结束通信的更残酷的方式。虽然它的工作原理,如果你从监听器里调用这个,我会考虑使用一个BlockingQueue作为中介,利用监听到Status对象添加到它宁愿建议的方法,通过@krishnakumarp

回答:

一旦流开始,您可以开始从队列中取出Status es,直到您拥有所需的一千个。

作为一个起点,它看起来像下面这样:

public class Stream { 

private static final int TOTAL_TWEETS = 1000;

public List<Status> execute() throws TwitterException {

// skipped for brevity...

// TODO: You may have to tweak the capacity of the queue, depends on the filter query

final BlockingQueue<Status> statuses = new LinkedBlockingQueue<Status>(10000);

final StatusListener listener = new StatusListener() {

public void onStatus(Status status) {

statuses.offer(status); // Add received status to the queue

}

// etc...

};

final FilterQuery fq = new FilterQuery();

final String keywords[] = {"Keyword 1", "Keyword 2"};

fq.track(keywords);

twitterStream.addListener(listener);

twitterStream.filter(fq);

// Collect the 1000 statues

final List<Status> collected = new ArrayList<Status>(TOTAL_TWEETS);

while (collected.size() < TOTAL_TWEETS) {

// TODO: Handle InterruptedException

final Status status = statuses.poll(10, TimeUnit.SECONDS);

if (status == null) {

// TODO: Consider hitting this too often could indicate no further Tweets

continue;

}

collected.add(status);

}

twitterStream.shutdown();

return collected;

}

}

以上是 停止Twitter流并返回与twitter4j的状态列表 的全部内容, 来源链接: utcz.com/qa/265591.html

回到顶部