Documentation

Everything you need to integrate NotiFlow into your Java application.

Installation

NotiFlow is available on Maven Central. Choose the artifact that fits your setup.

Spring Boot Starter

For Spring Boot applications, the starter provides auto-configuration with sensible defaults and zero boilerplate. notiflow-core is a transitive dependency — one declaration is enough.

pom.xml
<dependency>
<groupId>pl.notiflow</groupId>
<artifactId>notiflow-spring-boot-starter</artifactId>
<version>1.1.2</version>
</dependency>
<!-- notiflow-core pulled in transitively -->

Core Library (Standalone)

The core library has zero Spring dependencies. Use it with any Java framework or Jakarta WebSocket container (Tyrus, Jetty, Undertow).

pom.xml
<dependency>
<groupId>pl.notiflow</groupId>
<artifactId>notiflow-core</artifactId>
<version>1.1.2</version>
</dependency>

Spring Boot Setup

Add @EnableNotiflow to your main application class. A WebSocket endpoint is immediately available at /notifications.

MyApplication.java
@SpringBootApplication
@EnableNotiflow
public class MyApplication {
public static void main(String[] args) {
SpringApplication.run(MyApplication.class, args);
}
}

All components are auto-configured with @ConditionalOnMissingBean, so you can override any bean by declaring your own.


Standalone Setup

Without Spring, wire the components manually. Every class uses constructor injection with no hidden dependencies.

Setup.java
// Create components
SessionRegistry sessionRegistry = new InMemorySessionRegistry();
TopicService topicService = new InMemoryTopicService();
NotificationQueue queue = new InMemoryNotificationQueue(
Duration.ofMinutes(5), 100);
DeliveryTracker tracker = new InMemoryDeliveryTracker(
Duration.ofSeconds(30), 3, Duration.ofSeconds(10), null);
MessageCodec codec = new JacksonMessageCodec();
// Create notification service
DefaultNotificationService notificationService = new DefaultNotificationService(
sessionRegistry, topicService, queue, tracker, codec, true);
// Create WebSocket handler
NotificationWebSocketHandler handler = new NotificationWebSocketHandler(
new DefaultUserService(),
new NoOpAuthenticator(),
sessionRegistry,
tracker,
codec,
notificationService,
Duration.ofSeconds(30), // pingInterval
Duration.ofSeconds(60), // pingTimeout
true, // ackEnabled
true // heartbeatEnabled
);
// Use handler.onOpen/onMessage/onClose/onError
// Use notificationService.sendToUser/sendToTopic/broadcast
// Clean up on shutdown
handler.shutdown();
queue.close();

Sending Notifications

Build notifications using the builder pattern, then use NotificationService to deliver them.

NotificationService.java
Notification notification = Notification.builder()
.type("alert")
.title("New message")
.message("You have a new message")
.priority(Priority.HIGH)
.build();
// Send to a specific user (queues if offline)
notificationService.sendToUser("user-123", notification);
// Send to all subscribers of a topic
notificationService.sendToTopic("sports", notification);
// Broadcast to all connected clients
notificationService.broadcast(notification);

When a user is offline, sendToUser automatically queues the notification. When the user reconnects, queued notifications are delivered immediately with QUEUED envelope type.


Topics

Topics are server-managed and bound to user IDs (not sessions). This means subscriptions persist across reconnections — an offline user remains subscribed and will receive queued topic notifications.

TopicService.java
// Subscribe users to topics
topicService.subscribe("sports", "user-123");
topicService.subscribe("sports", "user-456");
topicService.subscribe("news", "user-123");
// Unsubscribe
topicService.unsubscribe("sports", "user-456");
// Get all subscribers
Set<String> subscribers = topicService.getSubscribers("sports");
// Send to topic - reaches all subscribers
notificationService.sendToTopic("sports", notification);
Note: Clients do not subscribe to topics themselves. Topic management is entirely server-side, controlled by your application logic.

Offline Queueing

When a user is offline, notifications are stored in the NotificationQueue with the following behavior:

Tip: The QUEUED type lets clients distinguish between live notifications and previously queued ones, enabling different UI treatments (e.g., batch display vs. toast).

Delivery & ACK

When ACK-based delivery is enabled, every sent notification is tracked until the client confirms receipt.

Server Client | | |─── NOTIFICATION (id=notif-1) ───>| | | |<─── ACK { notificationId=notif-1 }| | | | ✓ Delivery confirmed |

