package org.apache.flink.table.planner.runtime.stream.sql;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo;
import org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.api.bridge.scala.package$;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions;
import org.apache.flink.table.planner.plan.utils.WindowEmitStrategy$;
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase;
import org.apache.flink.table.planner.runtime.utils.TestData$;
import org.apache.flink.table.planner.runtime.utils.TestingAppendSink;
import org.apache.flink.table.planner.runtime.utils.TestingRetractSink;
import org.apache.flink.table.planner.runtime.utils.TestingRetractTableSink;
import org.apache.flink.table.planner.runtime.utils.TestingUpsertTableSink;
import org.apache.flink.table.planner.runtime.utils.TimeTestUtil;
import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import scala.Array$;
import scala.Predef$;
import scala.Symbol;
import scala.Tuple3;
import scala.Tuple4;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.$colon;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayOps;
import scala.math.Ordering$String$;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.SymbolLiteral;

/* compiled from: GroupWindowITCase.scala */
@RunWith(Parameterized.class)
@ScalaSignature(bytes = "\u0006\u0001\u0005Ug\u0001B\u0001\u0003\u0001M\u0011\u0011c\u0012:pkB<\u0016N\u001c3po&#6)Y:f\u0015\t\u0019A!A\u0002tc2T!!\u0002\u0004\u0002\rM$(/Z1n\u0015\t9\u0001\"A\u0004sk:$\u0018.\\3\u000b\u0005%Q\u0011a\u00029mC:tWM\u001d\u0006\u0003\u00171\tQ\u0001^1cY\u0016T!!\u0004\b\u0002\u000b\u0019d\u0017N\\6\u000b\u0005=\u0001\u0012AB1qC\u000eDWMC\u0001\u0012\u0003\ry'oZ\u0002\u0001'\t\u0001A\u0003\u0005\u0002\u001615\taC\u0003\u0002\u0018\r\u0005)Q\u000f^5mg&\u0011\u0011D\u0006\u0002\u001b'R\u0014X-Y7j]\u001e<\u0016\u000e\u001e5Ti\u0006$X\rV3ti\n\u000b7/\u001a\u0005\t7\u0001\u0011\t\u0011)A\u00059\u0005!Qn\u001c3f!\ti\u0012G\u0004\u0002\u001f_9\u0011qD\f\b\u0003A5r!!\t\u0017\u000f\u0005\tZcBA\u0012+\u001d\t!\u0013F\u0004\u0002&Q5\taE\u0003\u0002(%\u00051AH]8pizJ\u0011!E\u0005\u0003\u001fAI!!\u0004\b\n\u0005-a\u0011BA\u0005\u000b\u0013\t9\u0001\"\u0003\u0002\u0018\r%\u0011\u0001GF\u0001\u001b'R\u0014X-Y7j]\u001e<\u0016\u000e\u001e5Ti\u0006$X\rV3ti\n\u000b7/Z\u0005\u0003eM\u0012\u0001c\u0015;bi\u0016\u0014\u0015mY6f]\u0012lu\u000eZ3\u000b\u0005A2\u0002\u0002C\u001b\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u001c\u0002\u001fU\u001cX\rV5nKN$\u0018-\u001c9Mij\u0004\"a\u000e\u001e\u000e\u0003aR\u0011!O\u0001\u0006g\u000e\fG.Y\u0005\u0003wa\u0012qAQ8pY\u0016\fg\u000eC\u0003>\u0001\u0011\u0005a(\u0001\u0004=S:LGO\u0010\u000b\u0004\u007f\u0005\u0013\u0005C\u0001!\u0001\u001b\u0005\u0011\u0001\"B\u000e=\u0001\u0004a\u0002\"B\u001b=\u0001\u00041\u0004b\u0002#\u0001\u0005\u0004%\t!R\u0001\u000e'\"\u000bej\u0012%B\u0013~SvJT#\u0016\u0003\u0019\u0003\"a\u0012'\u000e\u0003!S!!\u0013&\u0002\tQLW.\u001a\u0006\u0002\u0017\u0006!!.\u0019<b\u0013\ti\u0005J\u0001\u0004[_:,\u0017\n\u001a\u0005\u0007\u001f\u0002\u0001\u000b\u0011\u0002$\u0002\u001dMC\u0015IT$I\u0003&{&l\u0014(FA!9\u0011\u000b\u0001b\u0001\n\u0003\u0011\u0016\u0001G;qg\u0016\u0014HoU8ve\u000e,7)\u001e:sK:\u001c\u0017\u0010R1uCV\t1\u000bE\u0002U3nk\u0011!\u0016\u0006\u0003-^\u000b\u0011\"[7nkR\f'\r\\3\u000b\u0005aC\u0014AC2pY2,7\r^5p]&\u0011!,\u0016\u0002\u0005\u0019&\u001cH\u000f\u0005\u0002]?6\tQL\u0003\u0002_\u0019\u0005)A/\u001f9fg&\u0011\u0001-\u0018\u0002\u0004%><\bB\u00022\u0001A\u0003%1+A\rvaN,'\u000f^*pkJ\u001cWmQ;se\u0016t7-\u001f#bi\u0006\u0004\u0003\"\u00023\u0001\t\u0003*\u0017A\u00022fM>\u0014X\rF\u0001g!\t9t-\u0003\u0002iq\t!QK\\5u\u0011\u0015Q\u0007\u0001\"\u0001f\u0003i!Xm\u001d;Fm\u0016tG\u000fV5nKNc\u0017\u000eZ5oO^Kg\u000eZ8xQ\tIG\u000e\u0005\u0002na6\taN\u0003\u0002p!\u0005)!.\u001e8ji&\u0011\u0011O\u001c\u0002\u0005)\u0016\u001cH\u000fC\u0003t\u0001\u0011\u0005Q-A\ruKN$8)Y:dC\u0012Lgn\u001a+v[\ndWmV5oI><\bF\u0001:m\u0011\u00151\b\u0001\"\u0001f\u0003q!Xm\u001d;NS:l\u0015\r_,ji\"$V/\u001c2mS:<w+\u001b8e_^D#!\u001e7\t\u000be\u0004A\u0011A3\u0002EQ,7\u000f^,j]\u0012|w/Q4he\u0016<\u0017\r^3P]\u000e{gn\u001d;b]R4\u0016\r\\;fQ\tAH\u000eC\u0003}\u0001\u0011\u0005Q0\u0001\u000fuKN$\bK]8di&lWmQ1tG\u0006$WmV5oI><\u0018iZ4\u0016\u0003\u0019D#a\u001f7\t\r\u0005\u0005\u0001\u0001\"\u0001f\u0003i!Xm\u001d;Fm\u0016tG\u000fV5nKN+7o]5p]^Kg\u000eZ8xQ\tyH\u000e\u0003\u0004\u0002\b\u0001!\t!Z\u0001-i\u0016\u001cH/\u0012<f]R$\u0016.\\3Uk6\u0014G.\u001b8h/&tGm\\<XSRD\u0017\t\u001c7po2\u000bG/\u001a8fgND3!!\u0002m\u0011\u0019\ti\u0001\u0001C\u0001K\u0006\tC/Z:u/&tGm\\<BO\u001e\u0014XmZ1uK>sW\u000b]:feR\u001cv.\u001e:dK\"\u001a\u00111\u00027\t\r\u0005M\u0001\u0001\"\u0001f\u0003I\"Xm\u001d;XS:$wn^!hOJ,w-\u0019;f\u001f:,\u0006o]3siN{WO]2f/&$\b.\u00117m_^d\u0015\r^3oKN\u001c\bfAA\tY\"1\u0011\u0011\u0004\u0001\u0005\u0002\u0015\f!\u0007^3ti^Kg\u000eZ8x\u0003\u001e<'/Z4bi\u0016|e.\u00169tKJ$8k\\;sG\u0016\u0004Vo\u001d5e_^tw+\u0019;fe6\f'o\u001b\u0015\u0004\u0003/a\u0007BBA\u0010\u0001\u0011\u0005Q-\u0001\u0012uKN$x+\u001b8e_^\fum\u001a:fO\u0006$Xm\u00148SKR\u0014\u0018m\u0019;TiJ,\u0017-\u001c\u0015\u0004\u0003;a\u0007BBA\u0013\u0001\u0011\u0005Q-A\u001buKN$H)[:uS:\u001cG/Q4h/&$\b.T3sO\u0016|e.\u0012<f]R$\u0016.\\3TKN\u001c\u0018n\u001c8He>,\boV5oI><\bfAA\u0012Y\"9\u00111\u0006\u0001\u0005\n\u00055\u0012!E<ji\"d\u0015\r^3GSJ,G)\u001a7bsR)a-a\f\u0002@!A\u0011\u0011GA\u0015\u0001\u0004\t\u0019$A\u0006uC\ndWmQ8oM&<\u0007\u0003BA\u001b\u0003wi!!a\u000e\u000b\u0007\u0005e\"\"A\u0002ba&LA!!\u0010\u00028\tYA+\u00192mK\u000e{gNZ5h\u0011!\t\t%!\u000bA\u0002\u0005\r\u0013\u0001C5oi\u0016\u0014h/\u00197\u0011\t\u0005\u0015\u0013qJ\u0007\u0003\u0003\u000fR1!SA%\u0015\u0011\tY%!\u0014\u0002\r\r|W.\\8o\u0015\r\tI\u0004D\u0005\u0005\u0003#\n9E\u0001\u0003US6,\u0007bBA+\u0001\u0011%\u0011qK\u0001\u000eY>\u001c\u0017\r\u001c#bi\u0016$\u0016.\\3\u0015\t\u0005e\u0013q\f\t\u0004\u000f\u0006m\u0013bAA/\u0011\niAj\\2bY\u0012\u000bG/\u001a+j[\u0016D\u0001\"!\u0019\u0002T\u0001\u0007\u00111M\u0001\fKB|7\r[*fG>tG\rE\u00028\u0003KJ1!a\u001a9\u0005\u0011auN\\4)\u000f\u0001\tY'a\u001e\u0002zA!\u0011QNA:\u001b\t\tyGC\u0002\u0002r9\faA];o]\u0016\u0014\u0018\u0002BA;\u0003_\u0012qAU;o/&$\b.A\u0003wC2,Xm\t\u0002\u0002|A!\u0011QPAB\u001b\t\tyHC\u0002\u0002\u0002:\fqA];o]\u0016\u00148/\u0003\u0003\u0002\u0006\u0006}$!\u0004)be\u0006lW\r^3sSj,GmB\u0004\u0002\n\nA\t!a#\u0002#\u001d\u0013x.\u001e9XS:$wn^%U\u0007\u0006\u001cX\rE\u0002A\u0003\u001b3a!\u0001\u0002\t\u0002\u0005=5\u0003BAG\u0003#\u00032aNAJ\u0013\r\t)\n\u000f\u0002\u0007\u0003:L(+\u001a4\t\u000fu\ni\t\"\u0001\u0002\u001aR\u0011\u00111\u0012\u0005\t\u0003;\u000bi\t\"\u0001\u0002 \u0006Q\u0001/\u0019:b[\u0016$XM]:\u0015\u0005\u0005\u0005\u0006CBAR\u0003S\u000bi+\u0004\u0002\u0002&*\u0019\u0011q\u0015&\u0002\tU$\u0018\u000e\\\u0005\u0005\u0003W\u000b)K\u0001\u0006D_2dWm\u0019;j_:\u0004RaNAX\u0003gK1!!-9\u0005\u0015\t%O]1z!\u0011\t),a/\u000e\u0005\u0005]&bAA]\u0015\u0006!A.\u00198h\u0013\u0011\ti,a.\u0003\r=\u0013'.Z2uQ!\tY*!1\u0002P\u0006E\u0007\u0003BAb\u0003\u0013tA!! \u0002F&!\u0011qYA@\u00035\u0001\u0016M]1nKR,'/\u001b>fI&!\u00111ZAg\u0005)\u0001\u0016M]1nKR,'o\u001d\u0006\u0005\u0003\u000f\fy(\u0001\u0003oC6,\u0017EAAj\u0003\u001d\u001aF/\u0019;f\u0005\u0006\u001c7.\u001a8e{m\u0004T\u0010\f\u0011Vg\u0016$\u0016.\\3ti\u0006l\u0007\u000f\u0014;{Au\u000230M?")
/* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/sql/GroupWindowITCase.class */
public class GroupWindowITCase extends StreamingWithStateTestBase {
    private final boolean useTimestampLtz;
    private final ZoneId SHANGHAI_ZONE;
    private final List<Row> upsertSourceCurrencyData;

