diff --git a/src/commonMain/kotlin/de/kif/common/DateFormat.kt b/src/commonMain/kotlin/de/kif/common/DateFormat.kt index d336d8b..af1c8a2 100644 --- a/src/commonMain/kotlin/de/kif/common/DateFormat.kt +++ b/src/commonMain/kotlin/de/kif/common/DateFormat.kt @@ -2,6 +2,7 @@ package de.kif.common import com.soywiz.klock.* import com.soywiz.klock.locale.german +import kotlin.math.ceil fun formatDateTime(unix: Long, timezone: Long) = DateFormat("EEEE, d. MMMM y HH:mm") @@ -24,10 +25,8 @@ fun formatTime(unix: Long, timezone: Long) = .format(DateTimeTz.utc(DateTime(unix), TimezoneOffset(timezone.toDouble()))) fun formatTimeDiff(time: Long, now: Long, timezone: Long): String { - var dt = (time - now) / 1000 + var dt = ceil(((time - now) / 1000) / 60.0).toLong() - val seconds = dt % 60 - dt /= 60 val minutes = dt % 60 dt /= 60 val hours = dt % 24 @@ -35,12 +34,8 @@ fun formatTimeDiff(time: Long, now: Long, timezone: Long): String { val days = dt return when { - days > 1L -> { - "in $days Tagen" - } - days > 0L -> { - "morgen" - } + days > 1L -> "in $days Tagen" + days > 0L -> "morgen" hours > 1L -> { val nowHour = DateFormat("HH") .withLocale(KlockLocale.german) @@ -54,18 +49,11 @@ fun formatTimeDiff(time: Long, now: Long, timezone: Long): String { "morgen" } else "um $ht" } - hours > 0L -> { - "in " + hours.toString().padStart(2, '0') + ":" + (minutes + if (seconds > 0) 1 else 0).toString().padStart( - 2, - '0' - ) - } - minutes > 0L -> { - "in 00:" + (minutes + if (seconds > 0) 1 else 0).toString().padStart(2, '0') - } - seconds > 0L -> { - "in > 1 Minute" - } - else -> "---" + hours > 0L -> "in " + hours.toTimeString() + ":" + minutes.toTimeString() + minutes > 1L -> "in 00:" + minutes.toTimeString() + else -> "in > 1 Minute" } } + +@Suppress("NOTHING_TO_INLINE") +private inline fun Number.toTimeString() = toString().padStart(2, '0') diff --git a/src/jsMain/kotlin/de/kif/frontend/PushServiceClient.kt b/src/jsMain/kotlin/de/kif/frontend/PushServiceClient.kt index 100321f..8e4863f 100644 --- a/src/jsMain/kotlin/de/kif/frontend/PushServiceClient.kt +++ b/src/jsMain/kotlin/de/kif/frontend/PushServiceClient.kt @@ -3,11 +3,15 @@ package de.kif.frontend import de.kif.common.MessageBox import de.kif.common.MessageType import de.kif.common.RepositoryType +import de.kif.common.Serialization import de.kif.frontend.repository.* import de.westermann.kwebview.clearInterval import de.westermann.kwebview.createHtmlView import de.westermann.kwebview.interval import kotlinx.serialization.DynamicObjectParser +import org.w3c.dom.EventSource +import org.w3c.dom.MessageEvent +import org.w3c.dom.events.EventListener import org.w3c.dom.get import org.w3c.xhr.XMLHttpRequest import kotlin.browser.document @@ -15,7 +19,8 @@ import kotlin.browser.window class PushServiceClient { private val prefix = js("prefix") - private val url = "$prefix/api/updates" + private val pollingUrl = "$prefix/api/updates" + private val eventUrl = "$prefix/api/events" private val body = document.body ?: createHtmlView() private val parser = DynamicObjectParser() @@ -55,16 +60,17 @@ class PushServiceClient { } } - private fun onError(code: Int) { + private fun onError(code: Int): Boolean { if (errorTimeout > 0) { errorTimeout-- - return + return false } if (!body.classList.contains("offline")) { console.log("Offline reason: $code") } body.classList.add("offline") + return true } private fun request() { @@ -82,21 +88,57 @@ class PushServiceClient { } else { onError(-1) } - } else { onError(xmlHttpRequest.status.toInt()) } } + Unit } catch (e: Exception) { console.error(e) onError(-2) } } - xmlHttpRequest.open("GET", "$url?timestamp=$timestamp", true) + xmlHttpRequest.open("GET", "$pollingUrl?timestamp=$timestamp", true) xmlHttpRequest.overrideMimeType("application/json") xmlHttpRequest.send() } + private fun initEventSource() { + val eventSource = EventSource(eventUrl) + var timeout = 3 + + eventSource.addEventListener("update", EventListener { + val event = it as? MessageEvent ?: return@EventListener + val message = event.data as? String ?: return@EventListener + onMessage(Serialization.parse(MessageBox.serializer(), message)) + }) + eventSource.addEventListener("ping", EventListener { + timeout = 3 + val event = it as? MessageEvent ?: return@EventListener + val s = event.data as? String ?: return@EventListener + if (s != signature) { + reload() + } + }) + + intervalId = interval(500) { + timeout -= 1 + if (timeout <= 0) { + if (onError(-1)) { + val id = intervalId + if (id != null) { + clearInterval(id) + intervalId = null + } + + intervalId = interval(500) { + request() + } + } + } + } + } + private val messageHandlers: List = listOf( RoomRepository.handler, ScheduleRepository.handler, @@ -108,8 +150,19 @@ class PushServiceClient { ) init { - intervalId = interval(500) { - request() + try { + initEventSource() + } catch (e: Exception) { + console.log("Cannot connect to event source, use polling fallback!") + val id = intervalId + if (id != null) { + clearInterval(id) + intervalId = null + } + + intervalId = interval(500) { + request() + } } } diff --git a/src/jvmMain/kotlin/de/kif/backend/util/PushService.kt b/src/jvmMain/kotlin/de/kif/backend/util/PushService.kt index d3d4353..59e3099 100644 --- a/src/jvmMain/kotlin/de/kif/backend/util/PushService.kt +++ b/src/jvmMain/kotlin/de/kif/backend/util/PushService.kt @@ -3,18 +3,21 @@ package de.kif.backend.util import de.kif.backend.repository.* import de.kif.backend.route.api.error import de.kif.backend.route.api.success -import de.kif.common.Message -import de.kif.common.MessageBox -import de.kif.common.MessageType -import de.kif.common.RepositoryType +import de.kif.common.* import de.kif.common.model.Post import io.ktor.application.call +import io.ktor.http.CacheControl +import io.ktor.http.ContentType import io.ktor.http.HttpStatusCode +import io.ktor.response.cacheControl +import io.ktor.response.respondTextWriter import io.ktor.routing.Route import io.ktor.routing.get +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.broadcast +import kotlinx.coroutines.runBlocking import kotlinx.html.currentTimeMillis import kotlin.concurrent.thread -import kotlin.math.abs import kotlin.math.max object PushService { @@ -24,16 +27,30 @@ object PushService { private val messages: MutableList> = mutableListOf() + private val channel = Channel() + val broadcast = channel.broadcast() + /** * Save the message with the current timestamp */ - fun notify(type: MessageType, repository: RepositoryType, id: Long) { + suspend fun notify(type: MessageType, repository: RepositoryType, id: Long) { val timestamp = System.currentTimeMillis() val message = Message(type, repository, id) synchronized(messages) { messages += Pair(timestamp, message) } + + channel.send( + SSE.Message( + MessageBox( + System.currentTimeMillis(), + signature, + true, + listOf(message) + ) + ) + ) } private fun getIndexOfTimestamp(timestamp: Long): Int { @@ -93,6 +110,17 @@ object PushService { } } } + + fun ping() { + runBlocking { + channel.send(SSE.Ping) + } + } +} + +sealed class SSE { + class Message(val messageBox: MessageBox) : SSE() + object Ping: SSE() } fun Route.pushService() { @@ -104,12 +132,42 @@ fun Route.pushService() { val messageBox = PushService.getMessages(timestamp) + call.response.cacheControl(CacheControl.NoCache(null)) call.success(messageBox) } catch (_: Exception) { call.error(HttpStatusCode.InternalServerError) } } + get("/api/events") { + val events = PushService.broadcast.openSubscription() + try { + call.response.cacheControl(CacheControl.NoCache(null)) + call.respondTextWriter(contentType = ContentType.Text.EventStream) { + write("retry: 1000\n") + write("\n") + + for (event in events) { + write("id: 0\n") + when (event) { + is SSE.Ping -> { + write("event: ping\n") + write("data: ${PushService.signature}\n") + } + is SSE.Message -> { + write("event: update\n") + write("data: ${Serialization.stringify(MessageBox.serializer(), event.messageBox)}\n") + } + } + write("\n") + flush() + } + } + } finally { + events.cancel() + } + } + RoomRepository.registerPushService() ScheduleRepository.registerPushService() TrackRepository.registerPushService() @@ -121,11 +179,19 @@ fun Route.pushService() { thread( start = true, isDaemon = true, - name = "PushServiceGC" + name = "push-service" ) { + var gc = currentTimeMillis() + 1000 * 60 * 10 while (true) { - PushService.gc(currentTimeMillis() - 1000 * 60) - Thread.sleep(1000 * 60 * 5) + Thread.sleep(500) + PushService.ping() + + val now = currentTimeMillis() + if (gc < now) { + gc = now + 1000 * 60 * 10 + PushService.gc(now - 1000 * 60) + } } + } }