If the client does not acknowledge within the ack timeout:

Server Client | | |─── NOTIFICATION (id=notif-1) ───>| (no ACK) | | | ... ack-timeout elapses ... | | | |─── NOTIFICATION (id=notif-1) ───>| (retry 1) | | |<─── ACK { notificationId=notif-1 }| | | | ✓ Delivery confirmed |

After exhausting max-retries, the notification is passed to the DeliveryFailureHandler.


Heartbeat

When heartbeat is enabled, the server periodically sends PING messages to detect stale connections.


Message Types

TypeDirectionDescription
NOTIFICATIONServer → ClientNew live notification
QUEUEDServer → ClientPreviously queued notification delivered on reconnect
ACKClient → ServerClient confirms receipt of a notification
PINGServer → ClientHeartbeat check
PONGClient → ServerHeartbeat response

Envelope Format

All messages are wrapped in an Envelope with a consistent structure:

envelope.json
{
"type": "NOTIFICATION",
"id": "notif-456",
"payload": {
"id": "notif-456",
"type": "alert",
"title": "New message",
"message": "You have a new message",
"timestamp": "2024-01-15T10:30:00Z",
"priority": "HIGH"
},
"metadata": {}
}

Client Integration

Connect via standard WebSocket and handle the protocol messages:

client.js
const ws = new WebSocket('ws://localhost:8080/notifications?token=nfu_XXXXX');
ws.onmessage = (event) => {
const envelope = JSON.parse(event.data);
switch (envelope.type) {
case 'NOTIFICATION':
case 'QUEUED':
console.log('Received:', envelope.payload);
// Send ACK to confirm receipt
ws.send(JSON.stringify({
type: 'ACK',
payload: { notificationId: envelope.payload.id }
}));
break;
case 'PING':
ws.send(JSON.stringify({ type: 'PONG' }));
break;
}
};

Pluggable Components

Every component follows the interface + default implementation pattern. The Spring Boot starter registers each bean with @ConditionalOnMissingBean, so defining your own bean replaces the default.

Authenticator
NoOpAuthenticator
Authenticate WebSocket connections
UserService
DefaultUserService
Extract user ID from session
SessionRegistry
InMemorySessionRegistry
Track active WebSocket sessions
TopicService
InMemoryTopicService
Manage topic subscriptions
NotificationQueue
InMemoryNotificationQueue
Queue notifications for offline users
DeliveryTracker
InMemoryDeliveryTracker
Track delivery status and retries
DeliveryFailureHandler
Logging handler
Handle permanently failed deliveries
MessageCodec
JacksonMessageCodec
Serialize/deserialize messages

Authenticator

Important: The default NoOpAuthenticator accepts all connections. Always replace it in production.

Annotate your implementation with @Component — the @ConditionalOnMissingBean on the default causes it to be skipped automatically. To pass the authenticated identity to UserService, store it in session.getUserProperties() under the key "notiflow.userId". The default DefaultUserService reads this key, so no custom UserService is needed unless you use a different key.

WebSocketAuthenticator.java
@Component
public class WebSocketAuthenticator implements Authenticator {
private final UserRepository userRepository;
public WebSocketAuthenticator(UserRepository userRepository) {
this.userRepository = userRepository;
}
@Override
public boolean authenticate(Session session) {
List<String> tokens = session.getRequestParameterMap().get("token");
if (tokens == null || tokens.isEmpty()) return false;
return userRepository.findByToken(tokens.get(0)).map(user -> {
// Store identity — read by DefaultUserService (or your UserService)
session.getUserProperties().put("notiflow.userId", user.getExternalId());
return true;
}).orElse(false);
}
}

UserService

Extracts the stable user ID from a WebSocket session after authentication. The default DefaultUserService resolves user ID from:

  1. Session property notiflow.userId set by Authenticator via session.getUserProperties()
  2. Raw WebSocket session ID as fallback — never returns null

If your Authenticator stores the identity under "notiflow.userId", the default works without any changes. Only replace UserService when you use a different key or need custom resolution logic.

WebSocketUserService.java
@Component
public class WebSocketUserService implements UserService {
@Override
public String getUserId(Session session) {
// Reads the identity stored by Authenticator during handshake
Object userId = session.getUserProperties().get("notiflow.userId");
return userId != null ? userId.toString() : null;
}
}

