首页 > 技术文章 > flink-table demo

zpzhue 2021-05-16 23:25 原文

添加依赖

  • maven pom 如下

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>cn.irisz</groupId>
        <artifactId>flink-test1</artifactId>
        <version>0.1-SNAPSHOT</version>
    
        <properties>
            <maven.compiler.source>1.8</maven.compiler.source>
            <maven.compiler.target>1.8</maven.compiler.target>
            <encoding>UTF-8</encoding>
            <scala.binary.version>2.11</scala.binary.version>
            <flink.version>1.12.1</flink.version>
            <hadoop.version>2.7.5</hadoop.version>
        </properties>
    
    
        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.38</version>
            </dependency>
        </dependencies>
    
    
        <build>
            <sourceDirectory>src/main/scala</sourceDirectory>
            <testSourceDirectory>src/test/scala</testSourceDirectory>
            <plugins>
    
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>2.5.1</version>
                    <configuration>
                        <source>${maven.compiler.source}</source>
                        <target>${maven.compiler.target}</target>
                        <!--<encoding>${project.build.sourceEncoding}</encoding>-->
                    </configuration>
                </plugin>
    
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.0</version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>testCompile</goal>
                            </goals>
                            <configuration>
                                <args>
                                    <!--<arg>-make:transitive</arg>-->
                                    <arg>-dependencyfile</arg>
                                    <arg>${project.build.directory}/.scala_dependencies</arg>
                                </args>
    
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
    
    
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>2.3</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <filters>
                                    <filter>
                                        <artifact>*:*</artifact>
                                        <excludes>
                                            <!--
                                            zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF
                                            -->
                                            <exclude>META-INF/*.SF</exclude>
                                            <exclude>META-INF/*.DSA</exclude>
                                            <exclude>META-INF/*.RSA</exclude>
                                        </excludes>
                                    </filter>
                                </filters>
                                <transformers>
                                    <transformer
                                            implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                        <mainClass>com.itheima.batch.BatchFromCollection</mainClass>
                                    </transformer>
                                </transformers>
                            </configuration>
                        </execution>
    
                    </executions>
                </plugin>
    
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-jar-plugin</artifactId>
                    <version>2.6</version>
                    <configuration>
                        <archive>
                            <manifest>
                                <addClasspath>true</addClasspath>
                                <classpathPrefix>lib/</classpathPrefix>
                                <mainClass>com.itheima.env.BatchRemoteEven</mainClass>
                            </manifest>
                        </archive>
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-dependency-plugin</artifactId>
                    <version>2.10</version>
                    <executions>
                        <execution>
                            <id>copy-dependencies</id>
                            <phase>package</phase>
                            <goals>
                                <goal>copy-dependencies</goal>
                            </goals>
                            <configuration>
                                <outputDirectory>${project.build.directory}/lib</outputDirectory>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    
    </project>
    

测试代码

package cn.irisz.connect_test

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment, TableResult}

object Demo2 {
  def main(args: Array[String]): Unit = {
    //    val settings: EnvironmentSettings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()
    //    val tEnv: TableEnvironment = StreamTable.create(settings)
    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(environment, settings)
    val tData: TableResult = tEnv.executeSql(
      """CREATE TABLE `log` (
          `ip` STRING,
          `i_country` STRING,
          `i_province` STRING,
          `i_city` STRING,
          `i_isp` STRING,
          `url_path` STRING,
          `url_count` BIGINT
        ) WITH (
          'connector' = 'jdbc',
          'url' = 'jdbc:mysql://localhost:3306/test1?useSSL=false',
          'table-name' = 'result',
          'username' = 'root',
          'password' = '123456'
        )""")
    val result: TableResult = tEnv.executeSql(
      """
      CREATE TABLE `sink` (
      `ip` STRING,
      `i_country` STRING,
      `i_province` STRING,
      `i_city` STRING,
      `i_isp` STRING,
      `url_path` STRING,
      `url_count` BIGINT
    ) WITH (
      'connector' = 'print'
    )""")
    tEnv.executeSql("""INSERT INTO sink SELECT * FROM log LIMIT 100""")
    tEnv.execute("demo 2")
  }
}
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import EnvironmentSettings, StreamTableEnvironment


def task():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.add_jars("file:///root/flink-demo/mysql-connector-java-5.1.38.jar")
    settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
    t_env = StreamTableEnvironment.create(stream_execution_environment=env, environment_settings=settings)
    t_env.get_config().get_configuration().set_string('parallelism.default', '2')
    t_env.get_config().get_configuration().set_string(
        "pipeline.jars", "file:///root/flink-demo/flink-connector-jdbc_2.11-1.12.3.jar")

    t_log = t_env.execute_sql("""
    CREATE TABLE `log` (
      `ip` STRING,
      `i_country` STRING,
      `i_province` STRING,
      `i_city` STRING,
      `i_isp` STRING,
      `url_path` STRING,
      `url_count` BIGINT
    ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:mysql://localhost:3306/test1?useSSL=false',
      'table-name' = 'result',
      'username' = 'root',
      'password' = '123456'
    )""")

    t_sink = t_env.execute_sql("""
    CREATE TABLE `sink` (
      `ip` STRING,
      `i_country` STRING,
      `i_province` STRING,
      `i_city` STRING,
      `i_isp` STRING,
      `url_path` STRING,
      `url_count` BIGINT
    ) WITH (
      'connector' = 'print'
    )""")

    t_env.execute_sql("""INSERT INTO sink SELECT * FROM log LIMIT 100""")
    t_env.execute('demo 2')


if __name__ == '__main__':
    task()

推荐阅读