CloudWatch Appender для Logback - Налаштування та Особливості

Vadym Kykalo · April 14, 2024

Вступ

CloudWatchAppender

Amazon CloudWatch надає платформу моніторингу та спостереження в AWS. У той час як Logback є широко розповсюдженою бібліотекою для логування в Java-додатках. Кастомний CloudWatchAppender (com.vkykalo.loggerappender.CloudWatchAppender) поєднує можливості обох інструментів, дозволяючи спрямовувати логи з вашого Java-додатка прямо в CloudWatch.

Налаштування

Щоб почати використовувати CloudWatchAppender, додайте наступний блок до вашого Logback конфігураційного файлу logging.xml:

<appender name="CloudWatch" class="com.vkykalo.loggerappender.CloudWatchAppender">
    <logGroupName>logGroupName</logGroupName>
    <logStreamName>logStreamName</logStreamName>
    <region>eu-central-1</region>
    <encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
        <providers>
            <timestamp/>
            <logLevel/>
            <threadName/>
            <loggerName/>
            <message/>
                <pattern>
                    <pattern>
                        {
                            "node": "server_id", <!-- write your server id -->
                            "line": "%line"
                        }
                    </pattern>
                </pattern>
            <stackTrace/>
        </providers>
    </encoder>
</appender>

<root level="INFO">
    <!-- Інші аппендери -->
    <appender-ref ref="CloudWatch"/>
</root>

Тут level=INFO вказує на рівень журналювання, який ви хочете застосувати. Всі логи на рівні INFO та вище будуть передані до CloudWatchAppender (а також до будь-яких інших аппендерів, які ви підключите).

Клас CloudWatchAppender
package com.vkykalo.loggerappender;

import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.UnsynchronizedAppenderBase;
import ch.qos.logback.core.encoder.Encoder;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
import software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsRequest;
import software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsResponse;
import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent;
import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;

public class CloudWatchAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {

    private static final int BUFFER_SIZE = 50;
    private static final int MAX_QUEUE_SIZE = 10_000;
    private static final int MAX_RETRIES = 3;

    private final ConcurrentLinkedQueue<InputLogEvent> eventQueue = new ConcurrentLinkedQueue<>();

    private String logGroupName;
    private String logStreamName;
    private String region;

    private CloudWatchLogsClient client;
    private Encoder<ILoggingEvent> encoder;

    public void setEncoder(Encoder<ILoggingEvent> encoder) {
        this.encoder = encoder;
    }

    public void setLogGroupName(String logGroupName) {
        this.logGroupName = logGroupName;
    }

    public void setLogStreamName(String logStreamName) {
        this.logStreamName = logStreamName;
    }

    public void setRegion(String region) {
        this.region = region;
    }

    @Override
    protected void append(ILoggingEvent event) {
        if (null == encoder) {
            addError("No encoder set for the appender named [" + name + "].");
            return;
        }

        byte[] encodedEvent = encoder.encode(event);

        String formattedMessage = new String(encodedEvent, StandardCharsets.UTF_8);
        InputLogEvent logEvent = InputLogEvent.builder()
                .message(formattedMessage)
                .timestamp(event.getTimeStamp())
                .build();

        // Add the log event to the queue
        eventQueue.add(logEvent);

        // If the queue exceeds the max size, remove the oldest log event
        if (eventQueue.size() > MAX_QUEUE_SIZE) {
            eventQueue.poll();
        }

        // If the buffer size is reached, send the log events to CloudWatch
        if (eventQueue.size() >= BUFFER_SIZE) {
            flushEvents();
        }
    }