SessionRegistry

The default InMemorySessionRegistry stores sessions in a ConcurrentHashMap. It enforces single session per user — connecting with the same user ID replaces the previous session.

Replace for distributed deployments:

RedisSessionRegistry.java
@Bean
public SessionRegistry sessionRegistry(RedisTemplate<String, UserSession> redis) {
return new RedisSessionRegistry(redis);
}

TopicService

Server-managed topic subscriptions bound to user IDs. The default InMemoryTopicService is lost on restart. For production, implement persistence using JPA or any other store — annotate your class with @Component and the default is skipped.

JpaTopicService.java
@Component
public class JpaTopicService implements TopicService {
private final TopicSubscriptionRepository repository;
@Override
@Transactional
public void subscribe(String topic, String userId) {
if (!repository.existsByTopicAndUserId(topic, userId)) {
repository.save(new TopicSubscription(topic, userId));
}
}
@Override
public Set<String> getSubscribers(String topic) {
return repository.findByTopic(topic).stream()
.map(TopicSubscription::getUserId)
.collect(Collectors.toSet());
}
// ... implement remaining interface methods
}

NotificationQueue

Stores notifications for offline users with configurable TTL and per-user capacity. The default InMemoryNotificationQueue is lost on restart. For production, persist with JPA: implement enqueue / dequeue / remove / peek / size / clear backed by a database entity, then annotate with @Component.

JpaNotificationQueue.java
@Component
public class JpaNotificationQueue implements NotificationQueue {
private final QueuedNotificationRepository repository;
@Override
@Transactional
public void enqueue(String userId, Notification notification) {
repository.save(toEntity(userId, notification));
}
@Override
@Transactional
public List<Notification> dequeue(String userId) {
List<Notification> result = repository.findByUserIdOrderByCreatedAtAsc(userId)
.stream().map(this::toNotification).toList();
repository.deleteByUserId(userId);
return result;
}
// ... implement remaining interface methods
}

DeliveryTracker

Tracks unacknowledged notifications. When ACK is enabled, the tracker monitors for acknowledgment and retries up to max-retries times. For production, use the decorator pattern: wrap InMemoryDeliveryTracker to keep retry scheduling, and add persistence as a layer on top. This avoids reimplementing the retry clock from scratch.

NotiflowDeliveryConfig.java
@Configuration
public class NotiflowDeliveryConfig {
@Bean
public DeliveryTracker deliveryTracker(
NotiflowProperties props,
DeliveryFailureHandler failureHandler,
NotificationAuditRepository auditRepo) {
// Keep retry scheduling in the default in-memory implementation
InMemoryDeliveryTracker delegate = new InMemoryDeliveryTracker(
props.getDelivery().getAckTimeout(),
props.getDelivery().getMaxRetries(),
props.getDelivery().getRetryInterval(),
failureHandler);
// Wrap with auditing — persists sent/acked/failed timestamps
return new AuditingDeliveryTracker(delegate, auditRepo);
}
}
Pattern: AuditingDeliveryTracker delegates all retry/scheduling calls to InMemoryDeliveryTracker and adds a notification_audit row on track(), stamps ackedAt on acknowledge(), and failedAt on markFailed(). Declare the @Bean with return type DeliveryTracker — this satisfies the @ConditionalOnMissingBean check and prevents the auto-configured default from being created.

DeliveryFailureHandler

Called when a notification permanently fails after all retry attempts:

FailureHandler.java
@Bean
public DeliveryFailureHandler deliveryFailureHandler(NotificationQueue queue) {
return (userId, notification, attemptCount) -> {
log.error("Delivery failed for user {} after {} attempts: {}",
userId, attemptCount, notification.getId());
queue.enqueue(userId, notification);
};
}

MessageCodec

The default JacksonMessageCodec uses Jackson with ISO-8601 date formatting, unknown property tolerance, and null exclusion. To customize:

CodecConfig.java
@Bean
public MessageCodec messageCodec(ObjectMapper existingMapper) {
return new JacksonMessageCodec(existingMapper);
}

Custom Message Handlers

Implement MessageHandler to process any incoming WebSocket message type. The auto-configuration collects all MessageHandler beans via beanFactory.getBeansOfType(MessageHandler.class) — the Spring bean name is used as the message type string. Custom handlers are registered after the built-in ones, so a bean named "ACK" or "PONG" replaces the built-in handler for that type.