    @Parameterized.Parameters(name = "StateBackend={0}, UseTimestampLtz = {1}")
    public static Collection<Object[]> parameters() {
        return GroupWindowITCase$.MODULE$.parameters();
    }

    public ZoneId SHANGHAI_ZONE() {
        return this.SHANGHAI_ZONE;
    }

    public List<Row> upsertSourceCurrencyData() {
        return this.upsertSourceCurrencyData;
    }

    @Override // org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase, org.apache.flink.table.planner.runtime.utils.StreamingTestBase
    public void before() {
        super.before();
        String registerData = TestValuesTableFactory.registerData(TestData$.MODULE$.timestampData());
        String registerData2 = TestValuesTableFactory.registerData(TestData$.MODULE$.timestampLtzData());
        tEnv().getConfig().setLocalTimeZone(SHANGHAI_ZONE());
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(505).append("\n         |CREATE TABLE testTable (\n         | `ts` ").append((Object) (this.useTimestampLtz ? "BIGINT" : "STRING")).append(",\n         | `int` INT,\n         | `double` DOUBLE,\n         | `float` FLOAT,\n         | `bigdec` DECIMAL(10, 2),\n         | `string` STRING,\n         | `name` STRING,\n         | `rowtime` AS\n         | ").append((Object) (this.useTimestampLtz ? "TO_TIMESTAMP_LTZ(`ts`, 3)" : "TO_TIMESTAMP(`ts`)")).append(",\n         | proctime as PROCTIME(),\n         | WATERMARK for `rowtime` AS `rowtime` - INTERVAL '0.01' SECOND\n         |) WITH (\n         | 'connector' = 'values',\n         | 'data-id' = '").append((Object) (this.useTimestampLtz ? registerData2 : registerData)).append("',\n         | 'failing-source' = 'true'\n         |)\n         |").toString())).stripMargin());
    }

    @Test
    public void testEventTimeSlidingWindow() {
        tEnv().registerFunction("concat_distinct_agg", new JavaUserDefinedAggFunctions.ConcatDistinctAggFunction(), BasicTypeInfo.getInfoFor(String.class), TypeExtractor.createTypeInfo(JavaUserDefinedAggFunctions.ConcatAcc.class));
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  `string`,\n        |  HOP_START(rowtime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND),\n        |  HOP_ROWTIME(rowtime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND),\n        |  COUNT(1),\n        |  SUM(1),\n        |  COUNT(`int`),\n        |  COUNT(DISTINCT `float`),\n        |  concat_distinct_agg(name)\n        |FROM testTable\n        |GROUP BY `string`, HOP(rowtime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND)\n      ")).stripMargin();
        TestingAppendSink testingAppendSink = new TestingAppendSink();
        package$.MODULE$.tableConversions(tEnv().sqlQuery(stripMargin)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink(testingAppendSink);
        env().execute();
        Assert.assertEquals((this.useTimestampLtz ? (Seq) Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"Hallo,1970-01-01T00:00,1969-12-31T16:00:00.004Z,1,1,1,1,a", "Hello world,1970-01-01T00:00:00.004,1969-12-31T16:00:00.008Z,1,1,1,1,a", "Hello world,1970-01-01T00:00:00.008,1969-12-31T16:00:00.012Z,1,1,1,1,a", "Hello world,1970-01-01T00:00:00.012,1969-12-31T16:00:00.016Z,1,1,1,1,b", "Hello world,1970-01-01T00:00:00.016,1969-12-31T16:00:00.020Z,1,1,1,1,b", "Hello,1970-01-01T00:00,1969-12-31T16:00:00.004Z,2,2,2,2,a", "Hello,1970-01-01T00:00:00.004,1969-12-31T16:00:00.008Z,3,3,3,2,a|b", "Hi,1970-01-01T00:00,1969-12-31T16:00:00.004Z,1,1,1,1,a", "null,1970-01-01T00:00:00.028,1969-12-31T16:00:00.032Z,1,1,1,1,null", "null,1970-01-01T00:00:00.032,1969-12-31T16:00:00.036Z,1,1,1,1,null"})) : Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"Hallo,1970-01-01T00:00,1970-01-01T00:00:00.004,1,1,1,1,a", "Hello world,1970-01-01T00:00:00.004,1970-01-01T00:00:00.008,1,1,1,1,a", "Hello world,1970-01-01T00:00:00.008,1970-01-01T00:00:00.012,1,1,1,1,a", "Hello world,1970-01-01T00:00:00.012,1970-01-01T00:00:00.016,1,1,1,1,b", "Hello world,1970-01-01T00:00:00.016,1970-01-01T00:00:00.020,1,1,1,1,b", "Hello,1970-01-01T00:00,1970-01-01T00:00:00.004,2,2,2,2,a", "Hello,1970-01-01T00:00:00.004,1970-01-01T00:00:00.008,3,3,3,2,a|b", "Hi,1970-01-01T00:00,1970-01-01T00:00:00.004,1,1,1,1,a", "null,1970-01-01T00:00:00.028,1970-01-01T00:00:00.032,1,1,1,1,null", "null,1970-01-01T00:00:00.032,1970-01-01T00:00:00.036,1,1,1,1,null"}))).sorted(Ordering$String$.MODULE$), testingAppendSink.getAppendResults().sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testCascadingTumbleWindow() {
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT SUM(cnt)\n        |FROM (\n        |  SELECT COUNT(1) AS cnt, TUMBLE_ROWTIME(rowtime, INTERVAL '10' SECOND) AS ts\n        |  FROM testTable\n        |  GROUP BY `int`, `string`, TUMBLE(rowtime, INTERVAL '10' SECOND)\n        |)\n        |GROUP BY TUMBLE(ts, INTERVAL '10' SECOND)\n        |")).stripMargin();
        TestingAppendSink testingAppendSink = new TestingAppendSink();
        package$.MODULE$.tableConversions(tEnv().sqlQuery(stripMargin)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink(testingAppendSink);
        env().execute();
        Assert.assertEquals(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"9"})).sorted(Ordering$String$.MODULE$), testingAppendSink.getAppendResults().sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testMinMaxWithTumblingWindow() {
        tEnv().getConfig().set("table.exec.emit.early-fire.enabled", "true");
        tEnv().getConfig().set("table.exec.emit.early-fire.delay", "1000 ms");
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        | MAX(max_ts),\n        | MIN(min_ts),\n        | `string`\n        |FROM(\n        | SELECT\n        | `string`,\n        | `int`,\n        | MAX(rowtime) as max_ts,\n        | MIN(rowtime) as min_ts\n        | FROM testTable\n        | GROUP BY `string`, `int`, TUMBLE(rowtime, INTERVAL '10' SECOND))\n        |GROUP BY `string`\n      ")).stripMargin();
        TestingRetractSink testingRetractSink = new TestingRetractSink();
        package$.MODULE$.tableConversions(tEnv().sqlQuery(stripMargin)).toRetractStream(TypeExtractor.createTypeInfo(Row.class)).addSink(testingRetractSink);
        env().execute();
        Assert.assertEquals((this.useTimestampLtz ? (Seq) Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"1969-12-31T16:00:00.001Z,1969-12-31T16:00:00.001Z,Hi", "1969-12-31T16:00:00.002Z,1969-12-31T16:00:00.002Z,Hallo", "1969-12-31T16:00:00.007Z,1969-12-31T16:00:00.003Z,Hello", "1969-12-31T16:00:00.016Z,1969-12-31T16:00:00.008Z,Hello world", "1969-12-31T16:00:00.032Z,1969-12-31T16:00:00.032Z,null"})) : Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"1970-01-01T00:00:00.001,1970-01-01T00:00:00.001,Hi", "1970-01-01T00:00:00.002,1970-01-01T00:00:00.002,Hallo", "1970-01-01T00:00:00.007,1970-01-01T00:00:00.003,Hello", "1970-01-01T00:00:00.016,1970-01-01T00:00:00.008,Hello world", "1970-01-01T00:00:00.032,1970-01-01T00:00:00.032,null"}))).sorted(Ordering$String$.MODULE$), testingRetractSink.getRetractResults().sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testWindowAggregateOnConstantValue() {
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT TUMBLE_END(rowtime, INTERVAL '0.003' SECOND), COUNT(name)\n        |FROM testTable\n        | GROUP BY 'a', TUMBLE(rowtime, INTERVAL '0.003' SECOND)\n      ")).stripMargin();
        TestingAppendSink testingAppendSink = new TestingAppendSink();
        package$.MODULE$.tableConversions(tEnv().sqlQuery(stripMargin)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink(testingAppendSink);
        env().execute();
        Assert.assertEquals(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"1970-01-01T00:00:00.003,2", "1970-01-01T00:00:00.006,2", "1970-01-01T00:00:00.009,3", "1970-01-01T00:00:00.018,1", "1970-01-01T00:00:00.033,0"})).sorted(Ordering$String$.MODULE$), testingAppendSink.getAppendResults().sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testProctimeCascadeWindowAgg() {
        Assert.assertEquals(new StringOps(Predef$.MODULE$.augmentString("\n         |(\n         |  `key` BIGINT NOT NULL,\n         |  `window_start` TIMESTAMP(3) NOT NULL,\n         |  `window_start0` TIMESTAMP(3) NOT NULL,\n         |  `window_proctime` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME*,\n         |  `v1` TIMESTAMP(3) NOT NULL,\n         |  `v2` TIMESTAMP(3) NOT NULL\n         |)\n         ")).stripMargin().trim(), tEnv().sqlQuery(new StringOps(Predef$.MODULE$.augmentString("\n        | SELECT\n        |  cnt AS key,\n        |  TUMBLE_START(pt1, INTERVAL '0.01' SECOND) AS window_start,\n        |  TUMBLE_END(pt1, INTERVAL '0.01' SECOND) AS window_start,\n        |  TUMBLE_PROCTIME(pt1, INTERVAL '0.01' SECOND) as window_proctime,\n        |  MAX(s1) AS v1,\n        |  MAX(e1) AS v2\n        | FROM\n        | (SELECT\n        |   TUMBLE_START(proctime, INTERVAL '0.005' SECOND) as s1,\n        |   TUMBLE_END(proctime, INTERVAL '0.005' SECOND) e1,\n        |   TUMBLE_PROCTIME(proctime, INTERVAL '0.005' SECOND) as pt1,\n        |   COUNT(name) as cnt\n        |  FROM testTable\n        |  GROUP BY 'a', TUMBLE(proctime, INTERVAL '0.005' SECOND)\n        |  ) as T\n        | GROUP BY cnt, TUMBLE(pt1, INTERVAL '0.01' SECOND)\n      ")).stripMargin()).getResolvedSchema().toString());
    }

    @Test
    public void testEventTimeSessionWindow() {
        if (this.useTimestampLtz) {
            return;
        }
        final GroupWindowITCase groupWindowITCase = null;
        tEnv().createTemporaryView("T1", package$.MODULE$.dataStreamConversions(failingDataSource(new $colon.colon(new Tuple4(BoxesRunTime.boxToLong(1L), BoxesRunTime.boxToInteger(1), "Hello", "a"), new $colon.colon(new Tuple4(BoxesRunTime.boxToLong(2L), BoxesRunTime.boxToInteger(2), "Hello", "b"), new $colon.colon(new Tuple4(BoxesRunTime.boxToLong(8L), BoxesRunTime.boxToInteger(8), "Hello", "a"), new $colon.colon(new Tuple4(BoxesRunTime.boxToLong(9L), BoxesRunTime.boxToInteger(9), "Hello World", "b"), new $colon.colon(new Tuple4(BoxesRunTime.boxToLong(4L), BoxesRunTime.boxToInteger(4), "Hello", "c"), new $colon.colon(new Tuple4(BoxesRunTime.boxToLong(16L), BoxesRunTime.boxToInteger(16), "Hello", "d"), Nil$.MODULE$)))))), new CaseClassTypeInfo<Tuple4<Object, Object, String, String>>(groupWindowITCase) { // from class: org.apache.flink.table.planner.runtime.stream.sql.GroupWindowITCase$$anon$4
            public /* synthetic */ TypeInformation[] protected$types(GroupWindowITCase$$anon$4 groupWindowITCase$$anon$4) {
                return groupWindowITCase$$anon$4.types;
            }

            public TypeSerializer<Tuple4<Object, Object, String, String>> createSerializer(ExecutionConfig executionConfig) {
                final TypeSerializer[] typeSerializerArr = new TypeSerializer[getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), getArity()).foreach$mVc$sp(i -> {
                    typeSerializerArr[i] = this.protected$types(this)[i].createSerializer(executionConfig);
                });
                new ScalaCaseClassSerializer<Tuple4<Object, Object, String, String>>(this, typeSerializerArr) { // from class: org.apache.flink.table.planner.runtime.stream.sql.GroupWindowITCase$$anon$4$$anon$1
                    /* renamed from: createInstance, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
                    public Tuple4<Object, Object, String, String> m2491createInstance(Object[] objArr) {
                        return new Tuple4<>(BoxesRunTime.boxToLong(BoxesRunTime.unboxToLong(objArr[0])), BoxesRunTime.boxToInteger(BoxesRunTime.unboxToInt(objArr[1])), (String) objArr[2], (String) objArr[3]);
                    }

                    {
                        Class typeClass = this.getTypeClass();
                    }
                };
                return new ScalaCaseClassSerializer(getTypeClass(), typeSerializerArr);
            }

            {
                super(Tuple4.class, (TypeInformation[]) new $colon.colon(BasicTypeInfo.getInfoFor(Long.TYPE), new $colon.colon(BasicTypeInfo.getInfoFor(Integer.TYPE), new $colon.colon(BasicTypeInfo.getInfoFor(String.class), new $colon.colon(BasicTypeInfo.getInfoFor(String.class), Nil$.MODULE$)))).toArray((ClassTag) Predef$.MODULE$.implicitly(ClassTag$.MODULE$.apply(TypeInformation.class))), new $colon.colon(BasicTypeInfo.getInfoFor(Long.TYPE), new $colon.colon(BasicTypeInfo.getInfoFor(Integer.TYPE), new $colon.colon(BasicTypeInfo.getInfoFor(String.class), new $colon.colon(BasicTypeInfo.getInfoFor(String.class), Nil$.MODULE$)))), Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"_1", "_2", "_3", "_4"})));
            }
        }).assignTimestampsAndWatermarks(new TimeTestUtil.TimestampAndWatermarkWithOffset(10L))).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{(Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression((Symbol) SymbolLiteral.bootstrap(MethodHandles.lookup(), "apply", MethodType.methodType(Symbol.class), "rowtime").dynamicInvoker().invoke() /* invoke-custom */).rowtime(), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression((Symbol) SymbolLiteral.bootstrap(MethodHandles.lookup(), "apply", MethodType.methodType(Symbol.class), "int").dynamicInvoker().invoke() /* invoke-custom */), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression((Symbol) SymbolLiteral.bootstrap(MethodHandles.lookup(), "apply", MethodType.methodType(Symbol.class), "string").dynamicInvoker().invoke() /* invoke-custom */), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression((Symbol) SymbolLiteral.bootstrap(MethodHandles.lookup(), "apply", MethodType.methodType(Symbol.class), "name").dynamicInvoker().invoke() /* invoke-custom */)})));
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  `string`,\n        |  SESSION_START(rowtime, INTERVAL '0.005' SECOND),\n        |  SESSION_ROWTIME(rowtime, INTERVAL '0.005' SECOND),\n        |  COUNT(1),\n        |  SUM(1),\n        |  COUNT(`int`),\n        |  SUM(`int`),\n        |  COUNT(DISTINCT name)\n        |FROM T1\n        |GROUP BY `string`, SESSION(rowtime, INTERVAL '0.005' SECOND)\n      ")).stripMargin();
        TestingAppendSink testingAppendSink = new TestingAppendSink();
        package$.MODULE$.tableConversions(tEnv().sqlQuery(stripMargin)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink(testingAppendSink);
        env().execute();
        Assert.assertEquals(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"Hello World,1970-01-01T00:00:00.009,1970-01-01T00:00:00.013,1,1,1,9,1", "Hello,1970-01-01T00:00:00.016,1970-01-01T00:00:00.020,1,1,1,16,1", "Hello,1970-01-01T00:00:00.001,1970-01-01T00:00:00.012,4,4,4,15,3"})).sorted(Ordering$String$.MODULE$), testingAppendSink.getAppendResults().sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testEventTimeTumblingWindowWithAllowLateness() {
        if (this.useTimestampLtz) {
            return;
        }
        tEnv().getConfig().set(WindowEmitStrategy$.MODULE$.TABLE_EXEC_EMIT_ALLOW_LATENESS(), Duration.ofMillis(10L));
        withLateFireDelay(tEnv().getConfig(), Time.of(0L, TimeUnit.NANOSECONDS));
        final GroupWindowITCase groupWindowITCase = null;
        tEnv().createTemporaryView("T1", package$.MODULE$.dataStreamConversions(failingDataSource(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple3[]{new Tuple3(BoxesRunTime.boxToLong(1L), BoxesRunTime.boxToInteger(1), "Hi"), new Tuple3(BoxesRunTime.boxToLong(2L), BoxesRunTime.boxToInteger(2), "Hello"), new Tuple3(BoxesRunTime.boxToLong(4L), BoxesRunTime.boxToInteger(2), "Hello"), new Tuple3(BoxesRunTime.boxToLong(8L), BoxesRunTime.boxToInteger(3), "Hello world"), new Tuple3(BoxesRunTime.boxToLong(4L), BoxesRunTime.boxToInteger(3), "Hello"), new Tuple3(BoxesRunTime.boxToLong(16L), BoxesRunTime.boxToInteger(3), "Hello world"), new Tuple3(BoxesRunTime.boxToLong(9L), BoxesRunTime.boxToInteger(4), "Hello world"), new Tuple3(BoxesRunTime.boxToLong(3L), BoxesRunTime.boxToInteger(1), "Hi")})), new CaseClassTypeInfo<Tuple3<Object, Object, String>>(groupWindowITCase) { // from class: org.apache.flink.table.planner.runtime.stream.sql.GroupWindowITCase$$anon$5
            public /* synthetic */ TypeInformation[] protected$types(GroupWindowITCase$$anon$5 groupWindowITCase$$anon$5) {
                return groupWindowITCase$$anon$5.types;
            }

            public TypeSerializer<Tuple3<Object, Object, String>> createSerializer(ExecutionConfig executionConfig) {
                final TypeSerializer[] typeSerializerArr = new TypeSerializer[getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), getArity()).foreach$mVc$sp(i -> {
                    typeSerializerArr[i] = this.protected$types(this)[i].createSerializer(executionConfig);
                });
                new ScalaCaseClassSerializer<Tuple3<Object, Object, String>>(this, typeSerializerArr) { // from class: org.apache.flink.table.planner.runtime.stream.sql.GroupWindowITCase$$anon$5$$anon$2
                    /* renamed from: createInstance, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
                    public Tuple3<Object, Object, String> m2493createInstance(Object[] objArr) {
                        return new Tuple3<>(BoxesRunTime.boxToLong(BoxesRunTime.unboxToLong(objArr[0])), BoxesRunTime.boxToInteger(BoxesRunTime.unboxToInt(objArr[1])), (String) objArr[2]);
                    }

                    {
                        Class typeClass = this.getTypeClass();
                    }
                };
                return new ScalaCaseClassSerializer(getTypeClass(), typeSerializerArr);
            }

            {
                super(Tuple3.class, (TypeInformation[]) new $colon.colon(BasicTypeInfo.getInfoFor(Long.TYPE), new $colon.colon(BasicTypeInfo.getInfoFor(Integer.TYPE), new $colon.colon(BasicTypeInfo.getInfoFor(String.class), Nil$.MODULE$))).toArray((ClassTag) Predef$.MODULE$.implicitly(ClassTag$.MODULE$.apply(TypeInformation.class))), new $colon.colon(BasicTypeInfo.getInfoFor(Long.TYPE), new $colon.colon(BasicTypeInfo.getInfoFor(Integer.TYPE), new $colon.colon(BasicTypeInfo.getInfoFor(String.class), Nil$.MODULE$))), Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"_1", "_2", "_3"})));
            }
        }).assignTimestampsAndWatermarks(new TimeTestUtil.TimestampAndWatermarkWithOffset(0L))).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression((Symbol) SymbolLiteral.bootstrap(MethodHandles.lookup(), "apply", MethodType.methodType(Symbol.class), "long").dynamicInvoker().invoke() /* invoke-custom */), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression((Symbol) SymbolLiteral.bootstrap(MethodHandles.lookup(), "apply", MethodType.methodType(Symbol.class), "int").dynamicInvoker().invoke() /* invoke-custom */), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression((Symbol) SymbolLiteral.bootstrap(MethodHandles.lookup(), "apply", MethodType.methodType(Symbol.class), "string").dynamicInvoker().invoke() /* invoke-custom */), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression((Symbol) SymbolLiteral.bootstrap(MethodHandles.lookup(), "apply", MethodType.methodType(Symbol.class), "rowtime").dynamicInvoker().invoke() /* invoke-custom */).rowtime()})));
        tEnv().createTemporarySystemFunction("weightAvgFun", JavaUserDefinedAggFunctions.WeightedAvg.class);
        Table sqlQuery = tEnv().sqlQuery(new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  `string`,\n        |  TUMBLE_START(rowtime, INTERVAL '0.005' SECOND) as w_start,\n        |  TUMBLE_END(rowtime, INTERVAL '0.005' SECOND),\n        |  COUNT(DISTINCT `long`),\n        |  COUNT(`int`),\n        |  CAST(AVG(`int`) AS INT),\n        |  weightAvgFun(`long`, `int`),\n        |  MIN(`int`),\n        |  MAX(`int`),\n        |  SUM(`int`)\n        |FROM T1\n        |GROUP BY `string`, TUMBLE(rowtime, INTERVAL '0.005' SECOND)\n      ")).stripMargin());
        TypeInformation<?>[] typeInformationArr = {Types.STRING(), Types.LOCAL_DATE_TIME(), Types.LOCAL_DATE_TIME(), Types.LONG(), Types.LONG(), Types.INT(), Types.LONG(), Types.INT(), Types.INT(), Types.INT()};
        TestingUpsertTableSink configure = new TestingUpsertTableSink(new int[]{0, 1}).configure((String[]) ((TraversableOnce) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(typeInformationArr)).indices().map(obj -> {
            return $anonfun$testEventTimeTumblingWindowWithAllowLateness$1(BoxesRunTime.unboxToInt(obj));
        }, IndexedSeq$.MODULE$.canBuildFrom())).toArray(ClassTag$.MODULE$.apply(String.class)), typeInformationArr);
        tEnv().registerTableSinkInternal("MySink", configure);
        sqlQuery.executeInsert("MySink").await();
        Assert.assertEquals(((TraversableOnce) Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"Hi,1970-01-01T00:00,1970-01-01T00:00:00.005,1,1,1,1,1,1,1", "Hello,1970-01-01T00:00,1970-01-01T00:00:00.005,2,3,2,3,2,3,7", "Hello world,1970-01-01T00:00:00.015,1970-01-01T00:00:00.020,1,1,3,16,3,3,3", "Hello world,1970-01-01T00:00:00.005,1970-01-01T00:00:00.010,2,2,3,8,3,4,7"})).sorted(Ordering$String$.MODULE$)).mkString("\n"), ((TraversableOnce) configure.getUpsertResults().sorted(Ordering$String$.MODULE$)).mkString("\n"));
    }

    @Test
    public void testWindowAggregateOnUpsertSource() {
        env().setParallelism(1);
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(621).append("\n                       |CREATE TABLE upsert_currency (\n                       |  currency STRING,\n                       |  currency_no STRING,\n                       |  rate  BIGINT,\n                       |  currency_time TIMESTAMP(3),\n                       |  WATERMARK FOR currency_time AS currency_time - interval '5' SECOND,\n                       |  PRIMARY KEY(currency) NOT ENFORCED\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'changelog-mode' = 'UA,D',\n                       |  'data-id' = '").append(TestValuesTableFactory.registerData((Seq<Row>) upsertSourceCurrencyData())).append("'\n                       |)\n                       |").toString())).stripMargin());
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |currency,\n        |COUNT(1) AS cnt,\n        |MAX(rate),\n        |TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start,\n        |TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end\n        |FROM upsert_currency\n        |GROUP BY currency, TUMBLE(currency_time, INTERVAL '5' SECOND)\n        |")).stripMargin();
        TestingAppendSink testingAppendSink = new TestingAppendSink();
        package$.MODULE$.tableConversions(tEnv().sqlQuery(stripMargin)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink(testingAppendSink);
        env().execute();
        Assert.assertEquals(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"US Dollar,1,102,1970-01-01T00:00,1970-01-01T00:00:05", "Yen,1,1,1970-01-01T00:00,1970-01-01T00:00:05", "Euro,1,118,1970-01-01T00:00:15,1970-01-01T00:00:20", "RMB,1,702,1970-01-01T00:00,1970-01-01T00:00:05"})).sorted(Ordering$String$.MODULE$), testingAppendSink.getAppendResults().sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testWindowAggregateOnUpsertSourceWithAllowLateness() {
        tEnv().getConfig().set(WindowEmitStrategy$.MODULE$.TABLE_EXEC_EMIT_ALLOW_LATENESS(), Duration.ofSeconds(15L));
        withLateFireDelay(tEnv().getConfig(), Time.of(0L, TimeUnit.NANOSECONDS));
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(621).append("\n                       |CREATE TABLE upsert_currency (\n                       |  currency STRING,\n                       |  currency_no STRING,\n                       |  rate  BIGINT,\n                       |  currency_time TIMESTAMP(3),\n                       |  WATERMARK FOR currency_time AS currency_time - interval '5' SECOND,\n                       |  PRIMARY KEY(currency) NOT ENFORCED\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'changelog-mode' = 'UA,D',\n                       |  'data-id' = '").append(TestValuesTableFactory.registerData((Seq<Row>) upsertSourceCurrencyData())).append("'\n                       |)\n                       |").toString())).stripMargin());
        Table sqlQuery = tEnv().sqlQuery(new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |currency,\n        |COUNT(1) AS cnt,\n        |MAX(rate),\n        |TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start,\n        |TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end\n        |FROM upsert_currency\n        |GROUP BY currency, TUMBLE(currency_time, INTERVAL '5' SECOND)\n        |")).stripMargin());
        TableSchema schema = sqlQuery.getSchema();
        TestingRetractTableSink configure = new TestingRetractTableSink().configure(schema.getFieldNames(), (TypeInformation<?>[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(schema.getFieldDataTypes())).map(dataType -> {
            return dataType.nullable();
        }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(DataType.class))))).map(dataType2 -> {
            return TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(dataType2);
        }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(TypeInformation.class))));
        tEnv().registerTableSinkInternal("MySink1", configure);
        sqlQuery.executeInsert("MySink1").await();
        Assert.assertEquals(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"US Dollar,1,104,1970-01-01T00:00,1970-01-01T00:00:05", "Yen,1,1,1970-01-01T00:00,1970-01-01T00:00:05", "Euro,1,118,1970-01-01T00:00:15,1970-01-01T00:00:20"})).sorted(Ordering$String$.MODULE$), configure.getRetractResults().sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testWindowAggregateOnUpsertSourcePushdownWatermark() {
        env().setParallelism(1);
        tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(621).append("\n                       |CREATE TABLE upsert_currency (\n                       |  currency STRING,\n                       |  currency_no STRING,\n                       |  rate  BIGINT,\n                       |  currency_time TIMESTAMP(3),\n                       |  WATERMARK FOR currency_time AS currency_time - interval '5' SECOND,\n                       |  PRIMARY KEY(currency) NOT ENFORCED\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'changelog-mode' = 'UA,D',\n                       |  'data-id' = '").append(TestValuesTableFactory.registerData((Seq<Row>) upsertSourceCurrencyData())).append("'\n                       |)\n                       |").toString())).stripMargin());
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start,\n        |TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end,\n        |MAX(rate) AS max_rate\n        |FROM upsert_currency\n        |GROUP BY TUMBLE(currency_time, INTERVAL '5' SECOND)\n        |")).stripMargin();
        TestingAppendSink testingAppendSink = new TestingAppendSink();
        package$.MODULE$.tableConversions(tEnv().sqlQuery(stripMargin)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink(testingAppendSink);
        env().execute();
        Assert.assertEquals(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"1970-01-01T00:00,1970-01-01T00:00:05,702", "1970-01-01T00:00:15,1970-01-01T00:00:20,118"})).sorted(Ordering$String$.MODULE$), testingAppendSink.getAppendResults().sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testWindowAggregateOnRetractStream() {
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |`string`,\n        |TUMBLE_START(rowtime, INTERVAL '0.005' SECOND) as w_start,\n        |TUMBLE_END(rowtime, INTERVAL '0.005' SECOND) as w_end,\n        |COUNT(1) AS cnt\n        |FROM\n        | (\n        | SELECT `string`, rowtime\n        | FROM (\n        |  SELECT *,\n        |  ROW_NUMBER() OVER (PARTITION BY `string` ORDER BY rowtime DESC) as rowNum\n        |   FROM testTable\n        | )\n        | WHERE rowNum = 1\n        |)\n        |GROUP BY `string`, TUMBLE(rowtime, INTERVAL '0.005' SECOND)\n        |")).stripMargin();
        TestingAppendSink testingAppendSink = new TestingAppendSink();
        package$.MODULE$.tableConversions(tEnv().sqlQuery(stripMargin)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink(testingAppendSink);
        env().execute();
        Assert.assertEquals(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"Hi,1970-01-01T00:00,1970-01-01T00:00:00.005,1", "Hallo,1970-01-01T00:00,1970-01-01T00:00:00.005,1", "Hello,1970-01-01T00:00:00.005,1970-01-01T00:00:00.010,1", "Hello world,1970-01-01T00:00:00.015,1970-01-01T00:00:00.020,1", "null,1970-01-01T00:00:00.030,1970-01-01T00:00:00.035,1"})).sorted(Ordering$String$.MODULE$), testingAppendSink.getAppendResults().sorted(Ordering$String$.MODULE$));
    }

    @Test
    public void testDistinctAggWithMergeOnEventTimeSessionGroupWindow() {
        if (this.useTimestampLtz) {
            return;
        }
        final GroupWindowITCase groupWindowITCase = null;
        tEnv().createTemporaryView("MyTable", package$.MODULE$.dataStreamConversions(failingDataSource(new $colon.colon(new Tuple3(BoxesRunTime.boxToLong(1L), BoxesRunTime.boxToInteger(2), "Hello"), new $colon.colon(new Tuple3(BoxesRunTime.boxToLong(2L), BoxesRunTime.boxToInteger(2), "Hello"), new $colon.colon(new Tuple3(BoxesRunTime.boxToLong(8L), BoxesRunTime.boxToInteger(2), "Hello"), new $colon.colon(new Tuple3(BoxesRunTime.boxToLong(10L), BoxesRunTime.boxToInteger(3), "Hello"), new $colon.colon(new Tuple3(BoxesRunTime.boxToLong(9L), BoxesRunTime.boxToInteger(9), "Hello World"), new $colon.colon(new Tuple3(BoxesRunTime.boxToLong(4L), BoxesRunTime.boxToInteger(1), "Hello"), new $colon.colon(new Tuple3(BoxesRunTime.boxToLong(16L), BoxesRunTime.boxToInteger(16), "Hello"), Nil$.MODULE$))))))), new CaseClassTypeInfo<Tuple3<Object, Object, String>>(groupWindowITCase) { // from class: org.apache.flink.table.planner.runtime.stream.sql.GroupWindowITCase$$anon$6
            public /* synthetic */ TypeInformation[] protected$types(GroupWindowITCase$$anon$6 groupWindowITCase$$anon$6) {
                return groupWindowITCase$$anon$6.types;
            }

            public TypeSerializer<Tuple3<Object, Object, String>> createSerializer(ExecutionConfig executionConfig) {
                final TypeSerializer[] typeSerializerArr = new TypeSerializer[getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), getArity()).foreach$mVc$sp(i -> {
                    typeSerializerArr[i] = this.protected$types(this)[i].createSerializer(executionConfig);
                });
                new ScalaCaseClassSerializer<Tuple3<Object, Object, String>>(this, typeSerializerArr) { // from class: org.apache.flink.table.planner.runtime.stream.sql.GroupWindowITCase$$anon$6$$anon$3
                    /* renamed from: createInstance, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
                    public Tuple3<Object, Object, String> m2495createInstance(Object[] objArr) {
                        return new Tuple3<>(BoxesRunTime.boxToLong(BoxesRunTime.unboxToLong(objArr[0])), BoxesRunTime.boxToInteger(BoxesRunTime.unboxToInt(objArr[1])), (String) objArr[2]);
                    }

                    {
                        Class typeClass = this.getTypeClass();
                    }
                };
                return new ScalaCaseClassSerializer(getTypeClass(), typeSerializerArr);
            }

            {
                super(Tuple3.class, (TypeInformation[]) new $colon.colon(BasicTypeInfo.getInfoFor(Long.TYPE), new $colon.colon(BasicTypeInfo.getInfoFor(Integer.TYPE), new $colon.colon(BasicTypeInfo.getInfoFor(String.class), Nil$.MODULE$))).toArray((ClassTag) Predef$.MODULE$.implicitly(ClassTag$.MODULE$.apply(TypeInformation.class))), new $colon.colon(BasicTypeInfo.getInfoFor(Long.TYPE), new $colon.colon(BasicTypeInfo.getInfoFor(Integer.TYPE), new $colon.colon(BasicTypeInfo.getInfoFor(String.class), Nil$.MODULE$))), Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"_1", "_2", "_3"})));
            }
        }).assignTimestampsAndWatermarks(new TimeTestUtil.TimestampAndWatermarkWithOffset(10L))).toTable(tEnv(), Predef$.MODULE$.wrapRefArray(new Expression[]{org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression((Symbol) SymbolLiteral.bootstrap(MethodHandles.lookup(), "apply", MethodType.methodType(Symbol.class), "a").dynamicInvoker().invoke() /* invoke-custom */), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression((Symbol) SymbolLiteral.bootstrap(MethodHandles.lookup(), "apply", MethodType.methodType(Symbol.class), "b").dynamicInvoker().invoke() /* invoke-custom */), org.apache.flink.table.api.package$.MODULE$.symbol2FieldExpression((Symbol) SymbolLiteral.bootstrap(MethodHandles.lookup(), "apply", MethodType.methodType(Symbol.class), "c").dynamicInvoker().invoke() /* invoke-custom */), (Expression) org.apache.flink.table.api.package$.MODULE$.UnresolvedFieldExpression((Symbol) SymbolLiteral.bootstrap(MethodHandles.lookup(), "apply", MethodType.methodType(Symbol.class), "rowtime").dynamicInvoker().invoke() /* invoke-custom */).rowtime()})));
        String stripMargin = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT c,\n        |   COUNT(DISTINCT b),\n        |   SESSION_END(rowtime, INTERVAL '0.005' SECOND)\n        |FROM MyTable\n        |GROUP BY c, SESSION(rowtime, INTERVAL '0.005' SECOND)\n      ")).stripMargin();
        TestingAppendSink testingAppendSink = new TestingAppendSink();
        package$.MODULE$.tableConversions(tEnv().sqlQuery(stripMargin)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink(testingAppendSink);
        env().execute();
        Assert.assertEquals(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"Hello World,1,1970-01-01T00:00:00.014", "Hello,1,1970-01-01T00:00:00.021", "Hello,3,1970-01-01T00:00:00.015"})).sorted(Ordering$String$.MODULE$), testingAppendSink.getAppendResults().sorted(Ordering$String$.MODULE$));
    }

    private void withLateFireDelay(TableConfig tableConfig, Time time) {
        long milliseconds = time.toMilliseconds();
        Duration duration = (Duration) tableConfig.getOptional(WindowEmitStrategy$.MODULE$.TABLE_EXEC_EMIT_LATE_FIRE_DELAY()).orElse(null);
        if (duration != null && duration.toMillis() != milliseconds) {
            throw new RuntimeException("Currently not support different lateFireInterval configs in one job");
        }
        tableConfig.set(WindowEmitStrategy$.MODULE$.TABLE_EXEC_EMIT_LATE_FIRE_ENABLED(), BoxesRunTime.boxToBoolean(true));
        tableConfig.set(WindowEmitStrategy$.MODULE$.TABLE_EXEC_EMIT_LATE_FIRE_DELAY(), Duration.ofMillis(milliseconds));
    }

    private LocalDateTime localDateTime(long j) {
        return LocalDateTime.ofEpochSecond(j, 0, ZoneOffset.UTC);
    }

    public static final /* synthetic */ String $anonfun$testEventTimeTumblingWindowWithAllowLateness$1(int i) {
        return new StringBuilder(1).append("f").append(i).toString();
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public GroupWindowITCase(StreamingWithStateTestBase.StateBackendMode stateBackendMode, boolean z) {
        super(stateBackendMode);
        this.useTimestampLtz = z;
        this.SHANGHAI_ZONE = ZoneId.of("Asia/Shanghai");
        this.upsertSourceCurrencyData = List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Row[]{TestValuesTableFactory.changelogRow("+U", "Euro", "no1", 114L, localDateTime(1L)), TestValuesTableFactory.changelogRow("+U", "US Dollar", "no1", 100L, localDateTime(1L)), TestValuesTableFactory.changelogRow("+U", "US Dollar", "no1", 102L, localDateTime(2L)), TestValuesTableFactory.changelogRow("+U", "Yen", "no1", 1L, localDateTime(3L)), TestValuesTableFactory.changelogRow("+U", "RMB", "no1", 702L, localDateTime(4L)), TestValuesTableFactory.changelogRow("+U", "Euro", "no1", 118L, localDateTime(18L)), TestValuesTableFactory.changelogRow("+U", "US Dollar", "no1", 104L, localDateTime(4L)), TestValuesTableFactory.changelogRow("-D", "RMB", "no1", 702L, localDateTime(4L))}));
    }
}
