From 76c859ab6061e80a46f10daef9d93f7b811808ef Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Tue, 31 Oct 2023 16:11:29 -0700 Subject: [PATCH 01/13] Ehcache disk tier integration Signed-off-by: Sagar Upadhyaya --- buildSrc/version.properties | 5 +- .../licenses/slf4j-api-1.7.36.jar.sha1 | 1 - .../licenses/slf4j-api-LICENSE.txt | 21 - .../licenses/slf4j-api-NOTICE.txt | 0 .../licenses/slf4j-api-1.7.36.jar.sha1 | 1 - .../licenses/slf4j-api-LICENSE.txt | 21 - .../licenses/slf4j-api-NOTICE.txt | 0 .../licenses/slf4j-api-1.7.36.jar.sha1 | 1 - .../licenses/slf4j-api-LICENSE.txt | 21 - .../licenses/slf4j-api-NOTICE.txt | 0 .../licenses/slf4j-api-1.7.36.jar.sha1 | 1 - .../licenses/slf4j-api-LICENSE.txt | 21 - .../licenses/slf4j-api-NOTICE.txt | 0 .../licenses/slf4j-api-1.7.36.jar.sha1 | 1 - .../licenses/slf4j-api-LICENSE.txt | 21 - .../licenses/slf4j-api-NOTICE.txt | 0 .../licenses/slf4j-api-1.7.36.jar.sha1 | 1 - .../licenses/slf4j-api-LICENSE.txt | 21 - .../licenses/slf4j-api-NOTICE.txt | 0 server/build.gradle | 4 + server/licenses/ehcache-3.10.8.jar.sha1 | 1 + server/licenses/ehcache-LICENSE.txt | 201 +++++++++ server/licenses/ehcache-NOTICE.txt | 5 + .../licenses/slf4j-api-1.7.36.jar.sha1 | 0 .../licenses/slf4j-api-LICENSE.txt | 0 .../licenses/slf4j-api-NOTICE.txt | 0 .../common/cache/tier/DiskCachingTier.java | 2 +- .../cache/tier/EhCacheDiskCachingTier.java | 406 ++++++++++++++++++ .../org/opensearch/bootstrap/security.policy | 4 + .../tier/EhCacheDiskCachingTierTests.java | 226 ++++++++++ ...redCacheSpilloverStrategyServiceTests.java | 3 + 31 files changed, 855 insertions(+), 134 deletions(-) delete mode 100644 plugins/discovery-ec2/licenses/slf4j-api-1.7.36.jar.sha1 delete mode 100644 plugins/discovery-ec2/licenses/slf4j-api-LICENSE.txt delete mode 100644 plugins/discovery-ec2/licenses/slf4j-api-NOTICE.txt delete mode 100644 plugins/identity-shiro/licenses/slf4j-api-1.7.36.jar.sha1 delete mode 100644 plugins/identity-shiro/licenses/slf4j-api-LICENSE.txt delete mode 100644 plugins/identity-shiro/licenses/slf4j-api-NOTICE.txt delete mode 100644 plugins/ingest-attachment/licenses/slf4j-api-1.7.36.jar.sha1 delete mode 100644 plugins/ingest-attachment/licenses/slf4j-api-LICENSE.txt delete mode 100644 plugins/ingest-attachment/licenses/slf4j-api-NOTICE.txt delete mode 100644 plugins/repository-azure/licenses/slf4j-api-1.7.36.jar.sha1 delete mode 100644 plugins/repository-azure/licenses/slf4j-api-LICENSE.txt delete mode 100644 plugins/repository-azure/licenses/slf4j-api-NOTICE.txt delete mode 100644 plugins/repository-hdfs/licenses/slf4j-api-1.7.36.jar.sha1 delete mode 100644 plugins/repository-hdfs/licenses/slf4j-api-LICENSE.txt delete mode 100644 plugins/repository-hdfs/licenses/slf4j-api-NOTICE.txt delete mode 100644 plugins/repository-s3/licenses/slf4j-api-1.7.36.jar.sha1 delete mode 100644 plugins/repository-s3/licenses/slf4j-api-LICENSE.txt delete mode 100644 plugins/repository-s3/licenses/slf4j-api-NOTICE.txt create mode 100644 server/licenses/ehcache-3.10.8.jar.sha1 create mode 100644 server/licenses/ehcache-LICENSE.txt create mode 100644 server/licenses/ehcache-NOTICE.txt rename {plugins/crypto-kms => server}/licenses/slf4j-api-1.7.36.jar.sha1 (100%) rename {plugins/crypto-kms => server}/licenses/slf4j-api-LICENSE.txt (100%) rename {plugins/crypto-kms => server}/licenses/slf4j-api-NOTICE.txt (100%) create mode 100644 server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java create mode 100644 server/src/test/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTierTests.java diff --git a/buildSrc/version.properties b/buildSrc/version.properties index e54a5a1089a93..15628d6b88c78 100644 --- a/buildSrc/version.properties +++ b/buildSrc/version.properties @@ -68,4 +68,7 @@ jzlib = 1.1.3 resteasy = 6.2.4.Final # opentelemetry dependencies -opentelemetry = 1.30.1 +opentelemetry = 1.31.0 +opentelemetrysemconv = 1.21.0-alpha + +ehcache = 3.10.8 diff --git a/plugins/discovery-ec2/licenses/slf4j-api-1.7.36.jar.sha1 b/plugins/discovery-ec2/licenses/slf4j-api-1.7.36.jar.sha1 deleted file mode 100644 index 77b9917528382..0000000000000 --- a/plugins/discovery-ec2/licenses/slf4j-api-1.7.36.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -6c62681a2f655b49963a5983b8b0950a6120ae14 \ No newline at end of file diff --git a/plugins/discovery-ec2/licenses/slf4j-api-LICENSE.txt b/plugins/discovery-ec2/licenses/slf4j-api-LICENSE.txt deleted file mode 100644 index 2be7689435062..0000000000000 --- a/plugins/discovery-ec2/licenses/slf4j-api-LICENSE.txt +++ /dev/null @@ -1,21 +0,0 @@ -Copyright (c) 2004-2022 QOS.ch -All rights reserved. - -Permission is hereby granted, free of charge, to any person obtaining -a copy of this software and associated documentation files (the -"Software"), to deal in the Software without restriction, including -without limitation the rights to use, copy, modify, merge, publish, -distribute, sublicense, and/or sell copies of the Software, and to -permit persons to whom the Software is furnished to do so, subject to -the following conditions: - -The above copyright notice and this permission notice shall be -included in all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, -EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE -LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION -WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. \ No newline at end of file diff --git a/plugins/discovery-ec2/licenses/slf4j-api-NOTICE.txt b/plugins/discovery-ec2/licenses/slf4j-api-NOTICE.txt deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/plugins/identity-shiro/licenses/slf4j-api-1.7.36.jar.sha1 b/plugins/identity-shiro/licenses/slf4j-api-1.7.36.jar.sha1 deleted file mode 100644 index 77b9917528382..0000000000000 --- a/plugins/identity-shiro/licenses/slf4j-api-1.7.36.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -6c62681a2f655b49963a5983b8b0950a6120ae14 \ No newline at end of file diff --git a/plugins/identity-shiro/licenses/slf4j-api-LICENSE.txt b/plugins/identity-shiro/licenses/slf4j-api-LICENSE.txt deleted file mode 100644 index 8fda22f4d72f6..0000000000000 --- a/plugins/identity-shiro/licenses/slf4j-api-LICENSE.txt +++ /dev/null @@ -1,21 +0,0 @@ -Copyright (c) 2004-2014 QOS.ch -All rights reserved. - -Permission is hereby granted, free of charge, to any person obtaining -a copy of this software and associated documentation files (the -"Software"), to deal in the Software without restriction, including -without limitation the rights to use, copy, modify, merge, publish, -distribute, sublicense, and/or sell copies of the Software, and to -permit persons to whom the Software is furnished to do so, subject to -the following conditions: - -The above copyright notice and this permission notice shall be -included in all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, -EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE -LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION -WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/plugins/identity-shiro/licenses/slf4j-api-NOTICE.txt b/plugins/identity-shiro/licenses/slf4j-api-NOTICE.txt deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/plugins/ingest-attachment/licenses/slf4j-api-1.7.36.jar.sha1 b/plugins/ingest-attachment/licenses/slf4j-api-1.7.36.jar.sha1 deleted file mode 100644 index 77b9917528382..0000000000000 --- a/plugins/ingest-attachment/licenses/slf4j-api-1.7.36.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -6c62681a2f655b49963a5983b8b0950a6120ae14 \ No newline at end of file diff --git a/plugins/ingest-attachment/licenses/slf4j-api-LICENSE.txt b/plugins/ingest-attachment/licenses/slf4j-api-LICENSE.txt deleted file mode 100644 index 8fda22f4d72f6..0000000000000 --- a/plugins/ingest-attachment/licenses/slf4j-api-LICENSE.txt +++ /dev/null @@ -1,21 +0,0 @@ -Copyright (c) 2004-2014 QOS.ch -All rights reserved. - -Permission is hereby granted, free of charge, to any person obtaining -a copy of this software and associated documentation files (the -"Software"), to deal in the Software without restriction, including -without limitation the rights to use, copy, modify, merge, publish, -distribute, sublicense, and/or sell copies of the Software, and to -permit persons to whom the Software is furnished to do so, subject to -the following conditions: - -The above copyright notice and this permission notice shall be -included in all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, -EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE -LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION -WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/plugins/ingest-attachment/licenses/slf4j-api-NOTICE.txt b/plugins/ingest-attachment/licenses/slf4j-api-NOTICE.txt deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/plugins/repository-azure/licenses/slf4j-api-1.7.36.jar.sha1 b/plugins/repository-azure/licenses/slf4j-api-1.7.36.jar.sha1 deleted file mode 100644 index 77b9917528382..0000000000000 --- a/plugins/repository-azure/licenses/slf4j-api-1.7.36.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -6c62681a2f655b49963a5983b8b0950a6120ae14 \ No newline at end of file diff --git a/plugins/repository-azure/licenses/slf4j-api-LICENSE.txt b/plugins/repository-azure/licenses/slf4j-api-LICENSE.txt deleted file mode 100644 index 8fda22f4d72f6..0000000000000 --- a/plugins/repository-azure/licenses/slf4j-api-LICENSE.txt +++ /dev/null @@ -1,21 +0,0 @@ -Copyright (c) 2004-2014 QOS.ch -All rights reserved. - -Permission is hereby granted, free of charge, to any person obtaining -a copy of this software and associated documentation files (the -"Software"), to deal in the Software without restriction, including -without limitation the rights to use, copy, modify, merge, publish, -distribute, sublicense, and/or sell copies of the Software, and to -permit persons to whom the Software is furnished to do so, subject to -the following conditions: - -The above copyright notice and this permission notice shall be -included in all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, -EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE -LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION -WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/plugins/repository-azure/licenses/slf4j-api-NOTICE.txt b/plugins/repository-azure/licenses/slf4j-api-NOTICE.txt deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/plugins/repository-hdfs/licenses/slf4j-api-1.7.36.jar.sha1 b/plugins/repository-hdfs/licenses/slf4j-api-1.7.36.jar.sha1 deleted file mode 100644 index 77b9917528382..0000000000000 --- a/plugins/repository-hdfs/licenses/slf4j-api-1.7.36.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -6c62681a2f655b49963a5983b8b0950a6120ae14 \ No newline at end of file diff --git a/plugins/repository-hdfs/licenses/slf4j-api-LICENSE.txt b/plugins/repository-hdfs/licenses/slf4j-api-LICENSE.txt deleted file mode 100644 index 8fda22f4d72f6..0000000000000 --- a/plugins/repository-hdfs/licenses/slf4j-api-LICENSE.txt +++ /dev/null @@ -1,21 +0,0 @@ -Copyright (c) 2004-2014 QOS.ch -All rights reserved. - -Permission is hereby granted, free of charge, to any person obtaining -a copy of this software and associated documentation files (the -"Software"), to deal in the Software without restriction, including -without limitation the rights to use, copy, modify, merge, publish, -distribute, sublicense, and/or sell copies of the Software, and to -permit persons to whom the Software is furnished to do so, subject to -the following conditions: - -The above copyright notice and this permission notice shall be -included in all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, -EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE -LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION -WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/plugins/repository-hdfs/licenses/slf4j-api-NOTICE.txt b/plugins/repository-hdfs/licenses/slf4j-api-NOTICE.txt deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/plugins/repository-s3/licenses/slf4j-api-1.7.36.jar.sha1 b/plugins/repository-s3/licenses/slf4j-api-1.7.36.jar.sha1 deleted file mode 100644 index 77b9917528382..0000000000000 --- a/plugins/repository-s3/licenses/slf4j-api-1.7.36.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -6c62681a2f655b49963a5983b8b0950a6120ae14 \ No newline at end of file diff --git a/plugins/repository-s3/licenses/slf4j-api-LICENSE.txt b/plugins/repository-s3/licenses/slf4j-api-LICENSE.txt deleted file mode 100644 index 8fda22f4d72f6..0000000000000 --- a/plugins/repository-s3/licenses/slf4j-api-LICENSE.txt +++ /dev/null @@ -1,21 +0,0 @@ -Copyright (c) 2004-2014 QOS.ch -All rights reserved. - -Permission is hereby granted, free of charge, to any person obtaining -a copy of this software and associated documentation files (the -"Software"), to deal in the Software without restriction, including -without limitation the rights to use, copy, modify, merge, publish, -distribute, sublicense, and/or sell copies of the Software, and to -permit persons to whom the Software is furnished to do so, subject to -the following conditions: - -The above copyright notice and this permission notice shall be -included in all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, -EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE -LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION -WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/plugins/repository-s3/licenses/slf4j-api-NOTICE.txt b/plugins/repository-s3/licenses/slf4j-api-NOTICE.txt deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/server/build.gradle b/server/build.gradle index f6db3d53a0dcc..2c1b247d46a0e 100644 --- a/server/build.gradle +++ b/server/build.gradle @@ -158,6 +158,10 @@ dependencies { api "com.google.protobuf:protobuf-java:${versions.protobuf}" api "jakarta.annotation:jakarta.annotation-api:${versions.jakarta_annotation}" + api "org.ehcache:ehcache:${versions.ehcache}" + api "org.slf4j:slf4j-api:${versions.slf4j}" + + testImplementation(project(":test:framework")) { // tests use the locally compiled version of server exclude group: 'org.opensearch', module: 'server' diff --git a/server/licenses/ehcache-3.10.8.jar.sha1 b/server/licenses/ehcache-3.10.8.jar.sha1 new file mode 100644 index 0000000000000..dee07e9238ebf --- /dev/null +++ b/server/licenses/ehcache-3.10.8.jar.sha1 @@ -0,0 +1 @@ +f0d50ede46609db78413ca7f4250d348a597b101 \ No newline at end of file diff --git a/server/licenses/ehcache-LICENSE.txt b/server/licenses/ehcache-LICENSE.txt new file mode 100644 index 0000000000000..8dada3edaf50d --- /dev/null +++ b/server/licenses/ehcache-LICENSE.txt @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/server/licenses/ehcache-NOTICE.txt b/server/licenses/ehcache-NOTICE.txt new file mode 100644 index 0000000000000..1dbd38242cc98 --- /dev/null +++ b/server/licenses/ehcache-NOTICE.txt @@ -0,0 +1,5 @@ +Ehcache V3 +Copyright 2014-2023 Terracotta, Inc. + +The product includes software from the Apache Commons Lang project, +under the Apache License 2.0 (see: org.ehcache.impl.internal.classes.commonslang) diff --git a/plugins/crypto-kms/licenses/slf4j-api-1.7.36.jar.sha1 b/server/licenses/slf4j-api-1.7.36.jar.sha1 similarity index 100% rename from plugins/crypto-kms/licenses/slf4j-api-1.7.36.jar.sha1 rename to server/licenses/slf4j-api-1.7.36.jar.sha1 diff --git a/plugins/crypto-kms/licenses/slf4j-api-LICENSE.txt b/server/licenses/slf4j-api-LICENSE.txt similarity index 100% rename from plugins/crypto-kms/licenses/slf4j-api-LICENSE.txt rename to server/licenses/slf4j-api-LICENSE.txt diff --git a/plugins/crypto-kms/licenses/slf4j-api-NOTICE.txt b/server/licenses/slf4j-api-NOTICE.txt similarity index 100% rename from plugins/crypto-kms/licenses/slf4j-api-NOTICE.txt rename to server/licenses/slf4j-api-NOTICE.txt diff --git a/server/src/main/java/org/opensearch/common/cache/tier/DiskCachingTier.java b/server/src/main/java/org/opensearch/common/cache/tier/DiskCachingTier.java index 4db71b6378a02..6af6a33010c49 100644 --- a/server/src/main/java/org/opensearch/common/cache/tier/DiskCachingTier.java +++ b/server/src/main/java/org/opensearch/common/cache/tier/DiskCachingTier.java @@ -14,5 +14,5 @@ * @param Type of value */ public interface DiskCachingTier extends CachingTier { - + void close(); } diff --git a/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java b/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java new file mode 100644 index 0000000000000..e7e52c7fbccbf --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java @@ -0,0 +1,406 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.tier; + +import org.opensearch.OpenSearchException; +import org.opensearch.common.cache.RemovalListener; +import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.RemovalReason; +import org.opensearch.common.metrics.CounterMetric; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; + +import java.io.File; +import java.time.Duration; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Supplier; + +import org.ehcache.Cache; +import org.ehcache.CachePersistenceException; +import org.ehcache.PersistentCacheManager; +import org.ehcache.config.builders.CacheConfigurationBuilder; +import org.ehcache.config.builders.CacheEventListenerConfigurationBuilder; +import org.ehcache.config.builders.CacheManagerBuilder; +import org.ehcache.config.builders.PooledExecutionServiceConfigurationBuilder; +import org.ehcache.config.builders.ResourcePoolsBuilder; +import org.ehcache.config.units.MemoryUnit; +import org.ehcache.event.CacheEvent; +import org.ehcache.event.CacheEventListener; +import org.ehcache.event.EventType; +import org.ehcache.expiry.ExpiryPolicy; +import org.ehcache.impl.config.store.disk.OffHeapDiskStoreConfiguration; + +public class EhCacheDiskCachingTier implements DiskCachingTier { + + // A Cache manager can create many caches. + private final PersistentCacheManager cacheManager; + + // Disk cache + private Cache cache; + private final long maxWeightInBytes; + private final String storagePath; + + private final Class keyType; + + private final Class valueType; + + private final TimeValue expireAfterAccess; + + private final EhCacheEventListener ehCacheEventListener; + + private final String threadPoolAlias; + + private final Settings settings; + + private CounterMetric count = new CounterMetric(); + + private final static String DISK_CACHE_ALIAS = "ehDiskCache"; + + private final static String THREAD_POOL_ALIAS_PREFIX = "ehcachePool"; + + private final static int MINIMUM_MAX_SIZE_IN_BYTES = 1024 * 100; // 100KB + + // Ehcache disk write minimum threads for its pool + public final Setting DISK_WRITE_MINIMUM_THREADS; + + // Ehcache disk write maximum threads for its pool + public final Setting DISK_WRITE_MAXIMUM_THREADS; + + // Not be to confused with number of disk segments, this is different. Defines + // distinct write queues created for disk store where a group of segments share a write queue. This is + // implemented with ehcache using a partitioned thread pool exectutor By default all segments share a single write + // queue ie write concurrency is 1. Check OffHeapDiskStoreConfiguration and DiskWriteThreadPool. + public final Setting DISK_WRITE_CONCURRENCY; + + // Defines how many segments the disk cache is separated into. Higher number achieves greater concurrency but + // will hold that many file pointers. + public final Setting DISK_SEGMENTS; + + private EhCacheDiskCachingTier(Builder builder) { + this.keyType = Objects.requireNonNull(builder.keyType, "Key type shouldn't be null"); + this.valueType = Objects.requireNonNull(builder.valueType, "Value type shouldn't be null"); + this.expireAfterAccess = Objects.requireNonNull(builder.expireAfterAcess, "ExpireAfterAccess value shouldn't " + "be null"); + this.ehCacheEventListener = new EhCacheEventListener(); + this.maxWeightInBytes = builder.maxWeightInBytes; + this.storagePath = Objects.requireNonNull(builder.storagePath, "Storage path shouldn't be null"); + if (builder.threadPoolAlias == null || builder.threadPoolAlias.isBlank()) { + this.threadPoolAlias = THREAD_POOL_ALIAS_PREFIX + "DiskWrite"; + } else { + this.threadPoolAlias = builder.threadPoolAlias; + } + this.settings = Objects.requireNonNull(builder.settings, "Settings objects shouldn't be null"); + Objects.requireNonNull(builder.settingPrefix, "Setting prefix shouldn't be null"); + this.DISK_WRITE_MINIMUM_THREADS = Setting.intSetting(builder.settingPrefix + ".tiered.disk.ehcache.min_threads", 2, 1, 5); + this.DISK_WRITE_MAXIMUM_THREADS = Setting.intSetting(builder.settingPrefix + ".tiered.disk.ehcache.max_threads", 2, 1, 20); + // Default value is 1 within EhCache. + this.DISK_WRITE_CONCURRENCY = Setting.intSetting(builder.settingPrefix + ".tiered.disk.ehcache.concurrency", 2, 1, 3); + // Default value is 16 within Ehcache. + this.DISK_SEGMENTS = Setting.intSetting(builder.settingPrefix + ".ehcache.disk.segments", 16, 1, 32); + this.cacheManager = buildCacheManager(); + this.cache = buildCache(Duration.ofMillis(expireAfterAccess.getMillis()), builder); + } + + private PersistentCacheManager buildCacheManager() { + // In case we use multiple ehCaches, we can define this cache manager at a global level. + return CacheManagerBuilder.newCacheManagerBuilder() + .with(CacheManagerBuilder.persistence(new File(storagePath))) + .using( + PooledExecutionServiceConfigurationBuilder.newPooledExecutionServiceConfigurationBuilder() + .defaultPool(THREAD_POOL_ALIAS_PREFIX + "Default", 1, 3) // Default pool used for other tasks like + // event listeners + .pool(this.threadPoolAlias, DISK_WRITE_MINIMUM_THREADS.get(settings), DISK_WRITE_MAXIMUM_THREADS.get(settings)) + .build() + ) + .build(true); + } + + private Cache buildCache(Duration expireAfterAccess, Builder builder) { + return this.cacheManager.createCache( + DISK_CACHE_ALIAS, + CacheConfigurationBuilder.newCacheConfigurationBuilder( + this.keyType, + this.valueType, + ResourcePoolsBuilder.newResourcePoolsBuilder().disk(maxWeightInBytes, MemoryUnit.B) + ).withExpiry(new ExpiryPolicy() { + @Override + public Duration getExpiryForCreation(K key, V value) { + return INFINITE; + } + + @Override + public Duration getExpiryForAccess(K key, Supplier value) { + return expireAfterAccess; + } + + @Override + public Duration getExpiryForUpdate(K key, Supplier oldValue, V newValue) { + return INFINITE; + } + }) + .withService(getListenerConfiguration(builder)) + .withService( + new OffHeapDiskStoreConfiguration( + this.threadPoolAlias, + DISK_WRITE_CONCURRENCY.get(settings), + DISK_SEGMENTS.get(settings) + ) + ) + ); + } + + private CacheEventListenerConfigurationBuilder getListenerConfiguration(Builder builder) { + CacheEventListenerConfigurationBuilder configurationBuilder = CacheEventListenerConfigurationBuilder.newEventListenerConfiguration( + this.ehCacheEventListener, + EventType.EVICTED, + EventType.EXPIRED, + EventType.REMOVED, + EventType.UPDATED, + EventType.CREATED + ).unordered(); + if (builder.isEventListenerModeSync) { + return configurationBuilder.synchronous(); + } else { + return configurationBuilder.asynchronous(); + } + } + + @Override + public V get(K key) { + // Optimize it by adding key store. + return cache.get(key); + } + + @Override + public void put(K key, V value) { + cache.put(key, value); + } + + @Override + public V computeIfAbsent(K key, TieredCacheLoader loader) throws Exception { + // Ehcache doesn't offer any such function. Will have to implement our own if needed later on. + throw new UnsupportedOperationException(); + } + + @Override + public void invalidate(K key) { + // There seems to be an thread leak issue while calling this and then closing cache. + cache.remove(key); + } + + @Override + public V compute(K key, TieredCacheLoader loader) throws Exception { + // Ehcache doesn't offer any such function. Will have to implement our own if needed later on. + throw new UnsupportedOperationException(); + } + + @Override + public void setRemovalListener(RemovalListener removalListener) { + ehCacheEventListener.setRemovalListener(removalListener); + } + + @Override + public void invalidateAll() { + // Clear up files. + } + + @Override + public Iterable keys() { + return () -> new EhCacheKeyIterator<>(cache.iterator()); + } + + @Override + public int count() { + return (int) count.count(); + } + + @Override + public TierType getTierType() { + return TierType.DISK; + } + + @Override + public void close() { + cacheManager.removeCache(DISK_CACHE_ALIAS); + cacheManager.close(); + try { + cacheManager.destroyCache(DISK_CACHE_ALIAS); + } catch (CachePersistenceException e) { + throw new OpenSearchException("Exception occurred while destroying ehcache and associated data", e); + } + } + + /** + * Wrapper over Ehcache original listener to listen to desired events and notify desired subscribers. + * @param Type of key + * @param Type of value + */ + class EhCacheEventListener implements CacheEventListener { + + private Optional> removalListener; + + EhCacheEventListener() {} + + public void setRemovalListener(RemovalListener removalListener) { + this.removalListener = Optional.ofNullable(removalListener); + } + + @Override + public void onEvent(CacheEvent event) { + switch (event.getType()) { + case CREATED: + count.inc(); + assert event.getOldValue() == null; + break; + case EVICTED: + this.removalListener.ifPresent( + listener -> listener.onRemoval( + new RemovalNotification<>(event.getKey(), event.getOldValue(), RemovalReason.EVICTED) + ) + ); + count.dec(); + assert event.getNewValue() == null; + break; + case REMOVED: + count.dec(); + assert event.getNewValue() == null; + break; + case EXPIRED: + this.removalListener.ifPresent( + listener -> listener.onRemoval( + new RemovalNotification<>(event.getKey(), event.getOldValue(), RemovalReason.INVALIDATED) + ) + ); + count.dec(); + assert event.getNewValue() == null; + break; + case UPDATED: + break; + default: + break; + } + } + } + + /** + * This iterator wraps ehCache iterator and only iterates over its keys. + * @param Type of key + */ + class EhCacheKeyIterator implements Iterator { + + Iterator> iterator; + + EhCacheKeyIterator(Iterator> iterator) { + this.iterator = iterator; + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public K next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return iterator.next().getKey(); + } + } + + /** + * Builder object to build Ehcache disk tier. + * @param Type of key + * @param Type of value + */ + public static class Builder { + private long maxWeightInBytes; + private TimeValue expireAfterAcess; + + private Class keyType; + + private Class valueType; + + private String storagePath; + + private String threadPoolAlias; + + private Settings settings; + + private String diskCacheAlias; + + private String settingPrefix; + + // Provides capability to make ehCache event listener to run in sync mode. Used for testing too. + private boolean isEventListenerModeSync; + + public Builder() {} + + public EhCacheDiskCachingTier.Builder setMaximumWeightInBytes(long sizeInBytes) { + if (sizeInBytes <= MINIMUM_MAX_SIZE_IN_BYTES) { + throw new IllegalArgumentException("Ehcache Disk tier cache size should be greater than " + MINIMUM_MAX_SIZE_IN_BYTES); + } + this.maxWeightInBytes = sizeInBytes; + return this; + } + + public EhCacheDiskCachingTier.Builder setExpireAfterAccess(TimeValue expireAfterAcess) { + this.expireAfterAcess = expireAfterAcess; + return this; + } + + public EhCacheDiskCachingTier.Builder setKeyType(Class keyType) { + this.keyType = keyType; + return this; + } + + public EhCacheDiskCachingTier.Builder setValueType(Class valueType) { + this.valueType = valueType; + return this; + } + + public EhCacheDiskCachingTier.Builder setStoragePath(String storagePath) { + this.storagePath = storagePath; + return this; + } + + public EhCacheDiskCachingTier.Builder setThreadPoolAlias(String threadPoolAlias) { + this.threadPoolAlias = threadPoolAlias; + return this; + } + + public EhCacheDiskCachingTier.Builder setSettings(Settings settings) { + this.settings = settings; + return this; + } + + public EhCacheDiskCachingTier.Builder setDiskCacheAlias(String diskCacheAlias) { + this.diskCacheAlias = diskCacheAlias; + return this; + } + + public EhCacheDiskCachingTier.Builder setSettingPrefix(String settingPrefix) { + // TODO: Do some basic validation. So that it doesn't end with "." etc. + this.settingPrefix = settingPrefix; + return this; + } + + public EhCacheDiskCachingTier.Builder setIsEventListenerModeSync(boolean isEventListenerModeSync) { + this.isEventListenerModeSync = isEventListenerModeSync; + return this; + } + + public EhCacheDiskCachingTier build() { + return new EhCacheDiskCachingTier<>(this); + } + } +} diff --git a/server/src/main/resources/org/opensearch/bootstrap/security.policy b/server/src/main/resources/org/opensearch/bootstrap/security.policy index 77cd0ab05278e..db5ee445f413a 100644 --- a/server/src/main/resources/org/opensearch/bootstrap/security.policy +++ b/server/src/main/resources/org/opensearch/bootstrap/security.policy @@ -188,4 +188,8 @@ grant { permission java.io.FilePermission "/sys/fs/cgroup/memory", "read"; permission java.io.FilePermission "/sys/fs/cgroup/memory/-", "read"; + // For ehcache + permission java.lang.RuntimePermission "createClassLoader"; + permission java.lang.RuntimePermission "accessClassInPackage.sun.misc"; + }; diff --git a/server/src/test/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTierTests.java b/server/src/test/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTierTests.java new file mode 100644 index 0000000000000..804f236264daa --- /dev/null +++ b/server/src/test/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTierTests.java @@ -0,0 +1,226 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.tier; + +import org.opensearch.common.cache.RemovalListener; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.env.NodeEnvironment; +import org.opensearch.test.OpenSearchSingleNodeTestCase; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Phaser; +import java.util.concurrent.atomic.AtomicInteger; + +public class EhCacheDiskCachingTierTests extends OpenSearchSingleNodeTestCase { + + private static final int CACHE_SIZE_IN_BYTES = 1024 * 101; + private static final String SETTING_PREFIX = "indices.request.cache"; + + public void testBasicGetAndPut() throws IOException { + Settings settings = Settings.builder().build(); + try (NodeEnvironment env = newNodeEnvironment(settings)) { + EhCacheDiskCachingTier ehCacheDiskCachingTierNew = new EhCacheDiskCachingTier.Builder() + .setKeyType(String.class) + .setValueType(String.class) + .setExpireAfterAccess(TimeValue.MAX_VALUE) + .setSettings(settings) + .setThreadPoolAlias("ehcacheTest") + .setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES) + .setStoragePath(env.nodePaths()[0].indicesPath.toString() + "/request_cache") + .setSettingPrefix(SETTING_PREFIX) + .build(); + int randomKeys = randomIntBetween(10, 100); + Map keyValueMap = new HashMap<>(); + for (int i = 0; i < randomKeys; i++) { + keyValueMap.put(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + } + for (Map.Entry entry : keyValueMap.entrySet()) { + ehCacheDiskCachingTierNew.put(entry.getKey(), entry.getValue()); + } + for (Map.Entry entry : keyValueMap.entrySet()) { + String value = ehCacheDiskCachingTierNew.get(entry.getKey()); + assertEquals(entry.getValue(), value); + } + ehCacheDiskCachingTierNew.close(); + } + } + + public void testConcurrentPut() throws Exception { + Settings settings = Settings.builder().build(); + try (NodeEnvironment env = newNodeEnvironment(settings)) { + EhCacheDiskCachingTier ehCacheDiskCachingTierNew = new EhCacheDiskCachingTier.Builder() + .setKeyType(String.class) + .setValueType(String.class) + .setExpireAfterAccess(TimeValue.MAX_VALUE) + .setSettings(settings) + .setThreadPoolAlias("ehcacheTest") + .setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES) + .setStoragePath(env.nodePaths()[0].indicesPath.toString() + "/request_cache") + .setSettingPrefix(SETTING_PREFIX) + .build(); + int randomKeys = randomIntBetween(20, 100); + Thread[] threads = new Thread[randomKeys]; + Phaser phaser = new Phaser(randomKeys + 1); + CountDownLatch countDownLatch = new CountDownLatch(randomKeys); + Map keyValueMap = new HashMap<>(); + int j = 0; + for (int i = 0; i < randomKeys; i++) { + keyValueMap.put(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + } + for (Map.Entry entry : keyValueMap.entrySet()) { + threads[j] = new Thread(() -> { + phaser.arriveAndAwaitAdvance(); + ehCacheDiskCachingTierNew.put(entry.getKey(), entry.getValue()); + countDownLatch.countDown(); + }); + threads[j].start(); + j++; + } + phaser.arriveAndAwaitAdvance(); // Will trigger parallel puts above. + countDownLatch.await(); // Wait for all threads to finish + for (Map.Entry entry : keyValueMap.entrySet()) { + String value = ehCacheDiskCachingTierNew.get(entry.getKey()); + assertEquals(entry.getValue(), value); + } + ehCacheDiskCachingTierNew.close(); + } + } + + public void testEhcacheParallelGets() throws Exception { + Settings settings = Settings.builder().build(); + try (NodeEnvironment env = newNodeEnvironment(settings)) { + EhCacheDiskCachingTier ehCacheDiskCachingTierNew = new EhCacheDiskCachingTier.Builder() + .setKeyType(String.class) + .setValueType(String.class) + .setExpireAfterAccess(TimeValue.MAX_VALUE) + .setSettings(settings) + .setThreadPoolAlias("ehcacheTest") + .setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES) + .setStoragePath(env.nodePaths()[0].indicesPath.toString() + "/request_cache") + .setSettingPrefix(SETTING_PREFIX) + .setIsEventListenerModeSync(true) // For accurate count + .build(); + ehCacheDiskCachingTierNew.setRemovalListener(removalListener(new AtomicInteger())); + int randomKeys = randomIntBetween(20, 100); + Thread[] threads = new Thread[randomKeys]; + Phaser phaser = new Phaser(randomKeys + 1); + CountDownLatch countDownLatch = new CountDownLatch(randomKeys); + Map keyValueMap = new HashMap<>(); + int j = 0; + for (int i = 0; i < randomKeys; i++) { + keyValueMap.put(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + } + for (Map.Entry entry : keyValueMap.entrySet()) { + ehCacheDiskCachingTierNew.put(entry.getKey(), entry.getValue()); + } + assertEquals(keyValueMap.size(), ehCacheDiskCachingTierNew.count()); + for (Map.Entry entry : keyValueMap.entrySet()) { + threads[j] = new Thread(() -> { + phaser.arriveAndAwaitAdvance(); + assertEquals(entry.getValue(), ehCacheDiskCachingTierNew.get(entry.getKey())); + countDownLatch.countDown(); + }); + threads[j].start(); + j++; + } + phaser.arriveAndAwaitAdvance(); // Will trigger parallel puts above. + countDownLatch.await(); // Wait for all threads to finish + ehCacheDiskCachingTierNew.close(); + } + } + + public void testEhcacheKeyIterator() throws Exception { + Settings settings = Settings.builder().build(); + try (NodeEnvironment env = newNodeEnvironment(settings)) { + EhCacheDiskCachingTier ehCacheDiskCachingTierNew = new EhCacheDiskCachingTier.Builder() + .setKeyType(String.class) + .setValueType(String.class) + .setExpireAfterAccess(TimeValue.MAX_VALUE) + .setSettings(settings) + .setThreadPoolAlias("ehcacheTest") + .setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES) + .setSettingPrefix(SETTING_PREFIX) + .setStoragePath(env.nodePaths()[0].indicesPath.toString() + "/request_cache") + .build(); + + int randomKeys = randomIntBetween(2, 2); + Map keyValueMap = new HashMap<>(); + for (int i = 0; i < randomKeys; i++) { + keyValueMap.put(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + } + for (Map.Entry entry : keyValueMap.entrySet()) { + ehCacheDiskCachingTierNew.put(entry.getKey(), entry.getValue()); + } + Iterator keys = ehCacheDiskCachingTierNew.keys().iterator(); + int keysCount = 0; + while (keys.hasNext()) { + String key = keys.next(); + keysCount++; + assertNotNull(ehCacheDiskCachingTierNew.get(key)); + } + assertEquals(keysCount, randomKeys); + ehCacheDiskCachingTierNew.close(); + } + } + + public void testCompute() throws Exception { + Settings settings = Settings.builder().build(); + try (NodeEnvironment env = newNodeEnvironment(settings)) { + EhCacheDiskCachingTier ehCacheDiskCachingTierNew = new EhCacheDiskCachingTier.Builder() + .setKeyType(String.class) + .setValueType(String.class) + .setExpireAfterAccess(TimeValue.MAX_VALUE) + .setSettings(settings) + .setThreadPoolAlias("ehcacheTest") + .setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES) + .setSettingPrefix(SETTING_PREFIX) + .setStoragePath(env.nodePaths()[0].indicesPath.toString() + "/request_cache") + .build(); + // For now it is unsupported. + assertThrows( + UnsupportedOperationException.class, + () -> ehCacheDiskCachingTierNew.compute("dummy", new TieredCacheLoader() { + @Override + public String load(String key) throws Exception { + return "dummy"; + } + + @Override + public boolean isLoaded() { + return false; + } + }) + ); + assertThrows( + UnsupportedOperationException.class, + () -> ehCacheDiskCachingTierNew.computeIfAbsent("dummy", new TieredCacheLoader<>() { + @Override + public String load(String key) { + return "dummy"; + } + + @Override + public boolean isLoaded() { + return false; + } + }) + ); + } + } + + private RemovalListener removalListener(AtomicInteger counter) { + return notification -> counter.incrementAndGet(); + } +} diff --git a/server/src/test/java/org/opensearch/common/cache/tier/TieredCacheSpilloverStrategyServiceTests.java b/server/src/test/java/org/opensearch/common/cache/tier/TieredCacheSpilloverStrategyServiceTests.java index 3cd08df649f72..bb7a22cc26037 100644 --- a/server/src/test/java/org/opensearch/common/cache/tier/TieredCacheSpilloverStrategyServiceTests.java +++ b/server/src/test/java/org/opensearch/common/cache/tier/TieredCacheSpilloverStrategyServiceTests.java @@ -454,5 +454,8 @@ public TierType getTierType() { public void onRemoval(RemovalNotification notification) { this.removalListener.onRemoval(notification); } + + @Override + public void close() {} } } From 1d8d21f010c9e8620be6779468a7e3de4f45f34d Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Wed, 1 Nov 2023 16:59:52 -0700 Subject: [PATCH 02/13] Adds Serializer interface for use in ehcache disk tier and elsewhere; modifies existing disk tier impl to use it in a generic way Signed-off-by: Peter Alfonsi Fixed byte[] key implementation to use ByteBuffer wrapper passed directly to ehcache Signed-off-by: Peter Alfonsi Added tests for BytesReference serializer, and ehcache disk tier using BytesReference as a value --- .../cache/tier/BytesReferenceSerializer.java | 35 +++++ .../cache/tier/EhCacheDiskCachingTier.java | 128 ++++++++++++++---- .../common/cache/tier/Serializer.java | 39 ++++++ .../tier/BytesReferenceSerializerTests.java | 61 +++++++++ .../tier/EhCacheDiskCachingTierTests.java | 70 ++++++++++ 5 files changed, 309 insertions(+), 24 deletions(-) create mode 100644 server/src/main/java/org/opensearch/common/cache/tier/BytesReferenceSerializer.java create mode 100644 server/src/main/java/org/opensearch/common/cache/tier/Serializer.java create mode 100644 server/src/test/java/org/opensearch/common/cache/tier/BytesReferenceSerializerTests.java diff --git a/server/src/main/java/org/opensearch/common/cache/tier/BytesReferenceSerializer.java b/server/src/main/java/org/opensearch/common/cache/tier/BytesReferenceSerializer.java new file mode 100644 index 0000000000000..55ffe22c2a339 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/tier/BytesReferenceSerializer.java @@ -0,0 +1,35 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.tier; + +import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.common.bytes.BytesReference; + +import java.io.IOException; +import java.util.Arrays; + +public class BytesReferenceSerializer implements Serializer { + // This class does not get passed to ehcache itself, so it's not required that classes match after deserialization. + + public BytesReferenceSerializer() {} + @Override + public byte[] serialize(BytesReference object) { + return BytesReference.toBytes(object); + } + + @Override + public BytesReference deserialize(byte[] bytes) { + return new BytesArray(bytes); + } + + @Override + public boolean equals(BytesReference object, byte[] bytes) { + return Arrays.equals(serialize(object), bytes); + } +} diff --git a/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java b/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java index e7e52c7fbccbf..4adfcd731c6ef 100644 --- a/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java +++ b/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java @@ -8,6 +8,8 @@ package org.opensearch.common.cache.tier; +import org.ehcache.core.spi.service.FileBasedPersistenceContext; +import org.ehcache.spi.serialization.SerializerException; import org.opensearch.OpenSearchException; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; @@ -18,6 +20,7 @@ import org.opensearch.common.unit.TimeValue; import java.io.File; +import java.nio.ByteBuffer; import java.time.Duration; import java.util.Iterator; import java.util.NoSuchElementException; @@ -40,13 +43,25 @@ import org.ehcache.expiry.ExpiryPolicy; import org.ehcache.impl.config.store.disk.OffHeapDiskStoreConfiguration; +/** + * This ehcache disk caching tier uses its value serializer outside ehcache. + * Values are transformed to byte[] outside ehcache and then ehcache uses its bundled byte[] serializer. + * The key serializer you pass to this class produces a byte[]. This serializer is passed to a wrapper which + * implements Ehcache's serializer implementation and produces a BytesBuffer. The wrapper instance is then passed to ehcache. + * This is done because to get keys on a disk tier, ehcache internally checks the equals() method of the serializer, + * but ALSO requires newKey.equals(storedKey) (this isn't documented), which is the case for ByteBuffer but not byte[]. + * This limitation means that the key serializer must preserve the class of the key before/after serialization, + * but the value serializer does not have to do this. + * @param The key type of cache entries + * @param The value type of cache entries + */ public class EhCacheDiskCachingTier implements DiskCachingTier { // A Cache manager can create many caches. private final PersistentCacheManager cacheManager; // Disk cache - private Cache cache; + private Cache cache; private final long maxWeightInBytes; private final String storagePath; @@ -86,11 +101,16 @@ public class EhCacheDiskCachingTier implements DiskCachingTier { // will hold that many file pointers. public final Setting DISK_SEGMENTS; + private final Serializer keySerializer; + private final Serializer valueSerializer; + private EhCacheDiskCachingTier(Builder builder) { this.keyType = Objects.requireNonNull(builder.keyType, "Key type shouldn't be null"); this.valueType = Objects.requireNonNull(builder.valueType, "Value type shouldn't be null"); this.expireAfterAccess = Objects.requireNonNull(builder.expireAfterAcess, "ExpireAfterAccess value shouldn't " + "be null"); - this.ehCacheEventListener = new EhCacheEventListener(); + this.keySerializer = Objects.requireNonNull(builder.keySerializer, "Key serializer shouldn't be null"); + this.valueSerializer = Objects.requireNonNull(builder.valueSerializer, "Value serializer shouldn't be null"); + this.ehCacheEventListener = new EhCacheEventListener(this.valueSerializer); this.maxWeightInBytes = builder.maxWeightInBytes; this.storagePath = Objects.requireNonNull(builder.storagePath, "Storage path shouldn't be null"); if (builder.threadPoolAlias == null || builder.threadPoolAlias.isBlank()) { @@ -124,37 +144,38 @@ private PersistentCacheManager buildCacheManager() { .build(true); } - private Cache buildCache(Duration expireAfterAccess, Builder builder) { + private Cache buildCache(Duration expireAfterAccess, Builder builder) { return this.cacheManager.createCache( DISK_CACHE_ALIAS, CacheConfigurationBuilder.newCacheConfigurationBuilder( - this.keyType, - this.valueType, + keyType, + byte[].class, ResourcePoolsBuilder.newResourcePoolsBuilder().disk(maxWeightInBytes, MemoryUnit.B) - ).withExpiry(new ExpiryPolicy() { + ).withExpiry(new ExpiryPolicy() { @Override - public Duration getExpiryForCreation(K key, V value) { + public Duration getExpiryForCreation(K key, byte[] value) { return INFINITE; } @Override - public Duration getExpiryForAccess(K key, Supplier value) { + public Duration getExpiryForAccess(K key, Supplier value) { return expireAfterAccess; } @Override - public Duration getExpiryForUpdate(K key, Supplier oldValue, V newValue) { + public Duration getExpiryForUpdate(K key, Supplier oldValue, byte[] newValue) { return INFINITE; } }) .withService(getListenerConfiguration(builder)) .withService( - new OffHeapDiskStoreConfiguration( - this.threadPoolAlias, - DISK_WRITE_CONCURRENCY.get(settings), - DISK_SEGMENTS.get(settings) - ) + new OffHeapDiskStoreConfiguration( + this.threadPoolAlias, + DISK_WRITE_CONCURRENCY.get(settings), + DISK_SEGMENTS.get(settings) ) + ) + .withKeySerializer(new KeySerializerWrapper(keySerializer)) ); } @@ -177,12 +198,12 @@ private CacheEventListenerConfigurationBuilder getListenerConfiguration(Builder< @Override public V get(K key) { // Optimize it by adding key store. - return cache.get(key); + return valueSerializer.deserialize(cache.get(key)); } @Override public void put(K key, V value) { - cache.put(key, value); + cache.put(key, valueSerializer.serialize(value)); } @Override @@ -193,7 +214,7 @@ public V computeIfAbsent(K key, TieredCacheLoader loader) throws Exception @Override public void invalidate(K key) { - // There seems to be an thread leak issue while calling this and then closing cache. + // There seems to be a thread leak issue while calling this and then closing cache. cache.remove(key); } @@ -244,18 +265,23 @@ public void close() { * @param Type of key * @param Type of value */ - class EhCacheEventListener implements CacheEventListener { + class EhCacheEventListener implements CacheEventListener { private Optional> removalListener; + // We need to pass the value serializer to this listener, as the removal listener is expecting + // values of type K, V, not K, byte[] + private Serializer valueSerializer; - EhCacheEventListener() {} + EhCacheEventListener(Serializer valueSerializer) { + this.valueSerializer = valueSerializer; + } public void setRemovalListener(RemovalListener removalListener) { this.removalListener = Optional.ofNullable(removalListener); } @Override - public void onEvent(CacheEvent event) { + public void onEvent(CacheEvent event) { switch (event.getType()) { case CREATED: count.inc(); @@ -264,7 +290,11 @@ public void onEvent(CacheEvent event) { case EVICTED: this.removalListener.ifPresent( listener -> listener.onRemoval( - new RemovalNotification<>(event.getKey(), event.getOldValue(), RemovalReason.EVICTED) + new RemovalNotification<>( + event.getKey(), + valueSerializer.deserialize(event.getOldValue()), + RemovalReason.EVICTED + ) ) ); count.dec(); @@ -277,7 +307,11 @@ public void onEvent(CacheEvent event) { case EXPIRED: this.removalListener.ifPresent( listener -> listener.onRemoval( - new RemovalNotification<>(event.getKey(), event.getOldValue(), RemovalReason.INVALIDATED) + new RemovalNotification<>( + event.getKey(), + valueSerializer.deserialize(event.getOldValue()), + RemovalReason.INVALIDATED + ) ) ); count.dec(); @@ -297,9 +331,9 @@ public void onEvent(CacheEvent event) { */ class EhCacheKeyIterator implements Iterator { - Iterator> iterator; + Iterator> iterator; - EhCacheKeyIterator(Iterator> iterator) { + EhCacheKeyIterator(Iterator> iterator) { this.iterator = iterator; } @@ -317,6 +351,40 @@ public K next() { } } + /** + * The wrapper for the key serializer which is passed directly to Ehcache. + */ + private class KeySerializerWrapper implements org.ehcache.spi.serialization.Serializer { + public Serializer serializer; + public KeySerializerWrapper(Serializer serializer) { + this.serializer = serializer; + } + + // This constructor must be present, but does not have to work as we are not actually persisting the disk + // cache after a restart. + // See https://www.ehcache.org/documentation/3.0/serializers-copiers.html#persistent-vs-transient-caches + public KeySerializerWrapper(ClassLoader classLoader, FileBasedPersistenceContext persistenceContext) {} + + @Override + public ByteBuffer serialize(K object) throws SerializerException { + return ByteBuffer.wrap(serializer.serialize(object)); + } + + @Override + public K read(ByteBuffer binary) throws ClassNotFoundException, SerializerException { + byte[] arr = new byte[binary.remaining()]; + binary.get(arr); + return serializer.deserialize(arr); + } + + @Override + public boolean equals(K object, ByteBuffer binary) throws ClassNotFoundException, SerializerException { + byte[] arr = new byte[binary.remaining()]; + binary.get(arr); + return serializer.equals(object, arr); + } + } + /** * Builder object to build Ehcache disk tier. * @param Type of key @@ -342,6 +410,8 @@ public static class Builder { // Provides capability to make ehCache event listener to run in sync mode. Used for testing too. private boolean isEventListenerModeSync; + private Serializer keySerializer; + private Serializer valueSerializer; public Builder() {} @@ -399,6 +469,16 @@ public EhCacheDiskCachingTier.Builder setIsEventListenerModeSync(boolean i return this; } + public EhCacheDiskCachingTier.Builder setKeySerializer(Serializer keySerializer) { + this.keySerializer = keySerializer; + return this; + } + + public EhCacheDiskCachingTier.Builder setValueSerializer(Serializer valueSerializer) { + this.valueSerializer = valueSerializer; + return this; + } + public EhCacheDiskCachingTier build() { return new EhCacheDiskCachingTier<>(this); } diff --git a/server/src/main/java/org/opensearch/common/cache/tier/Serializer.java b/server/src/main/java/org/opensearch/common/cache/tier/Serializer.java new file mode 100644 index 0000000000000..74c256d188682 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/tier/Serializer.java @@ -0,0 +1,39 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.tier; + +import java.io.IOException; + +/** + * An interface for serializers, to be used in disk caching tier and elsewhere. + * T is the class of the original object, and U is the serialized class. + */ +public interface Serializer { + /** + * Serializes an object. + * @param object A non-serialized object. + * @return The serialized representation of the object. + */ + U serialize(T object); + + /** + * Deserializes bytes into an object. + * @param bytes The serialized representation. + * @return The original object. + */ + T deserialize(U bytes); + + /** + * Compares an object to a serialized representation of an object. + * @param object A non-serialized objet + * @param bytes Serialized representation of an object + * @return true if representing the same object, false if not + */ + boolean equals(T object, U bytes); +} diff --git a/server/src/test/java/org/opensearch/common/cache/tier/BytesReferenceSerializerTests.java b/server/src/test/java/org/opensearch/common/cache/tier/BytesReferenceSerializerTests.java new file mode 100644 index 0000000000000..2fc9c7cbb2756 --- /dev/null +++ b/server/src/test/java/org/opensearch/common/cache/tier/BytesReferenceSerializerTests.java @@ -0,0 +1,61 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.tier; + +import org.opensearch.common.Randomness; +import org.opensearch.common.bytes.ReleasableBytesReference; +import org.opensearch.common.util.BigArrays; +import org.opensearch.common.util.PageCacheRecycler; +import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.bytes.CompositeBytesReference; +import org.opensearch.core.common.util.ByteArray; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.Random; + +public class BytesReferenceSerializerTests extends OpenSearchTestCase { + public void testEquality() throws Exception { + BytesReferenceSerializer ser = new BytesReferenceSerializer(); + // Test that values are equal before and after serialization, for each implementation of BytesReference. + byte[] bytesValue = new byte[1000]; + Random rand = Randomness.get(); + rand.nextBytes(bytesValue); + + BytesReference ba = new BytesArray(bytesValue); + byte[] serialized = ser.serialize(ba); + assertTrue(ser.equals(ba, serialized)); + BytesReference deserialized = ser.deserialize(serialized); + assertEquals(ba, deserialized); + + BytesReference cbr = CompositeBytesReference.of(new BytesArray(bytesValue), new BytesArray(bytesValue)); + serialized = ser.serialize(cbr); + assertTrue(ser.equals(cbr, serialized)); + deserialized = ser.deserialize(serialized); + assertEquals(cbr, deserialized); + + // We need the PagedBytesReference to be larger than the page size (16 KB) in order to actually create it + byte[] pbrValue = new byte[PageCacheRecycler.PAGE_SIZE_IN_BYTES * 2]; + rand.nextBytes(pbrValue); + ByteArray arr = BigArrays.NON_RECYCLING_INSTANCE.newByteArray(pbrValue.length); + arr.set(0L, pbrValue, 0, pbrValue.length); + assert !arr.hasArray(); + BytesReference pbr = BytesReference.fromByteArray(arr, pbrValue.length); + serialized = ser.serialize(pbr); + assertTrue(ser.equals(pbr, serialized)); + deserialized = ser.deserialize(serialized); + assertEquals(pbr, deserialized); + + BytesReference rbr = new ReleasableBytesReference(new BytesArray(bytesValue), ReleasableBytesReference.NO_OP); + serialized = ser.serialize(rbr); + assertTrue(ser.equals(rbr, serialized)); + deserialized = ser.deserialize(serialized); + assertEquals(rbr, deserialized); + } +} diff --git a/server/src/test/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTierTests.java b/server/src/test/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTierTests.java index 804f236264daa..c873fc075c4cc 100644 --- a/server/src/test/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTierTests.java +++ b/server/src/test/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTierTests.java @@ -8,16 +8,22 @@ package org.opensearch.common.cache.tier; +import org.opensearch.common.Randomness; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.env.NodeEnvironment; import org.opensearch.test.OpenSearchSingleNodeTestCase; import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.Random; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Phaser; @@ -40,6 +46,8 @@ public void testBasicGetAndPut() throws IOException { .setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES) .setStoragePath(env.nodePaths()[0].indicesPath.toString() + "/request_cache") .setSettingPrefix(SETTING_PREFIX) + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new StringSerializer()) .build(); int randomKeys = randomIntBetween(10, 100); Map keyValueMap = new HashMap<>(); @@ -57,6 +65,41 @@ public void testBasicGetAndPut() throws IOException { } } + public void testBasicGetAndPutBytesReference() throws Exception { + Settings settings = Settings.builder().build(); + try (NodeEnvironment env = newNodeEnvironment(settings)) { + EhCacheDiskCachingTier ehCacheDiskCachingTier = new EhCacheDiskCachingTier.Builder() + .setKeyType(String.class) + .setValueType(BytesReference.class) + .setExpireAfterAccess(TimeValue.MAX_VALUE) + .setSettings(settings) + .setThreadPoolAlias("ehcacheTest") + .setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES) + .setStoragePath(env.nodePaths()[0].indicesPath.toString() + "/request_cache") + .setSettingPrefix(SETTING_PREFIX) + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new BytesReferenceSerializer()) + .build(); + int randomKeys = randomIntBetween(10, 100); + int valueLength = 1000; + Random rand = Randomness.get(); + Map keyValueMap = new HashMap<>(); + for (int i = 0; i < randomKeys; i++) { + byte[] valueBytes = new byte[valueLength]; + rand.nextBytes(valueBytes); + keyValueMap.put(UUID.randomUUID().toString(), new BytesArray(valueBytes)); + } + for (Map.Entry entry : keyValueMap.entrySet()) { + ehCacheDiskCachingTier.put(entry.getKey(), entry.getValue()); + } + for (Map.Entry entry : keyValueMap.entrySet()) { + BytesReference value = ehCacheDiskCachingTier.get(entry.getKey()); + assertEquals(entry.getValue(), value); + } + ehCacheDiskCachingTier.close(); + } + } + public void testConcurrentPut() throws Exception { Settings settings = Settings.builder().build(); try (NodeEnvironment env = newNodeEnvironment(settings)) { @@ -69,6 +112,8 @@ public void testConcurrentPut() throws Exception { .setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES) .setStoragePath(env.nodePaths()[0].indicesPath.toString() + "/request_cache") .setSettingPrefix(SETTING_PREFIX) + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new StringSerializer()) .build(); int randomKeys = randomIntBetween(20, 100); Thread[] threads = new Thread[randomKeys]; @@ -111,6 +156,8 @@ public void testEhcacheParallelGets() throws Exception { .setStoragePath(env.nodePaths()[0].indicesPath.toString() + "/request_cache") .setSettingPrefix(SETTING_PREFIX) .setIsEventListenerModeSync(true) // For accurate count + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new StringSerializer()) .build(); ehCacheDiskCachingTierNew.setRemovalListener(removalListener(new AtomicInteger())); int randomKeys = randomIntBetween(20, 100); @@ -153,6 +200,8 @@ public void testEhcacheKeyIterator() throws Exception { .setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES) .setSettingPrefix(SETTING_PREFIX) .setStoragePath(env.nodePaths()[0].indicesPath.toString() + "/request_cache") + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new StringSerializer()) .build(); int randomKeys = randomIntBetween(2, 2); @@ -187,6 +236,8 @@ public void testCompute() throws Exception { .setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES) .setSettingPrefix(SETTING_PREFIX) .setStoragePath(env.nodePaths()[0].indicesPath.toString() + "/request_cache") + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new StringSerializer()) .build(); // For now it is unsupported. assertThrows( @@ -223,4 +274,23 @@ public boolean isLoaded() { private RemovalListener removalListener(AtomicInteger counter) { return notification -> counter.incrementAndGet(); } + + private static class StringSerializer implements Serializer { + + private final Charset charset = StandardCharsets.UTF_8; + @Override + public byte[] serialize(String object) { + return object.getBytes(charset); + } + + @Override + public String deserialize(byte[] bytes) { + return new String(bytes, charset); + } + + @Override + public boolean equals(String object, byte[] bytes) { + return object.equals(deserialize(bytes)); + } + } } From 9d0cfeaca28027373af3d0d1f1d94e3273132a62 Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Mon, 13 Nov 2023 10:14:14 -0800 Subject: [PATCH 03/13] addressed sagar's comments Signed-off-by: Peter Alfonsi --- .../common/cache/tier/EhCacheDiskCachingTier.java | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java b/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java index 4adfcd731c6ef..ff42ac510231b 100644 --- a/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java +++ b/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java @@ -44,14 +44,6 @@ import org.ehcache.impl.config.store.disk.OffHeapDiskStoreConfiguration; /** - * This ehcache disk caching tier uses its value serializer outside ehcache. - * Values are transformed to byte[] outside ehcache and then ehcache uses its bundled byte[] serializer. - * The key serializer you pass to this class produces a byte[]. This serializer is passed to a wrapper which - * implements Ehcache's serializer implementation and produces a BytesBuffer. The wrapper instance is then passed to ehcache. - * This is done because to get keys on a disk tier, ehcache internally checks the equals() method of the serializer, - * but ALSO requires newKey.equals(storedKey) (this isn't documented), which is the case for ByteBuffer but not byte[]. - * This limitation means that the key serializer must preserve the class of the key before/after serialization, - * but the value serializer does not have to do this. * @param The key type of cache entries * @param The value type of cache entries */ @@ -293,7 +285,8 @@ public void onEvent(CacheEvent event) { new RemovalNotification<>( event.getKey(), valueSerializer.deserialize(event.getOldValue()), - RemovalReason.EVICTED + RemovalReason.EVICTED, + TierType.DISK ) ) ); @@ -310,7 +303,8 @@ public void onEvent(CacheEvent event) { new RemovalNotification<>( event.getKey(), valueSerializer.deserialize(event.getOldValue()), - RemovalReason.INVALIDATED + RemovalReason.INVALIDATED, + TierType.DISK ) ) ); From 498b5cb596b3291e872a2cbd8478eb89bd2536ed Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Mon, 13 Nov 2023 11:40:59 -0800 Subject: [PATCH 04/13] Fixed broken test Signed-off-by: Peter Alfonsi --- .../common/cache/tier/EhCacheDiskCachingTierTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTierTests.java b/server/src/test/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTierTests.java index c873fc075c4cc..e6222a9065f94 100644 --- a/server/src/test/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTierTests.java +++ b/server/src/test/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTierTests.java @@ -74,7 +74,7 @@ public void testBasicGetAndPutBytesReference() throws Exception { .setExpireAfterAccess(TimeValue.MAX_VALUE) .setSettings(settings) .setThreadPoolAlias("ehcacheTest") - .setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES) + .setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES * 2) // bigger so no evictions happen .setStoragePath(env.nodePaths()[0].indicesPath.toString() + "/request_cache") .setSettingPrefix(SETTING_PREFIX) .setKeySerializer(new StringSerializer()) From a65c649622d80f5c52eec052f7e91b3de65a4cbb Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Tue, 14 Nov 2023 12:34:45 -0800 Subject: [PATCH 05/13] Fixed ehcache init issue in test cases Signed-off-by: Peter Alfonsi --- .../IndicesRequestCacheDiskTierIT.java | 103 ++++++++++++++++++ .../indices/IndicesRequestCacheIT.java | 1 + .../cache/tier/EhCacheDiskCachingTier.java | 15 ++- .../indices/IndicesRequestCache.java | 26 +++++ .../opensearch/indices/IndicesService.java | 4 + .../org/opensearch/bootstrap/security.policy | 2 + .../indices/IndicesRequestCacheTests.java | 3 +- 7 files changed, 149 insertions(+), 5 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheDiskTierIT.java diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheDiskTierIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheDiskTierIT.java new file mode 100644 index 0000000000000..b1db9b8e624aa --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheDiskTierIT.java @@ -0,0 +1,103 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.indices; + +import org.opensearch.action.search.SearchResponse; +import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.cache.tier.TierType; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.index.cache.request.RequestCacheStats; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.test.OpenSearchIntegTestCase; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse; + +// This is a separate file from IndicesRequestCacheIT because we only want to run our test +// on a node with a maximum request cache size that we set. + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class IndicesRequestCacheDiskTierIT extends OpenSearchIntegTestCase { + public void testDiskTierStats() throws Exception { + int heapSizeBytes = 4729; + String node = internalCluster().startNode( + Settings.builder().put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), new ByteSizeValue(heapSizeBytes)) + ); + Client client = client(node); + + Settings.Builder indicesSettingBuilder = Settings.builder() + .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0); + + assertAcked( + client.admin().indices().prepareCreate("index").setMapping("k", "type=keyword").setSettings(indicesSettingBuilder).get() + ); + indexRandom(true, client.prepareIndex("index").setSource("k", "hello")); + ensureSearchable("index"); + SearchResponse resp; + + resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + 0)).get(); + int requestSize = (int) getCacheSizeBytes(client, "index", TierType.ON_HEAP); + System.out.println(requestSize); + assertTrue(heapSizeBytes > requestSize); + // If this fails, increase heapSizeBytes! We can't adjust it after getting the size of one query + // as the cache size setting is not dynamic + + int numOnDisk = 5; + int numRequests = heapSizeBytes / requestSize + numOnDisk; + for (int i = 1; i < numRequests; i++) { + resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + i)).get(); + assertSearchResponse(resp); + IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.ON_HEAP, false); + IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.DISK, false); + } + // the first request, for "hello0", should have been evicted to the disk tier + resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello0")).get(); + IndicesRequestCacheIT.assertCacheState(client, "index", 0, numRequests + 1, TierType.ON_HEAP, false); + IndicesRequestCacheIT.assertCacheState(client, "index", 1, numRequests, TierType.DISK, false); + } + + private long getCacheSizeBytes(Client client, String index, TierType tierType) { + RequestCacheStats requestCacheStats = client.admin() + .indices() + .prepareStats(index) + .setRequestCache(true) + .get() + .getTotal() + .getRequestCache(); + return requestCacheStats.getMemorySizeInBytes(tierType); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java index a1815d9be2daf..163f9afc0c103 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java @@ -40,6 +40,7 @@ import org.opensearch.action.search.SearchType; import org.opensearch.client.Client; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.cache.tier.TierType; import org.opensearch.common.settings.Settings; import org.opensearch.common.time.DateFormatter; import org.opensearch.common.util.FeatureFlags; diff --git a/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java b/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java index ff42ac510231b..c8a217558d982 100644 --- a/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java +++ b/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java @@ -50,7 +50,7 @@ public class EhCacheDiskCachingTier implements DiskCachingTier { // A Cache manager can create many caches. - private final PersistentCacheManager cacheManager; + private static PersistentCacheManager cacheManager = null; // Disk cache private Cache cache; @@ -118,8 +118,15 @@ private EhCacheDiskCachingTier(Builder builder) { this.DISK_WRITE_CONCURRENCY = Setting.intSetting(builder.settingPrefix + ".tiered.disk.ehcache.concurrency", 2, 1, 3); // Default value is 16 within Ehcache. this.DISK_SEGMENTS = Setting.intSetting(builder.settingPrefix + ".ehcache.disk.segments", 16, 1, 32); - this.cacheManager = buildCacheManager(); - this.cache = buildCache(Duration.ofMillis(expireAfterAccess.getMillis()), builder); + if (cacheManager == null) { + cacheManager = buildCacheManager(); + this.cache = buildCache(Duration.ofMillis(expireAfterAccess.getMillis()), builder); + } + + // IndicesRequestCache gets 1%, of which we allocate 5% to the keystore = 0.05% + // TODO: how do we change this automatically based on INDICES_CACHE_QUERY_SIZE setting? + Setting keystoreSizeSetting = Setting.memorySizeSetting(builder.settingPrefix + ".tiered.disk.keystore_size", "0.05%"); + this.keystore = new RBMIntKeyLookupStore(keystoreSizeSetting.get(this.settings).getBytes()); } private PersistentCacheManager buildCacheManager() { @@ -137,7 +144,7 @@ private PersistentCacheManager buildCacheManager() { } private Cache buildCache(Duration expireAfterAccess, Builder builder) { - return this.cacheManager.createCache( + return cacheManager.createCache( DISK_CACHE_ALIAS, CacheConfigurationBuilder.newCacheConfigurationBuilder( keyType, diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index fe75e8ffb5f39..08c191eddcd96 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -127,6 +127,32 @@ public final class IndicesRequestCache implements TieredCacheEventListener k.ramBytesUsed() + v.ramBytesUsed() ).setMaximumWeight(sizeInBytes).setExpireAfterAccess(expire).build(); + Function transformationFunction = (data) -> { + try { + return convertBytesReferenceToQSR(data); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + // enabling this for testing purposes. Remove/tweak!! + long CACHE_SIZE_IN_BYTES = 1000000L; + String SETTING_PREFIX = "indices.request.cache"; + String STORAGE_PATH = indicesService.getNodePaths()[0].indicesPath.toString() + "/request_cache"; + System.out.println("Node paths length = " + indicesService.getNodePaths().length); + + EhCacheDiskCachingTier ehcacheDiskTier = new EhCacheDiskCachingTier.Builder() + .setKeyType(Key.class) + .setValueType(BytesReference.class) + .setExpireAfterAccess(TimeValue.MAX_VALUE) + .setSettings(settings) + .setThreadPoolAlias("ehcacheTest") + .setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES) + .setStoragePath(STORAGE_PATH) + .setSettingPrefix(SETTING_PREFIX) + .setKeySerializer(new IRCKeyWriteableSerializer(this)) + .setValueSerializer(new BytesReferenceSerializer()) + .build(); + // Initialize tiered cache service. TODO: Enable Disk tier when tiered support is turned on. tieredCacheService = new TieredCacheSpilloverStrategyService.Builder().setOnHeapCachingTier( openSearchOnHeapCache diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index f5e71327b6e7b..d1f39c9a567e5 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -1939,6 +1939,10 @@ public boolean allPendingDanglingIndicesWritten() { || (danglingIndicesToWrite.isEmpty() && danglingIndicesThreadPoolExecutor.getActiveCount() == 0); } + public NodeEnvironment.NodePath[] getNodePaths() { + return nodeEnv.nodePaths(); + } + /** * Validates the cluster default index refresh interval. * diff --git a/server/src/main/resources/org/opensearch/bootstrap/security.policy b/server/src/main/resources/org/opensearch/bootstrap/security.policy index db5ee445f413a..5d588bb9bf1fd 100644 --- a/server/src/main/resources/org/opensearch/bootstrap/security.policy +++ b/server/src/main/resources/org/opensearch/bootstrap/security.policy @@ -192,4 +192,6 @@ grant { permission java.lang.RuntimePermission "createClassLoader"; permission java.lang.RuntimePermission "accessClassInPackage.sun.misc"; + permission java.lang.RuntimePermission "accessDeclaredMembers"; + permission java.lang.reflect.ReflectPermission "suppressAccessChecks"; }; diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index 18ec013711f22..ad6e968743fa9 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -250,7 +250,8 @@ public void testEviction() throws Exception { } IndicesRequestCache cache = new IndicesRequestCache( Settings.builder().put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), size.getBytes() + 1 + "b").build(), - null + getInstanceFromNode(IndicesService.class), + dummyClusterSettings ); AtomicBoolean indexShard = new AtomicBoolean(true); ShardRequestCache requestCacheStats = new ShardRequestCache(); From caa125ec829f23bd6646638060e9ac67d58295fe Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Tue, 14 Nov 2023 13:55:17 -0800 Subject: [PATCH 06/13] cleanup, fixed commented line Signed-off-by: Peter Alfonsi --- .../opensearch/indices/IndicesRequestCache.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 08c191eddcd96..123fa69241157 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -114,8 +114,7 @@ public final class IndicesRequestCache implements TieredCacheEventListener tieredCacheService; - - private final TieredCacheHandler tieredCacheHandler; + IndicesRequestCache(Settings settings, IndicesService indicesService) { this.size = INDICES_CACHE_QUERY_SIZE.get(settings); @@ -138,7 +137,6 @@ public final class IndicesRequestCache implements TieredCacheEventListener keystoreSizeSetting = Setting.memorySizeSetting(builder.settingPrefix + ".tiered.disk.keystore_size", "0.05%"); + //this.keystore = new RBMIntKeyLookupStore(keystoreSizeSetting.get(this.settings).getBytes()); + long keystoreMaxWeight = builder.keystoreMaxWeightInBytes; + this.keystore = new RBMIntKeyLookupStore(keystoreMaxWeight); } private PersistentCacheManager buildCacheManager() { @@ -413,6 +415,7 @@ public static class Builder { private boolean isEventListenerModeSync; private Serializer keySerializer; private Serializer valueSerializer; + private long keystoreMaxWeightInBytes = 0; public Builder() {} @@ -480,6 +483,11 @@ public EhCacheDiskCachingTier.Builder setValueSerializer(Serializer setKeyStoreMaxWeightInBytes(long weight) { + this.keystoreMaxWeightInBytes = weight; + return this; + } + public EhCacheDiskCachingTier build() { return new EhCacheDiskCachingTier<>(this); } diff --git a/server/src/main/java/org/opensearch/common/cache/tier/OnHeapTierRequestStats.java b/server/src/main/java/org/opensearch/common/cache/tier/OnHeapTierRequestStats.java new file mode 100644 index 0000000000000..3527dae885fde --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/tier/OnHeapTierRequestStats.java @@ -0,0 +1,19 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.tier; + +/** + * A class created by on-heap tier implementations containing on-heap-specific stats for a single request. + */ +public class OnHeapTierRequestStats implements TierRequestStats { + @Override + public TierType getTierType() { + return TierType.ON_HEAP; + } +} diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 123fa69241157..a1c44ab5d5699 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -114,7 +114,7 @@ public final class IndicesRequestCache implements TieredCacheEventListener tieredCacheService; - + IndicesRequestCache(Settings settings, IndicesService indicesService) { this.size = INDICES_CACHE_QUERY_SIZE.get(settings); @@ -138,6 +138,9 @@ public final class IndicesRequestCache implements TieredCacheEventListener ehcacheDiskTier = new EhCacheDiskCachingTier.Builder() .setKeyType(Key.class) .setValueType(BytesReference.class) @@ -149,6 +152,7 @@ public final class IndicesRequestCache implements TieredCacheEventListener Date: Wed, 22 Nov 2023 14:48:56 -0800 Subject: [PATCH 08/13] removed more dead code Signed-off-by: Peter Alfonsi --- .../common/cache/tier/EhCacheDiskCachingTier.java | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java b/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java index ef2036edccf9e..368447f29f7bf 100644 --- a/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java +++ b/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java @@ -118,15 +118,13 @@ private EhCacheDiskCachingTier(Builder builder) { this.DISK_WRITE_CONCURRENCY = Setting.intSetting(builder.settingPrefix + ".tiered.disk.ehcache.concurrency", 2, 1, 3); // Default value is 16 within Ehcache. this.DISK_SEGMENTS = Setting.intSetting(builder.settingPrefix + ".ehcache.disk.segments", 16, 1, 32); - if (cacheManager == null) { - cacheManager = buildCacheManager(); - this.cache = buildCache(Duration.ofMillis(expireAfterAccess.getMillis()), builder); - } - // IndicesRequestCache gets 1%, of which we allocate 5% to the keystore = 0.05% - // TODO: how do we change this automatically based on INDICES_CACHE_QUERY_SIZE setting? - //Setting keystoreSizeSetting = Setting.memorySizeSetting(builder.settingPrefix + ".tiered.disk.keystore_size", "0.05%"); - //this.keystore = new RBMIntKeyLookupStore(keystoreSizeSetting.get(this.settings).getBytes()); + // In test cases, there might be leftover cache managers and caches hanging around, from nodes created in the test case setup + // Destroy them before recreating them + close(); + cacheManager = buildCacheManager(); + this.cache = buildCache(Duration.ofMillis(expireAfterAccess.getMillis()), builder); + long keystoreMaxWeight = builder.keystoreMaxWeightInBytes; this.keystore = new RBMIntKeyLookupStore(keystoreMaxWeight); } From f37daf3d30a3674f0c5320a39509b6ca34bba7f6 Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Fri, 1 Dec 2023 09:26:59 -0800 Subject: [PATCH 09/13] Added notification for REMOVED case in ehcache event listener --- .../common/cache/tier/EhCacheDiskCachingTier.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java b/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java index 368447f29f7bf..0cfc26fa0d0dc 100644 --- a/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java +++ b/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java @@ -301,6 +301,16 @@ public void onEvent(CacheEvent event) { assert event.getNewValue() == null; break; case REMOVED: + this.removalListener.ifPresent( + listener -> listener.onRemoval( + new RemovalNotification<>( + event.getKey(), + valueSerializer.deserialize(event.getOldValue()), + RemovalReason.INVALIDATED, + TierType.DISK + ) + ) + ); count.dec(); assert event.getNewValue() == null; break; From ff1e90ce6dca0faf2a7758bbbb8aa4bef34e0598 Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Thu, 21 Dec 2023 12:04:23 -0800 Subject: [PATCH 10/13] Fixed bug with how disk tier memory was tracked Signed-off-by: Peter Alfonsi --- .../org/opensearch/indices/IndicesRequestCacheIT.java | 2 +- .../index/cache/request/ShardRequestCache.java | 11 +++++++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java index 163f9afc0c103..5dcd5a9f44c7a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java @@ -690,5 +690,5 @@ private static void assertCacheState(Client client, String index, long expectedH ); } - + } diff --git a/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java b/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java index efad437804bef..3f80e92afcea0 100644 --- a/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java +++ b/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java @@ -72,8 +72,15 @@ public void onMiss(TierType tierType) { statsHolder.get(tierType).missCount.inc(); } - public void onCached(Accountable key, BytesReference value, TierType tierType) { - statsHolder.get(tierType).totalMetric.inc(key.ramBytesUsed() + value.ramBytesUsed()); + public void onCached(Accountable key, BytesReference value, CacheStoreType cacheStoreType) { + long valueByteSize; + if (cacheStoreType == CacheStoreType.DISK) { + valueByteSize = value.length(); // Ehcache trims trailing zeros from incoming byte[] + } else { + valueByteSize = value.ramBytesUsed(); + } + defaultStatsHolder.get(cacheStoreType).totalMetric.inc(key.ramBytesUsed() + valueByteSize); + defaultStatsHolder.get(cacheStoreType).entries.inc(); } public void onRemoval(Accountable key, BytesReference value, boolean evicted, TierType tierType) { From 5f4ca9d065aef1db95f9cfba1a9ffcd04c6b532e Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Tue, 2 Jan 2024 10:07:57 -0800 Subject: [PATCH 11/13] Cleanup commit part 1 --- .../cache/tier/EhCacheDiskCachingTier.java | 18 +++------- .../indices/IndicesRequestCache.java | 35 +------------------ 2 files changed, 5 insertions(+), 48 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java b/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java index 0cfc26fa0d0dc..d45eaf1324b0c 100644 --- a/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java +++ b/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java @@ -99,7 +99,7 @@ public class EhCacheDiskCachingTier implements DiskCachingTier { private EhCacheDiskCachingTier(Builder builder) { this.keyType = Objects.requireNonNull(builder.keyType, "Key type shouldn't be null"); this.valueType = Objects.requireNonNull(builder.valueType, "Value type shouldn't be null"); - this.expireAfterAccess = Objects.requireNonNull(builder.expireAfterAcess, "ExpireAfterAccess value shouldn't " + "be null"); + this.expireAfterAccess = Objects.requireNonNull(builder.expireAfterAccess, "ExpireAfterAccess value shouldn't " + "be null"); this.keySerializer = Objects.requireNonNull(builder.keySerializer, "Key serializer shouldn't be null"); this.valueSerializer = Objects.requireNonNull(builder.valueSerializer, "Value serializer shouldn't be null"); this.ehCacheEventListener = new EhCacheEventListener(this.valueSerializer); @@ -124,9 +124,6 @@ private EhCacheDiskCachingTier(Builder builder) { close(); cacheManager = buildCacheManager(); this.cache = buildCache(Duration.ofMillis(expireAfterAccess.getMillis()), builder); - - long keystoreMaxWeight = builder.keystoreMaxWeightInBytes; - this.keystore = new RBMIntKeyLookupStore(keystoreMaxWeight); } private PersistentCacheManager buildCacheManager() { @@ -196,7 +193,6 @@ private CacheEventListenerConfigurationBuilder getListenerConfiguration(Builder< @Override public V get(K key) { - // Optimize it by adding key store. return valueSerializer.deserialize(cache.get(key)); } @@ -403,7 +399,7 @@ public boolean equals(K object, ByteBuffer binary) throws ClassNotFoundException */ public static class Builder { private long maxWeightInBytes; - private TimeValue expireAfterAcess; + private TimeValue expireAfterAccess; private Class keyType; @@ -423,7 +419,6 @@ public static class Builder { private boolean isEventListenerModeSync; private Serializer keySerializer; private Serializer valueSerializer; - private long keystoreMaxWeightInBytes = 0; public Builder() {} @@ -435,8 +430,8 @@ public EhCacheDiskCachingTier.Builder setMaximumWeightInBytes(long sizeInB return this; } - public EhCacheDiskCachingTier.Builder setExpireAfterAccess(TimeValue expireAfterAcess) { - this.expireAfterAcess = expireAfterAcess; + public EhCacheDiskCachingTier.Builder setExpireAfterAccess(TimeValue expireAfterAccess) { + this.expireAfterAccess = expireAfterAccess; return this; } @@ -491,11 +486,6 @@ public EhCacheDiskCachingTier.Builder setValueSerializer(Serializer setKeyStoreMaxWeightInBytes(long weight) { - this.keystoreMaxWeightInBytes = weight; - return this; - } - public EhCacheDiskCachingTier build() { return new EhCacheDiskCachingTier<>(this); } diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 15567358a449f..d8f62d55491f1 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -55,7 +55,6 @@ import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.io.stream.StreamInput; -import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.common.unit.ByteSizeValue; @@ -127,44 +126,12 @@ public final class IndicesRequestCache implements TieredCacheEventListener k.ramBytesUsed() + v.ramBytesUsed() ).setMaximumWeight(sizeInBytes).setExpireAfterAccess(expire).build(); - Function transformationFunction = (data) -> { - try { - return convertBytesReferenceToQSR(data); - } catch (IOException e) { - throw new RuntimeException(e); - } - }; - // enabling this for testing purposes. Remove/tweak!! - long CACHE_SIZE_IN_BYTES = 1000000L; - String SETTING_PREFIX = "indices.request.cache"; - String STORAGE_PATH = indicesService.getNodePaths()[0].indicesPath.toString() + "/request_cache"; - - double diskTierKeystoreWeightFraction = 0.05; // Allocate 5% of the on-heap weight to the disk tier's keystore - long keystoreMaxWeight = (long) (diskTierKeystoreWeightFraction * INDICES_CACHE_QUERY_SIZE.get(settings).getBytes()); - - EhCacheDiskCachingTier ehcacheDiskTier = new EhCacheDiskCachingTier.Builder() - .setKeyType(Key.class) - .setValueType(BytesReference.class) - .setExpireAfterAccess(TimeValue.MAX_VALUE) - .setSettings(settings) - .setThreadPoolAlias("ehcacheTest") - .setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES) - .setStoragePath(STORAGE_PATH) - .setSettingPrefix(SETTING_PREFIX) - .setKeySerializer(new IRCKeyWriteableSerializer(this)) - .setValueSerializer(new BytesReferenceSerializer()) - .setKeyStoreMaxWeightInBytes(keystoreMaxWeight) - .build(); - // Initialize tiered cache service. TODO: Enable Disk tier when tiered support is turned on. - tieredCacheService = new TieredCacheSpilloverStrategyService.Builder() + tieredCacheService = new TieredCacheSpilloverStrategyService.Builder() .setOnHeapCachingTier(openSearchOnHeapCache) - .setOnDiskCachingTier(ehcacheDiskTier) .setTieredCacheEventListener(this) - .withPreDiskCachingPolicyFunction(transformationFunction) .build(); - this.indicesService = indicesService; } @Override From ad25390a4e1208199e0a08a90ad65471e37d3a94 Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Tue, 2 Jan 2024 10:54:33 -0800 Subject: [PATCH 12/13] cleanup commit Signed-off-by: Peter Alfonsi --- .../IndicesRequestCacheDiskTierIT.java | 103 ------------------ .../cache/tier/EhCacheDiskCachingTier.java | 6 +- .../cache/tier/OnHeapTierRequestStats.java | 19 ---- .../indices/IndicesRequestCacheTests.java | 3 +- 4 files changed, 4 insertions(+), 127 deletions(-) delete mode 100644 server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheDiskTierIT.java delete mode 100644 server/src/main/java/org/opensearch/common/cache/tier/OnHeapTierRequestStats.java diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheDiskTierIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheDiskTierIT.java deleted file mode 100644 index b1db9b8e624aa..0000000000000 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheDiskTierIT.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - -package org.opensearch.indices; - -import org.opensearch.action.search.SearchResponse; -import org.opensearch.client.Client; -import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.common.cache.tier.TierType; -import org.opensearch.common.settings.Settings; -import org.opensearch.core.common.unit.ByteSizeValue; -import org.opensearch.index.cache.request.RequestCacheStats; -import org.opensearch.index.query.QueryBuilders; -import org.opensearch.test.OpenSearchIntegTestCase; - -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse; - -// This is a separate file from IndicesRequestCacheIT because we only want to run our test -// on a node with a maximum request cache size that we set. - -@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) -public class IndicesRequestCacheDiskTierIT extends OpenSearchIntegTestCase { - public void testDiskTierStats() throws Exception { - int heapSizeBytes = 4729; - String node = internalCluster().startNode( - Settings.builder().put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), new ByteSizeValue(heapSizeBytes)) - ); - Client client = client(node); - - Settings.Builder indicesSettingBuilder = Settings.builder() - .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0); - - assertAcked( - client.admin().indices().prepareCreate("index").setMapping("k", "type=keyword").setSettings(indicesSettingBuilder).get() - ); - indexRandom(true, client.prepareIndex("index").setSource("k", "hello")); - ensureSearchable("index"); - SearchResponse resp; - - resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + 0)).get(); - int requestSize = (int) getCacheSizeBytes(client, "index", TierType.ON_HEAP); - System.out.println(requestSize); - assertTrue(heapSizeBytes > requestSize); - // If this fails, increase heapSizeBytes! We can't adjust it after getting the size of one query - // as the cache size setting is not dynamic - - int numOnDisk = 5; - int numRequests = heapSizeBytes / requestSize + numOnDisk; - for (int i = 1; i < numRequests; i++) { - resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + i)).get(); - assertSearchResponse(resp); - IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.ON_HEAP, false); - IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.DISK, false); - } - // the first request, for "hello0", should have been evicted to the disk tier - resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello0")).get(); - IndicesRequestCacheIT.assertCacheState(client, "index", 0, numRequests + 1, TierType.ON_HEAP, false); - IndicesRequestCacheIT.assertCacheState(client, "index", 1, numRequests, TierType.DISK, false); - } - - private long getCacheSizeBytes(Client client, String index, TierType tierType) { - RequestCacheStats requestCacheStats = client.admin() - .indices() - .prepareStats(index) - .setRequestCache(true) - .get() - .getTotal() - .getRequestCache(); - return requestCacheStats.getMemorySizeInBytes(tierType); - } -} diff --git a/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java b/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java index d45eaf1324b0c..fad0c5b1f8552 100644 --- a/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java +++ b/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java @@ -246,13 +246,13 @@ public TierType getTierType() { @Override public void close() { - cacheManager.removeCache(DISK_CACHE_ALIAS); - cacheManager.close(); try { cacheManager.destroyCache(DISK_CACHE_ALIAS); + cacheManager.close(); + cacheManager = null; } catch (CachePersistenceException e) { throw new OpenSearchException("Exception occurred while destroying ehcache and associated data", e); - } + } catch (NullPointerException ignored) {} // Another test node has already destroyed the cache manager } /** diff --git a/server/src/main/java/org/opensearch/common/cache/tier/OnHeapTierRequestStats.java b/server/src/main/java/org/opensearch/common/cache/tier/OnHeapTierRequestStats.java deleted file mode 100644 index 3527dae885fde..0000000000000 --- a/server/src/main/java/org/opensearch/common/cache/tier/OnHeapTierRequestStats.java +++ /dev/null @@ -1,19 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.common.cache.tier; - -/** - * A class created by on-heap tier implementations containing on-heap-specific stats for a single request. - */ -public class OnHeapTierRequestStats implements TierRequestStats { - @Override - public TierType getTierType() { - return TierType.ON_HEAP; - } -} diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index 416f66244808b..5fbffe6906d56 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -250,8 +250,7 @@ public void testEviction() throws Exception { } IndicesRequestCache cache = new IndicesRequestCache( Settings.builder().put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), size.getBytes() + 1 + "b").build(), - getInstanceFromNode(IndicesService.class), - dummyClusterSettings + getInstanceFromNode(IndicesService.class) ); AtomicBoolean indexShard = new AtomicBoolean(true); ShardRequestCache requestCacheStats = new ShardRequestCache(); From 00877f7276508eed60c17f37fe3c79dee9d57c3f Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Thu, 4 Jan 2024 09:45:38 -0800 Subject: [PATCH 13/13] re-removed bytesreference serializer Signed-off-by: Peter Alfonsi --- .../cache/tier/BytesReferenceSerializer.java | 35 ----------- .../tier/BytesReferenceSerializerTests.java | 61 ------------------- .../tier/EhCacheDiskCachingTierTests.java | 35 ----------- 3 files changed, 131 deletions(-) delete mode 100644 server/src/main/java/org/opensearch/common/cache/tier/BytesReferenceSerializer.java delete mode 100644 server/src/test/java/org/opensearch/common/cache/tier/BytesReferenceSerializerTests.java diff --git a/server/src/main/java/org/opensearch/common/cache/tier/BytesReferenceSerializer.java b/server/src/main/java/org/opensearch/common/cache/tier/BytesReferenceSerializer.java deleted file mode 100644 index 55ffe22c2a339..0000000000000 --- a/server/src/main/java/org/opensearch/common/cache/tier/BytesReferenceSerializer.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.common.cache.tier; - -import org.opensearch.core.common.bytes.BytesArray; -import org.opensearch.core.common.bytes.BytesReference; - -import java.io.IOException; -import java.util.Arrays; - -public class BytesReferenceSerializer implements Serializer { - // This class does not get passed to ehcache itself, so it's not required that classes match after deserialization. - - public BytesReferenceSerializer() {} - @Override - public byte[] serialize(BytesReference object) { - return BytesReference.toBytes(object); - } - - @Override - public BytesReference deserialize(byte[] bytes) { - return new BytesArray(bytes); - } - - @Override - public boolean equals(BytesReference object, byte[] bytes) { - return Arrays.equals(serialize(object), bytes); - } -} diff --git a/server/src/test/java/org/opensearch/common/cache/tier/BytesReferenceSerializerTests.java b/server/src/test/java/org/opensearch/common/cache/tier/BytesReferenceSerializerTests.java deleted file mode 100644 index 2fc9c7cbb2756..0000000000000 --- a/server/src/test/java/org/opensearch/common/cache/tier/BytesReferenceSerializerTests.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.common.cache.tier; - -import org.opensearch.common.Randomness; -import org.opensearch.common.bytes.ReleasableBytesReference; -import org.opensearch.common.util.BigArrays; -import org.opensearch.common.util.PageCacheRecycler; -import org.opensearch.core.common.bytes.BytesArray; -import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.core.common.bytes.CompositeBytesReference; -import org.opensearch.core.common.util.ByteArray; -import org.opensearch.test.OpenSearchTestCase; - -import java.util.Random; - -public class BytesReferenceSerializerTests extends OpenSearchTestCase { - public void testEquality() throws Exception { - BytesReferenceSerializer ser = new BytesReferenceSerializer(); - // Test that values are equal before and after serialization, for each implementation of BytesReference. - byte[] bytesValue = new byte[1000]; - Random rand = Randomness.get(); - rand.nextBytes(bytesValue); - - BytesReference ba = new BytesArray(bytesValue); - byte[] serialized = ser.serialize(ba); - assertTrue(ser.equals(ba, serialized)); - BytesReference deserialized = ser.deserialize(serialized); - assertEquals(ba, deserialized); - - BytesReference cbr = CompositeBytesReference.of(new BytesArray(bytesValue), new BytesArray(bytesValue)); - serialized = ser.serialize(cbr); - assertTrue(ser.equals(cbr, serialized)); - deserialized = ser.deserialize(serialized); - assertEquals(cbr, deserialized); - - // We need the PagedBytesReference to be larger than the page size (16 KB) in order to actually create it - byte[] pbrValue = new byte[PageCacheRecycler.PAGE_SIZE_IN_BYTES * 2]; - rand.nextBytes(pbrValue); - ByteArray arr = BigArrays.NON_RECYCLING_INSTANCE.newByteArray(pbrValue.length); - arr.set(0L, pbrValue, 0, pbrValue.length); - assert !arr.hasArray(); - BytesReference pbr = BytesReference.fromByteArray(arr, pbrValue.length); - serialized = ser.serialize(pbr); - assertTrue(ser.equals(pbr, serialized)); - deserialized = ser.deserialize(serialized); - assertEquals(pbr, deserialized); - - BytesReference rbr = new ReleasableBytesReference(new BytesArray(bytesValue), ReleasableBytesReference.NO_OP); - serialized = ser.serialize(rbr); - assertTrue(ser.equals(rbr, serialized)); - deserialized = ser.deserialize(serialized); - assertEquals(rbr, deserialized); - } -} diff --git a/server/src/test/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTierTests.java b/server/src/test/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTierTests.java index e6222a9065f94..0f7bf51b65546 100644 --- a/server/src/test/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTierTests.java +++ b/server/src/test/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTierTests.java @@ -65,41 +65,6 @@ public void testBasicGetAndPut() throws IOException { } } - public void testBasicGetAndPutBytesReference() throws Exception { - Settings settings = Settings.builder().build(); - try (NodeEnvironment env = newNodeEnvironment(settings)) { - EhCacheDiskCachingTier ehCacheDiskCachingTier = new EhCacheDiskCachingTier.Builder() - .setKeyType(String.class) - .setValueType(BytesReference.class) - .setExpireAfterAccess(TimeValue.MAX_VALUE) - .setSettings(settings) - .setThreadPoolAlias("ehcacheTest") - .setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES * 2) // bigger so no evictions happen - .setStoragePath(env.nodePaths()[0].indicesPath.toString() + "/request_cache") - .setSettingPrefix(SETTING_PREFIX) - .setKeySerializer(new StringSerializer()) - .setValueSerializer(new BytesReferenceSerializer()) - .build(); - int randomKeys = randomIntBetween(10, 100); - int valueLength = 1000; - Random rand = Randomness.get(); - Map keyValueMap = new HashMap<>(); - for (int i = 0; i < randomKeys; i++) { - byte[] valueBytes = new byte[valueLength]; - rand.nextBytes(valueBytes); - keyValueMap.put(UUID.randomUUID().toString(), new BytesArray(valueBytes)); - } - for (Map.Entry entry : keyValueMap.entrySet()) { - ehCacheDiskCachingTier.put(entry.getKey(), entry.getValue()); - } - for (Map.Entry entry : keyValueMap.entrySet()) { - BytesReference value = ehCacheDiskCachingTier.get(entry.getKey()); - assertEquals(entry.getValue(), value); - } - ehCacheDiskCachingTier.close(); - } - } - public void testConcurrentPut() throws Exception { Settings settings = Settings.builder().build(); try (NodeEnvironment env = newNodeEnvironment(settings)) {