Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 35 additions & 7 deletions app/assets/javascripts/turbo.js
Original file line number Diff line number Diff line change
Expand Up @@ -5386,25 +5386,44 @@ function walk(obj) {

class TurboCableStreamSourceElement extends HTMLElement {
static observedAttributes=[ "channel", "signed-stream-name" ];
constructor() {
super();
this.beforeTurboRender = this.beforeTurboRender.bind(this);
this.afterTurboRender = this.afterTurboRender.bind(this);
}
async connectedCallback() {
document.addEventListener("turbo:before-render", this.beforeTurboRender);
document.addEventListener("turbo:render", this.afterTurboRender);
if (!this.withinTurboRender) {
await this.subscribe();
}
}
async disconnectedCallback() {
document.removeEventListener("turbo:before-render", this.beforeTurboRender);
document.removeEventListener("turbo:render", this.afterTurboRender);
if (!this.withinTurboRender) {
this.unsubscribe();
}
}
async attributeChangedCallback() {
if (this.subscription) {
this.unsubscribe();
await this.subscribe();
}
}
async subscribe() {
connectStreamSource(this);
this.subscription = await subscribeTo(this.channel, {
received: this.dispatchMessageEvent.bind(this),
connected: this.subscriptionConnected.bind(this),
disconnected: this.subscriptionDisconnected.bind(this)
});
}
disconnectedCallback() {
unsubscribe() {
disconnectStreamSource(this);
if (this.subscription) this.subscription.unsubscribe();
this.subscriptionDisconnected();
}
attributeChangedCallback() {
if (this.subscription) {
this.disconnectedCallback();
this.connectedCallback();
}
}
dispatchMessageEvent(data) {
const event = new MessageEvent("message", {
data: data
Expand All @@ -5417,6 +5436,15 @@ class TurboCableStreamSourceElement extends HTMLElement {
subscriptionDisconnected() {
this.removeAttribute("connected");
}
beforeTurboRender() {
this.withinTurboRender = true;
}
afterTurboRender() {
if (this.withinTurboRender && !this.isConnected) {
this.unsubscribe();
}
this.withinTurboRender = false;
}
get channel() {
const channel = this.getAttribute("channel");
const signed_stream_name = this.getAttribute("signed-stream-name");
Expand Down
6 changes: 3 additions & 3 deletions app/assets/javascripts/turbo.min.js

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion app/assets/javascripts/turbo.min.js.map

Large diffs are not rendered by default.

57 changes: 46 additions & 11 deletions app/javascript/turbo/cable_stream_source_element.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,42 @@
import { connectStreamSource, disconnectStreamSource } from "@hotwired/turbo"
import { subscribeTo } from "./cable"
import {connectStreamSource, disconnectStreamSource} from "@hotwired/turbo"
import {subscribeTo} from "./cable"
import snakeize from "./snakeize"

class TurboCableStreamSourceElement extends HTMLElement {
static observedAttributes = ["channel", "signed-stream-name"]

constructor() {
super()
this.beforeTurboRender = this.beforeTurboRender.bind(this)
this.afterTurboRender = this.afterTurboRender.bind(this)
}

async connectedCallback() {
document.addEventListener("turbo:before-render", this.beforeTurboRender)
document.addEventListener("turbo:render", this.afterTurboRender)

if (!this.withinTurboRender) {
await this.subscribe()
}
}

async disconnectedCallback() {
document.removeEventListener("turbo:before-render", this.beforeTurboRender)
document.removeEventListener("turbo:render", this.afterTurboRender)

if (!this.withinTurboRender) {
this.unsubscribe()
}
}

async attributeChangedCallback() {
if (this.subscription) {
this.unsubscribe()
await this.subscribe()
}
}

async subscribe() {
connectStreamSource(this)
this.subscription = await subscribeTo(this.channel, {
received: this.dispatchMessageEvent.bind(this),
Expand All @@ -14,19 +45,12 @@ class TurboCableStreamSourceElement extends HTMLElement {
})
}

disconnectedCallback() {
unsubscribe() {
disconnectStreamSource(this)
if (this.subscription) this.subscription.unsubscribe()
this.subscriptionDisconnected()
}

attributeChangedCallback() {
if (this.subscription) {
this.disconnectedCallback()
this.connectedCallback()
}
}

dispatchMessageEvent(data) {
const event = new MessageEvent("message", { data })
return this.dispatchEvent(event)
Expand All @@ -40,10 +64,21 @@ class TurboCableStreamSourceElement extends HTMLElement {
this.removeAttribute("connected")
}

beforeTurboRender() {
this.withinTurboRender = true;
}

afterTurboRender() {
if (this.withinTurboRender && !this.isConnected) {
this.unsubscribe();
}
this.withinTurboRender = false;
}

get channel() {
const channel = this.getAttribute("channel")
const signed_stream_name = this.getAttribute("signed-stream-name")
return { channel, signed_stream_name, ...snakeize({ ...this.dataset }) }
return {channel, signed_stream_name, ...snakeize({...this.dataset})}
}
}

Expand Down
4 changes: 4 additions & 0 deletions test/dummy/app/controllers/messages_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ def index
@messages = Message.all
end

def permanent
@messages = Message.all
end

def section
end

Expand Down
12 changes: 12 additions & 0 deletions test/dummy/app/views/messages/permanent.html.erb
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<% navigations = params.fetch(:navigations, 0).to_i %>

<h1>Permanent Messages</h1>

<p>Navigations: <%= navigations %></p>
<%= link_to "Navigate", permanent_messages_path(navigations: navigations + 1) %>

<div id="messages-wrapper" data-turbo-permanent>
<%= turbo_stream_from "messages" %>
<div id="messages">
</div>
</div>
1 change: 1 addition & 0 deletions test/dummy/config/routes.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
resources :messages do
collection do
get :section
get :permanent
end
end
resources :trays
Expand Down
33 changes: 33 additions & 0 deletions test/system/broadcasts_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,39 @@ class BroadcastsTest < ApplicationSystemTestCase
assert_no_text original.content
end

test "the turbo-cable-stream-source does not unsubscribe+resubscribe within turbo-permanent" do
visit permanent_messages_path
assert_selector "turbo-cable-stream-source[connected]"

cable_stream_source = find("turbo-cable-stream-source")
cable_stream_source.execute_script <<~JS
const el = this;

let removedOnce = false;
el.reconnectedObserver = new MutationObserver((mutations) => {
mutations.forEach((m) => {
const isConnected = el.hasAttribute("connected");
if (m.oldValue === "" && !isConnected) {
removedOnce = true;
el.setAttribute("disconnected", "");
}
if (removedOnce && m.oldValue == null && isConnected) {
el.setAttribute("reconnected", "");
}
});
});
el.reconnectedObserver.observe(el, { attributes: true, attributeOldValue: true, attributeFilter: ["connected"] });
Comment on lines +131 to +144
Copy link
Author

@bensheldon bensheldon Sep 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noting this observer would be unnecessary if there was something like this: #498 (comment)

...as we could more simply add a listener to the page for a subscription-connected/subscription-disconnected event rather than have to set up a mutation observer.

JS

assert_text "Navigations: 0"
click_link "Navigate"
assert_text "Navigations: 1"

assert_selector "turbo-cable-stream-source[connected]"
assert_no_selector "turbo-cable-stream-source[disconnected]"
assert_no_selector "turbo-cable-stream-source[reconnected]"
end

private

def reconnect_cable_stream_source(from:, to:)
Expand Down