Commit 63da1a87 authored by A.P.R.C. Abeyrathna's avatar A.P.R.C. Abeyrathna

Application added

parent 4530358f
# Miscellaneous
*.class
*.log
*.pyc
*.swp
.DS_Store
.atom/
.buildlog/
.history
.svn/
# IntelliJ related
*.iml
*.ipr
*.iws
.idea/
# The .vscode folder contains launch configuration and tasks you configure in
# VS Code which you may wish to be included in version control, so this line
# is commented out by default.
#.vscode/
# Flutter/Dart/Pub related
**/doc/api/
**/ios/Flutter/.last_build_id
.dart_tool/
.flutter-plugins
.flutter-plugins-dependencies
.packages
.pub-cache/
.pub/
/build/
# Web related
lib/generated_plugin_registrant.dart
# Symbolication related
app.*.symbols
# Obfuscation related
app.*.map.json
# Android Studio will place build artifacts here
/android/app/debug
/android/app/profile
/android/app/release
.gradle
/local.properties
/.idea/caches
/.idea/libraries
/.idea/modules.xml
/.idea/workspace.xml
/.idea/navEditor.xml
/.idea/assetWizardSettings.xml
.DS_Store
/build
/captures
.externalNativeBuild
.cxx
local.properties
.idea
# This file tracks properties of this Flutter project.
# Used by Flutter tool to assess capabilities and perform upgrades etc.
#
# This file should be version controlled and should not be manually edited.
version:
revision: f4abaa0735eba4dfd8f33f73363911d63931fe03
channel: stable
project_type: app
This diff is collapsed.
To execute MQTT CLI simply open the Windows Command Prompt with ⊞Win + R and execute cmd.
Navigate into the extracted MQTT CLI folder and execute the mqtt-cli.exe command.
To quick start a MQTT CLI shell simply double-click the mqtt-cli-shell.cmd file.
\ No newline at end of file
@ECHO off
start /B %0\..\ mqtt-cli.exe shell
# MQTT-Encryption-Payload
Publish and Subscribe Payload Encryption Script (MQTT).
point 1– First we create an encryption key – cipher_key = Fernet.generate_key(). This key is used to encrypt and decrypt and we would need to use this same key on the receiving client. In our example the sender and receiver are the same client.
point 2-The message to be encrypted must be in bytes.
point 3: We need to create a UTF-8 encoded string to pass as the message payload to the MQTT publish method.
point 4– The received message is already in bytes and so we pass it straight to the decrypt function.
point 5: We then convert the decrypted byte message to a UTF-8 string as normal.
import time
import paho.mqtt.client as paho
from cryptography.fernet import Fernet
broker="broker.hivemq.com"
#broker="192.168.1.3"
#define callback
def on_log(client, userdata):
def on_message(client, userdata, message):
#time.sleep(1)
print("receive payload ",message.payload)
decrypted_message = cipher.decrypt(message.payload) #decrypted_message = cipher.decrypt(encrypted_message)
print("\nreceived message =",str(decrypted_message.decode("utf-8")))
client= paho.Client("client-pub")
client.on_log=on_log
client.on_message=on_message
#####encryption
#cipher_key = Fernet.generate_key()
cipher_key=b'WDrevvK8ZrPn8gmiNFjcOp2xovBr40TCwJlZOyI94IY='
cipher = Fernet(cipher_key)
message = b'on33'
#message = b'the quick brown fox jumps over the lazy dog'
encrypted_message = cipher.encrypt(message)
out_message=encrypted_message.decode()# turn it into a string to send
##
print("connecting to broker ",broker)
client.connect(broker)#connect
print("publishing encrypted message ",encrypted_message)
out_message="25 C"
client.publish("home/temp",out_message)#publish
time.sleep(4)
client.disconnect() #disconnect
client.loop_stop() #stop loop
import time
import paho.mqtt.client as paho
from cryptography.fernet import Fernet
broker="broker.hivemq.com"
#broker="192.168.1.3"
#define callback
def on_log(client, userdata):
def on_message(client, userdata, message):
#time.sleep(1)
print("receive payload ",message.payload)
decrypted_message = cipher.decrypt(message.payload) #decrypted_message = cipher.decrypt(encrypted_message)
print("\nreceived message =",str(decrypted_message.decode("utf-8")))
client= paho.Client("client-001")
client.on_log=on_log
######
client.on_message=on_message
#####encryption
cipher_key =b'WDrevvK8ZrPn8gmiNFjcOp2xovBr40TCwJlZOyI94IY='
cipher = Fernet(cipher_key)
print("connecting to broker ",broker)
client.connect(broker)#connect
client.loop_start() #start loop to process received messages
print("subscribing ")
client.subscribe("home/temp")#subscribe
count=0
while count <60:
time.sleep(1)
count+=1
client.disconnect() #disconnect
client.loop_stop() #stop loop
= HiveMQ Community Contributing Guidelines
== Introduction
Anyone is welcome to participate and contribute to the HiveMQ open source community. We encourage people to use the Github issue tracker and create pull requests.
We ask that contributions are focused on improving the current implementation and project scope.
We will not accept contributions that would add features that expand the scope of the project. It is HiveMQ’s intention to offer commercial products that will add features suited for enterprise deployments of HiveMQ. If you would like to suggest a new feature, please contact the project team (support@hivemq.com) to discuss if it fits within the project scope. If you wish to contribute, we will preferably accept Bug Fixes, Typos, Documentation, JavaDoc and Logging.
== Licensing
HiveMQ Open Source Projects are released under the Apache 2.0 license to allow you the use of the software as you please. By contributing your code, you agree to license your contribution under the terms of the Apache License, Version 2.0.
Please fill out our Contributor License Agreement (CLA), otherwise we cannot accept your contribution.
All files must contain the license header from the link:HEADER[header file].
== Contributor License Agreement
If you wish to contribute to one of our Open Source projects, please download, fill out and sign the https://www.hivemq.com/downloads/Contributor_License_Agreement.pdf[Contributor License Agreement]. Send the signed agreement to (contributing@hivemq.com) with the subject `CLA for Project: <Project Name>`. Please read this document carefully before signing it and keep a copy for your own records. Once we've received your signed CLA, you will be added to the list of contributors, and our HiveMQ Development Team will review your contributions.
In case the rights to all intellectual property created by you are maintained by your employer, including your work on HiveMQ Open Source Projects, you must ensure you have retained all rights to grant the license, or that your employer has agreed to a separate https://www.hivemq.com/downloads/Corporate_Contributor_License_Agreement.pdf[Corporate CLA].
== Contribution Guidelines
=== Issues
==== Bug Fixes and Minor Features
Please always open an issue in the issue tracker before sending a pull request and wait until someone from the HiveMQ Development Team approves it, before you start working on it. This prevents you from working on issues that we will ultimately not accept. When you start working on an approved issue, please make it known in the comments that you are going to work on it, so a simultaneous contribution by someone else can be avoided.
Upon commitment, your contribution will be reviewed by the HiveMQ Development Team. We reserve the right to release only reviewed and approved implementations.
==== Security Issues
Please do not report any security issue to the public issue tracker. Please send security issues to security@hivemq.com.
=== Branching model
- `master`: release branch, protected
- `develop` is merged into `master` by creating a merge commit if a new version is released
- The release is tagged with the version `vX.Y.Z`
- `develop`: snapshot branch, protected
- Contains features for the next release
- Feature/bugfix/... branches are merged into `develop` by rebasing and merging
- Every feature/bugfix/... will have its own branch
- Branched off from `develop`
- Pull request targeting the `develop` branch
- Mandatory code review of the pull request
- `gh-pages`: documentation branch, protected
=== Branching guidelines
- Branch types: feature, bugfix, improvement, cleanup (same as the label of a corresponding GitHub Issue)
- Branch names:
- Starting with type: `feature/`, `bugfix/`, `improvement/`, `cleanup/`
- \+ task: lower case, spaces replaced with `-`
=== Commit guidelines
- Commits should be as atomic as possible.
- Commit messages should describe the changes clearly.
- Commit messages should start with a capital letter for consistency.
- Commit messages should avoid exceeding the line length limit. Instead use multiple lines, each describing one specific
change.
==== Questions
Please refer all your questions about the HiveMQ Open Source Projects to the https://community.hivemq.com[HiveMQ Community Forum].
Issues consisting of questions or suggestions will be closed immediately.
=== Testing
The HiveMQ Open Source projects only accept pull requests that contain unit tests and have sufficient unit test coverage.
=== Pull Requests
As soon as your code is ready for a https://help.github.com/en/articles/creating-a-pull-request[pull request], please link the specific issue that you want to resolve. Once the continuous integration is successful and at least one member of the HiveMQ Development Team has approved the changes, you will be asked to rebase and squash before the pull request can be merged.
We greatly appreciate your involvement and contribution.
# MQTT.dart Client Example
## Overview
This is an MQTT client example project that showcases how you can use HiveMQ Cloud with the Dart Client. The example project covers the basic MQTT functionality: Connecting MQTT clients to your HiveMQ Cloud cluster, subscribing to topics and publishing data (sending and receiving messages using the MQTT protocol).
The Dart client used here supports MQTT v3.1 and v3.1.1.
You can find documentation for this client library here: https://www.hivemq.com/blog/mqtt-client-library-mqtt-dart/.
This example repository is easily and clearly structured, so you can start quickly:
This readme file is your starting point. Here we will describe what you have to do step-by-step to get started with this example.
[``main.dart``](bin/main.dart) in the ``bin`` directory is a simple implementation that demonstrates the core functionality of an MQTT Client.
Follow the instructions in the following paragraphs to get started yourself.
## HiveMQ Cloud
[HiveMQ](https://www.hivemq.com/) is the industry leader for enterprise-ready, beautifully scalable, large-scale IoT deployments with MQTT. We help companies connect things to the Internet. Our MQTT-based messaging platform ensures fast, reliable, and secure movement of data to and from connected IoT devices for companies around the world. HiveMQ Cloud is our fully-managed, cloud-native IoT messaging platform that makes trustworthy and scalable IoT device connectivity simple. You can learn more about HiveMQ Cloud on our [website](https://www.hivemq.com/mqtt-cloud-broker/), [documentation](https://www.hivemq.com/docs/hivemq-cloud/introduction.html) and our [blog posts](https://www.hivemq.com/tags/cloud/).
## Getting started
[By signing up](https://console.hivemq.cloud) for HiveMQ Cloud you will automatically get access to a HiveMQ Cloud Basic cluster. HiveMQ Cloud Basic is our smallest package that allows you to connect up to 100 MQTT clients for free and test the full MQTT functionality.
The [HiveMQ Cloud Quick Start Guide](https://www.hivemq.com/docs/hivemq-cloud/introduction.html#guide) gives you step-by-step instructions on how to set up your HiveMQ Cloud account, create clusters, and connect MQTT clients.
### Prerequisites
After signing up, you have a running HiveMQ Cloud cluster, that you can use in this example.
Now clone this repository into your local IDE.
For using the code examples, you need to install the necessary client library.
The correct dependencies are listed in the ``pubspec.yaml``.
Execute this command in the terminal of your IDE to get the right dependencies.
```sh
dart pub get
```
### Broker credentials
To define the HiveMQ Cloud cluster which should be targeted, you need to fill the placeholders in the code with your host name, username and password.
The <b>host name</b> can be found in the <b>Details</b> section of the <b>Overview</b> tab of your cluster.
![cluster overview](/img/hivemq-cloud-cluster-overview.png)
After the cluster is created, add a set of credentials that you can use in this example or future implementations.
Use any secure username and password you desire.
The <b>username</b> and <b>password</b> are the values used as <b>Client Credentials</b> that you just created.
![credentials](/img/hivemq-cloud-credentials.png)
### Code Examples
This example project covers the core functionality of an MQTT client interacting with HiveMQ Cloud.
To securely connect the MQTT client with HiveMQ Cloud you need to enable TLS.
Use your username and password, to authenticate your MQTT client at HiveMQ Cloud.
To connect the client, use the port 8883 that is standard for secure MQTT communication.
``'<your_name>'`` is the name you give your client, you can choose any name for this.
```dart
await client.connect('<your_username>', '<your_password>');
```
```dart
client = MqttServerClient.withPort('<your_host>', '<your_name>', <your_port>);
```
The code located inside [``main.dart``](bin/main.dart) connects to the configured HiveMQ Cloud Broker in a simple way.
This is a ready-set example that can simply be run after inputting your credentials.
Navigate to the ``bin`` directory and run [``main.dart``](bin/main.dart) in your terminal.
```sh
cd bin
dart main.dart
```
The different processes are all defined in their respective functions.
These get all called in the ``prepareMqttClient()`` function at the start, which in turn gets called by the ``main`` function with ``newclient.prepareMqttClient()``.
After connecting the client, the code first subscribes to the topic filter ``"Dart/Mqtt_client/testtopic"``.
That means the MQTT client receives all messages that are published to this [topic filter](https://www.hivemq.com/blog/mqtt-essentials-part-5-mqtt-topics-best-practices/).
```dart
void _subscribeToTopic(String topicName) {
print('Subscribing to the $topicName topic');
client.subscribe(topicName, MqttQos.atMostOnce);
// print the message when it is received
client.updates.listen((List<MqttReceivedMessage<MqttMessage>> c) {
final MqttPublishMessage recMess = c[0].payload;
var message = MqttPublishPayload.bytesToStringAsString(recMess.payload.message);
print('YOU GOT A NEW MESSAGE:');
print(message);
});
}
```
The ``_onSubscribed`` callback acts as a reassurance that the subscription worked.
Then the code publishes a message to the same topic with ``_publishMessage``.
The message gets printed to the terminal by the ``_subscribeToTopic`` method, that listens for incoming messages.
```dart
void _publishMessage(String message) {
final MqttClientPayloadBuilder builder = MqttClientPayloadBuilder();
builder.addString(message);
print('Publishing message "$message" to topic ${'Dart/Mqtt_client/testtopic'}');
client.publishMessage('Dart/Mqtt_client/testtopic', MqttQos.exactlyOnce, builder.payload);
}
```
## Learn more
If you want to learn more about MQTT, visit the [MQTT Essentials](https://www.hivemq.com/mqtt-essentials/) guide, that explains the core of MQTT concepts, its features and other essential information. Learn about Dart, a client-optimized programming language for fast apps on any platform, on their [website](https://dart.dev/). Also have a look at the [client library](https://pub.dev/packages/mqtt_client).
## Contributing
Please see our [contributing guidelines](./CONTRIBUTING.adoc) and [code of conduct](./code-of-conduct.md).
## License
[Apache 2.0](./LICENSE).
/*
* Copyright 2021 HiveMQ GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import 'dart:io';
import 'package:mqtt_client/mqtt_client.dart';
import 'package:mqtt_client/mqtt_server_client.dart';
main() {
MQTTClientWrapper newclient = new MQTTClientWrapper();
newclient.prepareMqttClient();
}
// connection states for easy identification
enum MqttCurrentConnectionState {
IDLE,
CONNECTING,
CONNECTED,
DISCONNECTED,
ERROR_WHEN_CONNECTING
}
enum MqttSubscriptionState {
IDLE,
SUBSCRIBED
}
class MQTTClientWrapper {
MqttServerClient client;
MqttCurrentConnectionState connectionState = MqttCurrentConnectionState.IDLE;
MqttSubscriptionState subscriptionState = MqttSubscriptionState.IDLE;
// using async tasks, so the connection won't hinder the code flow
void prepareMqttClient() async {
_setupMqttClient();
await _connectClient();
_subscribeToTopic('Dart/Mqtt_client/testtopic');
_publishMessage('Hello');
}
// waiting for the connection, if an error occurs, print it and disconnect
Future<void> _connectClient() async {
try {
print('client connecting....');
connectionState = MqttCurrentConnectionState.CONNECTING;
await client.connect('<your_username>', '<your_password>');
} on Exception catch (e) {
print('client exception - $e');
connectionState = MqttCurrentConnectionState.ERROR_WHEN_CONNECTING;
client.disconnect();
}
// when connected, print a confirmation, else print an error
if (client.connectionStatus.state == MqttConnectionState.connected) {
connectionState = MqttCurrentConnectionState.CONNECTED;
print('client connected');
} else {
print(
'ERROR client connection failed - disconnecting, status is ${client.connectionStatus}');
connectionState = MqttCurrentConnectionState.ERROR_WHEN_CONNECTING;
client.disconnect();
}
}
void _setupMqttClient() {
client = MqttServerClient.withPort('<your_host>', '<your_name>', <your_port>);
// the next 2 lines are necessary to connect with tls, which is used by HiveMQ Cloud
client.secure = true;
client.securityContext = SecurityContext.defaultContext;
client.keepAlivePeriod = 20;
client.onDisconnected = _onDisconnected;
client.onConnected = _onConnected;
client.onSubscribed = _onSubscribed;
}
void _subscribeToTopic(String topicName) {
print('Subscribing to the $topicName topic');
client.subscribe(topicName, MqttQos.atMostOnce);
// print the message when it is received
client.updates.listen((List<MqttReceivedMessage<MqttMessage>> c) {
final MqttPublishMessage recMess = c[0].payload;
var message = MqttPublishPayload.bytesToStringAsString(recMess.payload.message);
print('YOU GOT A NEW MESSAGE:');
print(message);
});
}
void _publishMessage(String message) {
final MqttClientPayloadBuilder builder = MqttClientPayloadBuilder();
builder.addString(message);
print('Publishing message "$message" to topic ${'Dart/Mqtt_client/testtopic'}');
client.publishMessage('Dart/Mqtt_client/testtopic', MqttQos.exactlyOnce, builder.payload);
}
// callbacks for different events
void _onSubscribed(String topic) {
print('Subscription confirmed for topic $topic');
subscriptionState = MqttSubscriptionState.SUBSCRIBED;
}
void _onDisconnected() {
print('OnDisconnected client callback - Client disconnection');
connectionState = MqttCurrentConnectionState.DISCONNECTED;
}
void _onConnected() {
connectionState = MqttCurrentConnectionState.CONNECTED;
print('OnConnected client callback - Client connection was sucessful');
}
}
# Contributor Covenant Code of Conduct
Please refer to our HiveMQ [Code of Conduct](https://github.com/hivemq/hivemq-community/blob/master/code-of-conduct.md).
name: mqtt
version: 1.0.1
environment:
sdk: '>=2.1.0 <3.0.0'
dependencies:
mqtt_client: ^7.2.1
ini: '>=1.0.0'
test: '>=0.12.0'
---
name: Bug report
about: Create a report to help us improve
title: ''
labels: ''
assignees: ''
---
## Expected behavior
## Actual behavior
## To Reproduce
### Steps
### Reproducer code
## Details
- Affected HiveMQ MQTT Client version(s):
- Used JVM version:
- Used OS (name and version):
- Used MQTT version:
- Used MQTT broker (name and version):
---
name: Feature request
about: Suggest an idea for this project
title: ''
labels: ''
assignees: ''
---
## Problem or use case
## Preferred solution or suggestions
name: CI Check
on: [ push ]
concurrency:
group: ${{ github.ref }}
cancel-in-progress: true
jobs:
check:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Setup Java
uses: actions/setup-java@v2
with:
distribution: 'adopt'
java-version: '8'
- name: Check
run: ./gradlew check javadoc
\ No newline at end of file
name: Publish to Maven Central
on:
release:
types: [ published ]
jobs:
publish:
environment: mavenCentralPublish
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Setup Java
uses: actions/setup-java@v2
with:
distribution: 'adopt'
java-version: '8'
- name: Publish to Maven Central
env:
ORG_GRADLE_PROJECT_signKey: ${{ secrets.SIGN_KEY }}
ORG_GRADLE_PROJECT_signKeyPass: ${{ secrets.SIGN_KEY_PASS }}
ORG_GRADLE_PROJECT_sonatypeUsername: ${{ secrets.SONATYPE_USERNAME }}
ORG_GRADLE_PROJECT_sonatypePassword: ${{ secrets.SONATYPE_PASSWORD }}
run: ./gradlew publishToSonatype closeAndReleaseSonatypeStagingRepository
# Gradle
.gradle
build/
# IntelliJ
out/
*.iml
.idea/*
!.idea/codeStyles
!.idea/inspectionProfiles
!.idea/runConfigurations
.java-version
.DS_Store
# HiveMQ Code of Conduct
Please refer to our [HiveMQ Code of Conduct](https://github.com/hivemq/hivemq-community/blob/master/code-of-conduct.md).
# Contributing
## Contributing to the HiveMQ Community Projects
Welcome to the HiveMQ Community! Glad to see your interest in contributing to HiveMQ MQTT Client.
Please checkout our [Contribution Guide](https://github.com/hivemq/hivemq-community/blob/master/CONTRIBUTING.adoc) to
make sure your contribution will be accepted by the HiveMQ team.
For information on how the HiveMQ Community is organized and how contributions will be accepted please have a look at
our [HiveMQ Community Repo](https://github.com/hivemq/hivemq-community).
## Contributing to HiveMQ MQTT Client
### External contributors
If you would like to contribute code, do the following:
- Fork the repository on GitHub
- Open a pull request targeting the `develop` branch
### License
By contributing your code, you agree to license your contribution under the terms of the
[Apache License, Version 2.0](https://github.com/hivemq/hivemq-mqtt-client/blob/develop/LICENSE).
All files must contain the license header from the
[HEADER](https://github.com/hivemq/hivemq-mqtt-client/blob/develop/HEADER) file.
### Branching model
- `master`: release branch, protected
- `develop` is merged into `master` by creating a merge commit if a new version is released
- The release is tagged with the version `vX.Y.Z`
- `develop`: snapshot branch, protected
- Contains features for the next release
- Feature/bugfix/... branches are merged into `develop` by rebasing and merging
- Every feature/bugfix/... will have its own branch
- Branched off from `develop`
- Pull request targeting the `develop` branch
- Mandatory code review of the pull request
- `gh-pages`: documentation branch, protected
### Branching guidelines
- Branch types: feature, bugfix, improvement, cleanup (same as the label of a corresponding GitHub Issue)
- Branch names:
- Starting with type: `feature/`, `bugfix/`, `improvement/`, `cleanup/`
- \+ task: lower case, spaces replaced with `-`
### Commit guidelines
- Commits should be as atomic as possible.
- Commit messages should describe the changes clearly.
- Commit messages should start with a capital letter for consistency.
- Commit messages should avoid exceeding the line length limit. Instead use multiple lines, each describing one specific
change.
### Code style guidelines
- The project uses Nullability annotations to avoid NullPointerExceptions: `@NotNull`, `@Nullable`.
Every non-primitive parameter/return type/field should be annotated with one of them.
- For IntelliJ IDEA the codeStyleConfig and the inspectionProfile are provided in the `.idea` folder.
Copyright 2018-present HiveMQ and the HiveMQ Community
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
plugins {
id("java-platform")
}
/* ******************** metadata ******************** */
description = "Adds dependencies for the HiveMQ MQTT Client epoll module"
metadata {
moduleName.set("com.hivemq.client.mqtt.epoll")
readableName.set("HiveMQ MQTT Client epoll module")
}
/* ******************** dependencies ******************** */
javaPlatform {
allowDependencies()
}
dependencies {
api(rootProject)
}
configurations.runtime {
extendsFrom(rootProject.configurations["epollImplementation"])
}
plugins {
id("java")
}
/* ******************** metadata ******************** */
description = "Examples using the HiveMQ MQTT Client"
metadata {
moduleName.set("com.hivemq.client.mqtt.examples")
readableName.set("HiveMQ MQTT Client examples")
}
/* ******************** dependencies ******************** */
dependencies {
implementation(rootProject)
}
/*
* Copyright 2018-present HiveMQ and the HiveMQ Community
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hivemq.client.mqtt.examples;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
import java.util.concurrent.TimeUnit;
/**
* Small completely asynchronous example.
*
* @author Silvio Giebl
*/
public class AsyncDemo {
public static void main(final String[] args) throws InterruptedException {
final Mqtt5AsyncClient client = Mqtt5Client.builder().serverHost("broker.hivemq.com").buildAsync();
client.connect()
.thenAccept(connAck -> System.out.println("connected " + connAck))
.thenCompose(v -> client.publishWith().topic("demo/topic/b").qos(MqttQos.EXACTLY_ONCE).send())
.thenAccept(publishResult -> System.out.println("published " + publishResult))
.thenCompose(v -> client.disconnect())
.thenAccept(v -> System.out.println("disconnected"));
System.out.println("see that everything above is async");
for (int i = 0; i < 5; i++) {
TimeUnit.MILLISECONDS.sleep(50);
System.out.println("...");
}
}
}
/*
* Copyright 2018-present HiveMQ and the HiveMQ Community
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hivemq.client.mqtt.examples;
import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck;
import com.hivemq.client.mqtt.mqtt5.message.disconnect.Mqtt5DisconnectReasonCode;
import com.hivemq.client.mqtt.mqtt5.message.subscribe.Mqtt5RetainHandling;
import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck;
import java.util.concurrent.CountDownLatch;
/**
* Shows MQTT 5 features like session expiry, message expiry, user properties, topic aliases, flow control.
*
* @author Silvio Giebl
*/
// @formatter:off
public class Mqtt5Features {
public static void main(final String[] args)throws InterruptedException {
final Mqtt5AsyncClient client = Mqtt5Client.builder()
.serverHost("broker.hivemq.com")
.automaticReconnectWithDefaultConfig()
.buildAsync();
final Mqtt5ConnAck connAck = client.toBlocking().connectWith()
.cleanStart(false) // resume a previous session
.sessionExpiryInterval(30) // keep session state for 30s
.restrictions()
.receiveMaximum(10) // receive max. 10 concurrent messages
.sendMaximum(10) // send max. 10 concurrent messages
.maximumPacketSize(10_240) // receive messages with max size of 10KB
.sendMaximumPacketSize(10_240) // send messages with max size of 10KB
.topicAliasMaximum(0) // the server should not use topic aliases
.sendTopicAliasMaximum(8) // use up to 8 aliases for the most used topics (automatically traced)
.applyRestrictions()
.willPublish()
.topic("demo/topic/will")
.qos(MqttQos.EXACTLY_ONCE)
.payload("rip".getBytes())
.contentType("text/plain") // our payload is text
.messageExpiryInterval(120) // not so important, expire message after 2min if can not be delivered
.delayInterval(30) // delay sending out the will message so we can try to reconnect immediately
.userProperties() // add some user properties to the will message
.add("sender", "demo-sender-1")
.add("receiver", "you")
.applyUserProperties()
.applyWillPublish()
.send();
System.out.println("connected " + connAck);
final Mqtt5SubAck subAck = client.subscribeWith()
.topicFilter("demo/topic/a")
.noLocal(true) // we do not want to receive our own message
.retainHandling(Mqtt5RetainHandling.DO_NOT_SEND) // do not send retained messages
.retainAsPublished(true) // keep the retained flag as it was published
.callback(publish -> System.out.println("received message: " + publish))
.send().join();
System.out.println("subscribed " + subAck);
client.toBlocking().publishWith()
.topic("demo/topic/a")
.qos(MqttQos.EXACTLY_ONCE)
.payload("payload".getBytes())
.retain(true)
.contentType("text/plain") // our payload is text
.messageExpiryInterval(120) // not so important, expire message after 2min if can not be delivered
.userProperties() // add some user properties to the message
.add("sender", "demo-sender-1")
.add("receiver", "you")
.applyUserProperties()
.send();
System.out.println("published: we do not receive our own messages");
// setup a latch to wait for 1 message
final CountDownLatch countDownLatch = new CountDownLatch(1);
client.publishes(MqttGlobalPublishFilter.ALL, publish -> countDownLatch.countDown());
final Mqtt5BlockingClient client2 = Mqtt5Client.builder().serverHost("broker.hivemq.com").buildBlocking();
client2.connect();
client2.publishWith()
.topic("demo/topic/a")
.retain(true)
.userProperties()
.add("sender", "demo-sender-2")
.add("receiver", "you")
.applyUserProperties()
.send();
client2.disconnect();
System.out.println("client2 published: waiting for message to be received");
countDownLatch.await();
System.out.println("received message from client2: see the user property sender, also see that retain=true as requested");
client.toBlocking().disconnectWith()
.reasonCode(Mqtt5DisconnectReasonCode.DISCONNECT_WITH_WILL_MESSAGE) // send the will message
.sessionExpiryInterval(0) // we want to clear the session
.send();
System.out.println("disconnected");
System.exit(0);
}
}
/*
* Copyright 2018-present HiveMQ and the HiveMQ Community
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hivemq.client.mqtt.examples;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
import com.hivemq.client.mqtt.mqtt5.lifecycle.Mqtt5ClientDisconnectedContext;
import java.time.LocalTime;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
/**
* Disable internet connection to see automatic reconnect in action.
*
* @author Silvio Giebl
*/
// @formatter:off
public class ReconnectStrategy {
public static void main(final String[] args) throws InterruptedException {
// defaultReconnect();
// customizedReconnect();
completelyCustom();
}
public static void defaultReconnect() {
final Mqtt5BlockingClient client1 = Mqtt5Client.builder()
.serverHost("broker.hivemq.com")
.automaticReconnectWithDefaultConfig() // exponential backoff, 1s initial, doubled up to 2min, random delays +-25%
.buildBlocking();
}
public static void customizedReconnect() throws InterruptedException {
final Mqtt5BlockingClient client2 = Mqtt5Client.builder()
.serverHost("broker.hivemq.com")
.automaticReconnect()
.initialDelay(3, TimeUnit.SECONDS)
.maxDelay(10, TimeUnit.SECONDS)
.applyAutomaticReconnect()
.addConnectedListener(context -> System.out.println("connected " + LocalTime.now()))
.addDisconnectedListener(context -> System.out.println("disconnected " + LocalTime.now()))
.buildBlocking();
client2.connectWith().keepAlive(2).send(); // short keep alive value so disabling internet connection triggers connection lost
TimeUnit.MINUTES.sleep(3);
client2.toAsync().disconnect();
}
private static void completelyCustom() {
final Mqtt5BlockingClient client3 = Mqtt5Client.builder()
.serverHost("broker.hivemq.com")
// custom reconnect strategy is just a DisconnectedListener
.addDisconnectedListener(context -> {
context.getReconnector()
.reconnect(true) // always reconnect (includes calling disconnect)
.delay(2L * context.getReconnector().getAttempts(), TimeUnit.SECONDS); // linear scaling delay
})
// multiple DisconnectedListener can form a reconnect strategy
.addDisconnectedListener(context -> {
final Mqtt5ClientDisconnectedContext context5 = (Mqtt5ClientDisconnectedContext) context;
context5.getReconnector()
.reconnectWhen(getOAuthToken(), (token, throwable) -> { // first reconnect would be delayed 2s but OAuth server needs more time
if (token != null) {
context5.getReconnector().connectWith()
.simpleAuth().password(token).applySimpleAuth() // set OAuth token as password
.applyConnect();
} else {
context5.getReconnector().reconnect(false); // cancel reconnect if OAuth query failed
}
});
})
.addConnectedListener(context -> System.out.println("connected " + LocalTime.now()))
.addDisconnectedListener(context -> System.out.println("disconnected " + LocalTime.now()))
.buildBlocking();
client3.connect();
client3.disconnect();
}
private static CompletableFuture<byte[]> getOAuthToken() {
return CompletableFuture.supplyAsync(() -> {
try {
for(int i = 0; i < 5; i++) {
TimeUnit.SECONDS.sleep(1);
System.out.println("OAuth server is slow to respond ...");
}
} catch (final InterruptedException e) {
e.printStackTrace();
}
return new byte[] {1, 2, 3};
});
}
}
/*
* Copyright 2018-present HiveMQ and the HiveMQ Community
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hivemq.client.mqtt.examples;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
/**
* Shows how to implement a request/response pattern using response topic and correlation data.
*
* @author Silvio Giebl
*/
public class RequestResponse {
public static void main(final String[] args) {
final Mqtt5Client requester = Mqtt5Client.builder().serverHost("broker.hivemq.com").build();
final Mqtt5Client responder = Mqtt5Client.builder().serverHost("broker.hivemq.com").build();
requester.toBlocking().connect();
responder.toBlocking().connect();
responder.toRx()
.publish(responder.toRx()
.subscribePublishesWith()
.topicFilter("request/topic")
.applySubscribe()
.map(requestPublish -> Mqtt5Publish.builder()
.topic(requestPublish.getResponseTopic().get())
.qos(requestPublish.getQos())
.payload("response".getBytes())
.correlationData(requestPublish.getCorrelationData().orElse(null))
.build()))
.subscribe(); // this call is a reactive streams subscribe call, not an MQTT subscribe
requester.toAsync()
.subscribeWith()
.topicFilter("response/topic")
.callback(responsePublish -> System.out.println("received response"))
.send()
.thenCompose(subAck -> requester.toAsync()
.publishWith()
.topic("request/topic")
.responseTopic("response/topic")
.correlationData("1234".getBytes())
.qos(MqttQos.EXACTLY_ONCE)
.payload("request".getBytes())
.send());
}
}
version=1.3.0
prevVersion=1.2.2
#
# main dependencies
#
rxjava.version=2.2.19
reactive-streams.version=1.0.3
netty.version=4.1.48.Final
jctools.version=2.1.2
annotations.version=16.0.3
dagger.version=2.27
slf4j.version=1.7.30
reactor.version=3.3.4.RELEASE
reactor-adapter.version=3.3.3.RELEASE
#
# test dependencies
#
junit-jupiter.version=5.5.1
equalsverifier.version=3.1.7
mockito.version=2.18.3
guava.version=24.1-jre
bouncycastle.version=1.59
paho.version=1.2.0
#
# integration test dependencies
#
hivemq-testcontainer.version=2.0.0
hivemq-extension-sdk.version=4.7.2
awaitility.version=4.1.1
#
# plugins
#
plugin.shadow.version=5.2.0
plugin.bnd.version=5.3.0
plugin.nexus-publish.version=1.0.0
plugin.license.version=0.15.0
plugin.utf8.version=0.1.0
plugin.metadata.version=0.2.0
plugin.javadoc-links.version=0.3.0
#
# options
#
org.gradle.caching=true
#Fri Jun 23 08:50:38 CEST 2017
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-6.8.1-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-6.7-all.zip
#!/usr/bin/env sh
#
# Copyright 2015 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
##############################################################################
##
## Gradle start up script for UN*X
##
##############################################################################
# Attempt to set APP_HOME
# Resolve links: $0 may be a link
PRG="$0"
# Need this for relative symlinks.
while [ -h "$PRG" ] ; do
ls=`ls -ld "$PRG"`
link=`expr "$ls" : '.*-> \(.*\)$'`
if expr "$link" : '/.*' > /dev/null; then
PRG="$link"
else
PRG=`dirname "$PRG"`"/$link"
fi
done
SAVED="`pwd`"
cd "`dirname \"$PRG\"`/" >/dev/null
APP_HOME="`pwd -P`"
cd "$SAVED" >/dev/null
APP_NAME="Gradle"
APP_BASE_NAME=`basename "$0"`
# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
# Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD="maximum"
warn () {
echo "$*"
}
die () {
echo
echo "$*"
echo
exit 1
}
# OS specific support (must be 'true' or 'false').
cygwin=false
msys=false
darwin=false
nonstop=false
case "`uname`" in
CYGWIN* )
cygwin=true
;;
Darwin* )
darwin=true
;;
MINGW* )
msys=true
;;
NONSTOP* )
nonstop=true
;;
esac
CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
# Determine the Java command to use to start the JVM.
if [ -n "$JAVA_HOME" ] ; then
if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
# IBM's JDK on AIX uses strange locations for the executables
JAVACMD="$JAVA_HOME/jre/sh/java"
else
JAVACMD="$JAVA_HOME/bin/java"
fi
if [ ! -x "$JAVACMD" ] ; then
die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME
Please set the JAVA_HOME variable in your environment to match the
location of your Java installation."
fi
else
JAVACMD="java"
which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
Please set the JAVA_HOME variable in your environment to match the
location of your Java installation."
fi
# Increase the maximum file descriptors if we can.
if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then
MAX_FD_LIMIT=`ulimit -H -n`
if [ $? -eq 0 ] ; then
if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then
MAX_FD="$MAX_FD_LIMIT"
fi
ulimit -n $MAX_FD
if [ $? -ne 0 ] ; then
warn "Could not set maximum file descriptor limit: $MAX_FD"
fi
else
warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT"
fi
fi
# For Darwin, add options to specify how the application appears in the dock
if $darwin; then
GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\""
fi
# For Cygwin or MSYS, switch paths to Windows format before running java
if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then
APP_HOME=`cygpath --path --mixed "$APP_HOME"`
CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`
JAVACMD=`cygpath --unix "$JAVACMD"`
# We build the pattern for arguments to be converted via cygpath
ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null`
SEP=""
for dir in $ROOTDIRSRAW ; do
ROOTDIRS="$ROOTDIRS$SEP$dir"
SEP="|"
done
OURCYGPATTERN="(^($ROOTDIRS))"
# Add a user-defined pattern to the cygpath arguments
if [ "$GRADLE_CYGPATTERN" != "" ] ; then
OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)"
fi
# Now convert the arguments - kludge to limit ourselves to /bin/sh
i=0
for arg in "$@" ; do
CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -`
CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option
if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition
eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"`
else
eval `echo args$i`="\"$arg\""
fi
i=`expr $i + 1`
done
case $i in
0) set -- ;;
1) set -- "$args0" ;;
2) set -- "$args0" "$args1" ;;
3) set -- "$args0" "$args1" "$args2" ;;
4) set -- "$args0" "$args1" "$args2" "$args3" ;;
5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;;
6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;;
7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;;
8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;;
9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;;
esac
fi
# Escape application args
save () {
for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done
echo " "
}
APP_ARGS=`save "$@"`
# Collect all arguments for the java command, following the shell quoting and substitution rules
eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS"
exec "$JAVACMD" "$@"
@rem
@rem Copyright 2015 the original author or authors.
@rem
@rem Licensed under the Apache License, Version 2.0 (the "License");
@rem you may not use this file except in compliance with the License.
@rem You may obtain a copy of the License at
@rem
@rem https://www.apache.org/licenses/LICENSE-2.0
@rem
@rem Unless required by applicable law or agreed to in writing, software
@rem distributed under the License is distributed on an "AS IS" BASIS,
@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@rem See the License for the specific language governing permissions and
@rem limitations under the License.
@rem
@if "%DEBUG%" == "" @echo off
@rem ##########################################################################
@rem
@rem Gradle startup script for Windows
@rem
@rem ##########################################################################
@rem Set local scope for the variables with windows NT shell
if "%OS%"=="Windows_NT" setlocal
set DIRNAME=%~dp0
if "%DIRNAME%" == "" set DIRNAME=.
set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME%
@rem Resolve any "." and ".." in APP_HOME to make it shorter.
for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi
@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m"
@rem Find java.exe
if defined JAVA_HOME goto findJavaFromJavaHome
set JAVA_EXE=java.exe
%JAVA_EXE% -version >NUL 2>&1
if "%ERRORLEVEL%" == "0" goto execute
echo.
echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
echo.
echo Please set the JAVA_HOME variable in your environment to match the
echo location of your Java installation.
goto fail
:findJavaFromJavaHome
set JAVA_HOME=%JAVA_HOME:"=%
set JAVA_EXE=%JAVA_HOME%/bin/java.exe
if exist "%JAVA_EXE%" goto execute
echo.
echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME%
echo.
echo Please set the JAVA_HOME variable in your environment to match the
echo location of your Java installation.
goto fail
:execute
@rem Setup the command line
set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar
@rem Execute Gradle
"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %*
:end
@rem End local scope for the variables with windows NT shell
if "%ERRORLEVEL%"=="0" goto mainEnd
:fail
rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of
rem the _cmd.exe /c_ return code!
if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1
exit /b 1
:mainEnd
if "%OS%"=="Windows_NT" endlocal
:omega
install:
./gradlew publishToMavenLocal -x signBasePublication -x signShadedPublication
\ No newline at end of file
plugins {
id("java-platform")
}
/* ******************** metadata ******************** */
description = "Adds dependencies for the HiveMQ MQTT Client proxy module"
metadata {
moduleName.set("com.hivemq.client.mqtt.proxy")
readableName.set("HiveMQ MQTT Client proxy module")
}
/* ******************** dependencies ******************** */
javaPlatform {
allowDependencies()
}
dependencies {
api(rootProject)
}
configurations.runtime {
extendsFrom(rootProject.configurations["proxyImplementation"])
}
plugins {
id("java-library")
}
/* ******************** metadata ******************** */
description = "Reactor API for the HiveMQ MQTT Client"
metadata {
moduleName.set("com.hivemq.client.mqtt.reactor")
readableName.set("HiveMQ MQTT Client reactor module")
}
/* ******************** dependencies ******************** */
dependencies {
api(rootProject)
api("io.projectreactor:reactor-core:${property("reactor.version")}")
implementation("io.projectreactor.addons:reactor-adapter:${property("reactor-adapter.version")}")
implementation("org.jetbrains:annotations:${property("annotations.version")}")
}
/* ******************** test ******************** */
dependencies {
testImplementation("io.projectreactor:reactor-test:${property("reactor.version")}")
testImplementation("com.google.guava:guava:${property("guava.version")}")
}
/* ******************** jars ******************** */
tasks.jar {
withConvention(aQute.bnd.gradle.BundleTaskConvention::class) {
bnd("Export-Package: " +
"com.hivemq.client.mqtt.mqtt3.reactor," +
"com.hivemq.client.mqtt.mqtt5.reactor," +
"com.hivemq.client.rx.reactor")
}
}
/*
* Copyright 2018-present HiveMQ and the HiveMQ Community
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hivemq.client.internal.mqtt.mqtt3.reactor;
import com.hivemq.client.internal.mqtt.message.connect.mqtt3.Mqtt3ConnectView;
import com.hivemq.client.internal.mqtt.message.connect.mqtt3.Mqtt3ConnectViewBuilder;
import com.hivemq.client.internal.mqtt.message.subscribe.mqtt3.Mqtt3SubscribeViewBuilder;
import com.hivemq.client.internal.mqtt.message.unsubscribe.mqtt3.Mqtt3UnsubscribeViewBuilder;
import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient;
import com.hivemq.client.mqtt.mqtt3.Mqtt3BlockingClient;
import com.hivemq.client.mqtt.mqtt3.Mqtt3ClientConfig;
import com.hivemq.client.mqtt.mqtt3.Mqtt3RxClient;
import com.hivemq.client.mqtt.mqtt3.message.connect.Mqtt3Connect;
import com.hivemq.client.mqtt.mqtt3.message.connect.connack.Mqtt3ConnAck;
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish;
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3PublishResult;
import com.hivemq.client.mqtt.mqtt3.message.subscribe.Mqtt3Subscribe;
import com.hivemq.client.mqtt.mqtt3.message.subscribe.suback.Mqtt3SubAck;
import com.hivemq.client.mqtt.mqtt3.message.unsubscribe.Mqtt3Unsubscribe;
import com.hivemq.client.mqtt.mqtt3.reactor.Mqtt3ReactorClient;
import com.hivemq.client.rx.reactor.FluxWithSingle;
import io.reactivex.Flowable;
import org.jetbrains.annotations.NotNull;
import org.reactivestreams.Publisher;
import reactor.adapter.rxjava.RxJava2Adapter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* @author Silvio Giebl
*/
public class Mqtt3ReactorClientView implements Mqtt3ReactorClient {
private final @NotNull Mqtt3RxClient delegate;
public Mqtt3ReactorClientView(final @NotNull Mqtt3RxClient delegate) {
this.delegate = delegate;
}
@Override
public @NotNull Mono<Mqtt3ConnAck> connect() {
return connect(Mqtt3ConnectView.DEFAULT);
}
@Override
public @NotNull Mono<Mqtt3ConnAck> connect(final @NotNull Mqtt3Connect connect) {
return RxJava2Adapter.singleToMono(delegate.connect(connect));
}
@Override
public @NotNull Mqtt3ConnectViewBuilder.Nested<Mono<Mqtt3ConnAck>> connectWith() {
return new Mqtt3ConnectViewBuilder.Nested<>(this::connect);
}
@Override
public @NotNull Mono<Mqtt3SubAck> subscribe(final @NotNull Mqtt3Subscribe subscribe) {
return RxJava2Adapter.singleToMono(delegate.subscribe(subscribe));
}
@Override
public @NotNull Mqtt3SubscribeViewBuilder.Nested<Mono<Mqtt3SubAck>> subscribeWith() {
return new Mqtt3SubscribeViewBuilder.Nested<>(this::subscribe);
}
@Override
public @NotNull FluxWithSingle<Mqtt3Publish, Mqtt3SubAck> subscribePublishes(
final @NotNull Mqtt3Subscribe subscribe) {
return subscribePublishes(subscribe, false);
}
@Override
public @NotNull FluxWithSingle<Mqtt3Publish, Mqtt3SubAck> subscribePublishes(
final @NotNull Mqtt3Subscribe subscribe, final boolean manualAcknowledgement) {
return FluxWithSingle.from(delegate.subscribePublishes(subscribe, manualAcknowledgement));
}
@Override
public @NotNull Mqtt3SubscribeViewPublishesBuilder subscribePublishesWith() {
return new Mqtt3SubscribeViewPublishesBuilder();
}
@Override
public @NotNull Flux<Mqtt3Publish> publishes(final @NotNull MqttGlobalPublishFilter filter) {
return publishes(filter, false);
}
@Override
public @NotNull Flux<Mqtt3Publish> publishes(
final @NotNull MqttGlobalPublishFilter filter, final boolean manualAcknowledgement) {
return RxJava2Adapter.flowableToFlux(delegate.publishes(filter, manualAcknowledgement));
}
@Override
public @NotNull Mono<Void> unsubscribe(final @NotNull Mqtt3Unsubscribe unsubscribe) {
return RxJava2Adapter.completableToMono(delegate.unsubscribe(unsubscribe));
}
@Override
public @NotNull Mqtt3UnsubscribeViewBuilder.Nested<Mono<Void>> unsubscribeWith() {
return new Mqtt3UnsubscribeViewBuilder.Nested<>(this::unsubscribe);
}
@Override
public @NotNull Flux<Mqtt3PublishResult> publish(final @NotNull Publisher<Mqtt3Publish> publisher) {
return RxJava2Adapter.flowableToFlux(delegate.publish(Flowable.fromPublisher(publisher)));
}
@Override
public @NotNull Mono<Void> disconnect() {
return RxJava2Adapter.completableToMono(delegate.disconnect());
}
@Override
public @NotNull Mqtt3ClientConfig getConfig() {
return delegate.getConfig();
}
@Override
public @NotNull Mqtt3RxClient toRx() {
return delegate;
}
@Override
public @NotNull Mqtt3AsyncClient toAsync() {
return delegate.toAsync();
}
@Override
public @NotNull Mqtt3BlockingClient toBlocking() {
return delegate.toBlocking();
}
private class Mqtt3SubscribeViewPublishesBuilder
extends Mqtt3SubscribeViewBuilder.Publishes<FluxWithSingle<Mqtt3Publish, Mqtt3SubAck>> {
@Override
public @NotNull FluxWithSingle<Mqtt3Publish, Mqtt3SubAck> applySubscribe() {
return subscribePublishes(build(), manualAcknowledgement);
}
}
}
/*
* Copyright 2018-present HiveMQ and the HiveMQ Community
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hivemq.client.internal.mqtt.reactor;
import com.hivemq.client.internal.mqtt.message.connect.MqttConnect;
import com.hivemq.client.internal.mqtt.message.connect.MqttConnectBuilder;
import com.hivemq.client.internal.mqtt.message.disconnect.MqttDisconnect;
import com.hivemq.client.internal.mqtt.message.disconnect.MqttDisconnectBuilder;
import com.hivemq.client.internal.mqtt.message.subscribe.MqttSubscribeBuilder;
import com.hivemq.client.internal.mqtt.message.unsubscribe.MqttUnsubscribeBuilder;
import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import com.hivemq.client.mqtt.mqtt5.Mqtt5ClientConfig;
import com.hivemq.client.mqtt.mqtt5.Mqtt5RxClient;
import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5Connect;
import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck;
import com.hivemq.client.mqtt.mqtt5.message.disconnect.Mqtt5Disconnect;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult;
import com.hivemq.client.mqtt.mqtt5.message.subscribe.Mqtt5Subscribe;
import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck;
import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.Mqtt5Unsubscribe;
import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.unsuback.Mqtt5UnsubAck;
import com.hivemq.client.mqtt.mqtt5.reactor.Mqtt5ReactorClient;
import com.hivemq.client.rx.reactor.FluxWithSingle;
import io.reactivex.Flowable;
import org.jetbrains.annotations.NotNull;
import org.reactivestreams.Publisher;
import reactor.adapter.rxjava.RxJava2Adapter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* @author Silvio Giebl
*/
public class MqttReactorClient implements Mqtt5ReactorClient {
private final @NotNull Mqtt5RxClient delegate;
public MqttReactorClient(final @NotNull Mqtt5RxClient delegate) {
this.delegate = delegate;
}
@Override
public @NotNull Mono<Mqtt5ConnAck> connect() {
return connect(MqttConnect.DEFAULT);
}
@Override
public @NotNull Mono<Mqtt5ConnAck> connect(final @NotNull Mqtt5Connect connect) {
return RxJava2Adapter.singleToMono(delegate.connect(connect));
}
@Override
public @NotNull MqttConnectBuilder.Nested<Mono<Mqtt5ConnAck>> connectWith() {
return new MqttConnectBuilder.Nested<>(this::connect);
}
@Override
public @NotNull Mono<Mqtt5SubAck> subscribe(final @NotNull Mqtt5Subscribe subscribe) {
return RxJava2Adapter.singleToMono(delegate.subscribe(subscribe));
}
@Override
public @NotNull MqttSubscribeBuilder.Nested<Mono<Mqtt5SubAck>> subscribeWith() {
return new MqttSubscribeBuilder.Nested<>(this::subscribe);
}
@Override
public @NotNull FluxWithSingle<Mqtt5Publish, Mqtt5SubAck> subscribePublishes(
final @NotNull Mqtt5Subscribe subscribe) {
return subscribePublishes(subscribe, false);
}
@Override
public @NotNull FluxWithSingle<Mqtt5Publish, Mqtt5SubAck> subscribePublishes(
final @NotNull Mqtt5Subscribe subscribe, final boolean manualAcknowledgement) {
return FluxWithSingle.from(delegate.subscribePublishes(subscribe, manualAcknowledgement));
}
@Override
public @NotNull MqttSubscribePublishesBuilder subscribePublishesWith() {
return new MqttSubscribePublishesBuilder();
}
@Override
public @NotNull Flux<Mqtt5Publish> publishes(final @NotNull MqttGlobalPublishFilter filter) {
return publishes(filter, false);
}
@Override
public @NotNull Flux<Mqtt5Publish> publishes(
final @NotNull MqttGlobalPublishFilter filter, final boolean manualAcknowledgement) {
return RxJava2Adapter.flowableToFlux(delegate.publishes(filter, manualAcknowledgement));
}
@Override
public @NotNull Mono<Mqtt5UnsubAck> unsubscribe(final @NotNull Mqtt5Unsubscribe unsubscribe) {
return RxJava2Adapter.singleToMono(delegate.unsubscribe(unsubscribe));
}
@Override
public @NotNull MqttUnsubscribeBuilder.Nested<Mono<Mqtt5UnsubAck>> unsubscribeWith() {
return new MqttUnsubscribeBuilder.Nested<>(this::unsubscribe);
}
@Override
public @NotNull Flux<Mqtt5PublishResult> publish(final @NotNull Publisher<Mqtt5Publish> publisher) {
return RxJava2Adapter.flowableToFlux(delegate.publish(Flowable.fromPublisher(publisher)));
}
@Override
public @NotNull Mono<Void> reauth() {
return RxJava2Adapter.completableToMono(delegate.reauth());
}
@Override
public @NotNull Mono<Void> disconnect() {
return disconnect(MqttDisconnect.DEFAULT);
}
@Override
public @NotNull Mono<Void> disconnect(final @NotNull Mqtt5Disconnect disconnect) {
return RxJava2Adapter.completableToMono(delegate.disconnect(disconnect));
}
@Override
public @NotNull MqttDisconnectBuilder.Nested<Mono<Void>> disconnectWith() {
return new MqttDisconnectBuilder.Nested<>(this::disconnect);
}
@Override
public @NotNull Mqtt5ClientConfig getConfig() {
return delegate.getConfig();
}
@Override
public @NotNull Mqtt5RxClient toRx() {
return delegate;
}
@Override
public @NotNull Mqtt5AsyncClient toAsync() {
return delegate.toAsync();
}
@Override
public @NotNull Mqtt5BlockingClient toBlocking() {
return delegate.toBlocking();
}
private class MqttSubscribePublishesBuilder
extends MqttSubscribeBuilder.Publishes<FluxWithSingle<Mqtt5Publish, Mqtt5SubAck>> {
@Override
public @NotNull FluxWithSingle<Mqtt5Publish, Mqtt5SubAck> applySubscribe() {
return subscribePublishes(build(), manualAcknowledgement);
}
}
}
/*
* Copyright 2018-present HiveMQ and the HiveMQ Community
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hivemq.client.internal.rx.reactor;
import com.hivemq.client.rx.reactor.CoreWithSingleSubscriber;
import reactor.core.Fuseable;
/**
* @author Silvio Giebl
*/
public interface CoreWithSingleConditionalSubscriber<F, S>
extends CoreWithSingleSubscriber<F, S>, Fuseable.ConditionalSubscriber<F> {}
/*
* Copyright 2018-present HiveMQ and the HiveMQ Community
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hivemq.client.internal.rx.reactor;
import com.hivemq.client.internal.rx.WithSingleStrictSubscriber;
import com.hivemq.client.rx.reactivestreams.WithSingleSubscriber;
import com.hivemq.client.rx.reactor.CoreWithSingleSubscriber;
import org.jetbrains.annotations.NotNull;
/**
* @author Silvio Giebl
*/
public class CoreWithSingleStrictSubscriber<F, S> extends WithSingleStrictSubscriber<F, S>
implements CoreWithSingleSubscriber<F, S> {
public CoreWithSingleStrictSubscriber(final @NotNull WithSingleSubscriber<F, S> subscriber) {
super(subscriber);
}
}
/*
* Copyright 2018-present HiveMQ and the HiveMQ Community
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hivemq.client.internal.rx.reactor.operators;
import com.hivemq.client.internal.rx.reactor.CoreWithSingleConditionalSubscriber;
import com.hivemq.client.rx.reactor.CoreWithSingleSubscriber;
import com.hivemq.client.rx.reactor.FluxWithSingle;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Operators;
import reactor.util.context.Context;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
/**
* @author Silvio Giebl
*/
class FluxWithSingleCombine<F, S> extends Flux<Object> {
private final @NotNull FluxWithSingle<F, S> source;
FluxWithSingleCombine(final @NotNull FluxWithSingle<F, S> source) {
this.source = source;
}
@Override
public void subscribe(final @NotNull CoreSubscriber<? super Object> subscriber) {
source.subscribeBoth(new CombineSubscriber<>(subscriber));
}
private static class CombineSubscriber<F, S> implements CoreWithSingleSubscriber<F, S>, Subscription {
private static final @NotNull Object COMPLETE = new Object();
@SuppressWarnings("rawtypes")
private static final @NotNull AtomicLongFieldUpdater<CombineSubscriber> REQUESTED =
AtomicLongFieldUpdater.newUpdater(CombineSubscriber.class, "requested");
private final @NotNull CoreSubscriber<? super Object> subscriber;
private @Nullable Subscription subscription;
private volatile long requested;
private @Nullable Object queued;
private @Nullable Object done;
CombineSubscriber(final @NotNull CoreSubscriber<? super Object> subscriber) {
this.subscriber = subscriber;
}
@Override
public void onSubscribe(final @NotNull Subscription subscription) {
this.subscription = subscription;
subscriber.onSubscribe(this);
}
@Override
public void onSingle(final @NotNull S s) {
next(new SingleElement(s));
}
@Override
public void onNext(final @NotNull F f) {
next(f);
}
private void next(final @NotNull Object next) {
if (REQUESTED.get(this) == 0) {
synchronized (this) {
if (REQUESTED.get(this) == 0) {
queued = next;
return;
}
}
}
Operators.produced(REQUESTED, this, 1);
subscriber.onNext(next);
}
@Override
public void onComplete() {
synchronized (this) {
if (queued != null) {
done = COMPLETE;
} else {
subscriber.onComplete();
}
}
}
@Override
public void onError(final @NotNull Throwable error) {
synchronized (this) {
if (queued != null) {
done = error;
} else {
subscriber.onError(error);
}
}
}
@Override
public void request(long n) {
assert subscription != null;
if (n > 0) {
if (Operators.addCap(REQUESTED, this, n) == 0) {
synchronized (this) {
final Object queued = this.queued;
if (queued != null) {
this.queued = null;
Operators.produced(REQUESTED, this, 1);
subscriber.onNext(queued);
n--;
final Object done = this.done;
if (done != null) {
this.done = null;
if (done instanceof Throwable) {
subscriber.onError((Throwable) done);
} else {
subscriber.onComplete();
}
return;
}
}
if (n > 0) {
subscription.request(n);
}
}
} else {
subscription.request(n);
}
}
}
@Override
public void cancel() {
assert subscription != null;
subscription.cancel();
}
@Override
public @NotNull Context currentContext() {
return subscriber.currentContext();
}
}
static <F, S> void split(
final @NotNull Flux<Object> source,
final @NotNull CoreWithSingleSubscriber<? super F, ? super S> subscriber) {
if (subscriber instanceof CoreWithSingleConditionalSubscriber) {
//noinspection unchecked
source.subscribe(new SplitSubscriber.Conditional<>(
(CoreWithSingleConditionalSubscriber<? super F, ? super S>) subscriber));
} else {
source.subscribe(new SplitSubscriber.Default<>(subscriber));
}
}
private static abstract class SplitSubscriber<F, S, T extends CoreWithSingleSubscriber<? super F, ? super S>>
implements Fuseable.ConditionalSubscriber<Object>, Subscription {
final @NotNull T subscriber;
private @Nullable Subscription subscription;
SplitSubscriber(final @NotNull T subscriber) {
this.subscriber = subscriber;
}
@Override
public void onSubscribe(final @NotNull Subscription subscription) {
this.subscription = subscription;
subscriber.onSubscribe(this);
}
@Override
public void onNext(final @NotNull Object o) {
if (!tryOnNext(o)) {
assert subscription != null;
subscription.request(1);
}
}
@Override
public boolean tryOnNext(final @NotNull Object o) {
if (o instanceof SingleElement) {
//noinspection unchecked
subscriber.onSingle((S) ((SingleElement) o).element);
return false;
}
//noinspection unchecked
return tryOnNextActual((F) o);
}
abstract boolean tryOnNextActual(final @NotNull F f);
@Override
public void onError(final @NotNull Throwable throwable) {
subscriber.onError(throwable);
}
@Override
public void onComplete() {
subscriber.onComplete();
}
@Override
public void request(final long n) {
assert subscription != null;
subscription.request(n);
}
@Override
public void cancel() {
assert subscription != null;
subscription.cancel();
}
@Override
public @NotNull Context currentContext() {
return subscriber.currentContext();
}
private static class Default<F, S>
extends SplitSubscriber<F, S, CoreWithSingleSubscriber<? super F, ? super S>> {
Default(final @NotNull CoreWithSingleSubscriber<? super F, ? super S> subscriber) {
super(subscriber);
}
@Override
boolean tryOnNextActual(final @NotNull F f) {
subscriber.onNext(f);
return true;
}
}
private static class Conditional<F, S>
extends SplitSubscriber<F, S, CoreWithSingleConditionalSubscriber<? super F, ? super S>> {
Conditional(final @NotNull CoreWithSingleConditionalSubscriber<? super F, ? super S> subscriber) {
super(subscriber);
}
@Override
boolean tryOnNextActual(final @NotNull F f) {
return subscriber.tryOnNext(f);
}
}
}
private static class SingleElement {
final @NotNull Object element;
SingleElement(final @NotNull Object element) {
this.element = element;
}
}
}
/*
* Copyright 2018-present HiveMQ and the HiveMQ Community
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hivemq.client.internal.rx.reactor.operators;
import com.hivemq.client.rx.reactivestreams.PublisherWithSingle;
import com.hivemq.client.rx.reactor.CoreWithSingleSubscriber;
import com.hivemq.client.rx.reactor.FluxWithSingle;
import org.jetbrains.annotations.NotNull;
import reactor.core.CoreSubscriber;
/**
* @author Silvio Giebl
*/
public class FluxWithSingleFrom<F, S> extends FluxWithSingle<F, S> {
private final @NotNull PublisherWithSingle<? extends F, ? extends S> source;
public FluxWithSingleFrom(final @NotNull PublisherWithSingle<? extends F, ? extends S> source) {
this.source = source;
}
@Override
public void subscribe(final @NotNull CoreSubscriber<? super F> subscriber) {
source.subscribe(subscriber);
}
@Override
public void subscribeBoth(final @NotNull CoreWithSingleSubscriber<? super F, ? super S> subscriber) {
source.subscribeBoth(subscriber);
}
}
/*
* Copyright 2018-present HiveMQ and the HiveMQ Community
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hivemq.client.internal.rx.reactor.operators;
import com.hivemq.client.internal.rx.reactor.CoreWithSingleConditionalSubscriber;
import com.hivemq.client.internal.util.Checks;
import com.hivemq.client.rx.reactor.CoreWithSingleSubscriber;
import com.hivemq.client.rx.reactor.FluxWithSingle;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.core.Fuseable;
import reactor.util.context.Context;
import java.util.function.Function;
/**
* @author Silvio Giebl
*/
public class FluxWithSingleMap<F, S, FM, SM> extends FluxWithSingleOperator<F, S, FM, SM> {
public static <F, S, FM, SM> @NotNull FluxWithSingleMap<F, S, FM, SM> mapBoth(
final @NotNull FluxWithSingle<F, S> source,
final @Nullable Function<? super F, ? extends FM> fluxMapper,
final @NotNull Function<? super S, ? extends SM> singleMapper) {
return new FluxWithSingleMap<>(source, fluxMapper, singleMapper);
}
public static <F, S, SM> @NotNull FluxWithSingleMap<F, S, F, SM> mapSingle(
final @NotNull FluxWithSingle<F, S> source, final @NotNull Function<? super S, ? extends SM> singleMapper) {
return new FluxWithSingleMap<>(source, null, singleMapper);
}
private final @Nullable Function<? super F, ? extends FM> fluxMapper;
private final @NotNull Function<? super S, ? extends SM> singleMapper;
private FluxWithSingleMap(
final @NotNull FluxWithSingle<F, S> source,
final @Nullable Function<? super F, ? extends FM> fluxMapper,
final @NotNull Function<? super S, ? extends SM> singleMapper) {
super(source);
this.fluxMapper = fluxMapper;
this.singleMapper = singleMapper;
}
@Override
public void subscribe(final @NotNull CoreSubscriber<? super FM> subscriber) {
if (subscriber instanceof Fuseable.ConditionalSubscriber) {
//noinspection unchecked
final Fuseable.ConditionalSubscriber<? super FM> conditional =
(Fuseable.ConditionalSubscriber<? super FM>) subscriber;
source.subscribeBoth(new MapSubscriber.Conditional<>(conditional, fluxMapper, singleMapper));
} else {
source.subscribeBoth(new MapSubscriber<>(subscriber, fluxMapper, singleMapper));
}
}
@Override
public void subscribeBoth(final @NotNull CoreWithSingleSubscriber<? super FM, ? super SM> subscriber) {
if (subscriber instanceof CoreWithSingleConditionalSubscriber) {
//noinspection unchecked
final CoreWithSingleConditionalSubscriber<? super FM, ? super SM> conditional =
(CoreWithSingleConditionalSubscriber<? super FM, ? super SM>) subscriber;
source.subscribeBoth(new WithSingleMapSubscriber.Conditional<>(conditional, fluxMapper, singleMapper));
} else {
source.subscribeBoth(new WithSingleMapSubscriber<>(subscriber, fluxMapper, singleMapper));
}
}
private static class MapSubscriber<F, S, FM, SM, T extends CoreSubscriber<? super FM>>
implements CoreWithSingleSubscriber<F, S>, Subscription {
final @NotNull T subscriber;
final @Nullable Function<? super F, ? extends FM> fluxMapper;
private final @NotNull Function<? super S, ? extends SM> singleMapper;
private @Nullable Subscription subscription;
MapSubscriber(
final @NotNull T subscriber,
final @Nullable Function<? super F, ? extends FM> fluxMapper,
final @NotNull Function<? super S, ? extends SM> singleMapper) {
this.subscriber = subscriber;
this.fluxMapper = fluxMapper;
this.singleMapper = singleMapper;
}
@Override
public void onSubscribe(final @NotNull Subscription subscription) {
this.subscription = subscription;
subscriber.onSubscribe(this);
}
@Override
public void onSingle(final @NotNull S s) {
final SM sm;
try {
sm = Checks.notNull(singleMapper.apply(s), "Mapped single value");
} catch (final Throwable throwable) {
fail(throwable);
return;
}
onSingleMapped(sm);
}
void onSingleMapped(final @NotNull SM sm) {}
@Override
public void onNext(final @NotNull F f) {
if (fluxMapper == null) {
//noinspection unchecked
subscriber.onNext((FM) f);
} else {
final FM fm;
try {
fm = Checks.notNull(fluxMapper.apply(f), "Mapped value");
} catch (final Throwable throwable) {
fail(throwable);
return;
}
subscriber.onNext(fm);
}
}
void fail(final @NotNull Throwable throwable) {
assert subscription != null;
Exceptions.throwIfFatal(throwable);
subscription.cancel();
onError(throwable);
}
@Override
public void onError(final @NotNull Throwable error) {
subscriber.onError(error);
}
@Override
public void onComplete() {
subscriber.onComplete();
}
@Override
public void request(final long n) {
assert subscription != null;
subscription.request(n);
}
@Override
public void cancel() {
assert subscription != null;
subscription.cancel();
}
@Override
public @NotNull Context currentContext() {
return subscriber.currentContext();
}
private static class Conditional<F, S, FM, SM, T extends Fuseable.ConditionalSubscriber<? super FM>>
extends FluxWithSingleMap.MapSubscriber<F, S, FM, SM, T>
implements CoreWithSingleConditionalSubscriber<F, S> {
Conditional(
final @NotNull T subscriber,
final @Nullable Function<? super F, ? extends FM> fluxMapper,
final @NotNull Function<? super S, ? extends SM> singleMapper) {
super(subscriber, fluxMapper, singleMapper);
}
@Override
public boolean tryOnNext(final @NotNull F f) {
if (fluxMapper == null) {
//noinspection unchecked
return subscriber.tryOnNext((FM) f);
} else {
final FM fm;
try {
fm = Checks.notNull(fluxMapper.apply(f), "Mapped value");
} catch (final Throwable throwable) {
fail(throwable);
return false;
}
return subscriber.tryOnNext(fm);
}
}
}
}
private static class WithSingleMapSubscriber<F, S, FM, SM>
extends MapSubscriber<F, S, FM, SM, CoreWithSingleSubscriber<? super FM, ? super SM>> {
WithSingleMapSubscriber(
final @NotNull CoreWithSingleSubscriber<? super FM, ? super SM> subscriber,
final @Nullable Function<? super F, ? extends FM> fluxMapper,
final @NotNull Function<? super S, ? extends SM> singleMapper) {
super(subscriber, fluxMapper, singleMapper);
}
@Override
void onSingleMapped(final @NotNull SM sm) {
subscriber.onSingle(sm);
}
private static class Conditional<F, S, FM, SM> extends
MapSubscriber.Conditional<F, S, FM, SM, CoreWithSingleConditionalSubscriber<? super FM, ? super SM>> {
Conditional(
final @NotNull CoreWithSingleConditionalSubscriber<? super FM, ? super SM> subscriber,
final @Nullable Function<? super F, ? extends FM> fluxMapper,
final @NotNull Function<? super S, ? extends SM> singleMapper) {
super(subscriber, fluxMapper, singleMapper);
}
@Override
void onSingleMapped(final @NotNull SM sm) {
subscriber.onSingle(sm);
}
}
}
}
/*
* Copyright 2018-present HiveMQ and the HiveMQ Community
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hivemq.client.internal.rx.reactor.operators;
import com.hivemq.client.rx.reactor.FluxWithSingle;
import org.jetbrains.annotations.NotNull;
/**
* @author Silvio Giebl
*/
abstract class FluxWithSingleOperator<FU, SU, F, S> extends FluxWithSingle<F, S> {
final @NotNull FluxWithSingle<FU, SU> source;
FluxWithSingleOperator(final @NotNull FluxWithSingle<FU, SU> source) {
this.source = source;
}
}
/*
* Copyright 2018-present HiveMQ and the HiveMQ Community
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hivemq.client.internal.rx.reactor.operators;
import com.hivemq.client.rx.reactor.CoreWithSingleSubscriber;
import com.hivemq.client.rx.reactor.FluxWithSingle;
import org.jetbrains.annotations.NotNull;
import reactor.core.CoreSubscriber;
import reactor.core.scheduler.Scheduler;
/**
* @author Silvio Giebl
*/
public class FluxWithSinglePublishOn<F, S> extends FluxWithSingleOperator<F, S, F, S> {
private final @NotNull Scheduler scheduler;
private final boolean delayError;
private final int prefetch;
public FluxWithSinglePublishOn(
final @NotNull FluxWithSingle<F, S> source,
final @NotNull Scheduler scheduler,
final boolean delayError,
final int prefetch) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.prefetch = prefetch;
}
@Override
public void subscribe(final @NotNull CoreSubscriber<? super F> subscriber) {
source.publishOn(scheduler, delayError, prefetch).subscribe(subscriber);
}
@Override
public void subscribeBoth(final @NotNull CoreWithSingleSubscriber<? super F, ? super S> subscriber) {
FluxWithSingleCombine.split(
new FluxWithSingleCombine<>(source).publishOn(scheduler, delayError, prefetch), subscriber);
}
}
/*
* Copyright 2018-present HiveMQ and the HiveMQ Community
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hivemq.client.rx.reactor;
import com.hivemq.client.rx.reactivestreams.PublisherWithSingle;
import com.hivemq.client.rx.reactivestreams.WithSingleSubscriber;
import org.jetbrains.annotations.NotNull;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.CorePublisher;
import reactor.core.CoreSubscriber;
/**
* A {@link CoreWithSingleSubscriber} aware {@link PublisherWithSingle}.
* <p>
* {@inheritDoc}
*
* @author Silvio Giebl
* @see PublisherWithSingle
* @see CorePublisher
* @since 1.2
*/
public interface CorePublisherWithSingle<T, S> extends PublisherWithSingle<T, S>, CorePublisher<T> {
/**
* {@link PublisherWithSingle#subscribeBoth(WithSingleSubscriber) Subscribes} to this {@link
* CorePublisherWithSingle}.
* <p>
* In addition to behave as expected by {@link Publisher#subscribe(Subscriber)} in a controlled manner, it supports
* direct subscribe-time {@link reactor.util.context.Context Context} passing.
*
* @param subscriber the {@link Subscriber} interested into the published sequence.
* @see PublisherWithSingle#subscribeBoth(WithSingleSubscriber)
* @see CorePublisher#subscribe(CoreSubscriber)
*/
void subscribeBoth(@NotNull CoreWithSingleSubscriber<? super T, ? super S> subscriber);
}
/*
* Copyright 2018-present HiveMQ and the HiveMQ Community
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hivemq.client.rx.reactor;
import com.hivemq.client.rx.reactivestreams.WithSingleSubscriber;
import reactor.core.CoreSubscriber;
/**
* A {@link reactor.util.context.Context Context} aware {@link WithSingleSubscriber} which has relaxed rules for §1.3
* and §3.9 compared to the original {@link org.reactivestreams.Subscriber} from Reactive Streams.
*
* @author Silvio Giebl
* @see WithSingleSubscriber
* @see CoreSubscriber
* @since 1.2
*/
public interface CoreWithSingleSubscriber<T, S> extends WithSingleSubscriber<T, S>, CoreSubscriber<T> {}
/*
* Copyright 2018-present HiveMQ and the HiveMQ Community
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hivemq.client.rx.reactor;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
/**
* @author Silvio Giebl
*/
public class FluxWithSingleItem<F, S> extends FluxWithSingle<F, S> {
private final @NotNull Flux<F> source;
private final @NotNull S single;
private final int index;
public FluxWithSingleItem(final @NotNull Flux<F> source, final @NotNull S single, final int index) {
this.source = source;
this.single = single;
this.index = index;
}
@Override
public void subscribe(final @NotNull CoreSubscriber<? super F> subscriber) {
source.subscribe(subscriber);
}
@Override
public void subscribeBoth(final @NotNull CoreWithSingleSubscriber<? super F, ? super S> subscriber) {
source.subscribe(new SingleItemSubscriber<>(subscriber, single, index));
}
private static class SingleItemSubscriber<F, S> implements CoreSubscriber<F>, Subscription {
private final @NotNull CoreWithSingleSubscriber<? super F, ? super S> subscriber;
private final @NotNull S single;
private int index;
private int currentIndex;
private @Nullable Subscription subscription;
SingleItemSubscriber(
final @NotNull CoreWithSingleSubscriber<? super F, ? super S> subscriber,
final @NotNull S single,
final int index) {
this.subscriber = subscriber;
this.single = single;
this.index = index;
}
@Override
public void onSubscribe(final @NotNull Subscription subscription) {
this.subscription = subscription;
subscriber.onSubscribe(this);
}
@Override
public void onNext(final @NotNull F f) {
subscriber.onNext(f);
if (index == ++currentIndex) {
index = -1;
subscriber.onSingle(single);
}
}
@Override
public void onError(final @NotNull Throwable error) {
subscriber.onError(error);
}
@Override
public void onComplete() {
subscriber.onComplete();
}
@Override
public void request(final long n) {
assert subscription != null;
if (index == 0) {
index = -1;
subscriber.onSingle(single);
}
subscription.request(n);
}
@Override
public void cancel() {
assert subscription != null;
subscription.cancel();
}
}
}
rootProject.name = "hivemq-mqtt-client"
pluginManagement {
repositories {
gradlePluginPortal()
mavenCentral()
}
plugins {
id("com.github.johnrengelman.shadow") version "${extra["plugin.shadow.version"]}"
id("biz.aQute.bnd.builder") version "${extra["plugin.bnd.version"]}"
id("io.github.gradle-nexus.publish-plugin") version "${extra["plugin.nexus-publish.version"]}"
id("com.github.hierynomus.license") version "${extra["plugin.license.version"]}"
id("com.github.sgtsilvio.gradle.utf8") version "${extra["plugin.utf8.version"]}"
id("com.github.sgtsilvio.gradle.metadata") version "${extra["plugin.metadata.version"]}"
id("com.github.sgtsilvio.gradle.javadoc-links") version "${extra["plugin.javadoc-links.version"]}"
}
}
dependencyResolutionManagement {
repositories {
mavenCentral()
}
}
for (module in listOf("websocket", "proxy", "epoll", "reactor", "examples")) {
include("${rootProject.name}-$module")
project(":${rootProject.name}-$module").projectDir = file(module)
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment