Vertx EventBus
local eventbus
依赖 implementation ‘io.vertx:vertx-core:4.2.7’ 即可
package io.github.viakiba.hinx.eventbus.vertx;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.EventBus;
public class VertxEventBus {
// https://vertx.io/docs/vertx-core/java/#_the_event_bus_api
public static void main(String[] args) {
EventBus eventBus = Vertx.vertx().eventBus();
eventBus.localConsumer("test", msg -> {
System.out.println("receive msg: " + msg.body());
});
eventBus.localConsumer("test", msg -> {
System.out.println("receive msg1: " + msg.body());
});
// 1 对多 发布消息
eventBus.publish("test", "hello world1");
// 一对一 发布消息
eventBus.send("test", "hello world");
}
}
remote eventbus
依赖
远端 eventbus 依赖 vertx 的集群实现,接下来的用力以 zookeeper 为集群注册中心为例。增加下述依赖:
implementation 'io.vertx:vertx-zookeeper:4.2.7' exclude group: 'org.apache.zookeeper', module: 'zookeeper'
implementation 'org.apache.zookeeper:zookeeper:3.5.8'
并增加 logback 支持,并使用默认配置。
implementation group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.11'
zookeeper server
https://archive.apache.org/dist/zookeeper/zookeeper-3.5.8/
启动 zkServer.sh start xxxx配置(conf)
这里的server 与上面依赖的包 对应。
创建集群
文件层级关系
.
├── config
│ └── zookeeper.json
└── src
├── main
│ ├── kotlin
│ │ ├── Main.kt
│ │ ├── MainEventBusSend.kt
│ │ ├── MainEventBusSub.kt
│ │ └── verticle
│ │ ├── Verticle.kt
│ │ ├── VerticleSend.kt
│ │ └── VerticleSub.kt
│ └── resources
│ └── logback.xml
基础的集群实现
// kotlin代码
import io.vertx.core.Vertx
import io.vertx.core.VertxOptions
import io.vertx.core.spi.cluster.ClusterManager
import io.vertx.spi.cluster.zookeeper.ZookeeperClusterManager
// jvm启动参数 -Dvertx.zookeeper.config=./config/zookeeper.json
fun main() {
val mgr: ClusterManager = ZookeeperClusterManager()
val options = VertxOptions().setClusterManager(mgr)
var clusteredVertx = Vertx.clusteredVertx(options)
clusteredVertx.onSuccess {
val vertx = it
// 全路径的包名+类名
vertx.deployVerticle("verticle.Verticle")
}.onFailure {
println("Failed to deploy verticle")
}
}
eventbus 生产者 verticle
// VerticleSend.kt 文件
package verticle
import io.vertx.core.*
import io.vertx.core.Verticle
class VerticleSend : AbstractVerticle() {
override fun start() {
var eventBus = vertx.eventBus()
var myCodec = MyCodec()
eventBus.registerCodec( myCodec)
vertx.createHttpServer().requestHandler { req ->
//只发给一个订阅者
eventBus.send("verticle.publish", "send Hello from VerticleSub")
// 所有订阅者都会处理
eventBus.publish("verticle.publish", "publish Hello from VerticleSub")
eventBus.request<String>("verticle.publish", "send Hello from VerticleSub"){
if (it.succeeded()) {
println(it.result().body())
} else {
println(it.cause().message)
}
}
// 指定 codec
val options = DeliveryOptions().setCodecName(myCodec.name())
eventBus.send("verticle.publish.codec", "codec send Hello from VerticleSub", options)
req.response().end("Hello from Vert.x-Web!")
}.listen(8080)
}
}
class MyCodec : io.vertx.core.eventbus.MessageCodec<String, String> {
override fun encodeToWire(buffer: Buffer, s: String) {
buffer.appendString(s)
}
override fun decodeFromWire(pos: Int, buffer: Buffer): String {
return buffer.getString(pos, buffer.length())
}
override fun transform(s: String): String {
return s
}
override fun name(): String {
return "myCodec"
}
override fun systemCodecID(): Byte {
return -1
}
}
eventbus 消费者 verticle
// VerticleSub.kt 文件
package verticle
import io.vertx.core.*
import io.vertx.core.Verticle
class VerticleSub : AbstractVerticle() {
override fun start() {
var eventBus = vertx.eventBus()
eventBus.consumer<String>("verticle.publish") { message ->
println("Received message: ${message.body()}")
message.reply("Ok Received message: ${message.body()}")
}
var myCodec = MyCodec()
eventBus.registerCodec(myCodec)
eventBus.consumer<String>("verticle.publish.codec") { message ->
println("1111111111111111111 Received message: ${message.body()}")
}
}
}
两个进程分开部署生产者与消费之 verticle
// MainEventBusSend.kt 文件
import io.vertx.core.Vertx
import io.vertx.core.VertxOptions
import io.vertx.core.spi.cluster.ClusterManager
import io.vertx.spi.cluster.zookeeper.ZookeeperClusterManager
fun main() {
val mgr: ClusterManager = ZookeeperClusterManager()
val options = VertxOptions().setClusterManager(mgr)
var clusteredVertx = Vertx.clusteredVertx(options)
clusteredVertx.onSuccess {
val vertx = it
vertx.deployVerticle("verticle.VerticleSend")
}.onFailure {
println("Failed to deploy verticle")
}
}
// MainEventBusSub.kt 文件
import io.vertx.core.Vertx
import io.vertx.core.VertxOptions
import io.vertx.core.spi.cluster.ClusterManager
import io.vertx.spi.cluster.zookeeper.ZookeeperClusterManager
fun main() {
val mgr: ClusterManager = ZookeeperClusterManager()
val options = VertxOptions().setClusterManager(mgr)
var clusteredVertx = Vertx.clusteredVertx(options)
clusteredVertx.onSuccess {
val vertx = it
vertx.deployVerticle("verticle.VerticleSub")
}.onFailure {
println("Failed to deploy verticle")
}
}
启动上述两个main方法。访问 127.0.0.1:8080 即可访问触发。
Guava EventBus
手动注册或者扫描包注册
package io.github.viakiba.hinx.eventbus.guava;
import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import org.reflections.scanners.Scanners;
import org.reflections.util.ClasspathHelper;
import org.reflections.util.ConfigurationBuilder;
import java.lang.reflect.Method;
import java.util.Set;
import java.util.concurrent.Executors;
class LoginEvent {
private String username;
private String age;
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getAge() {
return age;
}
public void setAge(String age) {
this.age = age;
}
}
class LoginEventHandler {
public LoginEventHandler(){}
@Subscribe
public void LoginEventHandler(LoginEvent msg) {
System.out.println("String msg: " + msg.getUsername());
}
}
class LoginEventHandler1 {
public LoginEventHandler1(){}
@Subscribe
public void LoginEventHandler(LoginEvent msg) {
System.out.println("1String msg: " + msg.getUsername());
}
}
public class GuavaEventBusTest {
private static EventBus eventBus = new EventBus();
private static EventBus asyncEventBus = new AsyncEventBus("xxx", Executors.newSingleThreadExecutor());
public static void main(String[] args) throws Exception {
test1();
test2();
System.exit(0);
}
private static void test1() {
eventBus.register(new LoginEventHandler());
eventBus.register(new LoginEventHandler1());
asyncEventBus.register(new LoginEventHandler());
asyncEventBus.register(new LoginEventHandler1());
LoginEvent loginEvent = new LoginEvent();
loginEvent.setAge("12");
loginEvent.setUsername("viakiba");
asyncEventBus.post(loginEvent);
eventBus.post(loginEvent);
}
private static void test2() throws Exception {
org.reflections.Reflections reflections = new org.reflections.Reflections(
new ConfigurationBuilder()
.setUrls(ClasspathHelper.forPackage("io.github.viakiba.hinx.eventbus.guava"))
.addScanners(Scanners.MethodsAnnotated));
Set<Method> annotatedWith = reflections.getMethodsAnnotatedWith(Subscribe.class);
for (Method p : annotatedWith) {
Object o = p.getDeclaringClass().getConstructor().newInstance();
eventBus.register(o);
asyncEventBus.register(o);
}
LoginEvent loginEvent = new LoginEvent();
loginEvent.setAge("12");
loginEvent.setUsername("viakiba");
asyncEventBus.post(loginEvent);
eventBus.post(loginEvent);
}
}
Go 手动实现
package observer_test
import (
"fmt"
"testing"
)
func TestObserver(t *testing.T) {
observer.ObserverInstance.Register("test", func(args observer.Event) {
t.Log("test", args)
fmt.Println("xxxxxxxx")
})
observer.ObserverInstance.Register("test", func(args observer.Event) {
t.Log("test", args)
fmt.Println("YYYYYYYY")
})
event := observer.LoginEvent{EventNameStr: "test", UserIdStr: "xxxx"}
observer.ObserverInstance.Notify(event)
}
实现
package observer
// event 定义
type Event interface {
EventName() string
UserId() string
}
// event 例子
type LoginEvent struct {
UserIdStr string
EventNameStr string
}
func (loginEvent LoginEvent) EventName() string {
return loginEvent.EventNameStr
}
func (loginEvent LoginEvent) UserId() string {
return loginEvent.UserIdStr
}
// 事件监听 接口定义
type Observer interface {
Register(string, ExecuteFunction)
Notify(Event)
}
// 定义方法集合
type ExecuteFunction func(event Event)
type ExecuteCollection struct {
Collection []ExecuteFunction
}
type ObserverImpl struct {
observers map[string]*ExecuteCollection
}
func (o ObserverImpl) Register(eventName string, executeFunction ExecuteFunction) {
collection, ok := o.observers[eventName]
if !ok {
collection = &ExecuteCollection{}
collection.Collection = append(collection.Collection, executeFunction)
o.observers[eventName] = collection
} else {
collection.Collection = append(collection.Collection, executeFunction)
}
}
func (o ObserverImpl) Notify(event Event) {
collection, ok := o.observers[event.EventName()]
if !ok {
return
}
for _, function := range collection.Collection {
function(event)
}
}
var ObserverInstance ObserverImpl = ObserverImpl{
observers: make(map[string]*ExecuteCollection),
}