diff --git a/app/src/main/java/com/futo/platformplayer/fragment/mainactivity/main/VideoDetailView.kt b/app/src/main/java/com/futo/platformplayer/fragment/mainactivity/main/VideoDetailView.kt index 33e53ccb..c424f640 100644 --- a/app/src/main/java/com/futo/platformplayer/fragment/mainactivity/main/VideoDetailView.kt +++ b/app/src/main/java/com/futo/platformplayer/fragment/mainactivity/main/VideoDetailView.kt @@ -927,7 +927,7 @@ class VideoDetailView : ConstraintLayout { val device = devices.first(); UIDialogs.showConfirmationDialog(context, "Would you like to open\n[${videoToSend.name}]\non ${device.remotePublicKey}" , { fragment.lifecycleScope.launch(Dispatchers.IO) { - device.sendJson(GJSyncOpcodes.sendToDevices, SendToDevicePackage(videoToSend.url, (lastPositionMilliseconds/1000).toInt())); + device.sendJsonData(GJSyncOpcodes.sendToDevices, SendToDevicePackage(videoToSend.url, (lastPositionMilliseconds/1000).toInt())); } }) } diff --git a/app/src/main/java/com/futo/platformplayer/states/StateHistory.kt b/app/src/main/java/com/futo/platformplayer/states/StateHistory.kt index acd92a67..cf2c032c 100644 --- a/app/src/main/java/com/futo/platformplayer/states/StateHistory.kt +++ b/app/src/main/java/com/futo/platformplayer/states/StateHistory.kt @@ -89,7 +89,7 @@ class StateHistory { StateApp.instance.scopeOrNull?.launch(Dispatchers.IO) { if(StateSync.instance.hasAtLeastOneOnlineDevice()) { Logger.i(TAG, "SyncHistory playback broadcasted (${liveObj.name}: ${position})"); - StateSync.instance.broadcastJson( + StateSync.instance.broadcastJsonData( GJSyncOpcodes.syncHistory, listOf(historyVideo) ); diff --git a/app/src/main/java/com/futo/platformplayer/states/StatePlaylists.kt b/app/src/main/java/com/futo/platformplayer/states/StatePlaylists.kt index 2826cb91..f5a033ab 100644 --- a/app/src/main/java/com/futo/platformplayer/states/StatePlaylists.kt +++ b/app/src/main/java/com/futo/platformplayer/states/StatePlaylists.kt @@ -198,7 +198,7 @@ class StatePlaylists { StateApp.instance.scopeOrNull?.launch(Dispatchers.IO) { if(StateSync.instance.hasAtLeastOneOnlineDevice()) { Logger.i(StateSubscriptionGroups.TAG, "SyncPlaylist (${playlist.name})"); - StateSync.instance.broadcastJson( + StateSync.instance.broadcastJsonData( GJSyncOpcodes.syncPlaylists, SyncPlaylistsPackage(listOf(playlist), mapOf()) ); @@ -217,7 +217,7 @@ class StatePlaylists { StateApp.instance.scopeOrNull?.launch(Dispatchers.IO) { if(StateSync.instance.hasAtLeastOneOnlineDevice()) { Logger.i(StateSubscriptionGroups.TAG, "SyncPlaylist (${playlist.name})"); - StateSync.instance.broadcastJson( + StateSync.instance.broadcastJsonData( GJSyncOpcodes.syncPlaylists, SyncPlaylistsPackage(listOf(), mapOf(Pair(playlist.id, OffsetDateTime.now().toEpochSecond()))) ); diff --git a/app/src/main/java/com/futo/platformplayer/states/StateSubscriptionGroups.kt b/app/src/main/java/com/futo/platformplayer/states/StateSubscriptionGroups.kt index 2b4883da..5ca521ec 100644 --- a/app/src/main/java/com/futo/platformplayer/states/StateSubscriptionGroups.kt +++ b/app/src/main/java/com/futo/platformplayer/states/StateSubscriptionGroups.kt @@ -81,7 +81,7 @@ class StateSubscriptionGroups { StateApp.instance.scopeOrNull?.launch(Dispatchers.IO) { if(StateSync.instance.hasAtLeastOneOnlineDevice()) { Logger.i(TAG, "SyncSubscriptionGroup (${subGroup.name})"); - StateSync.instance.broadcastJson( + StateSync.instance.broadcastJsonData( GJSyncOpcodes.syncSubscriptionGroups, SyncSubscriptionGroupsPackage(listOf(subGroup), mapOf()) ); @@ -100,7 +100,7 @@ class StateSubscriptionGroups { StateApp.instance.scopeOrNull?.launch(Dispatchers.IO) { if(StateSync.instance.hasAtLeastOneOnlineDevice()) { Logger.i(TAG, "SyncSubscriptionGroup delete (${group.name})"); - StateSync.instance.broadcastJson( + StateSync.instance.broadcastJsonData( GJSyncOpcodes.syncSubscriptionGroups, SyncSubscriptionGroupsPackage(listOf(), mapOf(Pair(id, OffsetDateTime.now().toEpochSecond()))) ); diff --git a/app/src/main/java/com/futo/platformplayer/states/StateSubscriptions.kt b/app/src/main/java/com/futo/platformplayer/states/StateSubscriptions.kt index df680fed..52fb9f2e 100644 --- a/app/src/main/java/com/futo/platformplayer/states/StateSubscriptions.kt +++ b/app/src/main/java/com/futo/platformplayer/states/StateSubscriptions.kt @@ -250,7 +250,7 @@ class StateSubscriptions { StateApp.instance.scopeOrNull?.launch(Dispatchers.IO) { try { - StateSync.instance.broadcast( + StateSync.instance.broadcastData( GJSyncOpcodes.syncSubscriptions, Json.encodeToString( SyncSubscriptionsPackage( listOf(subObj), @@ -299,7 +299,7 @@ class StateSubscriptions { StateApp.instance.scopeOrNull?.launch(Dispatchers.IO) { try { - StateSync.instance.broadcast( + StateSync.instance.broadcastData( GJSyncOpcodes.syncSubscriptions, Json.encodeToString( SyncSubscriptionsPackage( listOf(), diff --git a/app/src/main/java/com/futo/platformplayer/states/StateSync.kt b/app/src/main/java/com/futo/platformplayer/states/StateSync.kt index b6bf0ca8..4de1b41c 100644 --- a/app/src/main/java/com/futo/platformplayer/states/StateSync.kt +++ b/app/src/main/java/com/futo/platformplayer/states/StateSync.kt @@ -370,26 +370,29 @@ class StateSync { Logger.i(TAG, "Connection authorized for ${remotePublicKey} because initiator") } }, - onData = { s, opcode, data -> - session?.handlePacket(s, opcode, data) + onData = { s, opcode, subOpcode, data -> + session?.handlePacket(s, opcode, subOpcode, data) }) } - inline fun broadcastJson(opcode: UByte, data: T) { - broadcast(opcode, Json.encodeToString(data)); + inline fun broadcastJsonData(subOpcode: UByte, data: T) { + broadcast(SyncSocketSession.Opcode.DATA.value, subOpcode, Json.encodeToString(data)); } - fun broadcast(opcode: UByte, data: String) { - broadcast(opcode, data.toByteArray(Charsets.UTF_8)); + fun broadcastData(subOpcode: UByte, data: String) { + broadcast(SyncSocketSession.Opcode.DATA.value, subOpcode, data.toByteArray(Charsets.UTF_8)); } - fun broadcast(opcode: UByte, data: ByteArray) { + fun broadcast(opcode: UByte, subOpcode: UByte, data: String) { + broadcast(opcode, subOpcode, data.toByteArray(Charsets.UTF_8)); + } + fun broadcast(opcode: UByte, subOpcode: UByte, data: ByteArray) { for(session in getSessions()) { try { if (session.isAuthorized && session.connected) { - session.send(opcode, data); + session.send(opcode, subOpcode, data); } } catch(ex: Exception) { - Logger.w(TAG, "Failed to broadcast ${opcode} to ${session.remotePublicKey}: ${ex.message}}", ex); + Logger.w(TAG, "Failed to broadcast (opcode = ${opcode}, subOpcode = ${subOpcode}) to ${session.remotePublicKey}: ${ex.message}}", ex); } } } @@ -398,7 +401,7 @@ class StateSync { val time = measureTimeMillis { //val export = StateBackup.export(); //session.send(GJSyncOpcodes.syncExport, export.asZip()); - session.send(GJSyncOpcodes.syncStateExchange, getSyncSessionDataString(session.remotePublicKey)); + session.sendData(GJSyncOpcodes.syncStateExchange, getSyncSessionDataString(session.remotePublicKey)); } Logger.i(TAG, "Generated and sent sync export in ${time}ms"); } diff --git a/app/src/main/java/com/futo/platformplayer/sync/internal/SyncSession.kt b/app/src/main/java/com/futo/platformplayer/sync/internal/SyncSession.kt index 6c46f59d..fd457d52 100644 --- a/app/src/main/java/com/futo/platformplayer/sync/internal/SyncSession.kt +++ b/app/src/main/java/com/futo/platformplayer/sync/internal/SyncSession.kt @@ -45,6 +45,7 @@ class SyncSession : IAuthorizable { private val _onConnectedChanged: (session: SyncSession, connected: Boolean) -> Unit val remotePublicKey: String override val isAuthorized get() = _authorized && _remoteAuthorized + private var _wasAuthorized = false var connected: Boolean = false private set(v) { @@ -94,8 +95,10 @@ class SyncSession : IAuthorizable { } private fun checkAuthorized() { - if (isAuthorized) + if (!_wasAuthorized && isAuthorized) { + _wasAuthorized = true _onAuthorized.invoke(this) + } } fun removeSocketSession(socketSession: SyncSocketSession) { @@ -117,8 +120,8 @@ class SyncSession : IAuthorizable { _onClose.invoke(this) } - fun handlePacket(socketSession: SyncSocketSession, opcode: UByte, data: ByteBuffer) { - Logger.i(TAG, "Handle packet (opcode: ${opcode}, data.length: ${data.remaining()})") + fun handlePacket(socketSession: SyncSocketSession, opcode: UByte, subOpcode: UByte, data: ByteBuffer) { + Logger.i(TAG, "Handle packet (opcode: ${opcode}, subOpcode: ${subOpcode}, data.length: ${data.remaining()})") when (opcode) { Opcode.NOTIFY_AUTHORIZED.value -> { @@ -136,10 +139,15 @@ class SyncSession : IAuthorizable { return } - Logger.i(TAG, "Received ${opcode} (${data.remaining()} bytes)") + if (opcode != Opcode.DATA.value) { + Logger.w(TAG, "Unknown opcode received: (opcode = ${opcode}, subOpcode = ${subOpcode})}") + return + } + + Logger.i(TAG, "Received (opcode = ${opcode}, subOpcode = ${subOpcode}) (${data.remaining()} bytes)") //TODO: Abstract this out try { - when (opcode) { + when (subOpcode) { GJSyncOpcodes.sendToDevices -> { StateApp.instance.scopeOrNull?.launch(Dispatchers.Main) { val context = StateApp.instance.contextOrNull; @@ -164,13 +172,13 @@ class SyncSession : IAuthorizable { Logger.i(TAG, "Received SyncSessionData from " + remotePublicKey); - send(GJSyncOpcodes.syncSubscriptions, StateSubscriptions.instance.getSyncSubscriptionsPackageString()); - send(GJSyncOpcodes.syncSubscriptionGroups, StateSubscriptionGroups.instance.getSyncSubscriptionGroupsPackageString()); - send(GJSyncOpcodes.syncPlaylists, StatePlaylists.instance.getSyncPlaylistsPackageString()) + sendData(GJSyncOpcodes.syncSubscriptions, StateSubscriptions.instance.getSyncSubscriptionsPackageString()); + sendData(GJSyncOpcodes.syncSubscriptionGroups, StateSubscriptionGroups.instance.getSyncSubscriptionGroupsPackageString()); + sendData(GJSyncOpcodes.syncPlaylists, StatePlaylists.instance.getSyncPlaylistsPackageString()) val recentHistory = StateHistory.instance.getRecentHistory(syncSessionData.lastHistory); if(recentHistory.size > 0) - sendJson(GJSyncOpcodes.syncHistory, recentHistory); + sendJsonData(GJSyncOpcodes.syncHistory, recentHistory); } GJSyncOpcodes.syncExport -> { @@ -338,16 +346,19 @@ class SyncSession : IAuthorizable { } - inline fun sendJson(opcode: UByte, data: T) { - send(opcode, Json.encodeToString(data)); + inline fun sendJsonData(subOpcode: UByte, data: T) { + send(Opcode.DATA.value, subOpcode, Json.encodeToString(data)); } - fun send(opcode: UByte, data: String) { - send(opcode, data.toByteArray(Charsets.UTF_8)); + fun sendData(subOpcode: UByte, data: String) { + send(Opcode.DATA.value, subOpcode, data.toByteArray(Charsets.UTF_8)); } - fun send(opcode: UByte, data: ByteArray) { + fun send(opcode: UByte, subOpcode: UByte, data: String) { + send(opcode, subOpcode, data.toByteArray(Charsets.UTF_8)); + } + fun send(opcode: UByte, subOpcode: UByte, data: ByteArray) { val sock = _socketSessions.firstOrNull(); if(sock != null){ - sock.send(opcode, ByteBuffer.wrap(data)); + sock.send(opcode, subOpcode, ByteBuffer.wrap(data)); } else throw IllegalStateException("Session has no active sockets"); diff --git a/app/src/main/java/com/futo/platformplayer/sync/internal/SyncSocketSession.kt b/app/src/main/java/com/futo/platformplayer/sync/internal/SyncSocketSession.kt index b7b5ab79..6a5fd4a3 100644 --- a/app/src/main/java/com/futo/platformplayer/sync/internal/SyncSocketSession.kt +++ b/app/src/main/java/com/futo/platformplayer/sync/internal/SyncSocketSession.kt @@ -2,6 +2,7 @@ package com.futo.platformplayer.sync.internal import com.futo.platformplayer.LittleEndianDataInputStream import com.futo.platformplayer.LittleEndianDataOutputStream +import com.futo.platformplayer.ensureNotMainThread import com.futo.platformplayer.logging.Logger import com.futo.platformplayer.noise.protocol.CipherStatePair import com.futo.platformplayer.noise.protocol.DHState @@ -18,7 +19,8 @@ class SyncSocketSession { NOTIFY_UNAUTHORIZED(3u), STREAM_START(4u), STREAM_DATA(5u), - STREAM_END(6u) + STREAM_END(6u), + DATA(7u) } private val _inputStream: LittleEndianDataInputStream @@ -41,12 +43,12 @@ class SyncSocketSession { private val _localKeyPair: DHState private var _localPublicKey: String val localPublicKey: String get() = _localPublicKey - private val _onData: (session: SyncSocketSession, opcode: UByte, data: ByteBuffer) -> Unit + private val _onData: (session: SyncSocketSession, opcode: UByte, subOpcode: UByte, data: ByteBuffer) -> Unit var authorizable: IAuthorizable? = null val remoteAddress: String - constructor(remoteAddress: String, localKeyPair: DHState, inputStream: LittleEndianDataInputStream, outputStream: LittleEndianDataOutputStream, onClose: (session: SyncSocketSession) -> Unit, onHandshakeComplete: (session: SyncSocketSession) -> Unit, onData: (session: SyncSocketSession, opcode: UByte, data: ByteBuffer) -> Unit) { + constructor(remoteAddress: String, localKeyPair: DHState, inputStream: LittleEndianDataInputStream, outputStream: LittleEndianDataOutputStream, onClose: (session: SyncSocketSession) -> Unit, onHandshakeComplete: (session: SyncSocketSession) -> Unit, onData: (session: SyncSocketSession, opcode: UByte, subOpcode: UByte, data: ByteBuffer) -> Unit) { _inputStream = inputStream _outputStream = outputStream _onClose = onClose @@ -159,10 +161,11 @@ class SyncSocketSession { } private fun performVersionCheck() { - _outputStream.writeInt(1) + val CURRENT_VERSION = 2 + _outputStream.writeInt(CURRENT_VERSION) val version = _inputStream.readInt() Logger.i(TAG, "performVersionCheck (version = $version)") - if (version != 1) + if (version != CURRENT_VERSION) throw Exception("Invalid version") } @@ -205,8 +208,9 @@ class SyncSocketSession { throw Exception("Handshake finished without completing") } + fun send(opcode: UByte, subOpcode: UByte, data: ByteBuffer) { + ensureNotMainThread() - fun send(opcode: UByte, data: ByteBuffer) { if (data.remaining() + HEADER_SIZE > MAXIMUM_PACKET_SIZE) { val segmentSize = MAXIMUM_PACKET_SIZE - HEADER_SIZE val segmentData = ByteArray(segmentSize) @@ -223,8 +227,8 @@ class SyncSocketSession { if (sendOffset == 0) { segmentOpcode = Opcode.STREAM_START.value - bytesToSend = segmentSize - 4 - 4 - 1 - segmentPacketSize = bytesToSend + 4 + 4 + 1 + bytesToSend = segmentSize - 4 - 4 - 1 - 1 + segmentPacketSize = bytesToSend + 4 + 4 + 1 + 1 } else { bytesToSend = minOf(segmentSize - 4 - 4, bytesRemaining) segmentOpcode = if (bytesToSend >= bytesRemaining) Opcode.STREAM_END.value else Opcode.STREAM_DATA.value @@ -236,18 +240,20 @@ class SyncSocketSession { putInt(if (segmentOpcode == Opcode.STREAM_START.value) data.remaining() else sendOffset) if (segmentOpcode == Opcode.STREAM_START.value) { put(opcode.toByte()) + put(subOpcode.toByte()) } put(data.array(), data.position() + sendOffset, bytesToSend) } - send(segmentOpcode, ByteBuffer.wrap(segmentData, 0, segmentPacketSize)) + send(segmentOpcode, 0u, ByteBuffer.wrap(segmentData, 0, segmentPacketSize)) sendOffset += bytesToSend } } else { synchronized(_sendLockObject) { ByteBuffer.wrap(_sendBuffer).order(ByteOrder.LITTLE_ENDIAN).apply { - putInt(data.remaining() + 1) + putInt(data.remaining() + 2) put(opcode.toByte()) + put(subOpcode.toByte()) put(data.array(), data.position(), data.remaining()) } @@ -260,10 +266,13 @@ class SyncSocketSession { } } - fun send(opcode: UByte) { + fun send(opcode: UByte, subOpcode: UByte = 0u) { + ensureNotMainThread() + synchronized(_sendLockObject) { - ByteBuffer.wrap(_sendBuffer, 0, 4).order(ByteOrder.LITTLE_ENDIAN).putInt(1) + ByteBuffer.wrap(_sendBuffer, 0, 4).order(ByteOrder.LITTLE_ENDIAN).putInt(2) _sendBuffer.asUByteArray()[4] = opcode + _sendBuffer.asUByteArray()[5] = subOpcode //Logger.i(TAG, "Encrypting message (size = ${HEADER_SIZE})") @@ -277,19 +286,19 @@ class SyncSocketSession { private fun handleData(data: ByteArray, length: Int) { if (length < HEADER_SIZE) - throw Exception("Packet must be at least 5 bytes (header size)") + throw Exception("Packet must be at least 6 bytes (header size)") val size = ByteBuffer.wrap(data, 0, 4).order(ByteOrder.LITTLE_ENDIAN).int if (size != length - 4) throw Exception("Incomplete packet received") val opcode = data.asUByteArray()[4] - val packetData = ByteBuffer.wrap(data, HEADER_SIZE, size - 1) - - handlePacket(opcode, packetData.order(ByteOrder.LITTLE_ENDIAN)) + val subOpcode = data.asUByteArray()[5] + val packetData = ByteBuffer.wrap(data, HEADER_SIZE, size - 2) + handlePacket(opcode, subOpcode, packetData.order(ByteOrder.LITTLE_ENDIAN)) } - private fun handlePacket(opcode: UByte, data: ByteBuffer) { + private fun handlePacket(opcode: UByte, subOpcode: UByte, data: ByteBuffer) { when (opcode) { Opcode.PING.value -> { send(Opcode.PONG.value) @@ -302,7 +311,7 @@ class SyncSocketSession { } Opcode.NOTIFY_AUTHORIZED.value, Opcode.NOTIFY_UNAUTHORIZED.value -> { - _onData.invoke(this, opcode, data) + _onData.invoke(this, opcode, subOpcode, data) return } } @@ -316,8 +325,9 @@ class SyncSocketSession { val id = data.int val expectedSize = data.int val op = data.get().toUByte() + val subOp = data.get().toUByte() - val syncStream = SyncStream(expectedSize, op) + val syncStream = SyncStream(expectedSize, op, subOp) if (data.remaining() > 0) { syncStream.add(data.array(), data.position(), data.remaining()) } @@ -362,10 +372,13 @@ class SyncSocketSession { throw Exception("After sync stream end, the stream must be complete") } - handlePacket(syncStream.opcode, syncStream.getBytes().let { ByteBuffer.wrap(it).order(ByteOrder.LITTLE_ENDIAN) }) + handlePacket(syncStream.opcode, syncStream.subOpcode, syncStream.getBytes().let { ByteBuffer.wrap(it).order(ByteOrder.LITTLE_ENDIAN) }) + } + Opcode.DATA.value -> { + _onData.invoke(this, opcode, subOpcode, data) } else -> { - _onData.invoke(this, opcode, data) + Logger.w(TAG, "Unknown opcode received (opcode = ${opcode}, subOpcode = ${subOpcode})") } } } @@ -374,6 +387,6 @@ class SyncSocketSession { private const val TAG = "SyncSocketSession" const val MAXIMUM_PACKET_SIZE = 65535 - 16 const val MAXIMUM_PACKET_SIZE_ENCRYPTED = MAXIMUM_PACKET_SIZE + 16 - const val HEADER_SIZE = 5 + const val HEADER_SIZE = 6 } } \ No newline at end of file diff --git a/app/src/main/java/com/futo/platformplayer/sync/internal/SyncStream.kt b/app/src/main/java/com/futo/platformplayer/sync/internal/SyncStream.kt index 5a60e295..d558feef 100644 --- a/app/src/main/java/com/futo/platformplayer/sync/internal/SyncStream.kt +++ b/app/src/main/java/com/futo/platformplayer/sync/internal/SyncStream.kt @@ -1,6 +1,6 @@ package com.futo.platformplayer.sync.internal -class SyncStream(expectedSize: Int, val opcode: UByte) { +class SyncStream(expectedSize: Int, val opcode: UByte, val subOpcode: UByte) { companion object { const val MAXIMUM_SIZE = 10_000_000 } diff --git a/app/src/test/java/com/futo/platformplayer/NoiseProtocolTests.kt b/app/src/test/java/com/futo/platformplayer/NoiseProtocolTests.kt index 1597fd64..33b640f9 100644 --- a/app/src/test/java/com/futo/platformplayer/NoiseProtocolTests.kt +++ b/app/src/test/java/com/futo/platformplayer/NoiseProtocolTests.kt @@ -524,8 +524,8 @@ class NoiseProtocolTest { println("Initiator handshake complete") handshakeLatch.countDown() // Handshake complete for initiator }, - onData = { session, opcode, data -> - println("Initiator received: Opcode $opcode, Data Length: ${data.remaining()}") + onData = { session, opcode, subOpcode, data -> + println("Initiator received: Opcode: $opcode, SubOpcode: $subOpcode, Data Length: ${data.remaining()}") when (data.remaining()) { randomBytesExactlyOnePacket.remaining() -> { @@ -556,8 +556,8 @@ class NoiseProtocolTest { println("Responder handshake complete") handshakeLatch.countDown() // Handshake complete for responder }, - onData = { session, opcode, data -> - println("Responder received: Opcode $opcode, Data Length: ${data.remaining()}") + onData = { session, opcode, subOpcode, data -> + println("Responder received: Opcode $opcode, SubOpcode $subOpcode, Data Length: ${data.remaining()}") when (data.remaining()) { randomBytesExactlyOnePacket.remaining() -> { @@ -590,12 +590,12 @@ class NoiseProtocolTest { responderSession.send(SyncSocketSession.Opcode.PONG.value) // Test data transfer - responderSession.send(SyncSocketSession.Opcode.NOTIFY_AUTHORIZED.value, randomBytesExactlyOnePacket) - initiatorSession.send(SyncSocketSession.Opcode.NOTIFY_AUTHORIZED.value, randomBytes) + responderSession.send(SyncSocketSession.Opcode.DATA.value, 0u, randomBytesExactlyOnePacket) + initiatorSession.send(SyncSocketSession.Opcode.DATA.value, 1u, randomBytes) // Send large data to test stream handling val start = System.currentTimeMillis() - responderSession.send(SyncSocketSession.Opcode.NOTIFY_AUTHORIZED.value, randomBytesBig) + responderSession.send(SyncSocketSession.Opcode.DATA.value, 0u, randomBytesBig) println("Sent 10MB in ${System.currentTimeMillis() - start}ms") // Wait for a brief period to simulate delay and allow communication