Adding a new handler

Annotate with @Component("TYPE_NAME") where TYPE_NAME matches the type field the client sends in the Envelope:

TypingIndicatorHandler.java
@Component("TYPING") // handles { "type": "TYPING", ... } frames
public class TypingIndicatorHandler implements MessageHandler {
private final SessionRegistry sessionRegistry;
@Override
public void handle(UserSession session, Envelope<?> envelope) {
String targetUserId = (String) ((Map<?,?>) envelope.getPayload()).get("targetUserId");
sessionRegistry.getByUserId(targetUserId).ifPresent(target -> {
// forward typing event to the target user's session
});
}
}

The client sends:

client.js
ws.send(JSON.stringify({
type: 'TYPING',
payload: { targetUserId: 'user-456' }
}));

Replacing the built-in ACK handler

The built-in ACK handler calls DeliveryTracker.acknowledge(userId, notificationId). To extend it — for example to add custom audit logging on top — declare a @Bean named "ACK". It is registered after the built-in, so it takes precedence:

CustomAckConfig.java
@Configuration
public class CustomAckConfig {
@Bean("ACK") // overrides the built-in AckMessageHandler
public MessageHandler ackHandler(DeliveryTracker deliveryTracker,
AuditService auditService) {
return (session, envelope) -> {
@SuppressWarnings("unchecked")
Map<String, Object> payload = (Map<String, Object>) envelope.getPayload();
String notificationId = (String) payload.get("notificationId");
deliveryTracker.acknowledge(session.getUserId(), notificationId);
auditService.recordAck(session.getUserId(), notificationId);
};
}
}
Built-in type constants: Envelope.TYPE_ACK = "ACK" and Envelope.TYPE_PONG = "PONG". Any bean named with one of these strings will replace the corresponding built-in handler. All other names add new types without affecting existing handlers.

Configuration Reference

All properties are under the notiflow.* prefix with Spring Boot's type-safe binding.

application.properties
# WebSocket endpoint
notiflow.web-socket.enabled=true
notiflow.web-socket.path=/notifications
# Offline notification queue
notiflow.queue.ttl=5m
notiflow.queue.max-per-user=100
# Delivery tracking and retries
notiflow.delivery.ack-enabled=true
notiflow.delivery.ack-timeout=30s
notiflow.delivery.max-retries=3
notiflow.delivery.retry-interval=10s
# Heartbeat (ping/pong)
notiflow.heartbeat.enabled=true
notiflow.heartbeat.ping-interval=30s
notiflow.heartbeat.ping-timeout=60s

Property Reference

PropertyTypeDefaultDescription
notiflow.web-socket.enabledbooleantrueEnable/disable the WebSocket endpoint
notiflow.web-socket.pathString/notificationsWebSocket endpoint path
notiflow.queue.ttlDuration5mHow long queued notifications are kept
notiflow.queue.max-per-userint100Max queued notifications per user
notiflow.delivery.ack-enabledbooleantrueEnable ACK-based delivery confirmation
notiflow.delivery.ack-timeoutDuration30sTime to wait for client ACK
notiflow.delivery.max-retriesint3Max delivery attempts before failure
notiflow.delivery.retry-intervalDuration10sDelay between retry attempts
notiflow.heartbeat.enabledbooleantrueEnable ping/pong heartbeat
notiflow.heartbeat.ping-intervalDuration30sHow often the server sends PING
notiflow.heartbeat.ping-timeoutDuration60sTime without activity before closing session

Concurrency Model

All default implementations are thread-safe:

The NotificationWebSocketHandler manages two scheduled tasks:

Both executors use daemon threads (notiflow-pool, notiflow-queue-cleanup) so they don't prevent JVM shutdown.


Requirements

ComponentVersion
Java21+
Spring Boot3.2+ (starter only)
Jakarta WebSocket2.2+
Jackson2.15+

NotiFlow · Open Source · Built for Java


NotiFlow App — Standalone Service

Don't want to integrate a library? notiflow-app is a fully deployed notification microservice with a REST API, JWT + API key auth, PostgreSQL persistence, and Swagger UI. Call it from any backend — no Java required on the caller side.

Full documentation on a dedicated page: REST API reference, authentication flows, WebSocket client guide, database schema, configuration reference, and Docker deployment — all covered in the NotiFlow App page.