    private void flushEvents() {
        int retries = 0;
        boolean success = false;

        // Try to send the logs until we reach the max retries or achieve a successful send
        while (retries < MAX_RETRIES && !success) {
            try {
                DescribeLogStreamsResponse describeLogStreamsResponse = client.describeLogStreams(DescribeLogStreamsRequest.builder()
                        .logGroupName(logGroupName)
                        .logStreamNamePrefix(logStreamName)
                        .build());

                String sequenceToken = null;
                if (describeLogStreamsResponse.logStreams().isEmpty()) {
                    // If the stream doesn't exist, create it
                    client.createLogStream(builder -> builder
                            .logGroupName(logGroupName)
                            .logStreamName(logStreamName));
                } else {
                    sequenceToken = describeLogStreamsResponse.logStreams().get(0).uploadSequenceToken();
                }

                // Prepare a batch of logs to send to CloudWatch
                List<InputLogEvent> logEventsBatch = new ArrayList<>(BUFFER_SIZE);
                while (!eventQueue.isEmpty() && logEventsBatch.size() < BUFFER_SIZE) {
                    logEventsBatch.add(eventQueue.poll());
                }

                if (!logEventsBatch.isEmpty()) {
                    PutLogEventsRequest.Builder putLogEventsRequestBuilder = PutLogEventsRequest.builder()
                            .logGroupName(logGroupName)
                            .logStreamName(logStreamName)
                            .logEvents(logEventsBatch);

                    if (null != sequenceToken) {
                        putLogEventsRequestBuilder.sequenceToken(sequenceToken);
                    }

                    // Send the batch of logs to CloudWatch
                    client.putLogEvents(putLogEventsRequestBuilder.build());
                }
                success = true;
            } catch (Exception e) {
                retries++;
                // If an error occurs and we haven't reached max retries, we will wait before the next attempt
                if (retries < MAX_RETRIES) {
                    try {
                        Thread.sleep(1000L * retries); // 1 + 2 + 3 + 4 + 5 = 15 max
                    } catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                    }
                } else {
                    // If we reach the max retries and still can't send the logs, we log the error
                    addError("Failed to send logs to CloudWatch after " + retries + " attempts: " + e.getMessage(), e);
                }
            }
        }
    }

    @Override
    public void start() {
        client = CloudWatchLogsClient.builder()
                .region(Region.of(region))
                .build();
        super.start();
    }

    @Override
    public void stop() {
        flushEvents();
        if (null != client) {
            client.close();
        }
        super.stop();
    }
}

Наслідування від UnsynchronizedAppenderBase: це базовий клас для апендерів Logback, який не гарантує синхронізацію, тому його слід використовувати у середовищах, де контекст потоку не вимагає синхронізованого доступу до апендера.

Використання ConcurrentLinkedQueue: для зберігання логів перед їхнім відправленням у CloudWatch. Ця структура даних добре підходить для многопоточних середовищ, де можливий одночасний доступ до черги з декількох потоків.

Буферизація логів: логи акумулюються у черзі до досягнення певної кількості (BUFFER_SIZE), перед тим як їх буде відправлено. Це зменшує кількість запитів до AWS API і допомагає оптимізувати використання мережі.

Обмеження розміру черги: є максимальний розмір черги (MAX_QUEUE_SIZE), що запобігає неконтрольованому споживанню пам’яті у разі проблем із відправкою логів.

Повторні спроби відправлення: якщо відправлення логів не вдається, апендер спробує повторити відправлення обмежену кількість разів (MAX_RETRIES). Це забезпечує додаткову надійність у разі мережевих проблем або проблем із AWS CloudWatch.

Життєвий цикл

Коли у вашому коді викликається logger.info("test test"), Logback визначає, які апендери конфігуровані для використання з даного логера. CloudWatchAppender має бути конфігурований в logging.xml, щоб він обробляв такі логи.

  • Вхід в апендер: Виклик append(ILoggingEvent event): Подія ILoggingEvent містить усі дані логу, включаючи рівень логування (info), повідомлення, часову мітку та інші контекстні дані. Цей метод спочатку перевіряє, чи існує енкодер. Якщо енкодер не налаштований, він реєструє помилку та повертається.

  • Кодування логу: Використовується encoder.encode(event) для перетворення ILoggingEvent у байтовий масив, який після цього конвертується у строку UTF-8.
  • Створення InputLogEvent: Формується новий об’єкт InputLogEvent з закодованим повідомленням і часовою міткою події.
  • Додавання до черги: Створений InputLogEvent додається до eventQueue. Якщо черга перевищує максимально дозволений розмір (MAX_QUEUE_SIZE), найстаріші події видаляються.
  • Перевірка на відправлення: Якщо кількість логів у черзі досягла BUFFER_SIZE, викликається flushEvents() для відправлення логів.

