Hi folks! In this article, I would like to discuss multithreading, how it can be used in genuine cases, and in what cases it is necessary. It assumes creating a server and client. We'll use the UDP protocol. It's a simple OSI protocol that is not guaranteed to send messages. However, it is a good solution if you need to send a lot of data. For instance, the torrent trackers use the UDP protocol. In our case, the server will send numbers packed in packages. The client will receive messages, compute data, and show statistics. For successful working, we must use the same IP address and port.
Server
Create a simple console application.
dotnet new console --name UdpMulticastServer
cd UdpMulticastServer
First of all, let's start with the configuration. You need to create an XML file like this:
<ServerConfig>
<MulticastAddress>239.0.0.222</MulticastAddress>
<Port>11000</Port>
<MinValue>1</MinValue>
<MaxValue>1000</MaxValue>
</ServerConfig>
Also, remember to add a file to the project you will parse.
<ItemGroup>
<None Include="ServerConfig.xml" CopyToOutputDirectory="Always" />
</ItemGroup>
I split this logic into two parts. To the Main method, add this code:
var config = XDocument.Load("ServerConfig.xml");
var multicastAddress = config.Root?.Element("MulticastAddress")?.Value;
int.TryParse(config.Root?.Element("Port")?.Value, out int port);
int.TryParse(config.Root?.Element("MinValue")?.Value, out int minValue);
int.TryParse(config.Root?.Element("MaxValue")?.Value, out int maxValue);
if (string.IsNullOrEmpty(multicastAddress) || port == 0 || minValue == 0 || maxValue == 0)
{
Console.WriteLine("Error: Check the correct of ServerConfig.xml file.");
return;
}
This code parses configuration from the ServerConfig.xml config file.
The second part is creating the UDP client and sending random data continuously with 1-second intervals.
using UdpClient udpClient = new UdpClient();
if (!IPAddress.TryParse(multicastAddress, out IPAddress? multicastIpAddress))
{
Console.WriteLine("Error: Impossible to parse multicast address.");
return;
}
try
{
udpClient.JoinMulticastGroup(multicastIpAddress);
IPEndPoint remoteEndPoint = new IPEndPoint(multicastIpAddress, port);
Random random = new Random();
Console.WriteLine($"The Server is running. The Data sending to {multicastAddress}:{port}");
while (true)
{
int randomValue = random.Next(minValue, maxValue);
byte[] data = BitConverter.GetBytes(randomValue);
udpClient.Send(data, data.Length, remoteEndPoint);
Thread.Sleep(1000);
}
}
catch (Exception ex)
{
Console.WriteLine($"Internal Server Error: {ex.Message} {ex.StackTrace}");
}
Client
Create the client project:
dotnet new console --name UdpMulticastClient
cd UdpMulticastClient
Make the client config file:
<ClientConfig>
<MulticastAddress>239.0.0.222</MulticastAddress>
<Port>11000</Port>
</ClientConfig>
The code was split into three threads. Why? Usually, the application uses CPU bound or I/O bound. The CPU bound is used when you need to compute data using a CPU. As a rule, these processes perform in the foreground, and you expect fast responses to what you need. That's why we need the threads. If we use I/O bound, it uses the background process, and there are more preferable tasks moving to ThreadPool, which manages threads in the background and frees threads in a lengthy task.
var config = XDocument.Load("ClientConfig.xml");
var multicastAddress = config.Root?.Element("MulticastAddress")?.Value;
int.TryParse(config.Root?.Element("Port")?.Value, out var port);
Thread receiveThread = new Thread(() => ReceiveQuotes(multicastAddress, port));
Thread processThread = new Thread(ProcessData);
Thread processControlThread = new Thread(ProcessStatistics);
receiveThread.Start();
processThread.Start();
processControlThread.Start();
receiveThread.Join();
processThread.Join();
processControlThread.Join();
The first thread creates the UDP client and continuously receives data. As you can see, I used the synchronization class. It is needed because we use the same property for reading and writing by different threads. Since we use primitive data, it suits the Interlocked class more.
private static void ReceiveQuotes(string? multicastAddress, int port)
{
try
{
UdpClient udpClient = new UdpClient();
IPEndPoint localEndPoint = new IPEndPoint(IPAddress.Any, port);
udpClient.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
udpClient.ExclusiveAddressUse = false;
udpClient.Client.Bind(localEndPoint);
Console.WriteLine("Joining to multicast group...");
IPAddress.TryParse(multicastAddress, out IPAddress? multicastIpAddress);
if (multicastIpAddress != null) udpClient.JoinMulticastGroup(multicastIpAddress);
Console.WriteLine("Successfully joined multicast group!");
while (true)
{
if (!_isReceiving)
{
Thread.Sleep(100);
continue;
}
try
{
byte[] data = udpClient.Receive(ref localEndPoint);
Console.WriteLine($"Received packet size: {data.Length} bytes");
int message = BitConverter.ToInt32(data, 0);
Console.WriteLine($"Received data: {message}");
if (Quotes.Count >= MaxQuotesSize)
{
Quotes.TryDequeue(out _);
}
Quotes.Enqueue(message);
Interlocked.Increment(ref _totalReceivedPackets);
}
catch (SocketException ex)
{
Console.WriteLine($"SocketException: {ex.Message}");
Interlocked.Increment(ref _lostPackets);
}
}
}
catch (Exception ex)
{
Console.WriteLine($"Error in ReceiveQuotes: {ex.Message}");
}
}
The second thread just only makes intervals.
private static void ProcessData()
{
while (true)
{
Thread.Sleep(500);
}
}
The third thread handles data and creates statistics. If the Enter button is pressed, the receiving stops and shows statistics; if it is pressed again, the receiving proceeds.
private static void ProcessStatistics()
{
while (true)
{
var key = Console.ReadKey(true);
if (key.Key == ConsoleKey.Enter)
{
lock (LockObject)
{
if (_isReceiving)
{
Console.WriteLine("\nSuspending receive packets...");
_isReceiving = false;
}
else
{
Console.WriteLine("\nResuming receive packets...");
_isReceiving = true;
}
}
if (Quotes.Count > 0)
{
var quotesArray = Quotes.ToArray();
double mean = quotesArray.Average();
double stdDev = Math.Sqrt(quotesArray.Average(v => Math.Pow(v - mean, 2)));
var mode = quotesArray.GroupBy(x => x)
.OrderByDescending(g => g.Count())
.Select(g => g.Key)
.FirstOrDefault();
double median = GetMedian(quotesArray);
Console.Clear();
Console.WriteLine($"Average: {mean}");
Console.WriteLine($"Standard deviation: {stdDev}");
Console.WriteLine($"Mode: {mode}");
Console.WriteLine($"Median: {median}");
Console.WriteLine($"Lost packets: {_lostPackets}");
Console.WriteLine($"Total received packets: {_totalReceivedPackets}");
}
else
{
Console.Clear();
Console.WriteLine("Data did not receive any packets!");
}
}
Thread.Sleep(500);
}
}
private static double GetMedian(int[] numbers)
{
Array.Sort(numbers);
int count = numbers.Length;
if (count % 2 == 0)
{
return (numbers[count / 2 - 1] + numbers[count / 2]) / 2.0;
}
else
{
return numbers[count / 2];
}
}
We also use each thread's Join() method. It is needed to block the called thread until the current thread ends.
Testing
Check this out.
Conclutions.
Multithreading allows you to perform several tasks in the foreground and manage the states. However, the more threads you have, the more likely you are to encounter deadlocks. For this reason, you should know how to use synchronization and avoid deadlocks. I showed you potential issues in this sample if we use joint resources.
I hope this article was helpful to you and see you next week. Happy coding!
Source code HERE.