I tried to use Java Socket Connection to streamers' channels vis irc server but connection is random

What I want to do is to collect twitch chat from multiple streamers’ live channels (about 2000 I guess)

For test, I tried 6 channels and used 3 java socket instances to connect 2 channels each.

Problem is, after I ran my program several times, I noticed that connection opened randomly

At first, All 6 channels sent their chat logs fine, but at second, I could collect chats from only 4 channels, rest of two did not opened.

Here are some of my codes.

CreateSocketOutputStream.java

@RequiredArgsConstructor
@Builder
public class CreateSocketOutputStream {
    private final Logger logger;

    public SocketOutputStreamDto createSocketOutputStream() throws IOException {

        OutputStream outputStream;

        Socket socket = new Socket("irc.chat.twitch.tv", 6667);
        outputStream = socket.getOutputStream();
        outputStream.write(("PASS " + "{my twitch username oauth token}" + "\n").getBytes(StandardCharsets.UTF_8));
        outputStream.write(("NICK " + "{my twtich username}" + "\n").getBytes(StandardCharsets.UTF_8));
        return SocketOutputStreamDto.builder().socket(socket).outputStream(outputStream).build();

    }

}

TwitchIrcConnection.java

@RequiredArgsConstructor
@Builder
public class TwitchIrcConnection implements Runnable {
    private final List<String> channels;
    private final KafkaTemplate<String, String> kafkaTemplate;
    private final String topic;
    private final Logger logger;

    private final SocketOutputStreamDto streamDto;

    public void run() {
        try {
            for (String channel : channels) {
                streamDto.getOutputStream().write(("JOIN " + channel + "\n").getBytes(StandardCharsets.UTF_8));
            }
            readMessageFromOutputStream();
        } catch (IOException e) {
            logger.info(e.getMessage());
        } finally {
            try {
                streamDto.getSocket().close();
                streamDto.getOutputStream().close();
            } catch (IOException e) {
                logger.info(e.getMessage());
            }

        }
    }

    public void kafkaOrIrcMultiplexer(@NotNull String line) throws IOException {
        if (line.split(" ")[0].equals("PING")) {
            streamDto.getOutputStream().write(("PONG\n").getBytes(StandardCharsets.UTF_8));
        } else {
            String key = line.split(" ")[2];
            kafkaTemplate.send(topic, key, line);
        }
    }

    public void readMessageFromOutputStream() throws IOException {
        String line;
        while ((line = new BufferedReader(new InputStreamReader(streamDto.getSocket().getInputStream(), StandardCharsets.UTF_8)).readLine()) != null) {
            catchUnexpectedMessage(line);
        }
    }

    public void catchUnexpectedMessage(String line) throws IOException {
        try {
            kafkaOrIrcMultiplexer(line);
        } catch (ArrayIndexOutOfBoundsException e) {
            logger.info("I Caught exception " + e.getMessage() + " which is : " + line);
        }
    }

}

Producer.java

@Component
@RequiredArgsConstructor
class Producer {

    private static final String TOPIC = "twitch";
    private static final Logger LOGGER = Logger.getLogger(Producer.class.getName());
    @Autowired
    private final KafkaTemplate<String, String> kafkaTemplate;

    @EventListener(ApplicationStartedEvent.class)
    public void twitch_channels() throws IOException {

        List<String> channels_01 = new ArrayList<>();
        channels_01.add("#amouranth");
        channels_01.add("#zoodasa");

        List<String> channels_02 = new ArrayList<>();
        channels_02.add("#rkdthdus930");
        channels_02.add("#rkdwl12");

        List<String> channels_03 = new ArrayList<>();
        channels_03.add("#nokduro");
        channels_03.add("#erenjjing");

        List<List<String>> channels = new ArrayList<>();
        channels.add(channels_01);
        channels.add(channels_02);
        channels.add(channels_03);

        for (List<String> list: channels){
            new Thread(TwitchIrcConnection
                    .builder()
                    .channels(list)
                    .kafkaTemplate(kafkaTemplate)
                    .topic(TOPIC)
                    .logger(LOGGER)
                    .streamDto(CreateSocketOutputStream
                            .builder()
                            .logger(LOGGER)
                            .build()
                            .createSocketOutputStream())
                    .build()).start();
        }
    }
}

I used same username, twitch irc oauth token to create 3 SocketOutputStreamDto instances (which is socket and outputstream).

And created 3 threads for them to join irc server via different socket connection each (but instantiated with same oauth token and twitch username as mentioned right above), connected 2 channels each.

But every time I run my program,
number of times message “:tmi.twitch.tv 001 davi21xxi :Welcome, GLHF!” shows is different(sometimes 3, sometimes 2 )

What I expect is “Welcome, GLHF!” should be shown in loggs 3 times always but connection is random.

Can I get some advice what is problem?

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.