Skip to content

Commit

Permalink
添加 websocket 超时机制
Browse files Browse the repository at this point in the history
  • Loading branch information
czp3009 committed Mar 29, 2019
1 parent bf1cf9c commit 5faac14
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 58 deletions.
27 changes: 14 additions & 13 deletions src/main/kotlin/com/hiczp/bilibili/api/live/websocket/LiveClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import io.ktor.util.KtorExperimentalAPI
import io.ktor.util.decodeString
import io.ktor.util.error
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.consumeEach
import mu.KotlinLogging

private val logger = KotlinLogging.logger { }
Expand Down Expand Up @@ -82,9 +81,6 @@ class LiveClient(

//开启 websocket
HttpClient(CIO).config { install(WebSockets) }.wss(host = host, port = port, path = "/sub") {
pingIntervalMillis = 30_000
timeoutMillis = 10_000

//发送进房数据包
send(PresetPacket.enterRoomPacket(anchorUserId, roomId))
val enterRoomResponsePacket = incoming.receive().toPackets()[0]
Expand All @@ -107,8 +103,9 @@ class LiveClient(
launch {
val scale = bilibiliClient.billingClientProperties.scale
while (true) {
@Suppress("DeferredResultUnused")
liveAPI.userOnlineHeart(roomId, scale)
liveAPI.userOnlineHeart(roomId, scale).invokeOnCompletion {
if (it != null) logger.error(it)
}
delay(300_000)
}
}
Expand All @@ -132,8 +129,10 @@ class LiveClient(
}

try {
incoming.consumeEach { frame ->
frame.toPackets().forEach {
while (true) {
withTimeout(40_000) {
incoming.receive()
}.toPackets().forEach {
try {
@Suppress("NON_EXHAUSTIVE_WHEN")
when (it.packetType) {
Expand All @@ -151,12 +150,14 @@ class LiveClient(
}
}
}
} catch (e: TimeoutCancellationException) {
throw e
} catch (e: CancellationException) {
close()
} finally {
restHeartBeatJob?.cancel()
websocketHeartBeatJob.cancel()
launch(NonCancellable) {
launch {
val closeReason = closeReason.await()
try {
callback.onClose?.invoke(this@LiveClient, closeReason)
Expand All @@ -179,7 +180,7 @@ class LiveClientCallbackDSL {
/**
* 成功进入房间时触发
*/
var onConnect: (suspend (LiveClient) -> Unit)? = null
var onConnect: ((LiveClient) -> Unit)? = null

/**
* 抛出异常时触发
Expand All @@ -189,17 +190,17 @@ class LiveClientCallbackDSL {
/**
* 收到人气值数据包
*/
var onPopularityPacket: (suspend (LiveClient, Int) -> Unit)? = null
var onPopularityPacket: ((LiveClient, Int) -> Unit)? = null

/**
* 收到 command 数据包
*/
var onCommandPacket: (suspend (LiveClient, JsonObject) -> Unit)? = null
var onCommandPacket: ((LiveClient, JsonObject) -> Unit)? = null

/**
* 连接关闭时触发
*/
var onClose: (suspend (LiveClient, CloseReason?) -> Unit)? = null
var onClose: ((LiveClient, CloseReason?) -> Unit)? = null
}

/**
Expand Down
24 changes: 8 additions & 16 deletions src/main/kotlin/com/hiczp/bilibili/api/live/websocket/Packet.kt
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package com.hiczp.bilibili.api.live.websocket

import com.hiczp.bilibili.api.toPrettyPrintString
import io.ktor.http.cio.websocket.Frame
import io.ktor.http.cio.websocket.WebSocketSession
import io.ktor.http.cio.websocket.readBytes
import java.nio.ByteBuffer

/**
Expand All @@ -12,14 +10,14 @@ import java.nio.ByteBuffer
* 数据包头部结构 00 00 00 65 00 10 00 01 00 00 00 07 00 00 00 01
* |数据包总长度| |头长| |tag| |数据包类型 | | tag |
*
* @param tagShort 一种 tag, 如果是非 command 数据包则为 1, 否则为 0, short 类型
* @param shortTag 一种 tag, 如果是非 command 数据包则为 1, 否则为 0, short 类型
* @param packetType 数据包类型
* @param tag 同 tagShort, 但是为 int 类型
* @param content 正文内容
*/
@Suppress("MemberVisibilityCanBePrivate")
class Packet(
val tagShort: Short = 1,
data class Packet(
val shortTag: Short = 1,
val packetType: PacketType,
val tag: Int = 1,
val content: ByteBuffer
Expand All @@ -29,24 +27,18 @@ class Packet(

val headerLength: Short = 0x10

fun toByteBuffer() =
fun toFrame() = Frame.Binary(
true,
ByteBuffer.allocate(totalLength)
.putInt(totalLength)
.putShort(headerLength)
.putShort(tagShort)
.putShort(shortTag)
.putInt(packetType.value)
.putInt(tag)
.put(content).apply {
flip()
}!!

fun toFrame() = Frame.Binary(
true,
toByteBuffer()
)

//for debug
override fun toString() = toFrame().readBytes().toPrettyPrintString()
}

/**
Expand All @@ -59,14 +51,14 @@ internal fun Frame.toPackets(): List<Packet> {
val startPosition = buffer.position()
val totalLength = buffer.int
buffer.position(buffer.position() + 2) //skip headerLength
val tagShort = buffer.short
val shortTag = buffer.short
val packetType = PacketType.getByValue(buffer.int)
val tag = buffer.int
buffer.limit(startPosition + totalLength)
val content = buffer.slice()
buffer.position(buffer.limit())
buffer.limit(bufferLength)
list.add(Packet(tagShort, packetType, tag, content))
list.add(Packet(shortTag, packetType, tag, content))
}
return list
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ object PresetPacket {

/**
* 心跳包
* 心跳包的正文内容可能是故意的, 为固定值 [object Object]
* 心跳包的正文内容可能是故意的, 为固定值 "[object Object]"
*/
fun heartbeatPacket(content: ByteBuffer = ByteBuffer.wrap("[object Object]".toByteArray())) = Packet(
packetType = PacketType.HEARTBEAT,
Expand Down
66 changes: 38 additions & 28 deletions src/test/kotlin/com/hiczp/bilibili/api/test/LiveClientTest.kt
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package com.hiczp.bilibili.api.test

import com.github.salomonbrys.kotson.byString
import com.hiczp.bilibili.api.BilibiliClient
import com.hiczp.bilibili.api.isNotEmpty
import com.hiczp.bilibili.api.live.websocket.DanmakuMessage
import com.hiczp.bilibili.api.live.websocket.liveClient
import io.ktor.http.cio.websocket.CloseReason
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import okhttp3.logging.HttpLoggingInterceptor
import org.junit.jupiter.api.Test
import java.nio.file.Paths
import java.time.Instant

class LiveClientTest {
@Test
Expand All @@ -16,39 +20,45 @@ class LiveClientTest {
it.toFile().mkdirs()
}

val job = bilibiliClient.liveClient(roomId = 3, sendUserOnlineHeart = true) {
onConnect = {
println("Connected")
}

onPopularityPacket = { _, popularity ->
println("Current popularity: $popularity")
}

onCommandPacket = { _, jsonObject ->
val json = jsonObject.toString()
val cmd by jsonObject.byString
path.resolve("$cmd.json").toFile().run {
if (!exists()) writeText(json)
BilibiliClient(logLevel = HttpLoggingInterceptor.Level.BASIC)
.apply {
loginResponse = bilibiliClient.loginResponse
}
.liveClient(roomId = 3, sendUserOnlineHeart = true) {
onConnect = {
println("Connected ${Instant.now()}")
}

onPopularityPacket = { _, popularity ->
println("Current popularity: $popularity ${Instant.now()}")
}

println(
if (cmd == "DANMU_MSG") {
with(DanmakuMessage(jsonObject)) {
"${if (fansMedalInfo.isNotEmpty()) "[$fansMedalName $fansMedalLevel] " else ""}[UL$userLevel] $nickname: $message"
}
} else {
json
onCommandPacket = { _, jsonObject ->
val json = jsonObject.toString()
val cmd by jsonObject.byString
path.resolve("$cmd.json").toFile().run {
if (!exists()) writeText(json)
}
)
}

onClose = { _, closeReason ->
println(closeReason)
}
}.launch()
println(
if (cmd == "DANMU_MSG") {
with(DanmakuMessage(jsonObject)) {
"${if (fansMedalInfo.isNotEmpty()) "[$fansMedalName $fansMedalLevel] " else ""}[UL$userLevel] $nickname: $message"
}
} else {
json
}
)
}

onClose = { liveClient, closeReason ->
println("$closeReason ${Instant.now()}")
if (closeReason?.code != CloseReason.Codes.NORMAL.code) liveClient.launch()
}
}.launch()

runBlocking {
delay(9999999)
delay(99999999999999999)
}
}
}

0 comments on commit 5faac14

Please sign in to comment.