Switch to sse

This commit is contained in:
Lars Westermann 2019-06-15 12:40:02 +02:00
parent e044203efc
commit 2ccc19208a
Signed by: lars.westermann
GPG key ID: 9D417FA5BB9D5E1D
3 changed files with 145 additions and 38 deletions

View file

@ -2,6 +2,7 @@ package de.kif.common
import com.soywiz.klock.*
import com.soywiz.klock.locale.german
import kotlin.math.ceil
fun formatDateTime(unix: Long, timezone: Long) =
DateFormat("EEEE, d. MMMM y HH:mm")
@ -24,10 +25,8 @@ fun formatTime(unix: Long, timezone: Long) =
.format(DateTimeTz.utc(DateTime(unix), TimezoneOffset(timezone.toDouble())))
fun formatTimeDiff(time: Long, now: Long, timezone: Long): String {
var dt = (time - now) / 1000
var dt = ceil(((time - now) / 1000) / 60.0).toLong()
val seconds = dt % 60
dt /= 60
val minutes = dt % 60
dt /= 60
val hours = dt % 24
@ -35,12 +34,8 @@ fun formatTimeDiff(time: Long, now: Long, timezone: Long): String {
val days = dt
return when {
days > 1L -> {
"in $days Tagen"
}
days > 0L -> {
"morgen"
}
days > 1L -> "in $days Tagen"
days > 0L -> "morgen"
hours > 1L -> {
val nowHour = DateFormat("HH")
.withLocale(KlockLocale.german)
@ -54,18 +49,11 @@ fun formatTimeDiff(time: Long, now: Long, timezone: Long): String {
"morgen"
} else "um $ht"
}
hours > 0L -> {
"in " + hours.toString().padStart(2, '0') + ":" + (minutes + if (seconds > 0) 1 else 0).toString().padStart(
2,
'0'
)
}
minutes > 0L -> {
"in 00:" + (minutes + if (seconds > 0) 1 else 0).toString().padStart(2, '0')
}
seconds > 0L -> {
"in > 1 Minute"
}
else -> "---"
hours > 0L -> "in " + hours.toTimeString() + ":" + minutes.toTimeString()
minutes > 1L -> "in 00:" + minutes.toTimeString()
else -> "in > 1 Minute"
}
}
@Suppress("NOTHING_TO_INLINE")
private inline fun Number.toTimeString() = toString().padStart(2, '0')

View file

@ -3,11 +3,15 @@ package de.kif.frontend
import de.kif.common.MessageBox
import de.kif.common.MessageType
import de.kif.common.RepositoryType
import de.kif.common.Serialization
import de.kif.frontend.repository.*
import de.westermann.kwebview.clearInterval
import de.westermann.kwebview.createHtmlView
import de.westermann.kwebview.interval
import kotlinx.serialization.DynamicObjectParser
import org.w3c.dom.EventSource
import org.w3c.dom.MessageEvent
import org.w3c.dom.events.EventListener
import org.w3c.dom.get
import org.w3c.xhr.XMLHttpRequest
import kotlin.browser.document
@ -15,7 +19,8 @@ import kotlin.browser.window
class PushServiceClient {
private val prefix = js("prefix")
private val url = "$prefix/api/updates"
private val pollingUrl = "$prefix/api/updates"
private val eventUrl = "$prefix/api/events"
private val body = document.body ?: createHtmlView()
private val parser = DynamicObjectParser()
@ -55,16 +60,17 @@ class PushServiceClient {
}
}
private fun onError(code: Int) {
private fun onError(code: Int): Boolean {
if (errorTimeout > 0) {
errorTimeout--
return
return false
}
if (!body.classList.contains("offline")) {
console.log("Offline reason: $code")
}
body.classList.add("offline")
return true
}
private fun request() {
@ -82,21 +88,57 @@ class PushServiceClient {
} else {
onError(-1)
}
} else {
onError(xmlHttpRequest.status.toInt())
}
}
Unit
} catch (e: Exception) {
console.error(e)
onError(-2)
}
}
xmlHttpRequest.open("GET", "$url?timestamp=$timestamp", true)
xmlHttpRequest.open("GET", "$pollingUrl?timestamp=$timestamp", true)
xmlHttpRequest.overrideMimeType("application/json")
xmlHttpRequest.send()
}
private fun initEventSource() {
val eventSource = EventSource(eventUrl)
var timeout = 3
eventSource.addEventListener("update", EventListener {
val event = it as? MessageEvent ?: return@EventListener
val message = event.data as? String ?: return@EventListener
onMessage(Serialization.parse(MessageBox.serializer(), message))
})
eventSource.addEventListener("ping", EventListener {
timeout = 3
val event = it as? MessageEvent ?: return@EventListener
val s = event.data as? String ?: return@EventListener
if (s != signature) {
reload()
}
})
intervalId = interval(500) {
timeout -= 1
if (timeout <= 0) {
if (onError(-1)) {
val id = intervalId
if (id != null) {
clearInterval(id)
intervalId = null
}
intervalId = interval(500) {
request()
}
}
}
}
}
private val messageHandlers: List<MessageHandler> = listOf(
RoomRepository.handler,
ScheduleRepository.handler,
@ -108,8 +150,19 @@ class PushServiceClient {
)
init {
intervalId = interval(500) {
request()
try {
initEventSource()
} catch (e: Exception) {
console.log("Cannot connect to event source, use polling fallback!")
val id = intervalId
if (id != null) {
clearInterval(id)
intervalId = null
}
intervalId = interval(500) {
request()
}
}
}

View file

@ -3,18 +3,21 @@ package de.kif.backend.util
import de.kif.backend.repository.*
import de.kif.backend.route.api.error
import de.kif.backend.route.api.success
import de.kif.common.Message
import de.kif.common.MessageBox
import de.kif.common.MessageType
import de.kif.common.RepositoryType
import de.kif.common.*
import de.kif.common.model.Post
import io.ktor.application.call
import io.ktor.http.CacheControl
import io.ktor.http.ContentType
import io.ktor.http.HttpStatusCode
import io.ktor.response.cacheControl
import io.ktor.response.respondTextWriter
import io.ktor.routing.Route
import io.ktor.routing.get
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.broadcast
import kotlinx.coroutines.runBlocking
import kotlinx.html.currentTimeMillis
import kotlin.concurrent.thread
import kotlin.math.abs
import kotlin.math.max
object PushService {
@ -24,16 +27,30 @@ object PushService {
private val messages: MutableList<Pair<Long, Message>> = mutableListOf()
private val channel = Channel<SSE>()
val broadcast = channel.broadcast()
/**
* Save the message with the current timestamp
*/
fun notify(type: MessageType, repository: RepositoryType, id: Long) {
suspend fun notify(type: MessageType, repository: RepositoryType, id: Long) {
val timestamp = System.currentTimeMillis()
val message = Message(type, repository, id)
synchronized(messages) {
messages += Pair(timestamp, message)
}
channel.send(
SSE.Message(
MessageBox(
System.currentTimeMillis(),
signature,
true,
listOf(message)
)
)
)
}
private fun getIndexOfTimestamp(timestamp: Long): Int {
@ -93,6 +110,17 @@ object PushService {
}
}
}
fun ping() {
runBlocking {
channel.send(SSE.Ping)
}
}
}
sealed class SSE {
class Message(val messageBox: MessageBox) : SSE()
object Ping: SSE()
}
fun Route.pushService() {
@ -104,12 +132,42 @@ fun Route.pushService() {
val messageBox = PushService.getMessages(timestamp)
call.response.cacheControl(CacheControl.NoCache(null))
call.success(messageBox)
} catch (_: Exception) {
call.error(HttpStatusCode.InternalServerError)
}
}
get("/api/events") {
val events = PushService.broadcast.openSubscription()
try {
call.response.cacheControl(CacheControl.NoCache(null))
call.respondTextWriter(contentType = ContentType.Text.EventStream) {
write("retry: 1000\n")
write("\n")
for (event in events) {
write("id: 0\n")
when (event) {
is SSE.Ping -> {
write("event: ping\n")
write("data: ${PushService.signature}\n")
}
is SSE.Message -> {
write("event: update\n")
write("data: ${Serialization.stringify(MessageBox.serializer(), event.messageBox)}\n")
}
}
write("\n")
flush()
}
}
} finally {
events.cancel()
}
}
RoomRepository.registerPushService()
ScheduleRepository.registerPushService()
TrackRepository.registerPushService()
@ -121,11 +179,19 @@ fun Route.pushService() {
thread(
start = true,
isDaemon = true,
name = "PushServiceGC"
name = "push-service"
) {
var gc = currentTimeMillis() + 1000 * 60 * 10
while (true) {
PushService.gc(currentTimeMillis() - 1000 * 60)
Thread.sleep(1000 * 60 * 5)
Thread.sleep(500)
PushService.ping()
val now = currentTimeMillis()
if (gc < now) {
gc = now + 1000 * 60 * 10
PushService.gc(now - 1000 * 60)
}
}
}
}