Opening new connection after Connection Draining. Google Cloud Messaging
I am somewhat new to Google Cloud Messaging. We have been working with it for a couple of months but just recently we have been getting "Connection Draining" messages. When this happens all communication stops.
Google says: https://developer.android.com/google/gcm/ccs.html#response
When you receive a CONNECTION_DRAINING message, you should immediately begin sending messages to another CCS connection, opening a new connection if necessary. You should, however, keep the original connection open and continue receiving messages that may come over the connection (and ACKing them)—CCS will handle initiating a connection close when it is ready.
My question is
I am surprised this isn't already put into play in their example code. It seems like its pretty much everything you need. Is it already done for me in the code and I am missing it?
I don't have a main method in my code, I user servlets as triggers instead. My connection is initailized like this
@PostConstruct
public void init() throws Exception{
try {
smackCcsClient.connect(Long.parseLong(env.getProperty("gcm.api")), env.getProperty("gcm.key"));
}catch (IOException e ){
e.printStackTrace();
}catch(SmackException e){
e.printStackTrace();
}catch(XMPPException e){
e.printStackTrace();
}
}
however after this I never touch the connection again. Am I handling this wrong, is the connection something I should be touching more frequently or something I need to keep track of?
_______________________ADDED AFTER THE QUESTION_________________________
I added a connection inside of their example code to try to reinitialize a connection. It looks like this:
if ("CONNECTION_DRAINING".equals(controlType)) {
connectionDraining = true;
//Open new connection because old connection will be closing or is already closed.
try {
connect(Long.parseLong(env.getProperty("gcm.api")), env.getProperty("gcm.key"));
} catch (XMPPException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (SmackException e) {
e.printStackTrace();
}
} else {
logger.log(Level.INFO, "Unrecognized control type: %s. This could happen if new features are " + "added to the CCS protocol.",
controlType);
}
我已经编写了一个处理这种情况的代码(基本上将新的下游消息转移到一个新的连接)......未经彻底测试......
import java.util.Deque;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedDeque;
import javax.net.ssl.SSLSocketFactory;
import org.jivesoftware.smack.ConnectionConfiguration;
import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode;
import org.jivesoftware.smack.ConnectionListener;
import org.jivesoftware.smack.PacketInterceptor;
import org.jivesoftware.smack.PacketListener;
import org.jivesoftware.smack.SmackException.NotConnectedException;
import org.jivesoftware.smack.XMPPConnection;
import org.jivesoftware.smack.filter.PacketTypeFilter;
import org.jivesoftware.smack.packet.DefaultPacketExtension;
import org.jivesoftware.smack.packet.Message;
import org.jivesoftware.smack.packet.Packet;
import org.jivesoftware.smack.packet.PacketExtension;
import org.jivesoftware.smack.provider.PacketExtensionProvider;
import org.jivesoftware.smack.provider.ProviderManager;
import org.jivesoftware.smack.tcp.XMPPTCPConnection;
import org.jivesoftware.smack.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xmlpull.v1.XmlPullParser;
import com.fasterxml.jackson.core.type.TypeReference;
/**
* Based on https://developer.android.com/google/gcm/ccs.html#smack
*
* @author Abhinav.Dwivedi
*
*/
public class SmackCcsClient implements CcsClient {
private static final Logger logger = LoggerFactory.getLogger(SmackCcsClient.class);
private static final String GCM_SERVER = "gcm.googleapis.com";
private static final int GCM_PORT = 5235;
private static final String GCM_ELEMENT_NAME = "gcm";
private static final String GCM_NAMESPACE = "google:mobile:data";
private static volatile SmackCcsClient instance;
static {
ProviderManager.addExtensionProvider(GCM_ELEMENT_NAME, GCM_NAMESPACE, new PacketExtensionProvider() {
@Override
public PacketExtension parseExtension(XmlPullParser parser) throws Exception {
String json = parser.nextText();
return new GcmPacketExtension(json);
}
});
}
private final Deque<Channel> channels;
public static SmackCcsClient instance() {
if (instance == null) {
synchronized (SmackCcsClient.class) {
if (instance == null) {
instance = new SmackCcsClient();
}
}
}
return instance;
}
private SmackCcsClient() {
channels = new ConcurrentLinkedDeque<Channel>();
channels.addFirst(connect());
}
private class Channel {
private XMPPConnection connection;
/**
* Indicates whether the connection is in draining state, which means that it will not accept any new downstream
* messages.
*/
private volatile boolean connectionDraining = false;
/**
* Sends a packet with contents provided.
*/
private void send(String jsonRequest) throws NotConnectedException {
Packet request = new GcmPacketExtension(jsonRequest).toPacket();
connection.sendPacket(request);
}
private void handleControlMessage(Map<String, Object> jsonObject) {
logger.debug("handleControlMessage(): {}", jsonObject);
String controlType = (String) jsonObject.get("control_type");
if ("CONNECTION_DRAINING".equals(controlType)) {
connectionDraining = true;
} else {
logger.info("Unrecognized control type: {}. This could happen if new features are "
+ "added to the CCS protocol.", controlType);
}
}
}
/**
* Sends a downstream message to GCM.
*
*/
@Override
public void sendDownstreamMessage(String message) throws Exception {
Channel channel = channels.peekFirst();
if (channel.connectionDraining) {
synchronized (channels) {
channel = channels.peekFirst();
if (channel.connectionDraining) {
channels.addFirst(connect());
channel = channels.peekFirst();
}
}
}
channel.send(message);
logger.debug("Message Sent via CSS: ({})", message);
}
/**
* Handles an upstream data message from a device application.
*
*/
protected void handleUpstreamMessage(Map<String, Object> jsonObject) {
// PackageName of the application that sent this message.
String category = (String) jsonObject.get("category");
String from = (String) jsonObject.get("from");
@SuppressWarnings("unchecked")
Map<String, String> payload = (Map<String, String>) jsonObject.get("data");
logger.info("Message received from device: category ({}), from ({}), payload: ({})", category, from,
JsonUtil.toJson(payload));
}
/**
* Handles an ACK.
*
* <p>
* Logs a INFO message, but subclasses could override it to properly handle ACKs.
*/
public void handleAckReceipt(Map<String, Object> jsonObject) {
String messageId = (String) jsonObject.get("message_id");
String from = (String) jsonObject.get("from");
logger.debug("handleAckReceipt() from: {}, messageId: {}", from, messageId);
}
/**
* Handles a NACK.
*
* <p>
* Logs a INFO message, but subclasses could override it to properly handle NACKs.
*/
protected void handleNackReceipt(Map<String, Object> jsonObject) {
String messageId = (String) jsonObject.get("message_id");
String from = (String) jsonObject.get("from");
logger.debug("handleNackReceipt() from: {}, messageId: ", from, messageId);
}
/**
* Creates a JSON encoded ACK message for an upstream message received from an application.
*
* @param to
* RegistrationId of the device who sent the upstream message.
* @param messageId
* messageId of the upstream message to be acknowledged to CCS.
* @return JSON encoded ack.
*/
protected static String createJsonAck(String to, String messageId) {
Map<String, Object> message = new HashMap<String, Object>();
message.put("message_type", "ack");
message.put("to", to);
message.put("message_id", messageId);
return JsonUtil.toJson(message);
}
/**
* Connects to GCM Cloud Connection Server using the supplied credentials.
*
* @return
*/
@Override
public Channel connect() {
try {
Channel channel = new Channel();
ConnectionConfiguration config = new ConnectionConfiguration(GCM_SERVER, GCM_PORT);
config.setSecurityMode(SecurityMode.enabled);
config.setReconnectionAllowed(true);
config.setRosterLoadedAtLogin(false);
config.setSendPresence(false);
config.setSocketFactory(SSLSocketFactory.getDefault());
channel.connection = new XMPPTCPConnection(config);
channel.connection.connect();
channel.connection.addConnectionListener(new LoggingConnectionListener());
// Handle incoming packets
channel.connection.addPacketListener(new PacketListener() {
@Override
public void processPacket(Packet packet) {
logger.debug("Received: ({})", packet.toXML());
Message incomingMessage = (Message) packet;
GcmPacketExtension gcmPacket = (GcmPacketExtension) incomingMessage.getExtension(GCM_NAMESPACE);
String json = gcmPacket.getJson();
try {
Map<String, Object> jsonObject = JacksonUtil.DEFAULT.mapper().readValue(json,
new TypeReference<Map<String, Object>>() {});
// present for ack, nack and control, null otherwise
Object messageType = jsonObject.get("message_type");
if (messageType == null) {
// Normal upstream data message
handleUpstreamMessage(jsonObject);
// Send ACK to CCS
String messageId = (String) jsonObject.get("message_id");
String from = (String) jsonObject.get("from");
String ack = createJsonAck(from, messageId);
channel.send(ack);
} else if ("ack".equals(messageType.toString())) {
// Process Ack
handleAckReceipt(jsonObject);
} else if ("nack".equals(messageType.toString())) {
// Process Nack
handleNackReceipt(jsonObject);
} else if ("control".equals(messageType.toString())) {
// Process control message
channel.handleControlMessage(jsonObject);
} else {
logger.error("Unrecognized message type ({})", messageType.toString());
}
} catch (Exception e) {
logger.error("Failed to process packet ({})", packet.toXML(), e);
}
}
}, new PacketTypeFilter(Message.class));
// Log all outgoing packets
channel.connection.addPacketInterceptor(new PacketInterceptor() {
@Override
public void interceptPacket(Packet packet) {
logger.debug("Sent: {}", packet.toXML());
}
}, new PacketTypeFilter(Message.class));
channel.connection.login(ExternalConfig.gcmSenderId() + "@gcm.googleapis.com", ExternalConfig.gcmApiKey());
return channel;
} catch (Exception e) {
logger.error(Logging.FATAL, "Error in creating channel for GCM communication", e);
throw new RuntimeException(e);
}
}
/**
* XMPP Packet Extension for GCM Cloud Connection Server.
*/
private static final class GcmPacketExtension extends DefaultPacketExtension {
private final String json;
public GcmPacketExtension(String json) {
super(GCM_ELEMENT_NAME, GCM_NAMESPACE);
this.json = json;
}
public String getJson() {
return json;
}
@Override
public String toXML() {
return String.format("<%s xmlns="%s">%s</%s>", GCM_ELEMENT_NAME, GCM_NAMESPACE,
StringUtils.escapeForXML(json), GCM_ELEMENT_NAME);
}
public Packet toPacket() {
Message message = new Message();
message.addExtension(this);
return message;
}
}
private static final class LoggingConnectionListener implements ConnectionListener {
@Override
public void connected(XMPPConnection xmppConnection) {
logger.info("Connected.");
}
@Override
public void authenticated(XMPPConnection xmppConnection) {
logger.info("Authenticated.");
}
@Override
public void reconnectionSuccessful() {
logger.info("Reconnecting..");
}
@Override
public void reconnectionFailed(Exception e) {
logger.error("Reconnection failed.. ", e);
}
@Override
public void reconnectingIn(int seconds) {
logger.info("Reconnecting in {} secs", seconds);
}
@Override
public void connectionClosedOnError(Exception e) {
logger.info("Connection closed on error.");
}
@Override
public void connectionClosed() {
logger.info("Connection closed.");
}
}
}
I'm also new in GCM and facing the same problem...I solved it by creating new SmackCcsClient() on CONNECTION_DRAINING message. Older connection should still exists and receiving messages, but not sending because:
protected volatile boolean connectionDraining = true;
Google says that connection will be closed by CCS:
CCS will handle initiating a connection close when it is ready.
Until connection is closed by CCS you will be able to receive messages from both connections, but able to send messages with just new one. When old connection is closed it should be destroyed, I'm not sure if garbage collector is called or not...trying to solve this issue
PS: I'm not 100% sure with this answer, but maybe it will open more space for discussion.
I just pushed the code for the FCM Connection Draining to my FCM XMPP Server example.
Project: XMPP Connection Server for FCM using the latest version of the Smack library (4.2.2) + Connection Draining Implementation.
GitHub link: https://github.com/carlosCharz/fcmxmppserverv2
Youtube link: https://youtu.be/KVKEj6PeLTc
If you had some problems check my troubleshooting section. Hope you can find it useful. Greetings!
链接地址: http://www.djcxy.com/p/23488.html