| 
 | 1 | +/*  | 
 | 2 | + * Copyright 2022 New Relic Corporation. All rights reserved.  | 
 | 3 | + * SPDX-License-Identifier: Apache-2.0  | 
 | 4 | + */  | 
 | 5 | + | 
 | 6 | +package com.newrelic.logging.forwarder;  | 
 | 7 | + | 
 | 8 | +import com.newrelic.api.agent.Agent;  | 
 | 9 | +import com.newrelic.api.agent.NewRelic;  | 
 | 10 | +import com.newrelic.telemetry.Attributes;  | 
 | 11 | +import com.newrelic.telemetry.LogBatchSenderFactory;  | 
 | 12 | +import com.newrelic.telemetry.OkHttpPoster;  | 
 | 13 | +import com.newrelic.telemetry.SenderConfiguration;  | 
 | 14 | +import com.newrelic.telemetry.TelemetryClient;  | 
 | 15 | +import com.newrelic.telemetry.logs.Log;  | 
 | 16 | +import com.newrelic.telemetry.logs.LogBatch;  | 
 | 17 | +import com.newrelic.telemetry.logs.LogBatchSender;  | 
 | 18 | + | 
 | 19 | +import java.net.MalformedURLException;  | 
 | 20 | +import java.net.URL;  | 
 | 21 | +import java.util.ArrayList;  | 
 | 22 | +import java.util.Collection;  | 
 | 23 | +import java.util.List;  | 
 | 24 | +import java.util.concurrent.BlockingQueue;  | 
 | 25 | +import java.util.concurrent.LinkedBlockingQueue;  | 
 | 26 | +import java.util.concurrent.ScheduledThreadPoolExecutor;  | 
 | 27 | +import java.util.concurrent.TimeUnit;  | 
 | 28 | +import java.util.function.Supplier;  | 
 | 29 | + | 
 | 30 | +/**  | 
 | 31 | + * A LogForwarder that will forward the logs using the NewRelic Telemetry SDK.  | 
 | 32 | + *  | 
 | 33 | + * This logic is in a separate class, so it can be reused by multiple appender  | 
 | 34 | + * implementations across different logging libraries.  | 
 | 35 | + */  | 
 | 36 | +public class LogForwarder {  | 
 | 37 | + | 
 | 38 | +    private static final String LICENSE_KEY_CONFIG_FIELD = "license_key";  | 
 | 39 | +    private static final boolean USE_DAEMON_THREADS = true;  | 
 | 40 | +    private static final String PLUGIN_TYPE_KEY = "plugin.type";  | 
 | 41 | + | 
 | 42 | +    private final String pluginType;  | 
 | 43 | +    private final BlockingQueue<Log> logs;  | 
 | 44 | +    private final LogForwarderConfiguration configuration;  | 
 | 45 | +    private final ScheduledThreadPoolExecutor executor;  | 
 | 46 | +    private final LogForwarderNotificationHandler notificationHandler;  | 
 | 47 | +    private TelemetryClient client;  | 
 | 48 | + | 
 | 49 | +    /**  | 
 | 50 | +     * Initialize {@link TelemetryClient} with a {@link LogBatchSender} that will forward logs to newrelic each second  | 
 | 51 | +     * and also manage the retry logic if the requests are failing.  | 
 | 52 | +     *  | 
 | 53 | +     * LogBatches will be sent to the TelemetryClient each second or each time the limit defined by  | 
 | 54 | +     * {@link LogForwarderConfiguration#getMaxLogsPerBatch()} is reached.  | 
 | 55 | +     *  | 
 | 56 | +     * LogBatches will be forwarded to NewRelic each second by the TelemetryClient.  | 
 | 57 | +     *  | 
 | 58 | +     * Logs will be dropped when {@link LogForwarderConfiguration#getMaxQueuedLogs()} is reached.  | 
 | 59 | +     *  | 
 | 60 | +     * @param givenPluginType the logging library using the forwarder.  | 
 | 61 | +     * @param givenConfiguration the log forwarder configuration.  | 
 | 62 | +     */  | 
 | 63 | +    public LogForwarder(String givenPluginType, LogForwarderConfiguration givenConfiguration) {  | 
 | 64 | +        if (agentSupplier.get() == null) throw new RuntimeException("NewRelic java-log-extensions requires the NewRelic Java Agent installed and set to work.");  | 
 | 65 | +        pluginType = givenPluginType;  | 
 | 66 | +        configuration = givenConfiguration;  | 
 | 67 | +        logs = new LinkedBlockingQueue<>(configuration.getMaxLogsPerBatch());  | 
 | 68 | +        executor = new ScheduledThreadPoolExecutor(1, Threads.daemonNamedThreadFactory("log-batcher-scheduler"));  | 
 | 69 | +        notificationHandler = new LogForwarderNotificationHandler(pluginType);  | 
 | 70 | +    }  | 
 | 71 | + | 
 | 72 | +    /**  | 
 | 73 | +     * Start the scheduled tasks that:  | 
 | 74 | +     * - Create log batches.  | 
 | 75 | +     * - Notify about dropped logs.  | 
 | 76 | +     *  | 
 | 77 | +     * Start the {@link TelemetryClient} that will send the logs to the NewRelic Log API.  | 
 | 78 | +     */  | 
 | 79 | +    public void start() {  | 
 | 80 | +        executor.scheduleAtFixedRate(this::addBatchWithCurrentLogs, configuration.getFlushIntervalSeconds(), configuration.getFlushIntervalSeconds(), TimeUnit.SECONDS);  | 
 | 81 | +        client = createTelemetryClient(generateSenderConfiguration());  | 
 | 82 | +        client.withNotificationHandler(notificationHandler);  | 
 | 83 | +    }  | 
 | 84 | + | 
 | 85 | +    /**  | 
 | 86 | +     * Schedule the log to be appended to a LogBatch.  | 
 | 87 | +     *  | 
 | 88 | +     * If the current queue size is bigger than the maxLogsPerBatch we drop the log. We use  | 
 | 89 | +     * the maxLogsPerBatch instead a custom configurations because we only want to limit  | 
 | 90 | +     * the queue somehow to prevent the log forwarder taking so much memory and affecting  | 
 | 91 | +     * the application.  | 
 | 92 | +     *  | 
 | 93 | +     * @param log the log to be appended.  | 
 | 94 | +     */  | 
 | 95 | +    public void append(Log log) {  | 
 | 96 | +        scheduleLog(new RetryableLog(log));  | 
 | 97 | +    }  | 
 | 98 | + | 
 | 99 | +    /**  | 
 | 100 | +     * Shutdowns the telemetry sdk client and the executor  | 
 | 101 | +     */  | 
 | 102 | +    public void shutdown() {  | 
 | 103 | +        addBatchWithCurrentLogs();  | 
 | 104 | +        if (client != null) {  | 
 | 105 | +            client.shutdown();  | 
 | 106 | +        }  | 
 | 107 | +        if (executor != null) {  | 
 | 108 | +            executor.shutdown();  | 
 | 109 | +        }  | 
 | 110 | +        if (notificationHandler != null) {  | 
 | 111 | +            notificationHandler.shutdown();  | 
 | 112 | +        }  | 
 | 113 | +    }  | 
 | 114 | + | 
 | 115 | +    /**  | 
 | 116 | +     * Generate a default configuration for the Telemetry SDK.  | 
 | 117 | +     *  | 
 | 118 | +     * @return the sender default configuration.  | 
 | 119 | +     */  | 
 | 120 | +    private SenderConfiguration generateSenderConfiguration() {  | 
 | 121 | +        return LogBatchSenderFactory  | 
 | 122 | +                .fromHttpImplementation(OkHttpPoster::new)  | 
 | 123 | +                .configureWith(getLicense())  | 
 | 124 | +                .endpoint(getEndpoint())  | 
 | 125 | +                .build();  | 
 | 126 | +    }  | 
 | 127 | + | 
 | 128 | +    /**  | 
 | 129 | +     * Create a telemetry client using the information from {@link LogForwarderConfiguration}.  | 
 | 130 | +     *  | 
 | 131 | +     * We only set the LogBatchSender since is the only one we're going to use.  | 
 | 132 | +     *  | 
 | 133 | +     * @param senderConfiguration the sender configuration.  | 
 | 134 | +     * @return the telemetry client.  | 
 | 135 | +     */  | 
 | 136 | +    protected TelemetryClient createTelemetryClient(SenderConfiguration senderConfiguration) {  | 
 | 137 | +        return new TelemetryClient(  | 
 | 138 | +                null,  | 
 | 139 | +                null,  | 
 | 140 | +                null,  | 
 | 141 | +                LogBatchSender.create(senderConfiguration),  | 
 | 142 | +                configuration.getMaxTerminationTimeSeconds(),  | 
 | 143 | +                USE_DAEMON_THREADS,  | 
 | 144 | +                configuration.getMaxQueuedLogs()  | 
 | 145 | +        );  | 
 | 146 | +    }  | 
 | 147 | + | 
 | 148 | +    /**  | 
 | 149 | +     * Add a log to the executor queue that will append the given log to the current batch.  | 
 | 150 | +     *  | 
 | 151 | +     * @param retryableLog a log wrapped in a RetryableLog to manage the retrying if needed.  | 
 | 152 | +     */  | 
 | 153 | +    private void scheduleLog(RetryableLog retryableLog) {  | 
 | 154 | +        if (executor.getQueue().size() >= configuration.getMaxScheduledLogsToBeAppended()) {  | 
 | 155 | +            droppedLog();  | 
 | 156 | +        } else {  | 
 | 157 | +            executor.execute(() -> appendWithRetry(retryableLog));  | 
 | 158 | +        }  | 
 | 159 | +    }  | 
 | 160 | + | 
 | 161 | +    /**  | 
 | 162 | +     * Schedule adding a log to the queue with the back off time provided by the RetryableLog.  | 
 | 163 | +     *  | 
 | 164 | +     * If max retries is reached or the maxQueuedLogAppend is reached, the log will be dropped.  | 
 | 165 | +     *  | 
 | 166 | +     * @param retryableLog the log to be appended.  | 
 | 167 | +     */  | 
 | 168 | +    private void scheduleLogWithDelay(RetryableLog retryableLog) {  | 
 | 169 | +        long waitTime = retryableLog.retryBackOffTime();  | 
 | 170 | +        if (waitTime == -1) {  | 
 | 171 | +            droppedLog();  | 
 | 172 | +            return;  | 
 | 173 | +        }  | 
 | 174 | + | 
 | 175 | +        if (executor.getQueue().size() >= configuration.getMaxScheduledLogsToBeAppended()) {  | 
 | 176 | +            droppedLog();  | 
 | 177 | +        } else {  | 
 | 178 | +            executor.schedule(() -> scheduleLog(retryableLog), waitTime, TimeUnit.MILLISECONDS);  | 
 | 179 | +        }  | 
 | 180 | +    }  | 
 | 181 | + | 
 | 182 | +    /**  | 
 | 183 | +     * To avoid blocking the main thread this method should be executed by  | 
 | 184 | +     * a scheduled action.  | 
 | 185 | +     *  | 
 | 186 | +     * If the max of log per batch is raised, then we will create a batch with current  | 
 | 187 | +     * queued logs and send it to the telemetry SDK.  | 
 | 188 | +     *  | 
 | 189 | +     * If the logs queue refuses to add a new log, we schedule a retry..  | 
 | 190 | +     *  | 
 | 191 | +     * @param retryableLog the log to be appended.  | 
 | 192 | +     */  | 
 | 193 | +    private void appendWithRetry(RetryableLog retryableLog) {  | 
 | 194 | +        if (logs.size() >= configuration.getMaxLogsPerBatch()) {  | 
 | 195 | +            addBatchWithCurrentLogs();  | 
 | 196 | +        }  | 
 | 197 | + | 
 | 198 | +        if (!logs.offer(retryableLog.getLog())) {  | 
 | 199 | +            scheduleLogWithDelay(retryableLog);  | 
 | 200 | +        }  | 
 | 201 | +    }  | 
 | 202 | + | 
 | 203 | +    /**  | 
 | 204 | +     * Create a batch adding the plugin type attribute for given collection of logs.  | 
 | 205 | +     *  | 
 | 206 | +     * @param logsToBeAdded the logs to be added on the batch.  | 
 | 207 | +     * @return the telemetry log batch object.  | 
 | 208 | +     */  | 
 | 209 | +    private LogBatch createBatch(Collection<Log> logsToBeAdded) {  | 
 | 210 | +        final Attributes attributes = new Attributes();  | 
 | 211 | +        attributes.put(PLUGIN_TYPE_KEY, pluginType);  | 
 | 212 | +        return new LogBatch(logsToBeAdded, attributes);  | 
 | 213 | +    }  | 
 | 214 | + | 
 | 215 | +    /**  | 
 | 216 | +     * Drain the current logs queue and create a batch with those logs if not empty.  | 
 | 217 | +     *  | 
 | 218 | +     * Empty could occur if the application is not emitting logs and the scheduled action to send logs  | 
 | 219 | +     * is triggered (each getFlushIntervalSeconds).  | 
 | 220 | +     */  | 
 | 221 | +    private void addBatchWithCurrentLogs() {  | 
 | 222 | +        final List<Log> logsToBeAdded = new ArrayList<>();  | 
 | 223 | +        logs.drainTo(logsToBeAdded, configuration.getMaxLogsPerBatch());  | 
 | 224 | +        if (logsToBeAdded.size() > 0) {  | 
 | 225 | +            client.sendBatch(createBatch(logsToBeAdded));  | 
 | 226 | +        }  | 
 | 227 | +    }  | 
 | 228 | + | 
 | 229 | +    private void droppedLog() {  | 
 | 230 | +        notificationHandler.noticeDroppedLog();  | 
 | 231 | +    }  | 
 | 232 | + | 
 | 233 | +    /**  | 
 | 234 | +     * Get the endpoint from the configuration.  | 
 | 235 | +     *  | 
 | 236 | +     * The endpoint defaults to the us-production if the customer doesn't provide a custom one in the configuration.  | 
 | 237 | +     *  | 
 | 238 | +     * @return the logs API URL.  | 
 | 239 | +     */  | 
 | 240 | +    private URL getEndpoint() {  | 
 | 241 | +        try {  | 
 | 242 | +            return new URL(configuration.getEndpoint());  | 
 | 243 | +        } catch (MalformedURLException e) {  | 
 | 244 | +            throw new RuntimeException("Invalid newrelic log endpoint.", e);  | 
 | 245 | +        }  | 
 | 246 | +    }  | 
 | 247 | + | 
 | 248 | +    /**  | 
 | 249 | +     * Get the license form the configuration or takes the one given in the agent.  | 
 | 250 | +     *  | 
 | 251 | +     * @return the provided license or fallback to the agent one.  | 
 | 252 | +     */  | 
 | 253 | +    private String getLicense() {  | 
 | 254 | +        if (!configuration.getLicense().isEmpty()) {  | 
 | 255 | +            return configuration.getLicense();  | 
 | 256 | +        }  | 
 | 257 | +        return getAgentLicense();  | 
 | 258 | +    }  | 
 | 259 | + | 
 | 260 | +    private String getAgentLicense() {  | 
 | 261 | +        return agentSupplier.get().getConfig().getValue(LICENSE_KEY_CONFIG_FIELD);  | 
 | 262 | +    }  | 
 | 263 | + | 
 | 264 | +    // visible for testing  | 
 | 265 | +    public static Supplier<Agent> agentSupplier = NewRelic::getAgent;  | 
 | 266 | +}  | 
0 commit comments