停止Twitter流并返回与twitter4j的状态列表
使用Twitter4j提供的代码示例,我想在收集完1000个状态列表后停止流,并返回此列表。我怎样才能做到这一点?停止Twitter流并返回与twitter4j的状态列表
public class Stream { public List<Status> execute throws TwitterException {
List<Status> statuses = new ArrayList();
ConfigurationBuilder cb = new ConfigurationBuilder();
cb.setDebugEnabled(true);
cb.setOAuthConsumerKey("bbb");
cb.setOAuthConsumerSecret("bbb");
cb.setOAuthAccessToken("bbb");
cb.setOAuthAccessTokenSecret("bbb");
TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
StatusListener listener = new StatusListener() {
public void onStatus(Status status) {
statuses.add(status);
if (statuses.size>1000){
//return statuses. Obviously that's not the correct place for a return statement...
}
}
public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
System.out.println("Got a status deletion notice id:" + statusDeletionNotice.getStatusId());
}
public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
System.out.println("Got track limitation notice:" + numberOfLimitedStatuses);
}
public void onScrubGeo(long userId, long upToStatusId) {
System.out.println("Got scrub_geo event userId:" + userId + " upToStatusId:" + upToStatusId);
}
public void onException(Exception ex) {
ex.printStackTrace();
}
};
FilterQuery fq = new FilterQuery();
String keywords[] = {"Keyword 1", "Keyword 2"};
fq.track(keywords);
twitterStream.addListener(listener);
twitterStream.filter(fq);
}
回答:
强制异步代码进入同步模式并不是一个好主意。请参阅https://stackoverflow.com/a/5934656/276263 看看您是否可以重新修改您的逻辑。
但是,下面的代码按照您的要求工作。
public class Stream { public static void main(String[] args) throws TwitterException {
Stream stream = new Stream();
stream.execute();
}
private final Object lock = new Object();
public List<Status> execute() throws TwitterException {
final List<Status> statuses = new ArrayList();
ConfigurationBuilder cb = new ConfigurationBuilder();
cb.setDebugEnabled(true);
cb.setOAuthConsumerKey("bbb");
cb.setOAuthConsumerSecret("bbb");
cb.setOAuthAccessToken("bbb");
cb.setOAuthAccessTokenSecret("bbb");
TwitterStream twitterStream = new TwitterStreamFactory(cb.build())
.getInstance();
StatusListener listener = new StatusListener() {
public void onStatus(Status status) {
statuses.add(status);
System.out.println(statuses.size() + ":" + status.getText());
if (statuses.size() > 100) {
synchronized (lock) {
lock.notify();
}
System.out.println("unlocked");
}
}
public void onDeletionNotice(
StatusDeletionNotice statusDeletionNotice) {
System.out.println("Got a status deletion notice id:"
+ statusDeletionNotice.getStatusId());
}
public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
System.out.println("Got track limitation notice:"
+ numberOfLimitedStatuses);
}
public void onScrubGeo(long userId, long upToStatusId) {
System.out.println("Got scrub_geo event userId:" + userId
+ " upToStatusId:" + upToStatusId);
}
public void onException(Exception ex) {
ex.printStackTrace();
}
@Override
public void onStallWarning(StallWarning sw) {
System.out.println(sw.getMessage());
}
};
FilterQuery fq = new FilterQuery();
String keywords[] = { "federer", "nadal", "#Salute" };
fq.track(keywords);
twitterStream.addListener(listener);
twitterStream.filter(fq);
try {
synchronized (lock) {
lock.wait();
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("returning statuses");
twitterStream.shutdown();
return statuses;
}
}
回答:
有两种方式:
如果你只需要完成消耗流(这是调用监听器线程),你可以使用twitterStream.cleanUp();
线程。这将优雅地停止线程。您可能希望在状态侦听器中使用boolean stopped
变量,以便忽略此事件后您将收到的任何呼叫。
您也可以通过调用twitterStream.shutdown();
关闭流消耗螺纹,其调度线程进行(这是一个守护进程线程)沿然而,这是Twitter的API结束通信的更残酷的方式。虽然它的工作原理,如果你从监听器里调用这个,我会考虑使用一个BlockingQueue
作为中介,利用监听到Status
对象添加到它宁愿建议的方法,通过@krishnakumarp
回答:
。
一旦流开始,您可以开始从队列中取出Status
es,直到您拥有所需的一千个。
作为一个起点,它看起来像下面这样:
public class Stream { private static final int TOTAL_TWEETS = 1000;
public List<Status> execute() throws TwitterException {
// skipped for brevity...
// TODO: You may have to tweak the capacity of the queue, depends on the filter query
final BlockingQueue<Status> statuses = new LinkedBlockingQueue<Status>(10000);
final StatusListener listener = new StatusListener() {
public void onStatus(Status status) {
statuses.offer(status); // Add received status to the queue
}
// etc...
};
final FilterQuery fq = new FilterQuery();
final String keywords[] = {"Keyword 1", "Keyword 2"};
fq.track(keywords);
twitterStream.addListener(listener);
twitterStream.filter(fq);
// Collect the 1000 statues
final List<Status> collected = new ArrayList<Status>(TOTAL_TWEETS);
while (collected.size() < TOTAL_TWEETS) {
// TODO: Handle InterruptedException
final Status status = statuses.poll(10, TimeUnit.SECONDS);
if (status == null) {
// TODO: Consider hitting this too often could indicate no further Tweets
continue;
}
collected.add(status);
}
twitterStream.shutdown();
return collected;
}
}
以上是 停止Twitter流并返回与twitter4j的状态列表 的全部内容, 来源链接: utcz.com/qa/265591.html