Rich Lee
2021/8
rich.lee@cathayholdings.com
@rich04230
RICH0423
Publish–subscribe is a async messaging pattern where senders of messages
one to one
one to many
AWS SQS & SNS
AWS Kinesis
Google Cloud Pub/Sub
Azure Service Bus
ActiveMQ
Redis Pub/Sub
Kafka
Cloud Pub/Sub is a fully-managed real-time messaging service that allows you to send and receive messages independent applications.
Cloud Pub/Sub Core Concepts
publisher
subscriber
ACK
Publisher-subscriber relationships
Cloud Pub/Sub Topic:
gcloud pubsub topics create topic1
gcloud pubsub topics list
gcloud pubsub subscriptions create sub1 \
--topic topic1
Cloud Pub/Sub Subscription:
gcloud pubsub topics list-subscriptions topic1
gcloud pubsub topics publish topic1 \
--message "Hello pub/sub"
gcloud pubsub subscriptions pull sub1 --auto-ack
Install the client lib
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>libraries-bom</artifactId>
<version>20.9.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
</dependency>
</dependencies>
Google PubSub Client Lib - Publishing messages
public class PublishService {
private static final String PROJECT_ID = "sample1";
private static final String TOPIC_NAME = "topic1";
public void publishFeedback(Feedback feedback) throws Exception {
ObjectMapper mapper = new ObjectMapper();
String feedbackMessage = mapper.writeValueAsString(feedback);
TopicName topicName = TopicName.create(PROJECT_ID, TOPIC_NAME);
Publisher publisher = null;
ApiFuture<String> messageIdFuture = null;
try {
publisher = Publisher.defaultBuilder(topicName).build();
ByteString data = ByteString.copyFromUtf8(feedbackMessage);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
messageIdFuture = publisher.publish(pubsubMessage);
} finally {
String messageId = messageIdFuture.get();
if (publisher != null) {
// When finished with the publisher, shutdown to free up resources.
publisher.shutdown();
}
}
}
}
Google PubSub Client Lib- Receiving messages
public static void subscribeAsyncExample(String projectId, String subscriptionId) {
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, subscriptionId);
// Instantiate an asynchronous message receiver.
MessageReceiver receiver =
(PubsubMessage message, AckReplyConsumer consumer) -> {
// Handle incoming message, then ack the received message.
System.out.println("Id: " + message.getMessageId());
System.out.println("Data: " + message.getData().toStringUtf8());
consumer.ack();
};
Subscriber subscriber = null;
try {
subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
// Start the subscriber.
subscriber.startAsync().awaitRunning();
System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
// Allow the subscriber to run for 30s unless an unrecoverable error occurs.
subscriber.awaitTerminated(30, TimeUnit.SECONDS);
} catch (TimeoutException timeoutException) {
// Shut down the subscriber after 30s. Stop receiving messages.
subscriber.stopAsync();
}
}
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
</dependency>
Publishing messages
public void publishMessage() {
this.pubSubTemplate.publish("topic", "your message payload",
ImmutableMap.of("key1", "val1"));
}
Receiving messages
Subscriber subscriber = this.pubSubTemplate.subscribe(subscriptionName, (message) -> {
log.info("Message received from {} subscription: {}",
subscriptionName, message.getPubsubMessage().getData().toStringUtf8());
message.ack();
});
Collection<AcknowledgeablePubsubMessage> messages = this.pubSubTemplate.pull(
subscriptionName, 10, true);
ListenableFuture<Void> ackFuture = this.pubSubTemplate.ack(messages);
ackFuture.get();
messages.stream().
map(AcknowledgeablePubsubMessage::getPubsubMessage).
forEach(m -> log.info("MessageId: {}, data: {}", m.getMessageId(),
m.getData().toStringUtf8()));