Використовуючи CloudWatchLogsClient, клас викликає describeLogStreams() для отримання поточного sequenceToken, який необхідний для наступного запиту на відправлення логів.

  • Перевірка наявності потоку: Якщо потрібний потік не існує, він створюється за допомогою createLogStream().
  • Формування пакета логів: Збирається пакет логів (logEventsBatch) з черги для відправлення.
  • Відправлення пакета логів: Виконується putLogEvents(), де в переданий запит включаються логи, назва групи логів, назва потоку логів та sequenceToken.

  • Обробка помилок і повторні спроби Якщо відправлення не вдається, метод робить до MAX_RETRIES повторних спроб, збільшуючи час очікування між спробами.
  • Завершення роботи Під час зупинки апендера метод stop() забезпечує відправлення усіх залишених у черзі логів та закриває з’єднання з CloudWatch.

Автентифікація і AWS CloudWatchLogsClient

CloudWatchLogsClient - це клієнтська бібліотека, що забезпечує можливість взаємодії з Amazon CloudWatch Logs. Щодо автентифікації, CloudWatchLogsClient намагається отримати облікові дані з декількох джерел:

  • Файли AWS Configuration і Credential: Якщо на вашому сервері або робочій станції налаштовано AWS CLI, CloudWatchLogsClient може автоматично використовувати файли конфігурації і облікові дані, які зазвичай знаходяться в папці ~/.aws/.
  • Змінні оточення: Клієнт також може використовувати AWS_ACCESS_KEY_ID та AWS_SECRET_ACCESS_KEY, якщо вони встановлені як змінні оточення.
  • IAM ролі: Якщо ваш додаток запущено в середовищі AWS, наприклад на EC2 або Lambda, CloudWatchLogsClient може використовувати AWS Identity and Access Management (IAM) ролі для делегування прав на ваш додаток.
Залежності

Для роботи CloudWatchAppender, вам потрібно включити декілька залежностей в ваш проект. Ось приклади для Maven:

<dependency>
    <groupId>software.amazon.awssdk</groupId>
    <artifactId>cloudwatchlogs</artifactId>
    <version>2.20.26</version>
</dependency>
<dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-classic</artifactId>
    <version>1.2.3</version>
</dependency>
<dependency>
    <groupId>net.logstash.logback</groupId>
    <artifactId>logstash-logback-encoder</artifactId>
    <version>7.0</version> <!-- compatible version -->
</dependency>
Необхідні права IAM

Для того, щоб CloudWatchAppender міг правильно відправляти логи в CloudWatch, IAM користувач або роль, яку використовує ваш додаток, має мати дозволи на такі дії:

  • logs:CreateLogGroup
  • logs:CreateLogStream
  • logs:PutLogEvents
  • logs:DescribeLogStreams

Приклад політики IAM:

{
   "Version":"2012-10-17",
   "Statement":[
      {
         "Action":[
            "logs:CreateLogGroup",
            "logs:CreateLogStream",
            "logs:PutLogEvents",
            "logs:DescribeLogStreams"
         ],
         "Effect":"Allow",
         "Resource":[
            "arn:aws:logs:*:*:*"
         ]
      }
   ]
}

Ця політика дозволяє відправляти логи в будь-яку групу журналів і будь-який потік журналів в CloudWatch.

Використання

Після налаштування, CloudWatchAppender буде автоматично відправляти логи з вашого додатку в CloudWatch. Це надає можливість легко переглядати, аналізувати та алертувати на проблеми в реальному часі, використовуючи масштабовані можливості AWS.

Twitter, Facebook